]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/column_family_test.cc
bdc832bd235aae03c66054e894031f4ccdf0ae88
[ceph.git] / ceph / src / rocksdb / db / column_family_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <algorithm>
11 #include <vector>
12 #include <string>
13 #include <thread>
14
15 #include "db/db_impl.h"
16 #include "db/db_test_util.h"
17 #include "memtable/hash_skiplist_rep.h"
18 #include "options/options_parser.h"
19 #include "port/port.h"
20 #include "rocksdb/db.h"
21 #include "rocksdb/env.h"
22 #include "rocksdb/iterator.h"
23 #include "util/coding.h"
24 #include "util/fault_injection_test_env.h"
25 #include "util/string_util.h"
26 #include "util/sync_point.h"
27 #include "util/testharness.h"
28 #include "util/testutil.h"
29 #include "utilities/merge_operators.h"
30
31 namespace rocksdb {
32
33 static const int kValueSize = 1000;
34
35 namespace {
36 std::string RandomString(Random* rnd, int len) {
37 std::string r;
38 test::RandomString(rnd, len, &r);
39 return r;
40 }
41 } // anonymous namespace
42
43 // counts how many operations were performed
44 class EnvCounter : public EnvWrapper {
45 public:
46 explicit EnvCounter(Env* base)
47 : EnvWrapper(base), num_new_writable_file_(0) {}
48 int GetNumberOfNewWritableFileCalls() {
49 return num_new_writable_file_;
50 }
51 Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
52 const EnvOptions& soptions) override {
53 ++num_new_writable_file_;
54 return EnvWrapper::NewWritableFile(f, r, soptions);
55 }
56
57 private:
58 std::atomic<int> num_new_writable_file_;
59 };
60
61 class ColumnFamilyTestBase : public testing::Test {
62 public:
63 ColumnFamilyTestBase(uint32_t format) : rnd_(139), format_(format) {
64 env_ = new EnvCounter(Env::Default());
65 dbname_ = test::PerThreadDBPath("column_family_test");
66 db_options_.create_if_missing = true;
67 db_options_.fail_if_options_file_error = true;
68 db_options_.env = env_;
69 DestroyDB(dbname_, Options(db_options_, column_family_options_));
70 }
71
72 ~ColumnFamilyTestBase() override {
73 std::vector<ColumnFamilyDescriptor> column_families;
74 for (auto h : handles_) {
75 ColumnFamilyDescriptor cfdescriptor;
76 h->GetDescriptor(&cfdescriptor);
77 column_families.push_back(cfdescriptor);
78 }
79 Close();
80 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
81 Destroy(column_families);
82 delete env_;
83 }
84
85 BlockBasedTableOptions GetBlockBasedTableOptions() {
86 BlockBasedTableOptions options;
87 options.format_version = format_;
88 return options;
89 }
90
91 // Return the value to associate with the specified key
92 Slice Value(int k, std::string* storage) {
93 if (k == 0) {
94 // Ugh. Random seed of 0 used to produce no entropy. This code
95 // preserves the implementation that was in place when all of the
96 // magic values in this file were picked.
97 *storage = std::string(kValueSize, ' ');
98 return Slice(*storage);
99 } else {
100 Random r(k);
101 return test::RandomString(&r, kValueSize, storage);
102 }
103 }
104
105 void Build(int base, int n, int flush_every = 0) {
106 std::string key_space, value_space;
107 WriteBatch batch;
108
109 for (int i = 0; i < n; i++) {
110 if (flush_every != 0 && i != 0 && i % flush_every == 0) {
111 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
112 dbi->TEST_FlushMemTable();
113 }
114
115 int keyi = base + i;
116 Slice key(DBTestBase::Key(keyi));
117
118 batch.Clear();
119 batch.Put(handles_[0], key, Value(keyi, &value_space));
120 batch.Put(handles_[1], key, Value(keyi, &value_space));
121 batch.Put(handles_[2], key, Value(keyi, &value_space));
122 ASSERT_OK(db_->Write(WriteOptions(), &batch));
123 }
124 }
125
126 void CheckMissed() {
127 uint64_t next_expected = 0;
128 uint64_t missed = 0;
129 int bad_keys = 0;
130 int bad_values = 0;
131 int correct = 0;
132 std::string value_space;
133 for (int cf = 0; cf < 3; cf++) {
134 next_expected = 0;
135 Iterator* iter = db_->NewIterator(ReadOptions(false, true), handles_[cf]);
136 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
137 uint64_t key;
138 Slice in(iter->key());
139 in.remove_prefix(3);
140 if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
141 key < next_expected) {
142 bad_keys++;
143 continue;
144 }
145 missed += (key - next_expected);
146 next_expected = key + 1;
147 if (iter->value() != Value(static_cast<int>(key), &value_space)) {
148 bad_values++;
149 } else {
150 correct++;
151 }
152 }
153 delete iter;
154 }
155
156 ASSERT_EQ(0, bad_keys);
157 ASSERT_EQ(0, bad_values);
158 ASSERT_EQ(0, missed);
159 (void)correct;
160 }
161
162 void Close() {
163 for (auto h : handles_) {
164 if (h) {
165 db_->DestroyColumnFamilyHandle(h);
166 }
167 }
168 handles_.clear();
169 names_.clear();
170 delete db_;
171 db_ = nullptr;
172 }
173
174 Status TryOpen(std::vector<std::string> cf,
175 std::vector<ColumnFamilyOptions> options = {}) {
176 std::vector<ColumnFamilyDescriptor> column_families;
177 names_.clear();
178 for (size_t i = 0; i < cf.size(); ++i) {
179 column_families.push_back(ColumnFamilyDescriptor(
180 cf[i], options.size() == 0 ? column_family_options_ : options[i]));
181 names_.push_back(cf[i]);
182 }
183 return DB::Open(db_options_, dbname_, column_families, &handles_, &db_);
184 }
185
186 Status OpenReadOnly(std::vector<std::string> cf,
187 std::vector<ColumnFamilyOptions> options = {}) {
188 std::vector<ColumnFamilyDescriptor> column_families;
189 names_.clear();
190 for (size_t i = 0; i < cf.size(); ++i) {
191 column_families.push_back(ColumnFamilyDescriptor(
192 cf[i], options.size() == 0 ? column_family_options_ : options[i]));
193 names_.push_back(cf[i]);
194 }
195 return DB::OpenForReadOnly(db_options_, dbname_, column_families, &handles_,
196 &db_);
197 }
198
199 #ifndef ROCKSDB_LITE // ReadOnlyDB is not supported
200 void AssertOpenReadOnly(std::vector<std::string> cf,
201 std::vector<ColumnFamilyOptions> options = {}) {
202 ASSERT_OK(OpenReadOnly(cf, options));
203 }
204 #endif // !ROCKSDB_LITE
205
206
207 void Open(std::vector<std::string> cf,
208 std::vector<ColumnFamilyOptions> options = {}) {
209 ASSERT_OK(TryOpen(cf, options));
210 }
211
212 void Open() {
213 Open({"default"});
214 }
215
216 DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }
217
218 int GetProperty(int cf, std::string property) {
219 std::string value;
220 EXPECT_TRUE(dbfull()->GetProperty(handles_[cf], property, &value));
221 #ifndef CYGWIN
222 return std::stoi(value);
223 #else
224 return std::strtol(value.c_str(), 0 /* off */, 10 /* base */);
225 #endif
226 }
227
228 bool IsDbWriteStopped() {
229 #ifndef ROCKSDB_LITE
230 uint64_t v;
231 EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.is-write-stopped", &v));
232 return (v == 1);
233 #else
234 return dbfull()->TEST_write_controler().IsStopped();
235 #endif // !ROCKSDB_LITE
236 }
237
238 uint64_t GetDbDelayedWriteRate() {
239 #ifndef ROCKSDB_LITE
240 uint64_t v;
241 EXPECT_TRUE(
242 dbfull()->GetIntProperty("rocksdb.actual-delayed-write-rate", &v));
243 return v;
244 #else
245 if (!dbfull()->TEST_write_controler().NeedsDelay()) {
246 return 0;
247 }
248 return dbfull()->TEST_write_controler().delayed_write_rate();
249 #endif // !ROCKSDB_LITE
250 }
251
252 void Destroy(const std::vector<ColumnFamilyDescriptor>& column_families =
253 std::vector<ColumnFamilyDescriptor>()) {
254 Close();
255 ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_),
256 column_families));
257 }
258
259 void CreateColumnFamilies(
260 const std::vector<std::string>& cfs,
261 const std::vector<ColumnFamilyOptions> options = {}) {
262 int cfi = static_cast<int>(handles_.size());
263 handles_.resize(cfi + cfs.size());
264 names_.resize(cfi + cfs.size());
265 for (size_t i = 0; i < cfs.size(); ++i) {
266 const auto& current_cf_opt =
267 options.size() == 0 ? column_family_options_ : options[i];
268 ASSERT_OK(
269 db_->CreateColumnFamily(current_cf_opt, cfs[i], &handles_[cfi]));
270 names_[cfi] = cfs[i];
271
272 #ifndef ROCKSDB_LITE // RocksDBLite does not support GetDescriptor
273 // Verify the CF options of the returned CF handle.
274 ColumnFamilyDescriptor desc;
275 ASSERT_OK(handles_[cfi]->GetDescriptor(&desc));
276 RocksDBOptionsParser::VerifyCFOptions(desc.options, current_cf_opt);
277 #endif // !ROCKSDB_LITE
278 cfi++;
279 }
280 }
281
282 void Reopen(const std::vector<ColumnFamilyOptions> options = {}) {
283 std::vector<std::string> names;
284 for (auto name : names_) {
285 if (name != "") {
286 names.push_back(name);
287 }
288 }
289 Close();
290 assert(options.size() == 0 || names.size() == options.size());
291 Open(names, options);
292 }
293
294 void CreateColumnFamiliesAndReopen(const std::vector<std::string>& cfs) {
295 CreateColumnFamilies(cfs);
296 Reopen();
297 }
298
299 void DropColumnFamilies(const std::vector<int>& cfs) {
300 for (auto cf : cfs) {
301 ASSERT_OK(db_->DropColumnFamily(handles_[cf]));
302 db_->DestroyColumnFamilyHandle(handles_[cf]);
303 handles_[cf] = nullptr;
304 names_[cf] = "";
305 }
306 }
307
308 void PutRandomData(int cf, int num, int key_value_size, bool save = false) {
309 if (cf >= static_cast<int>(keys_.size())) {
310 keys_.resize(cf + 1);
311 }
312 for (int i = 0; i < num; ++i) {
313 // 10 bytes for key, rest is value
314 if (!save) {
315 ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11),
316 RandomString(&rnd_, key_value_size - 10)));
317 } else {
318 std::string key = test::RandomKey(&rnd_, 11);
319 keys_[cf].insert(key);
320 ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10)));
321 }
322 }
323 db_->FlushWAL(false);
324 }
325
326 #ifndef ROCKSDB_LITE // TEST functions in DB are not supported in lite
327 void WaitForFlush(int cf) {
328 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
329 }
330
331 void WaitForCompaction() {
332 ASSERT_OK(dbfull()->TEST_WaitForCompact());
333 }
334
335 uint64_t MaxTotalInMemoryState() {
336 return dbfull()->TEST_MaxTotalInMemoryState();
337 }
338
339 void AssertMaxTotalInMemoryState(uint64_t value) {
340 ASSERT_EQ(value, MaxTotalInMemoryState());
341 }
342 #endif // !ROCKSDB_LITE
343
344 Status Put(int cf, const std::string& key, const std::string& value) {
345 return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
346 }
347 Status Merge(int cf, const std::string& key, const std::string& value) {
348 return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value));
349 }
350 Status Flush(int cf) {
351 return db_->Flush(FlushOptions(), handles_[cf]);
352 }
353
354 std::string Get(int cf, const std::string& key) {
355 ReadOptions options;
356 options.verify_checksums = true;
357 std::string result;
358 Status s = db_->Get(options, handles_[cf], Slice(key), &result);
359 if (s.IsNotFound()) {
360 result = "NOT_FOUND";
361 } else if (!s.ok()) {
362 result = s.ToString();
363 }
364 return result;
365 }
366
367 void CompactAll(int cf) {
368 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
369 nullptr));
370 }
371
372 void Compact(int cf, const Slice& start, const Slice& limit) {
373 ASSERT_OK(
374 db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
375 }
376
377 int NumTableFilesAtLevel(int level, int cf) {
378 return GetProperty(cf,
379 "rocksdb.num-files-at-level" + ToString(level));
380 }
381
382 #ifndef ROCKSDB_LITE
383 // Return spread of files per level
384 std::string FilesPerLevel(int cf) {
385 std::string result;
386 int last_non_zero_offset = 0;
387 for (int level = 0; level < dbfull()->NumberLevels(handles_[cf]); level++) {
388 int f = NumTableFilesAtLevel(level, cf);
389 char buf[100];
390 snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
391 result += buf;
392 if (f > 0) {
393 last_non_zero_offset = static_cast<int>(result.size());
394 }
395 }
396 result.resize(last_non_zero_offset);
397 return result;
398 }
399 #endif
400
401 void AssertFilesPerLevel(const std::string& value, int cf) {
402 #ifndef ROCKSDB_LITE
403 ASSERT_EQ(value, FilesPerLevel(cf));
404 #else
405 (void) value;
406 (void) cf;
407 #endif
408 }
409
410 #ifndef ROCKSDB_LITE // GetLiveFilesMetaData is not supported
411 int CountLiveFiles() {
412 std::vector<LiveFileMetaData> metadata;
413 db_->GetLiveFilesMetaData(&metadata);
414 return static_cast<int>(metadata.size());
415 }
416 #endif // !ROCKSDB_LITE
417
418 void AssertCountLiveFiles(int expected_value) {
419 #ifndef ROCKSDB_LITE
420 ASSERT_EQ(expected_value, CountLiveFiles());
421 #else
422 (void) expected_value;
423 #endif
424 }
425
426 // Do n memtable flushes, each of which produces an sstable
427 // covering the range [small,large].
428 void MakeTables(int cf, int n, const std::string& small,
429 const std::string& large) {
430 for (int i = 0; i < n; i++) {
431 ASSERT_OK(Put(cf, small, "begin"));
432 ASSERT_OK(Put(cf, large, "end"));
433 ASSERT_OK(db_->Flush(FlushOptions(), handles_[cf]));
434 }
435 }
436
437 #ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported
438 int CountLiveLogFiles() {
439 int micros_wait_for_log_deletion = 20000;
440 env_->SleepForMicroseconds(micros_wait_for_log_deletion);
441 int ret = 0;
442 VectorLogPtr wal_files;
443 Status s;
444 // GetSortedWalFiles is a flakey function -- it gets all the wal_dir
445 // children files and then later checks for their existence. if some of the
446 // log files doesn't exist anymore, it reports an error. it does all of this
447 // without DB mutex held, so if a background process deletes the log file
448 // while the function is being executed, it returns an error. We retry the
449 // function 10 times to avoid the error failing the test
450 for (int retries = 0; retries < 10; ++retries) {
451 wal_files.clear();
452 s = db_->GetSortedWalFiles(wal_files);
453 if (s.ok()) {
454 break;
455 }
456 }
457 EXPECT_OK(s);
458 for (const auto& wal : wal_files) {
459 if (wal->Type() == kAliveLogFile) {
460 ++ret;
461 }
462 }
463 return ret;
464 return 0;
465 }
466 #endif // !ROCKSDB_LITE
467
468 void AssertCountLiveLogFiles(int value) {
469 #ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported
470 ASSERT_EQ(value, CountLiveLogFiles());
471 #else
472 (void) value;
473 #endif // !ROCKSDB_LITE
474 }
475
476 void AssertNumberOfImmutableMemtables(std::vector<int> num_per_cf) {
477 assert(num_per_cf.size() == handles_.size());
478
479 #ifndef ROCKSDB_LITE // GetProperty is not supported in lite
480 for (size_t i = 0; i < num_per_cf.size(); ++i) {
481 ASSERT_EQ(num_per_cf[i], GetProperty(static_cast<int>(i),
482 "rocksdb.num-immutable-mem-table"));
483 }
484 #endif // !ROCKSDB_LITE
485 }
486
487 void CopyFile(const std::string& source, const std::string& destination,
488 uint64_t size = 0) {
489 const EnvOptions soptions;
490 std::unique_ptr<SequentialFile> srcfile;
491 ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
492 std::unique_ptr<WritableFile> destfile;
493 ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
494
495 if (size == 0) {
496 // default argument means copy everything
497 ASSERT_OK(env_->GetFileSize(source, &size));
498 }
499
500 char buffer[4096];
501 Slice slice;
502 while (size > 0) {
503 uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
504 ASSERT_OK(srcfile->Read(one, &slice, buffer));
505 ASSERT_OK(destfile->Append(slice));
506 size -= slice.size();
507 }
508 ASSERT_OK(destfile->Close());
509 }
510
511 int GetSstFileCount(std::string path) {
512 std::vector<std::string> files;
513 DBTestBase::GetSstFiles(env_, path, &files);
514 return static_cast<int>(files.size());
515 }
516
517 void RecalculateWriteStallConditions(ColumnFamilyData* cfd,
518 const MutableCFOptions& mutable_cf_options) {
519 // add lock to avoid race condition between
520 // `RecalculateWriteStallConditions` which writes to CFStats and
521 // background `DBImpl::DumpStats()` threads which read CFStats
522 dbfull()->TEST_LockMutex();
523 cfd->RecalculateWriteStallConditions(mutable_cf_options);
524 dbfull()-> TEST_UnlockMutex();
525 }
526
527 std::vector<ColumnFamilyHandle*> handles_;
528 std::vector<std::string> names_;
529 std::vector<std::set<std::string>> keys_;
530 ColumnFamilyOptions column_family_options_;
531 DBOptions db_options_;
532 std::string dbname_;
533 DB* db_ = nullptr;
534 EnvCounter* env_;
535 Random rnd_;
536 uint32_t format_;
537 };
538
539 class ColumnFamilyTest
540 : public ColumnFamilyTestBase,
541 virtual public ::testing::WithParamInterface<uint32_t> {
542 public:
543 ColumnFamilyTest() : ColumnFamilyTestBase(GetParam()) {}
544 };
545
546 INSTANTIATE_TEST_CASE_P(FormatDef, ColumnFamilyTest,
547 testing::Values(test::kDefaultFormatVersion));
548 INSTANTIATE_TEST_CASE_P(FormatLatest, ColumnFamilyTest,
549 testing::Values(test::kLatestFormatVersion));
550
551 TEST_P(ColumnFamilyTest, DontReuseColumnFamilyID) {
552 for (int iter = 0; iter < 3; ++iter) {
553 Open();
554 CreateColumnFamilies({"one", "two", "three"});
555 for (size_t i = 0; i < handles_.size(); ++i) {
556 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[i]);
557 ASSERT_EQ(i, cfh->GetID());
558 }
559 if (iter == 1) {
560 Reopen();
561 }
562 DropColumnFamilies({3});
563 Reopen();
564 if (iter == 2) {
565 // this tests if max_column_family is correctly persisted with
566 // WriteSnapshot()
567 Reopen();
568 }
569 CreateColumnFamilies({"three2"});
570 // ID 3 that was used for dropped column family "three" should not be
571 // reused
572 auto cfh3 = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[3]);
573 ASSERT_EQ(4U, cfh3->GetID());
574 Close();
575 Destroy();
576 }
577 }
578
579 #ifndef ROCKSDB_LITE
580 TEST_P(ColumnFamilyTest, CreateCFRaceWithGetAggProperty) {
581 Open();
582
583 rocksdb::SyncPoint::GetInstance()->LoadDependency(
584 {{"DBImpl::WriteOptionsFile:1",
585 "ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"},
586 {"ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2",
587 "DBImpl::WriteOptionsFile:2"}});
588 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
589
590 rocksdb::port::Thread thread([&] { CreateColumnFamilies({"one"}); });
591
592 TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1");
593 uint64_t pv;
594 db_->GetAggregatedIntProperty(DB::Properties::kEstimateTableReadersMem, &pv);
595 TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2");
596
597 thread.join();
598
599 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
600 }
601 #endif // !ROCKSDB_LITE
602
603 class FlushEmptyCFTestWithParam
604 : public ColumnFamilyTestBase,
605 virtual public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
606 public:
607 FlushEmptyCFTestWithParam()
608 : ColumnFamilyTestBase(std::get<0>(GetParam())),
609 allow_2pc_(std::get<1>(GetParam())) {}
610
611 // Required if inheriting from testing::WithParamInterface<>
612 static void SetUpTestCase() {}
613 static void TearDownTestCase() {}
614
615 bool allow_2pc_;
616 };
617
618 TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) {
619 std::unique_ptr<FaultInjectionTestEnv> fault_env(
620 new FaultInjectionTestEnv(env_));
621 db_options_.env = fault_env.get();
622 db_options_.allow_2pc = allow_2pc_;
623 Open();
624 CreateColumnFamilies({"one", "two"});
625 // Generate log file A.
626 ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
627
628 Reopen();
629 // Log file A is not dropped after reopening because default column family's
630 // min log number is 0.
631 // It flushes to SST file X
632 ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
633 ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
634 // Current log file is file B now. While flushing, a new log file C is created
635 // and is set to current. Boths' min log number is set to file C in memory, so
636 // after flushing file B is deleted. At the same time, the min log number of
637 // default CF is not written to manifest. Log file A still remains.
638 // Flushed to SST file Y.
639 Flush(1);
640 Flush(0);
641 ASSERT_OK(Put(1, "bar", "v3")); // seqID 4
642 ASSERT_OK(Put(1, "foo", "v4")); // seqID 5
643 db_->FlushWAL(false);
644
645 // Preserve file system state up to here to simulate a crash condition.
646 fault_env->SetFilesystemActive(false);
647 std::vector<std::string> names;
648 for (auto name : names_) {
649 if (name != "") {
650 names.push_back(name);
651 }
652 }
653
654 Close();
655 fault_env->ResetState();
656
657 // Before opening, there are four files:
658 // Log file A contains seqID 1
659 // Log file C contains seqID 4, 5
660 // SST file X contains seqID 1
661 // SST file Y contains seqID 2, 3
662 // Min log number:
663 // default CF: 0
664 // CF one, two: C
665 // When opening the DB, all the seqID should be preserved.
666 Open(names, {});
667 ASSERT_EQ("v4", Get(1, "foo"));
668 ASSERT_EQ("v3", Get(1, "bar"));
669 Close();
670
671 db_options_.env = env_;
672 }
673
674 TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) {
675 std::unique_ptr<FaultInjectionTestEnv> fault_env(
676 new FaultInjectionTestEnv(env_));
677 db_options_.env = fault_env.get();
678 db_options_.allow_2pc = allow_2pc_;
679 Open();
680 CreateColumnFamilies({"one", "two"});
681 // Generate log file A.
682 ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
683
684 Reopen();
685 // Log file A is not dropped after reopening because default column family's
686 // min log number is 0.
687 // It flushes to SST file X
688 ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
689 ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
690 // Current log file is file B now. While flushing, a new log file C is created
691 // and is set to current. Both CFs' min log number is set to file C so after
692 // flushing file B is deleted. Log file A still remains.
693 // Flushed to SST file Y.
694 Flush(1);
695 ASSERT_OK(Put(0, "bar", "v2")); // seqID 4
696 ASSERT_OK(Put(2, "bar", "v2")); // seqID 5
697 ASSERT_OK(Put(1, "bar", "v3")); // seqID 6
698 // Flushing all column families. This forces all CFs' min log to current. This
699 // is written to the manifest file. Log file C is cleared.
700 Flush(0);
701 Flush(1);
702 Flush(2);
703 // Write to log file D
704 ASSERT_OK(Put(1, "bar", "v4")); // seqID 7
705 ASSERT_OK(Put(1, "bar", "v5")); // seqID 8
706 db_->FlushWAL(false);
707 // Preserve file system state up to here to simulate a crash condition.
708 fault_env->SetFilesystemActive(false);
709 std::vector<std::string> names;
710 for (auto name : names_) {
711 if (name != "") {
712 names.push_back(name);
713 }
714 }
715
716 Close();
717 fault_env->ResetState();
718 // Before opening, there are two logfiles:
719 // Log file A contains seqID 1
720 // Log file D contains seqID 7, 8
721 // Min log number:
722 // default CF: D
723 // CF one, two: D
724 // When opening the DB, log file D should be replayed using the seqID
725 // specified in the file.
726 Open(names, {});
727 ASSERT_EQ("v1", Get(1, "foo"));
728 ASSERT_EQ("v5", Get(1, "bar"));
729 Close();
730
731 db_options_.env = env_;
732 }
733
734 INSTANTIATE_TEST_CASE_P(
735 FormatDef, FlushEmptyCFTestWithParam,
736 testing::Values(std::make_tuple(test::kDefaultFormatVersion, true),
737 std::make_tuple(test::kDefaultFormatVersion, false)));
738 INSTANTIATE_TEST_CASE_P(
739 FormatLatest, FlushEmptyCFTestWithParam,
740 testing::Values(std::make_tuple(test::kLatestFormatVersion, true),
741 std::make_tuple(test::kLatestFormatVersion, false)));
742
743 TEST_P(ColumnFamilyTest, AddDrop) {
744 Open();
745 CreateColumnFamilies({"one", "two", "three"});
746 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
747 ASSERT_EQ("NOT_FOUND", Get(2, "fodor"));
748 DropColumnFamilies({2});
749 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
750 CreateColumnFamilies({"four"});
751 ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
752 ASSERT_OK(Put(1, "fodor", "mirko"));
753 ASSERT_EQ("mirko", Get(1, "fodor"));
754 ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
755 Close();
756 ASSERT_TRUE(TryOpen({"default"}).IsInvalidArgument());
757 Open({"default", "one", "three", "four"});
758 DropColumnFamilies({1});
759 Reopen();
760 Close();
761
762 std::vector<std::string> families;
763 ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
764 std::sort(families.begin(), families.end());
765 ASSERT_TRUE(families ==
766 std::vector<std::string>({"default", "four", "three"}));
767 }
768
769 TEST_P(ColumnFamilyTest, BulkAddDrop) {
770 constexpr int kNumCF = 1000;
771 ColumnFamilyOptions cf_options;
772 WriteOptions write_options;
773 Open();
774 std::vector<std::string> cf_names;
775 std::vector<ColumnFamilyHandle*> cf_handles;
776 for (int i = 1; i <= kNumCF; i++) {
777 cf_names.push_back("cf1-" + ToString(i));
778 }
779 ASSERT_OK(db_->CreateColumnFamilies(cf_options, cf_names, &cf_handles));
780 for (int i = 1; i <= kNumCF; i++) {
781 ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
782 }
783 ASSERT_OK(db_->DropColumnFamilies(cf_handles));
784 std::vector<ColumnFamilyDescriptor> cf_descriptors;
785 for (auto* handle : cf_handles) {
786 delete handle;
787 }
788 cf_handles.clear();
789 for (int i = 1; i <= kNumCF; i++) {
790 cf_descriptors.emplace_back("cf2-" + ToString(i), ColumnFamilyOptions());
791 }
792 ASSERT_OK(db_->CreateColumnFamilies(cf_descriptors, &cf_handles));
793 for (int i = 1; i <= kNumCF; i++) {
794 ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
795 }
796 ASSERT_OK(db_->DropColumnFamilies(cf_handles));
797 for (auto* handle : cf_handles) {
798 delete handle;
799 }
800 Close();
801 std::vector<std::string> families;
802 ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
803 std::sort(families.begin(), families.end());
804 ASSERT_TRUE(families == std::vector<std::string>({"default"}));
805 }
806
807 TEST_P(ColumnFamilyTest, DropTest) {
808 // first iteration - dont reopen DB before dropping
809 // second iteration - reopen DB before dropping
810 for (int iter = 0; iter < 2; ++iter) {
811 Open({"default"});
812 CreateColumnFamiliesAndReopen({"pikachu"});
813 for (int i = 0; i < 100; ++i) {
814 ASSERT_OK(Put(1, ToString(i), "bar" + ToString(i)));
815 }
816 ASSERT_OK(Flush(1));
817
818 if (iter == 1) {
819 Reopen();
820 }
821 ASSERT_EQ("bar1", Get(1, "1"));
822
823 AssertCountLiveFiles(1);
824 DropColumnFamilies({1});
825 // make sure that all files are deleted when we drop the column family
826 AssertCountLiveFiles(0);
827 Destroy();
828 }
829 }
830
831 TEST_P(ColumnFamilyTest, WriteBatchFailure) {
832 Open();
833 CreateColumnFamiliesAndReopen({"one", "two"});
834 WriteBatch batch;
835 batch.Put(handles_[0], Slice("existing"), Slice("column-family"));
836 batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
837 ASSERT_OK(db_->Write(WriteOptions(), &batch));
838 DropColumnFamilies({1});
839 WriteOptions woptions_ignore_missing_cf;
840 woptions_ignore_missing_cf.ignore_missing_column_families = true;
841 batch.Put(handles_[0], Slice("still here"), Slice("column-family"));
842 ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch));
843 ASSERT_EQ("column-family", Get(0, "still here"));
844 Status s = db_->Write(WriteOptions(), &batch);
845 ASSERT_TRUE(s.IsInvalidArgument());
846 Close();
847 }
848
849 TEST_P(ColumnFamilyTest, ReadWrite) {
850 Open();
851 CreateColumnFamiliesAndReopen({"one", "two"});
852 ASSERT_OK(Put(0, "foo", "v1"));
853 ASSERT_OK(Put(0, "bar", "v2"));
854 ASSERT_OK(Put(1, "mirko", "v3"));
855 ASSERT_OK(Put(0, "foo", "v2"));
856 ASSERT_OK(Put(2, "fodor", "v5"));
857
858 for (int iter = 0; iter <= 3; ++iter) {
859 ASSERT_EQ("v2", Get(0, "foo"));
860 ASSERT_EQ("v2", Get(0, "bar"));
861 ASSERT_EQ("v3", Get(1, "mirko"));
862 ASSERT_EQ("v5", Get(2, "fodor"));
863 ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
864 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
865 ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
866 if (iter <= 1) {
867 Reopen();
868 }
869 }
870 Close();
871 }
872
873 TEST_P(ColumnFamilyTest, IgnoreRecoveredLog) {
874 std::string backup_logs = dbname_ + "/backup_logs";
875
876 // delete old files in backup_logs directory
877 ASSERT_OK(env_->CreateDirIfMissing(dbname_));
878 ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
879 std::vector<std::string> old_files;
880 env_->GetChildren(backup_logs, &old_files);
881 for (auto& file : old_files) {
882 if (file != "." && file != "..") {
883 env_->DeleteFile(backup_logs + "/" + file);
884 }
885 }
886
887 column_family_options_.merge_operator =
888 MergeOperators::CreateUInt64AddOperator();
889 db_options_.wal_dir = dbname_ + "/logs";
890 Destroy();
891 Open();
892 CreateColumnFamilies({"cf1", "cf2"});
893
894 // fill up the DB
895 std::string one, two, three;
896 PutFixed64(&one, 1);
897 PutFixed64(&two, 2);
898 PutFixed64(&three, 3);
899 ASSERT_OK(Merge(0, "foo", one));
900 ASSERT_OK(Merge(1, "mirko", one));
901 ASSERT_OK(Merge(0, "foo", one));
902 ASSERT_OK(Merge(2, "bla", one));
903 ASSERT_OK(Merge(2, "fodor", one));
904 ASSERT_OK(Merge(0, "bar", one));
905 ASSERT_OK(Merge(2, "bla", one));
906 ASSERT_OK(Merge(1, "mirko", two));
907 ASSERT_OK(Merge(1, "franjo", one));
908
909 // copy the logs to backup
910 std::vector<std::string> logs;
911 env_->GetChildren(db_options_.wal_dir, &logs);
912 for (auto& log : logs) {
913 if (log != ".." && log != ".") {
914 CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log);
915 }
916 }
917
918 // recover the DB
919 Close();
920
921 // 1. check consistency
922 // 2. copy the logs from backup back to WAL dir. if the recovery happens
923 // again on the same log files, this should lead to incorrect results
924 // due to applying merge operator twice
925 // 3. check consistency
926 for (int iter = 0; iter < 2; ++iter) {
927 // assert consistency
928 Open({"default", "cf1", "cf2"});
929 ASSERT_EQ(two, Get(0, "foo"));
930 ASSERT_EQ(one, Get(0, "bar"));
931 ASSERT_EQ(three, Get(1, "mirko"));
932 ASSERT_EQ(one, Get(1, "franjo"));
933 ASSERT_EQ(one, Get(2, "fodor"));
934 ASSERT_EQ(two, Get(2, "bla"));
935 Close();
936
937 if (iter == 0) {
938 // copy the logs from backup back to wal dir
939 for (auto& log : logs) {
940 if (log != ".." && log != ".") {
941 CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log);
942 }
943 }
944 }
945 }
946 }
947
948 #ifndef ROCKSDB_LITE // TEST functions used are not supported
949 TEST_P(ColumnFamilyTest, FlushTest) {
950 Open();
951 CreateColumnFamiliesAndReopen({"one", "two"});
952 ASSERT_OK(Put(0, "foo", "v1"));
953 ASSERT_OK(Put(0, "bar", "v2"));
954 ASSERT_OK(Put(1, "mirko", "v3"));
955 ASSERT_OK(Put(0, "foo", "v2"));
956 ASSERT_OK(Put(2, "fodor", "v5"));
957
958 for (int j = 0; j < 2; j++) {
959 ReadOptions ro;
960 std::vector<Iterator*> iterators;
961 // Hold super version.
962 if (j == 0) {
963 ASSERT_OK(db_->NewIterators(ro, handles_, &iterators));
964 }
965
966 for (int i = 0; i < 3; ++i) {
967 uint64_t max_total_in_memory_state =
968 MaxTotalInMemoryState();
969 Flush(i);
970 AssertMaxTotalInMemoryState(max_total_in_memory_state);
971 }
972 ASSERT_OK(Put(1, "foofoo", "bar"));
973 ASSERT_OK(Put(0, "foofoo", "bar"));
974
975 for (auto* it : iterators) {
976 delete it;
977 }
978 }
979 Reopen();
980
981 for (int iter = 0; iter <= 2; ++iter) {
982 ASSERT_EQ("v2", Get(0, "foo"));
983 ASSERT_EQ("v2", Get(0, "bar"));
984 ASSERT_EQ("v3", Get(1, "mirko"));
985 ASSERT_EQ("v5", Get(2, "fodor"));
986 ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
987 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
988 ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
989 if (iter <= 1) {
990 Reopen();
991 }
992 }
993 Close();
994 }
995
996 // Makes sure that obsolete log files get deleted
997 TEST_P(ColumnFamilyTest, LogDeletionTest) {
998 db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
999 column_family_options_.arena_block_size = 4 * 1024;
1000 column_family_options_.write_buffer_size = 128000; // 128KB
1001 Open();
1002 CreateColumnFamilies({"one", "two", "three", "four"});
1003 // Each bracket is one log file. if number is in (), it means
1004 // we don't need it anymore (it's been flushed)
1005 // []
1006 AssertCountLiveLogFiles(0);
1007 PutRandomData(0, 1, 128);
1008 // [0]
1009 PutRandomData(1, 1, 128);
1010 // [0, 1]
1011 PutRandomData(1, 1000, 128);
1012 WaitForFlush(1);
1013 // [0, (1)] [1]
1014 AssertCountLiveLogFiles(2);
1015 PutRandomData(0, 1, 128);
1016 // [0, (1)] [0, 1]
1017 AssertCountLiveLogFiles(2);
1018 PutRandomData(2, 1, 128);
1019 // [0, (1)] [0, 1, 2]
1020 PutRandomData(2, 1000, 128);
1021 WaitForFlush(2);
1022 // [0, (1)] [0, 1, (2)] [2]
1023 AssertCountLiveLogFiles(3);
1024 PutRandomData(2, 1000, 128);
1025 WaitForFlush(2);
1026 // [0, (1)] [0, 1, (2)] [(2)] [2]
1027 AssertCountLiveLogFiles(4);
1028 PutRandomData(3, 1, 128);
1029 // [0, (1)] [0, 1, (2)] [(2)] [2, 3]
1030 PutRandomData(1, 1, 128);
1031 // [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3]
1032 AssertCountLiveLogFiles(4);
1033 PutRandomData(1, 1000, 128);
1034 WaitForFlush(1);
1035 // [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1]
1036 AssertCountLiveLogFiles(5);
1037 PutRandomData(0, 1000, 128);
1038 WaitForFlush(0);
1039 // [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0]
1040 // delete obsolete logs -->
1041 // [(1), 2, 3] [1, (0)] [0]
1042 AssertCountLiveLogFiles(3);
1043 PutRandomData(0, 1000, 128);
1044 WaitForFlush(0);
1045 // [(1), 2, 3] [1, (0)], [(0)] [0]
1046 AssertCountLiveLogFiles(4);
1047 PutRandomData(1, 1000, 128);
1048 WaitForFlush(1);
1049 // [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1]
1050 AssertCountLiveLogFiles(5);
1051 PutRandomData(2, 1000, 128);
1052 WaitForFlush(2);
1053 // [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2]
1054 AssertCountLiveLogFiles(6);
1055 PutRandomData(3, 1000, 128);
1056 WaitForFlush(3);
1057 // [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3]
1058 // delete obsolete logs -->
1059 // [0, (1)] [1, (2)], [2, (3)] [3]
1060 AssertCountLiveLogFiles(4);
1061 Close();
1062 }
1063 #endif // !ROCKSDB_LITE
1064
1065 TEST_P(ColumnFamilyTest, CrashAfterFlush) {
1066 std::unique_ptr<FaultInjectionTestEnv> fault_env(
1067 new FaultInjectionTestEnv(env_));
1068 db_options_.env = fault_env.get();
1069 Open();
1070 CreateColumnFamilies({"one"});
1071
1072 WriteBatch batch;
1073 batch.Put(handles_[0], Slice("foo"), Slice("bar"));
1074 batch.Put(handles_[1], Slice("foo"), Slice("bar"));
1075 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1076 Flush(0);
1077 fault_env->SetFilesystemActive(false);
1078
1079 std::vector<std::string> names;
1080 for (auto name : names_) {
1081 if (name != "") {
1082 names.push_back(name);
1083 }
1084 }
1085 Close();
1086 fault_env->DropUnsyncedFileData();
1087 fault_env->ResetState();
1088 Open(names, {});
1089
1090 // Write batch should be atomic.
1091 ASSERT_EQ(Get(0, "foo"), Get(1, "foo"));
1092
1093 Close();
1094 db_options_.env = env_;
1095 }
1096
1097 TEST_P(ColumnFamilyTest, OpenNonexistentColumnFamily) {
1098 ASSERT_OK(TryOpen({"default"}));
1099 Close();
1100 ASSERT_TRUE(TryOpen({"default", "dne"}).IsInvalidArgument());
1101 }
1102
1103 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported
1104 // Makes sure that obsolete log files get deleted
1105 TEST_P(ColumnFamilyTest, DifferentWriteBufferSizes) {
1106 // disable flushing stale column families
1107 db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
1108 Open();
1109 CreateColumnFamilies({"one", "two", "three"});
1110 ColumnFamilyOptions default_cf, one, two, three;
1111 // setup options. all column families have max_write_buffer_number setup to 10
1112 // "default" -> 100KB memtable, start flushing immediatelly
1113 // "one" -> 200KB memtable, start flushing with two immutable memtables
1114 // "two" -> 1MB memtable, start flushing with three immutable memtables
1115 // "three" -> 90KB memtable, start flushing with four immutable memtables
1116 default_cf.write_buffer_size = 100000;
1117 default_cf.arena_block_size = 4 * 4096;
1118 default_cf.max_write_buffer_number = 10;
1119 default_cf.min_write_buffer_number_to_merge = 1;
1120 default_cf.max_write_buffer_number_to_maintain = 0;
1121 one.write_buffer_size = 200000;
1122 one.arena_block_size = 4 * 4096;
1123 one.max_write_buffer_number = 10;
1124 one.min_write_buffer_number_to_merge = 2;
1125 one.max_write_buffer_number_to_maintain = 1;
1126 two.write_buffer_size = 1000000;
1127 two.arena_block_size = 4 * 4096;
1128 two.max_write_buffer_number = 10;
1129 two.min_write_buffer_number_to_merge = 3;
1130 two.max_write_buffer_number_to_maintain = 2;
1131 three.write_buffer_size = 4096 * 22;
1132 three.arena_block_size = 4096;
1133 three.max_write_buffer_number = 10;
1134 three.min_write_buffer_number_to_merge = 4;
1135 three.max_write_buffer_number_to_maintain = -1;
1136
1137 Reopen({default_cf, one, two, three});
1138
1139 int micros_wait_for_flush = 10000;
1140 PutRandomData(0, 100, 1000);
1141 WaitForFlush(0);
1142 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1143 AssertCountLiveLogFiles(1);
1144 PutRandomData(1, 200, 1000);
1145 env_->SleepForMicroseconds(micros_wait_for_flush);
1146 AssertNumberOfImmutableMemtables({0, 1, 0, 0});
1147 AssertCountLiveLogFiles(2);
1148 PutRandomData(2, 1000, 1000);
1149 env_->SleepForMicroseconds(micros_wait_for_flush);
1150 AssertNumberOfImmutableMemtables({0, 1, 1, 0});
1151 AssertCountLiveLogFiles(3);
1152 PutRandomData(2, 1000, 1000);
1153 env_->SleepForMicroseconds(micros_wait_for_flush);
1154 AssertNumberOfImmutableMemtables({0, 1, 2, 0});
1155 AssertCountLiveLogFiles(4);
1156 PutRandomData(3, 93, 990);
1157 env_->SleepForMicroseconds(micros_wait_for_flush);
1158 AssertNumberOfImmutableMemtables({0, 1, 2, 1});
1159 AssertCountLiveLogFiles(5);
1160 PutRandomData(3, 88, 990);
1161 env_->SleepForMicroseconds(micros_wait_for_flush);
1162 AssertNumberOfImmutableMemtables({0, 1, 2, 2});
1163 AssertCountLiveLogFiles(6);
1164 PutRandomData(3, 88, 990);
1165 env_->SleepForMicroseconds(micros_wait_for_flush);
1166 AssertNumberOfImmutableMemtables({0, 1, 2, 3});
1167 AssertCountLiveLogFiles(7);
1168 PutRandomData(0, 100, 1000);
1169 WaitForFlush(0);
1170 AssertNumberOfImmutableMemtables({0, 1, 2, 3});
1171 AssertCountLiveLogFiles(8);
1172 PutRandomData(2, 100, 10000);
1173 WaitForFlush(2);
1174 AssertNumberOfImmutableMemtables({0, 1, 0, 3});
1175 AssertCountLiveLogFiles(9);
1176 PutRandomData(3, 88, 990);
1177 WaitForFlush(3);
1178 AssertNumberOfImmutableMemtables({0, 1, 0, 0});
1179 AssertCountLiveLogFiles(10);
1180 PutRandomData(3, 88, 990);
1181 env_->SleepForMicroseconds(micros_wait_for_flush);
1182 AssertNumberOfImmutableMemtables({0, 1, 0, 1});
1183 AssertCountLiveLogFiles(11);
1184 PutRandomData(1, 200, 1000);
1185 WaitForFlush(1);
1186 AssertNumberOfImmutableMemtables({0, 0, 0, 1});
1187 AssertCountLiveLogFiles(5);
1188 PutRandomData(3, 88 * 3, 990);
1189 WaitForFlush(3);
1190 PutRandomData(3, 88 * 4, 990);
1191 WaitForFlush(3);
1192 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1193 AssertCountLiveLogFiles(12);
1194 PutRandomData(0, 100, 1000);
1195 WaitForFlush(0);
1196 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1197 AssertCountLiveLogFiles(12);
1198 PutRandomData(2, 3 * 1000, 1000);
1199 WaitForFlush(2);
1200 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1201 AssertCountLiveLogFiles(12);
1202 PutRandomData(1, 2*200, 1000);
1203 WaitForFlush(1);
1204 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1205 AssertCountLiveLogFiles(7);
1206 Close();
1207 }
1208 #endif // !ROCKSDB_LITE
1209
1210 // The test is commented out because we want to test that snapshot is
1211 // not created for memtables not supported it, but There isn't a memtable
1212 // that doesn't support snapshot right now. If we have one later, we can
1213 // re-enable the test.
1214 //
1215 // #ifndef ROCKSDB_LITE // Cuckoo is not supported in lite
1216 // TEST_P(ColumnFamilyTest, MemtableNotSupportSnapshot) {
1217 // db_options_.allow_concurrent_memtable_write = false;
1218 // Open();
1219 // auto* s1 = dbfull()->GetSnapshot();
1220 // ASSERT_TRUE(s1 != nullptr);
1221 // dbfull()->ReleaseSnapshot(s1);
1222
1223 // // Add a column family that doesn't support snapshot
1224 // ColumnFamilyOptions first;
1225 // first.memtable_factory.reset(new DummyMemtableNotSupportingSnapshot());
1226 // CreateColumnFamilies({"first"}, {first});
1227 // auto* s2 = dbfull()->GetSnapshot();
1228 // ASSERT_TRUE(s2 == nullptr);
1229
1230 // // Add a column family that supports snapshot. Snapshot stays not
1231 // supported. ColumnFamilyOptions second; CreateColumnFamilies({"second"},
1232 // {second}); auto* s3 = dbfull()->GetSnapshot(); ASSERT_TRUE(s3 == nullptr);
1233 // Close();
1234 // }
1235 // #endif // !ROCKSDB_LITE
1236
1237 class TestComparator : public Comparator {
1238 int Compare(const rocksdb::Slice& /*a*/,
1239 const rocksdb::Slice& /*b*/) const override {
1240 return 0;
1241 }
1242 const char* Name() const override { return "Test"; }
1243 void FindShortestSeparator(std::string* /*start*/,
1244 const rocksdb::Slice& /*limit*/) const override {}
1245 void FindShortSuccessor(std::string* /*key*/) const override {}
1246 };
1247
1248 static TestComparator third_comparator;
1249 static TestComparator fourth_comparator;
1250
1251 // Test that we can retrieve the comparator from a created CF
1252 TEST_P(ColumnFamilyTest, GetComparator) {
1253 Open();
1254 // Add a column family with no comparator specified
1255 CreateColumnFamilies({"first"});
1256 const Comparator* comp = handles_[0]->GetComparator();
1257 ASSERT_EQ(comp, BytewiseComparator());
1258
1259 // Add three column families - one with no comparator and two
1260 // with comparators specified
1261 ColumnFamilyOptions second, third, fourth;
1262 second.comparator = &third_comparator;
1263 third.comparator = &fourth_comparator;
1264 CreateColumnFamilies({"second", "third", "fourth"}, {second, third, fourth});
1265 ASSERT_EQ(handles_[1]->GetComparator(), BytewiseComparator());
1266 ASSERT_EQ(handles_[2]->GetComparator(), &third_comparator);
1267 ASSERT_EQ(handles_[3]->GetComparator(), &fourth_comparator);
1268 Close();
1269 }
1270
1271 TEST_P(ColumnFamilyTest, DifferentMergeOperators) {
1272 Open();
1273 CreateColumnFamilies({"first", "second"});
1274 ColumnFamilyOptions default_cf, first, second;
1275 first.merge_operator = MergeOperators::CreateUInt64AddOperator();
1276 second.merge_operator = MergeOperators::CreateStringAppendOperator();
1277 Reopen({default_cf, first, second});
1278
1279 std::string one, two, three;
1280 PutFixed64(&one, 1);
1281 PutFixed64(&two, 2);
1282 PutFixed64(&three, 3);
1283
1284 ASSERT_OK(Put(0, "foo", two));
1285 ASSERT_OK(Put(0, "foo", one));
1286 ASSERT_TRUE(Merge(0, "foo", two).IsNotSupported());
1287 ASSERT_EQ(Get(0, "foo"), one);
1288
1289 ASSERT_OK(Put(1, "foo", two));
1290 ASSERT_OK(Put(1, "foo", one));
1291 ASSERT_OK(Merge(1, "foo", two));
1292 ASSERT_EQ(Get(1, "foo"), three);
1293
1294 ASSERT_OK(Put(2, "foo", two));
1295 ASSERT_OK(Put(2, "foo", one));
1296 ASSERT_OK(Merge(2, "foo", two));
1297 ASSERT_EQ(Get(2, "foo"), one + "," + two);
1298 Close();
1299 }
1300
1301 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported
1302 TEST_P(ColumnFamilyTest, DifferentCompactionStyles) {
1303 Open();
1304 CreateColumnFamilies({"one", "two"});
1305 ColumnFamilyOptions default_cf, one, two;
1306 db_options_.max_open_files = 20; // only 10 files in file cache
1307
1308 default_cf.compaction_style = kCompactionStyleLevel;
1309 default_cf.num_levels = 3;
1310 default_cf.write_buffer_size = 64 << 10; // 64KB
1311 default_cf.target_file_size_base = 30 << 10;
1312 default_cf.max_compaction_bytes = static_cast<uint64_t>(1) << 60;
1313
1314 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1315 table_options.no_block_cache = true;
1316 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1317
1318 one.compaction_style = kCompactionStyleUniversal;
1319
1320 one.num_levels = 1;
1321 // trigger compaction if there are >= 4 files
1322 one.level0_file_num_compaction_trigger = 4;
1323 one.write_buffer_size = 120000;
1324
1325 two.compaction_style = kCompactionStyleLevel;
1326 two.num_levels = 4;
1327 two.level0_file_num_compaction_trigger = 3;
1328 two.write_buffer_size = 100000;
1329
1330 Reopen({default_cf, one, two});
1331
1332 // SETUP column family "one" -- universal style
1333 for (int i = 0; i < one.level0_file_num_compaction_trigger - 1; ++i) {
1334 PutRandomData(1, 10, 12000);
1335 PutRandomData(1, 1, 10);
1336 WaitForFlush(1);
1337 AssertFilesPerLevel(ToString(i + 1), 1);
1338 }
1339
1340 // SETUP column family "two" -- level style with 4 levels
1341 for (int i = 0; i < two.level0_file_num_compaction_trigger - 1; ++i) {
1342 PutRandomData(2, 10, 12000);
1343 PutRandomData(2, 1, 10);
1344 WaitForFlush(2);
1345 AssertFilesPerLevel(ToString(i + 1), 2);
1346 }
1347
1348 // TRIGGER compaction "one"
1349 PutRandomData(1, 10, 12000);
1350 PutRandomData(1, 1, 10);
1351
1352 // TRIGGER compaction "two"
1353 PutRandomData(2, 10, 12000);
1354 PutRandomData(2, 1, 10);
1355
1356 // WAIT for compactions
1357 WaitForCompaction();
1358
1359 // VERIFY compaction "one"
1360 AssertFilesPerLevel("1", 1);
1361
1362 // VERIFY compaction "two"
1363 AssertFilesPerLevel("0,1", 2);
1364 CompactAll(2);
1365 AssertFilesPerLevel("0,1", 2);
1366
1367 Close();
1368 }
1369 #endif // !ROCKSDB_LITE
1370
1371 #ifndef ROCKSDB_LITE
1372 // Sync points not supported in RocksDB Lite
1373
1374 TEST_P(ColumnFamilyTest, MultipleManualCompactions) {
1375 Open();
1376 CreateColumnFamilies({"one", "two"});
1377 ColumnFamilyOptions default_cf, one, two;
1378 db_options_.max_open_files = 20; // only 10 files in file cache
1379 db_options_.max_background_compactions = 3;
1380
1381 default_cf.compaction_style = kCompactionStyleLevel;
1382 default_cf.num_levels = 3;
1383 default_cf.write_buffer_size = 64 << 10; // 64KB
1384 default_cf.target_file_size_base = 30 << 10;
1385 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1386 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1387 table_options.no_block_cache = true;
1388 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1389
1390 one.compaction_style = kCompactionStyleUniversal;
1391
1392 one.num_levels = 1;
1393 // trigger compaction if there are >= 4 files
1394 one.level0_file_num_compaction_trigger = 4;
1395 one.write_buffer_size = 120000;
1396
1397 two.compaction_style = kCompactionStyleLevel;
1398 two.num_levels = 4;
1399 two.level0_file_num_compaction_trigger = 3;
1400 two.write_buffer_size = 100000;
1401
1402 Reopen({default_cf, one, two});
1403
1404 // SETUP column family "one" -- universal style
1405 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1406 PutRandomData(1, 10, 12000, true);
1407 PutRandomData(1, 1, 10, true);
1408 WaitForFlush(1);
1409 AssertFilesPerLevel(ToString(i + 1), 1);
1410 }
1411 bool cf_1_1 = true;
1412 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1413 {{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"},
1414 {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"},
1415 {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}});
1416 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1417 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1418 if (cf_1_1) {
1419 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4");
1420 cf_1_1 = false;
1421 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3");
1422 }
1423 });
1424
1425 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1426 std::vector<port::Thread> threads;
1427 threads.emplace_back([&] {
1428 CompactRangeOptions compact_options;
1429 compact_options.exclusive_manual_compaction = false;
1430 ASSERT_OK(
1431 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1432 });
1433
1434 // SETUP column family "two" -- level style with 4 levels
1435 for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
1436 PutRandomData(2, 10, 12000);
1437 PutRandomData(2, 1, 10);
1438 WaitForFlush(2);
1439 AssertFilesPerLevel(ToString(i + 1), 2);
1440 }
1441 threads.emplace_back([&] {
1442 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1");
1443 CompactRangeOptions compact_options;
1444 compact_options.exclusive_manual_compaction = false;
1445 ASSERT_OK(
1446 db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
1447 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2");
1448 });
1449
1450 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5");
1451 for (auto& t : threads) {
1452 t.join();
1453 }
1454
1455 // VERIFY compaction "one"
1456 AssertFilesPerLevel("1", 1);
1457
1458 // VERIFY compaction "two"
1459 AssertFilesPerLevel("0,1", 2);
1460 CompactAll(2);
1461 AssertFilesPerLevel("0,1", 2);
1462 // Compare against saved keys
1463 std::set<std::string>::iterator key_iter = keys_[1].begin();
1464 while (key_iter != keys_[1].end()) {
1465 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1466 key_iter++;
1467 }
1468 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1469 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1470 Close();
1471 }
1472
1473 TEST_P(ColumnFamilyTest, AutomaticAndManualCompactions) {
1474 Open();
1475 CreateColumnFamilies({"one", "two"});
1476 ColumnFamilyOptions default_cf, one, two;
1477 db_options_.max_open_files = 20; // only 10 files in file cache
1478 db_options_.max_background_compactions = 3;
1479
1480 default_cf.compaction_style = kCompactionStyleLevel;
1481 default_cf.num_levels = 3;
1482 default_cf.write_buffer_size = 64 << 10; // 64KB
1483 default_cf.target_file_size_base = 30 << 10;
1484 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1485 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1486 ;
1487 table_options.no_block_cache = true;
1488 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1489
1490 one.compaction_style = kCompactionStyleUniversal;
1491
1492 one.num_levels = 1;
1493 // trigger compaction if there are >= 4 files
1494 one.level0_file_num_compaction_trigger = 4;
1495 one.write_buffer_size = 120000;
1496
1497 two.compaction_style = kCompactionStyleLevel;
1498 two.num_levels = 4;
1499 two.level0_file_num_compaction_trigger = 3;
1500 two.write_buffer_size = 100000;
1501
1502 Reopen({default_cf, one, two});
1503 // make sure all background compaction jobs can be scheduled
1504 auto stop_token =
1505 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1506
1507 bool cf_1_1 = true;
1508 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1509 {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"},
1510 {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"},
1511 {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}});
1512 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1513 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1514 if (cf_1_1) {
1515 cf_1_1 = false;
1516 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
1517 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
1518 }
1519 });
1520 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1521 // SETUP column family "one" -- universal style
1522 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1523 PutRandomData(1, 10, 12000, true);
1524 PutRandomData(1, 1, 10, true);
1525 WaitForFlush(1);
1526 AssertFilesPerLevel(ToString(i + 1), 1);
1527 }
1528
1529 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
1530
1531 // SETUP column family "two" -- level style with 4 levels
1532 for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
1533 PutRandomData(2, 10, 12000);
1534 PutRandomData(2, 1, 10);
1535 WaitForFlush(2);
1536 AssertFilesPerLevel(ToString(i + 1), 2);
1537 }
1538 rocksdb::port::Thread threads([&] {
1539 CompactRangeOptions compact_options;
1540 compact_options.exclusive_manual_compaction = false;
1541 ASSERT_OK(
1542 db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
1543 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
1544 });
1545
1546 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
1547 threads.join();
1548
1549 // WAIT for compactions
1550 WaitForCompaction();
1551
1552 // VERIFY compaction "one"
1553 AssertFilesPerLevel("1", 1);
1554
1555 // VERIFY compaction "two"
1556 AssertFilesPerLevel("0,1", 2);
1557 CompactAll(2);
1558 AssertFilesPerLevel("0,1", 2);
1559 // Compare against saved keys
1560 std::set<std::string>::iterator key_iter = keys_[1].begin();
1561 while (key_iter != keys_[1].end()) {
1562 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1563 key_iter++;
1564 }
1565 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1566 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1567 }
1568
1569 TEST_P(ColumnFamilyTest, ManualAndAutomaticCompactions) {
1570 Open();
1571 CreateColumnFamilies({"one", "two"});
1572 ColumnFamilyOptions default_cf, one, two;
1573 db_options_.max_open_files = 20; // only 10 files in file cache
1574 db_options_.max_background_compactions = 3;
1575
1576 default_cf.compaction_style = kCompactionStyleLevel;
1577 default_cf.num_levels = 3;
1578 default_cf.write_buffer_size = 64 << 10; // 64KB
1579 default_cf.target_file_size_base = 30 << 10;
1580 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1581 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1582 ;
1583 table_options.no_block_cache = true;
1584 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1585
1586 one.compaction_style = kCompactionStyleUniversal;
1587
1588 one.num_levels = 1;
1589 // trigger compaction if there are >= 4 files
1590 one.level0_file_num_compaction_trigger = 4;
1591 one.write_buffer_size = 120000;
1592
1593 two.compaction_style = kCompactionStyleLevel;
1594 two.num_levels = 4;
1595 two.level0_file_num_compaction_trigger = 3;
1596 two.write_buffer_size = 100000;
1597
1598 Reopen({default_cf, one, two});
1599 // make sure all background compaction jobs can be scheduled
1600 auto stop_token =
1601 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1602
1603 // SETUP column family "one" -- universal style
1604 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1605 PutRandomData(1, 10, 12000, true);
1606 PutRandomData(1, 1, 10, true);
1607 WaitForFlush(1);
1608 AssertFilesPerLevel(ToString(i + 1), 1);
1609 }
1610 bool cf_1_1 = true;
1611 bool cf_1_2 = true;
1612 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1613 {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"},
1614 {"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"},
1615 {"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}});
1616 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1617 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1618 if (cf_1_1) {
1619 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1620 cf_1_1 = false;
1621 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1622 } else if (cf_1_2) {
1623 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1624 cf_1_2 = false;
1625 }
1626 });
1627
1628 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1629 rocksdb::port::Thread threads([&] {
1630 CompactRangeOptions compact_options;
1631 compact_options.exclusive_manual_compaction = false;
1632 ASSERT_OK(
1633 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1634 });
1635
1636 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1637
1638 // SETUP column family "two" -- level style with 4 levels
1639 for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) {
1640 PutRandomData(2, 10, 12000);
1641 PutRandomData(2, 1, 10);
1642 WaitForFlush(2);
1643 AssertFilesPerLevel(ToString(i + 1), 2);
1644 }
1645 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1646 threads.join();
1647
1648 // WAIT for compactions
1649 WaitForCompaction();
1650
1651 // VERIFY compaction "one"
1652 AssertFilesPerLevel("1", 1);
1653
1654 // VERIFY compaction "two"
1655 AssertFilesPerLevel("0,1", 2);
1656 CompactAll(2);
1657 AssertFilesPerLevel("0,1", 2);
1658 // Compare against saved keys
1659 std::set<std::string>::iterator key_iter = keys_[1].begin();
1660 while (key_iter != keys_[1].end()) {
1661 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1662 key_iter++;
1663 }
1664 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1665 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1666 }
1667
1668 TEST_P(ColumnFamilyTest, SameCFManualManualCompactions) {
1669 Open();
1670 CreateColumnFamilies({"one"});
1671 ColumnFamilyOptions default_cf, one;
1672 db_options_.max_open_files = 20; // only 10 files in file cache
1673 db_options_.max_background_compactions = 3;
1674
1675 default_cf.compaction_style = kCompactionStyleLevel;
1676 default_cf.num_levels = 3;
1677 default_cf.write_buffer_size = 64 << 10; // 64KB
1678 default_cf.target_file_size_base = 30 << 10;
1679 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1680 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1681 ;
1682 table_options.no_block_cache = true;
1683 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1684
1685 one.compaction_style = kCompactionStyleUniversal;
1686
1687 one.num_levels = 1;
1688 // trigger compaction if there are >= 4 files
1689 one.level0_file_num_compaction_trigger = 4;
1690 one.write_buffer_size = 120000;
1691
1692 Reopen({default_cf, one});
1693 // make sure all background compaction jobs can be scheduled
1694 auto stop_token =
1695 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1696
1697 // SETUP column family "one" -- universal style
1698 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1699 PutRandomData(1, 10, 12000, true);
1700 PutRandomData(1, 1, 10, true);
1701 WaitForFlush(1);
1702 AssertFilesPerLevel(ToString(i + 1), 1);
1703 }
1704 bool cf_1_1 = true;
1705 bool cf_1_2 = true;
1706 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1707 {{"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:2"},
1708 {"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:5"},
1709 {"ColumnFamilyTest::ManualManual:1", "ColumnFamilyTest::ManualManual:2"},
1710 {"ColumnFamilyTest::ManualManual:1",
1711 "ColumnFamilyTest::ManualManual:3"}});
1712 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1713 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1714 if (cf_1_1) {
1715 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:4");
1716 cf_1_1 = false;
1717 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:3");
1718 } else if (cf_1_2) {
1719 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:2");
1720 cf_1_2 = false;
1721 }
1722 });
1723
1724 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1725 rocksdb::port::Thread threads([&] {
1726 CompactRangeOptions compact_options;
1727 compact_options.exclusive_manual_compaction = true;
1728 ASSERT_OK(
1729 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1730 });
1731
1732 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:5");
1733
1734 WaitForFlush(1);
1735
1736 // Add more L0 files and force another manual compaction
1737 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1738 PutRandomData(1, 10, 12000, true);
1739 PutRandomData(1, 1, 10, true);
1740 WaitForFlush(1);
1741 AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1742 1);
1743 }
1744
1745 rocksdb::port::Thread threads1([&] {
1746 CompactRangeOptions compact_options;
1747 compact_options.exclusive_manual_compaction = false;
1748 ASSERT_OK(
1749 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1750 });
1751
1752 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:1");
1753
1754 threads.join();
1755 threads1.join();
1756 WaitForCompaction();
1757 // VERIFY compaction "one"
1758 ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
1759
1760 // Compare against saved keys
1761 std::set<std::string>::iterator key_iter = keys_[1].begin();
1762 while (key_iter != keys_[1].end()) {
1763 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1764 key_iter++;
1765 }
1766 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1767 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1768 }
1769
1770 TEST_P(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
1771 Open();
1772 CreateColumnFamilies({"one"});
1773 ColumnFamilyOptions default_cf, one;
1774 db_options_.max_open_files = 20; // only 10 files in file cache
1775 db_options_.max_background_compactions = 3;
1776
1777 default_cf.compaction_style = kCompactionStyleLevel;
1778 default_cf.num_levels = 3;
1779 default_cf.write_buffer_size = 64 << 10; // 64KB
1780 default_cf.target_file_size_base = 30 << 10;
1781 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1782 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1783 ;
1784 table_options.no_block_cache = true;
1785 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1786
1787 one.compaction_style = kCompactionStyleUniversal;
1788
1789 one.num_levels = 1;
1790 // trigger compaction if there are >= 4 files
1791 one.level0_file_num_compaction_trigger = 4;
1792 one.write_buffer_size = 120000;
1793
1794 Reopen({default_cf, one});
1795 // make sure all background compaction jobs can be scheduled
1796 auto stop_token =
1797 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1798
1799 // SETUP column family "one" -- universal style
1800 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1801 PutRandomData(1, 10, 12000, true);
1802 PutRandomData(1, 1, 10, true);
1803 WaitForFlush(1);
1804 AssertFilesPerLevel(ToString(i + 1), 1);
1805 }
1806 bool cf_1_1 = true;
1807 bool cf_1_2 = true;
1808 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1809 {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
1810 {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
1811 {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"},
1812 {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
1813 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1814 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1815 if (cf_1_1) {
1816 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1817 cf_1_1 = false;
1818 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1819 } else if (cf_1_2) {
1820 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1821 cf_1_2 = false;
1822 }
1823 });
1824
1825 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1826 rocksdb::port::Thread threads([&] {
1827 CompactRangeOptions compact_options;
1828 compact_options.exclusive_manual_compaction = false;
1829 ASSERT_OK(
1830 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1831 });
1832
1833 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1834
1835 WaitForFlush(1);
1836
1837 // Add more L0 files and force automatic compaction
1838 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1839 PutRandomData(1, 10, 12000, true);
1840 PutRandomData(1, 1, 10, true);
1841 WaitForFlush(1);
1842 AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1843 1);
1844 }
1845
1846 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1847
1848 threads.join();
1849 WaitForCompaction();
1850 // VERIFY compaction "one"
1851 ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
1852
1853 // Compare against saved keys
1854 std::set<std::string>::iterator key_iter = keys_[1].begin();
1855 while (key_iter != keys_[1].end()) {
1856 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1857 key_iter++;
1858 }
1859 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1860 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1861 }
1862
1863 TEST_P(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
1864 Open();
1865 CreateColumnFamilies({"one"});
1866 ColumnFamilyOptions default_cf, one;
1867 db_options_.max_open_files = 20; // only 10 files in file cache
1868 db_options_.max_background_compactions = 3;
1869
1870 default_cf.compaction_style = kCompactionStyleLevel;
1871 default_cf.num_levels = 3;
1872 default_cf.write_buffer_size = 64 << 10; // 64KB
1873 default_cf.target_file_size_base = 30 << 10;
1874 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1875 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1876 ;
1877 table_options.no_block_cache = true;
1878 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1879
1880 one.compaction_style = kCompactionStyleLevel;
1881
1882 one.num_levels = 1;
1883 // trigger compaction if there are >= 4 files
1884 one.level0_file_num_compaction_trigger = 3;
1885 one.write_buffer_size = 120000;
1886
1887 Reopen({default_cf, one});
1888 // make sure all background compaction jobs can be scheduled
1889 auto stop_token =
1890 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1891
1892 // SETUP column family "one" -- level style
1893 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1894 PutRandomData(1, 10, 12000, true);
1895 PutRandomData(1, 1, 10, true);
1896 WaitForFlush(1);
1897 AssertFilesPerLevel(ToString(i + 1), 1);
1898 }
1899 bool cf_1_1 = true;
1900 bool cf_1_2 = true;
1901 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1902 {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
1903 {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
1904 {"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"},
1905 {"LevelCompactionPicker::PickCompactionBySize:0",
1906 "ColumnFamilyTest::ManualAuto:3"},
1907 {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
1908 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1909 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1910 if (cf_1_1) {
1911 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1912 cf_1_1 = false;
1913 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1914 } else if (cf_1_2) {
1915 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1916 cf_1_2 = false;
1917 }
1918 });
1919
1920 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1921 rocksdb::port::Thread threads([&] {
1922 CompactRangeOptions compact_options;
1923 compact_options.exclusive_manual_compaction = false;
1924 ASSERT_OK(
1925 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1926 });
1927
1928 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1929
1930 // Add more L0 files and force automatic compaction
1931 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1932 PutRandomData(1, 10, 12000, true);
1933 PutRandomData(1, 1, 10, true);
1934 WaitForFlush(1);
1935 AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1936 1);
1937 }
1938
1939 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1940
1941 threads.join();
1942 WaitForCompaction();
1943 // VERIFY compaction "one"
1944 AssertFilesPerLevel("0,1", 1);
1945
1946 // Compare against saved keys
1947 std::set<std::string>::iterator key_iter = keys_[1].begin();
1948 while (key_iter != keys_[1].end()) {
1949 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1950 key_iter++;
1951 }
1952 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1953 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
1954 }
1955
1956 // In this test, we generate enough files to trigger automatic compactions.
1957 // The automatic compaction waits in NonTrivial:AfterRun
1958 // We generate more files and then trigger an automatic compaction
1959 // This will wait because the automatic compaction has files it needs.
1960 // Once the conflict is hit, the automatic compaction starts and ends
1961 // Then the manual will run and end.
1962 TEST_P(ColumnFamilyTest, SameCFAutomaticManualCompactions) {
1963 Open();
1964 CreateColumnFamilies({"one"});
1965 ColumnFamilyOptions default_cf, one;
1966 db_options_.max_open_files = 20; // only 10 files in file cache
1967 db_options_.max_background_compactions = 3;
1968
1969 default_cf.compaction_style = kCompactionStyleLevel;
1970 default_cf.num_levels = 3;
1971 default_cf.write_buffer_size = 64 << 10; // 64KB
1972 default_cf.target_file_size_base = 30 << 10;
1973 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1974 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1975 ;
1976 table_options.no_block_cache = true;
1977 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1978
1979 one.compaction_style = kCompactionStyleUniversal;
1980
1981 one.num_levels = 1;
1982 // trigger compaction if there are >= 4 files
1983 one.level0_file_num_compaction_trigger = 4;
1984 one.write_buffer_size = 120000;
1985
1986 Reopen({default_cf, one});
1987 // make sure all background compaction jobs can be scheduled
1988 auto stop_token =
1989 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1990
1991 bool cf_1_1 = true;
1992 bool cf_1_2 = true;
1993 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1994 {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"},
1995 {"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"},
1996 {"CompactionPicker::CompactRange:Conflict",
1997 "ColumnFamilyTest::AutoManual:3"}});
1998 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1999 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
2000 if (cf_1_1) {
2001 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
2002 cf_1_1 = false;
2003 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
2004 } else if (cf_1_2) {
2005 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
2006 cf_1_2 = false;
2007 }
2008 });
2009
2010 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2011
2012 // SETUP column family "one" -- universal style
2013 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
2014 PutRandomData(1, 10, 12000, true);
2015 PutRandomData(1, 1, 10, true);
2016 WaitForFlush(1);
2017 AssertFilesPerLevel(ToString(i + 1), 1);
2018 }
2019
2020 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
2021
2022 // Add another L0 file and force automatic compaction
2023 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
2024 PutRandomData(1, 10, 12000, true);
2025 PutRandomData(1, 1, 10, true);
2026 WaitForFlush(1);
2027 }
2028
2029 CompactRangeOptions compact_options;
2030 compact_options.exclusive_manual_compaction = false;
2031 ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
2032
2033 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
2034
2035 WaitForCompaction();
2036 // VERIFY compaction "one"
2037 AssertFilesPerLevel("1", 1);
2038 // Compare against saved keys
2039 std::set<std::string>::iterator key_iter = keys_[1].begin();
2040 while (key_iter != keys_[1].end()) {
2041 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
2042 key_iter++;
2043 }
2044 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2045 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
2046 }
2047 #endif // !ROCKSDB_LITE
2048
2049 #ifndef ROCKSDB_LITE // Tailing iterator not supported
2050 namespace {
2051 std::string IterStatus(Iterator* iter) {
2052 std::string result;
2053 if (iter->Valid()) {
2054 result = iter->key().ToString() + "->" + iter->value().ToString();
2055 } else {
2056 result = "(invalid)";
2057 }
2058 return result;
2059 }
2060 } // anonymous namespace
2061
2062 TEST_P(ColumnFamilyTest, NewIteratorsTest) {
2063 // iter == 0 -- no tailing
2064 // iter == 2 -- tailing
2065 for (int iter = 0; iter < 2; ++iter) {
2066 Open();
2067 CreateColumnFamiliesAndReopen({"one", "two"});
2068 ASSERT_OK(Put(0, "a", "b"));
2069 ASSERT_OK(Put(1, "b", "a"));
2070 ASSERT_OK(Put(2, "c", "m"));
2071 ASSERT_OK(Put(2, "v", "t"));
2072 std::vector<Iterator*> iterators;
2073 ReadOptions options;
2074 options.tailing = (iter == 1);
2075 ASSERT_OK(db_->NewIterators(options, handles_, &iterators));
2076
2077 for (auto it : iterators) {
2078 it->SeekToFirst();
2079 }
2080 ASSERT_EQ(IterStatus(iterators[0]), "a->b");
2081 ASSERT_EQ(IterStatus(iterators[1]), "b->a");
2082 ASSERT_EQ(IterStatus(iterators[2]), "c->m");
2083
2084 ASSERT_OK(Put(1, "x", "x"));
2085
2086 for (auto it : iterators) {
2087 it->Next();
2088 }
2089
2090 ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
2091 if (iter == 0) {
2092 // no tailing
2093 ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
2094 } else {
2095 // tailing
2096 ASSERT_EQ(IterStatus(iterators[1]), "x->x");
2097 }
2098 ASSERT_EQ(IterStatus(iterators[2]), "v->t");
2099
2100 for (auto it : iterators) {
2101 delete it;
2102 }
2103 Destroy();
2104 }
2105 }
2106 #endif // !ROCKSDB_LITE
2107
2108 #ifndef ROCKSDB_LITE // ReadOnlyDB is not supported
2109 TEST_P(ColumnFamilyTest, ReadOnlyDBTest) {
2110 Open();
2111 CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
2112 ASSERT_OK(Put(0, "a", "b"));
2113 ASSERT_OK(Put(1, "foo", "bla"));
2114 ASSERT_OK(Put(2, "foo", "blabla"));
2115 ASSERT_OK(Put(3, "foo", "blablabla"));
2116 ASSERT_OK(Put(4, "foo", "blablablabla"));
2117
2118 DropColumnFamilies({2});
2119 Close();
2120 // open only a subset of column families
2121 AssertOpenReadOnly({"default", "one", "four"});
2122 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
2123 ASSERT_EQ("bla", Get(1, "foo"));
2124 ASSERT_EQ("blablablabla", Get(2, "foo"));
2125
2126
2127 // test newiterators
2128 {
2129 std::vector<Iterator*> iterators;
2130 ASSERT_OK(db_->NewIterators(ReadOptions(), handles_, &iterators));
2131 for (auto it : iterators) {
2132 it->SeekToFirst();
2133 }
2134 ASSERT_EQ(IterStatus(iterators[0]), "a->b");
2135 ASSERT_EQ(IterStatus(iterators[1]), "foo->bla");
2136 ASSERT_EQ(IterStatus(iterators[2]), "foo->blablablabla");
2137 for (auto it : iterators) {
2138 it->Next();
2139 }
2140 ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
2141 ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
2142 ASSERT_EQ(IterStatus(iterators[2]), "(invalid)");
2143
2144 for (auto it : iterators) {
2145 delete it;
2146 }
2147 }
2148
2149 Close();
2150 // can't open dropped column family
2151 Status s = OpenReadOnly({"default", "one", "two"});
2152 ASSERT_TRUE(!s.ok());
2153
2154 // Can't open without specifying default column family
2155 s = OpenReadOnly({"one", "four"});
2156 ASSERT_TRUE(!s.ok());
2157 }
2158 #endif // !ROCKSDB_LITE
2159
2160 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported in lite
2161 TEST_P(ColumnFamilyTest, DontRollEmptyLogs) {
2162 Open();
2163 CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
2164
2165 for (size_t i = 0; i < handles_.size(); ++i) {
2166 PutRandomData(static_cast<int>(i), 10, 100);
2167 }
2168 int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls();
2169 // this will trigger the flushes
2170 for (int i = 0; i <= 4; ++i) {
2171 ASSERT_OK(Flush(i));
2172 }
2173
2174 for (int i = 0; i < 4; ++i) {
2175 WaitForFlush(i);
2176 }
2177 int total_new_writable_files =
2178 env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start;
2179 ASSERT_EQ(static_cast<size_t>(total_new_writable_files), handles_.size() + 1);
2180 Close();
2181 }
2182 #endif // !ROCKSDB_LITE
2183
2184 #ifndef ROCKSDB_LITE // WaitForCompaction() is not supported in lite
2185 TEST_P(ColumnFamilyTest, FlushStaleColumnFamilies) {
2186 Open();
2187 CreateColumnFamilies({"one", "two"});
2188 ColumnFamilyOptions default_cf, one, two;
2189 default_cf.write_buffer_size = 100000; // small write buffer size
2190 default_cf.arena_block_size = 4096;
2191 default_cf.disable_auto_compactions = true;
2192 one.disable_auto_compactions = true;
2193 two.disable_auto_compactions = true;
2194 db_options_.max_total_wal_size = 210000;
2195
2196 Reopen({default_cf, one, two});
2197
2198 PutRandomData(2, 1, 10); // 10 bytes
2199 for (int i = 0; i < 2; ++i) {
2200 PutRandomData(0, 100, 1000); // flush
2201 WaitForFlush(0);
2202
2203 AssertCountLiveFiles(i + 1);
2204 }
2205 // third flush. now, CF [two] should be detected as stale and flushed
2206 // column family 1 should not be flushed since it's empty
2207 PutRandomData(0, 100, 1000); // flush
2208 WaitForFlush(0);
2209 WaitForFlush(2);
2210 // 3 files for default column families, 1 file for column family [two], zero
2211 // files for column family [one], because it's empty
2212 AssertCountLiveFiles(4);
2213
2214 Flush(0);
2215 ASSERT_EQ(0, dbfull()->TEST_total_log_size());
2216 Close();
2217 }
2218 #endif // !ROCKSDB_LITE
2219
2220 TEST_P(ColumnFamilyTest, CreateMissingColumnFamilies) {
2221 Status s = TryOpen({"one", "two"});
2222 ASSERT_TRUE(!s.ok());
2223 db_options_.create_missing_column_families = true;
2224 s = TryOpen({"default", "one", "two"});
2225 ASSERT_TRUE(s.ok());
2226 Close();
2227 }
2228
2229 TEST_P(ColumnFamilyTest, SanitizeOptions) {
2230 DBOptions db_options;
2231 for (int s = kCompactionStyleLevel; s <= kCompactionStyleUniversal; ++s) {
2232 for (int l = 0; l <= 2; l++) {
2233 for (int i = 1; i <= 3; i++) {
2234 for (int j = 1; j <= 3; j++) {
2235 for (int k = 1; k <= 3; k++) {
2236 ColumnFamilyOptions original;
2237 original.compaction_style = static_cast<CompactionStyle>(s);
2238 original.num_levels = l;
2239 original.level0_stop_writes_trigger = i;
2240 original.level0_slowdown_writes_trigger = j;
2241 original.level0_file_num_compaction_trigger = k;
2242 original.write_buffer_size =
2243 l * 4 * 1024 * 1024 + i * 1024 * 1024 + j * 1024 + k;
2244
2245 ColumnFamilyOptions result =
2246 SanitizeOptions(ImmutableDBOptions(db_options), original);
2247 ASSERT_TRUE(result.level0_stop_writes_trigger >=
2248 result.level0_slowdown_writes_trigger);
2249 ASSERT_TRUE(result.level0_slowdown_writes_trigger >=
2250 result.level0_file_num_compaction_trigger);
2251 ASSERT_TRUE(result.level0_file_num_compaction_trigger ==
2252 original.level0_file_num_compaction_trigger);
2253 if (s == kCompactionStyleLevel) {
2254 ASSERT_GE(result.num_levels, 2);
2255 } else {
2256 ASSERT_GE(result.num_levels, 1);
2257 if (original.num_levels >= 1) {
2258 ASSERT_EQ(result.num_levels, original.num_levels);
2259 }
2260 }
2261
2262 // Make sure Sanitize options sets arena_block_size to 1/8 of
2263 // the write_buffer_size, rounded up to a multiple of 4k.
2264 size_t expected_arena_block_size =
2265 l * 4 * 1024 * 1024 / 8 + i * 1024 * 1024 / 8;
2266 if (j + k != 0) {
2267 // not a multiple of 4k, round up 4k
2268 expected_arena_block_size += 4 * 1024;
2269 }
2270 ASSERT_EQ(expected_arena_block_size, result.arena_block_size);
2271 }
2272 }
2273 }
2274 }
2275 }
2276 }
2277
2278 TEST_P(ColumnFamilyTest, ReadDroppedColumnFamily) {
2279 // iter 0 -- drop CF, don't reopen
2280 // iter 1 -- delete CF, reopen
2281 for (int iter = 0; iter < 2; ++iter) {
2282 db_options_.create_missing_column_families = true;
2283 db_options_.max_open_files = 20;
2284 // delete obsolete files always
2285 db_options_.delete_obsolete_files_period_micros = 0;
2286 Open({"default", "one", "two"});
2287 ColumnFamilyOptions options;
2288 options.level0_file_num_compaction_trigger = 100;
2289 options.level0_slowdown_writes_trigger = 200;
2290 options.level0_stop_writes_trigger = 200;
2291 options.write_buffer_size = 100000; // small write buffer size
2292 Reopen({options, options, options});
2293
2294 // 1MB should create ~10 files for each CF
2295 int kKeysNum = 10000;
2296 PutRandomData(0, kKeysNum, 100);
2297 PutRandomData(1, kKeysNum, 100);
2298 PutRandomData(2, kKeysNum, 100);
2299
2300 {
2301 std::unique_ptr<Iterator> iterator(
2302 db_->NewIterator(ReadOptions(), handles_[2]));
2303 iterator->SeekToFirst();
2304
2305 if (iter == 0) {
2306 // Drop CF two
2307 ASSERT_OK(db_->DropColumnFamily(handles_[2]));
2308 } else {
2309 // delete CF two
2310 db_->DestroyColumnFamilyHandle(handles_[2]);
2311 handles_[2] = nullptr;
2312 }
2313 // Make sure iterator created can still be used.
2314 int count = 0;
2315 for (; iterator->Valid(); iterator->Next()) {
2316 ASSERT_OK(iterator->status());
2317 ++count;
2318 }
2319 ASSERT_OK(iterator->status());
2320 ASSERT_EQ(count, kKeysNum);
2321 }
2322
2323 // Add bunch more data to other CFs
2324 PutRandomData(0, kKeysNum, 100);
2325 PutRandomData(1, kKeysNum, 100);
2326
2327 if (iter == 1) {
2328 Reopen();
2329 }
2330
2331 // Since we didn't delete CF handle, RocksDB's contract guarantees that
2332 // we're still able to read dropped CF
2333 for (int i = 0; i < 3; ++i) {
2334 std::unique_ptr<Iterator> iterator(
2335 db_->NewIterator(ReadOptions(), handles_[i]));
2336 int count = 0;
2337 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2338 ASSERT_OK(iterator->status());
2339 ++count;
2340 }
2341 ASSERT_OK(iterator->status());
2342 ASSERT_EQ(count, kKeysNum * ((i == 2) ? 1 : 2));
2343 }
2344
2345 Close();
2346 Destroy();
2347 }
2348 }
2349
2350 TEST_P(ColumnFamilyTest, FlushAndDropRaceCondition) {
2351 db_options_.create_missing_column_families = true;
2352 Open({"default", "one"});
2353 ColumnFamilyOptions options;
2354 options.level0_file_num_compaction_trigger = 100;
2355 options.level0_slowdown_writes_trigger = 200;
2356 options.level0_stop_writes_trigger = 200;
2357 options.max_write_buffer_number = 20;
2358 options.write_buffer_size = 100000; // small write buffer size
2359 Reopen({options, options});
2360
2361 rocksdb::SyncPoint::GetInstance()->LoadDependency(
2362 {{"VersionSet::LogAndApply::ColumnFamilyDrop:0",
2363 "FlushJob::WriteLevel0Table"},
2364 {"VersionSet::LogAndApply::ColumnFamilyDrop:1",
2365 "FlushJob::InstallResults"},
2366 {"FlushJob::InstallResults",
2367 "VersionSet::LogAndApply::ColumnFamilyDrop:2"}});
2368
2369 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2370 test::SleepingBackgroundTask sleeping_task;
2371
2372 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2373 Env::Priority::HIGH);
2374
2375 // 1MB should create ~10 files for each CF
2376 int kKeysNum = 10000;
2377 PutRandomData(1, kKeysNum, 100);
2378
2379 std::vector<port::Thread> threads;
2380 threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });
2381
2382 sleeping_task.WakeUp();
2383 sleeping_task.WaitUntilDone();
2384 sleeping_task.Reset();
2385 // now we sleep again. this is just so we're certain that flush job finished
2386 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2387 Env::Priority::HIGH);
2388 sleeping_task.WakeUp();
2389 sleeping_task.WaitUntilDone();
2390
2391 {
2392 // Since we didn't delete CF handle, RocksDB's contract guarantees that
2393 // we're still able to read dropped CF
2394 std::unique_ptr<Iterator> iterator(
2395 db_->NewIterator(ReadOptions(), handles_[1]));
2396 int count = 0;
2397 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2398 ASSERT_OK(iterator->status());
2399 ++count;
2400 }
2401 ASSERT_OK(iterator->status());
2402 ASSERT_EQ(count, kKeysNum);
2403 }
2404 for (auto& t : threads) {
2405 t.join();
2406 }
2407
2408 Close();
2409 Destroy();
2410 }
2411
2412 #ifndef ROCKSDB_LITE
2413 // skipped as persisting options is not supported in ROCKSDB_LITE
2414 namespace {
2415 std::atomic<int> test_stage(0);
2416 std::atomic<bool> ordered_by_writethread(false);
2417 const int kMainThreadStartPersistingOptionsFile = 1;
2418 const int kChildThreadFinishDroppingColumnFamily = 2;
2419 void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
2420 std::vector<Comparator*>* comparators) {
2421 while (test_stage < kMainThreadStartPersistingOptionsFile &&
2422 !ordered_by_writethread) {
2423 Env::Default()->SleepForMicroseconds(100);
2424 }
2425 cf_test->DropColumnFamilies({cf_id});
2426 if ((*comparators)[cf_id]) {
2427 delete (*comparators)[cf_id];
2428 (*comparators)[cf_id] = nullptr;
2429 }
2430 test_stage = kChildThreadFinishDroppingColumnFamily;
2431 }
2432 } // namespace
2433
2434 TEST_P(ColumnFamilyTest, CreateAndDropRace) {
2435 const int kCfCount = 5;
2436 std::vector<ColumnFamilyOptions> cf_opts;
2437 std::vector<Comparator*> comparators;
2438 for (int i = 0; i < kCfCount; ++i) {
2439 cf_opts.emplace_back();
2440 comparators.push_back(new test::SimpleSuffixReverseComparator());
2441 cf_opts.back().comparator = comparators.back();
2442 }
2443 db_options_.create_if_missing = true;
2444 db_options_.create_missing_column_families = true;
2445
2446 auto main_thread_id = std::this_thread::get_id();
2447
2448 rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start",
2449 [&](void* /*arg*/) {
2450 auto current_thread_id = std::this_thread::get_id();
2451 // If it's the main thread hitting this sync-point, then it
2452 // will be blocked until some other thread update the test_stage.
2453 if (main_thread_id == current_thread_id) {
2454 test_stage = kMainThreadStartPersistingOptionsFile;
2455 while (test_stage < kChildThreadFinishDroppingColumnFamily &&
2456 !ordered_by_writethread) {
2457 Env::Default()->SleepForMicroseconds(100);
2458 }
2459 }
2460 });
2461
2462 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2463 "WriteThread::EnterUnbatched:Wait", [&](void* /*arg*/) {
2464 // This means a thread doing DropColumnFamily() is waiting for
2465 // other thread to finish persisting options.
2466 // In such case, we update the test_stage to unblock the main thread.
2467 ordered_by_writethread = true;
2468 });
2469
2470 // Create a database with four column families
2471 Open({"default", "one", "two", "three"},
2472 {cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]});
2473
2474 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2475
2476 // Start a thread that will drop the first column family
2477 // and its comparator
2478 rocksdb::port::Thread drop_cf_thread(DropSingleColumnFamily, this, 1,
2479 &comparators);
2480
2481 DropColumnFamilies({2});
2482
2483 drop_cf_thread.join();
2484 Close();
2485 Destroy();
2486 for (auto* comparator : comparators) {
2487 if (comparator) {
2488 delete comparator;
2489 }
2490 }
2491
2492 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2493 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
2494 }
2495 #endif // !ROCKSDB_LITE
2496
2497 TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) {
2498 const uint64_t kBaseRate = 800000u;
2499 db_options_.delayed_write_rate = kBaseRate;
2500 db_options_.max_background_compactions = 6;
2501
2502 Open({"default"});
2503 ColumnFamilyData* cfd =
2504 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2505
2506 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2507
2508 MutableCFOptions mutable_cf_options(column_family_options_);
2509
2510 mutable_cf_options.level0_slowdown_writes_trigger = 20;
2511 mutable_cf_options.level0_stop_writes_trigger = 10000;
2512 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2513 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2514 mutable_cf_options.disable_auto_compactions = false;
2515
2516 vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2517 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2518 ASSERT_TRUE(!IsDbWriteStopped());
2519 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2520
2521 vstorage->TEST_set_estimated_compaction_needed_bytes(201);
2522 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2523 ASSERT_TRUE(!IsDbWriteStopped());
2524 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2525 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2526 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2527
2528 vstorage->TEST_set_estimated_compaction_needed_bytes(400);
2529 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2530 ASSERT_TRUE(!IsDbWriteStopped());
2531 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2532 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2533 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2534
2535 vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2536 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2537 ASSERT_TRUE(!IsDbWriteStopped());
2538 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2539 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2540
2541 vstorage->TEST_set_estimated_compaction_needed_bytes(450);
2542 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2543 ASSERT_TRUE(!IsDbWriteStopped());
2544 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2545 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2546
2547 vstorage->TEST_set_estimated_compaction_needed_bytes(205);
2548 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2549 ASSERT_TRUE(!IsDbWriteStopped());
2550 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2551 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2552
2553 vstorage->TEST_set_estimated_compaction_needed_bytes(202);
2554 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2555 ASSERT_TRUE(!IsDbWriteStopped());
2556 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2557 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2558
2559 vstorage->TEST_set_estimated_compaction_needed_bytes(201);
2560 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2561 ASSERT_TRUE(!IsDbWriteStopped());
2562 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2563 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2564
2565 vstorage->TEST_set_estimated_compaction_needed_bytes(198);
2566 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2567 ASSERT_TRUE(!IsDbWriteStopped());
2568 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2569
2570 vstorage->TEST_set_estimated_compaction_needed_bytes(399);
2571 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2572 ASSERT_TRUE(!IsDbWriteStopped());
2573 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2574 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2575
2576 vstorage->TEST_set_estimated_compaction_needed_bytes(599);
2577 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2578 ASSERT_TRUE(!IsDbWriteStopped());
2579 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2580 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2581
2582 vstorage->TEST_set_estimated_compaction_needed_bytes(2001);
2583 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2584 ASSERT_TRUE(IsDbWriteStopped());
2585 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2586 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2587
2588 vstorage->TEST_set_estimated_compaction_needed_bytes(3001);
2589 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2590 ASSERT_TRUE(IsDbWriteStopped());
2591 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2592
2593 vstorage->TEST_set_estimated_compaction_needed_bytes(390);
2594 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2595 ASSERT_TRUE(!IsDbWriteStopped());
2596 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2597 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2598
2599 vstorage->TEST_set_estimated_compaction_needed_bytes(100);
2600 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2601 ASSERT_TRUE(!IsDbWriteStopped());
2602 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2603
2604 vstorage->set_l0_delay_trigger_count(100);
2605 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2606 ASSERT_TRUE(!IsDbWriteStopped());
2607 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2608 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2609 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2610
2611 vstorage->set_l0_delay_trigger_count(101);
2612 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2613 ASSERT_TRUE(!IsDbWriteStopped());
2614 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2615 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2616
2617 vstorage->set_l0_delay_trigger_count(0);
2618 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2619 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2620 ASSERT_TRUE(!IsDbWriteStopped());
2621 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2622 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2623
2624 vstorage->set_l0_delay_trigger_count(101);
2625 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2626 ASSERT_TRUE(!IsDbWriteStopped());
2627 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2628 ASSERT_EQ(kBaseRate / 1.25 / 1.25 / 1.25, GetDbDelayedWriteRate());
2629
2630 vstorage->TEST_set_estimated_compaction_needed_bytes(200);
2631 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2632 ASSERT_TRUE(!IsDbWriteStopped());
2633 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2634 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2635
2636 vstorage->set_l0_delay_trigger_count(0);
2637 vstorage->TEST_set_estimated_compaction_needed_bytes(0);
2638 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2639 ASSERT_TRUE(!IsDbWriteStopped());
2640 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2641
2642 mutable_cf_options.disable_auto_compactions = true;
2643 dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate);
2644 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2645 ASSERT_TRUE(!IsDbWriteStopped());
2646 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2647
2648 vstorage->set_l0_delay_trigger_count(50);
2649 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2650 ASSERT_TRUE(!IsDbWriteStopped());
2651 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2652 ASSERT_EQ(0, GetDbDelayedWriteRate());
2653 ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2654
2655 vstorage->set_l0_delay_trigger_count(60);
2656 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2657 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2658 ASSERT_TRUE(!IsDbWriteStopped());
2659 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2660 ASSERT_EQ(0, GetDbDelayedWriteRate());
2661 ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2662
2663 mutable_cf_options.disable_auto_compactions = false;
2664 vstorage->set_l0_delay_trigger_count(70);
2665 vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2666 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2667 ASSERT_TRUE(!IsDbWriteStopped());
2668 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2669 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2670
2671 vstorage->set_l0_delay_trigger_count(71);
2672 vstorage->TEST_set_estimated_compaction_needed_bytes(501);
2673 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2674 ASSERT_TRUE(!IsDbWriteStopped());
2675 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2676 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2677 }
2678
2679 TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
2680 db_options_.max_background_compactions = 6;
2681 Open({"default"});
2682 ColumnFamilyData* cfd =
2683 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2684
2685 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2686
2687 MutableCFOptions mutable_cf_options(column_family_options_);
2688
2689 // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
2690 mutable_cf_options.level0_file_num_compaction_trigger = 4;
2691 mutable_cf_options.level0_slowdown_writes_trigger = 36;
2692 mutable_cf_options.level0_stop_writes_trigger = 50;
2693 // Speedup threshold = 200 / 4 = 50
2694 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2695 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2696
2697 vstorage->TEST_set_estimated_compaction_needed_bytes(40);
2698 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2699 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2700
2701 vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2702 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2703 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2704
2705 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2706 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2707 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2708
2709 vstorage->TEST_set_estimated_compaction_needed_bytes(45);
2710 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2711 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2712
2713 vstorage->set_l0_delay_trigger_count(7);
2714 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2715 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2716
2717 vstorage->set_l0_delay_trigger_count(9);
2718 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2719 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2720
2721 vstorage->set_l0_delay_trigger_count(6);
2722 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2723 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2724
2725 // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6
2726 mutable_cf_options.level0_file_num_compaction_trigger = 4;
2727 mutable_cf_options.level0_slowdown_writes_trigger = 16;
2728 mutable_cf_options.level0_stop_writes_trigger = 30;
2729
2730 vstorage->set_l0_delay_trigger_count(5);
2731 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2732 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2733
2734 vstorage->set_l0_delay_trigger_count(7);
2735 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2736 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2737
2738 vstorage->set_l0_delay_trigger_count(3);
2739 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2740 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2741 }
2742
2743 TEST_P(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
2744 const uint64_t kBaseRate = 810000u;
2745 db_options_.delayed_write_rate = kBaseRate;
2746 Open();
2747 CreateColumnFamilies({"one"});
2748 ColumnFamilyData* cfd =
2749 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2750 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2751
2752 ColumnFamilyData* cfd1 =
2753 static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
2754 VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
2755
2756 MutableCFOptions mutable_cf_options(column_family_options_);
2757 mutable_cf_options.level0_slowdown_writes_trigger = 20;
2758 mutable_cf_options.level0_stop_writes_trigger = 10000;
2759 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2760 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2761
2762 MutableCFOptions mutable_cf_options1 = mutable_cf_options;
2763 mutable_cf_options1.soft_pending_compaction_bytes_limit = 500;
2764
2765 vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2766 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2767 ASSERT_TRUE(!IsDbWriteStopped());
2768 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2769
2770 vstorage1->TEST_set_estimated_compaction_needed_bytes(201);
2771 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2772 ASSERT_TRUE(!IsDbWriteStopped());
2773 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2774
2775 vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
2776 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2777 ASSERT_TRUE(!IsDbWriteStopped());
2778 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2779 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2780
2781 vstorage->TEST_set_estimated_compaction_needed_bytes(70);
2782 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2783 ASSERT_TRUE(!IsDbWriteStopped());
2784 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2785 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2786
2787 vstorage1->TEST_set_estimated_compaction_needed_bytes(800);
2788 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2789 ASSERT_TRUE(!IsDbWriteStopped());
2790 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2791 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2792
2793 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2794 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2795 ASSERT_TRUE(!IsDbWriteStopped());
2796 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2797 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2798
2799 vstorage1->TEST_set_estimated_compaction_needed_bytes(700);
2800 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2801 ASSERT_TRUE(!IsDbWriteStopped());
2802 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2803 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2804
2805 vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2806 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2807 ASSERT_TRUE(!IsDbWriteStopped());
2808 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2809 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2810
2811 vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
2812 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2813 ASSERT_TRUE(!IsDbWriteStopped());
2814 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2815 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2816 }
2817
2818 TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
2819 db_options_.max_background_compactions = 6;
2820 column_family_options_.soft_pending_compaction_bytes_limit = 200;
2821 column_family_options_.hard_pending_compaction_bytes_limit = 2000;
2822 Open();
2823 CreateColumnFamilies({"one"});
2824 ColumnFamilyData* cfd =
2825 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2826 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2827
2828 ColumnFamilyData* cfd1 =
2829 static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
2830 VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
2831
2832 MutableCFOptions mutable_cf_options(column_family_options_);
2833 // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
2834 mutable_cf_options.level0_file_num_compaction_trigger = 4;
2835 mutable_cf_options.level0_slowdown_writes_trigger = 36;
2836 mutable_cf_options.level0_stop_writes_trigger = 30;
2837 // Speedup threshold = 200 / 4 = 50
2838 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2839 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2840
2841 MutableCFOptions mutable_cf_options1 = mutable_cf_options;
2842 mutable_cf_options1.level0_slowdown_writes_trigger = 16;
2843
2844 vstorage->TEST_set_estimated_compaction_needed_bytes(40);
2845 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2846 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2847
2848 vstorage->TEST_set_estimated_compaction_needed_bytes(60);
2849 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2850 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2851 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2852 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2853
2854 vstorage1->TEST_set_estimated_compaction_needed_bytes(30);
2855 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2856 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2857
2858 vstorage1->TEST_set_estimated_compaction_needed_bytes(70);
2859 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2860 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2861
2862 vstorage->TEST_set_estimated_compaction_needed_bytes(20);
2863 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2864 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2865
2866 vstorage1->TEST_set_estimated_compaction_needed_bytes(3);
2867 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2868 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2869
2870 vstorage->set_l0_delay_trigger_count(9);
2871 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2872 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2873
2874 vstorage1->set_l0_delay_trigger_count(2);
2875 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2876 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2877
2878 vstorage->set_l0_delay_trigger_count(0);
2879 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2880 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2881 }
2882
2883 TEST_P(ColumnFamilyTest, CreateAndDestoryOptions) {
2884 std::unique_ptr<ColumnFamilyOptions> cfo(new ColumnFamilyOptions());
2885 ColumnFamilyHandle* cfh;
2886 Open();
2887 ASSERT_OK(db_->CreateColumnFamily(*(cfo.get()), "yoyo", &cfh));
2888 cfo.reset();
2889 ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "bar"));
2890 ASSERT_OK(db_->Flush(FlushOptions(), cfh));
2891 ASSERT_OK(db_->DropColumnFamily(cfh));
2892 ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
2893 }
2894
2895 TEST_P(ColumnFamilyTest, CreateDropAndDestroy) {
2896 ColumnFamilyHandle* cfh;
2897 Open();
2898 ASSERT_OK(db_->CreateColumnFamily(ColumnFamilyOptions(), "yoyo", &cfh));
2899 ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "bar"));
2900 ASSERT_OK(db_->Flush(FlushOptions(), cfh));
2901 ASSERT_OK(db_->DropColumnFamily(cfh));
2902 ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
2903 }
2904
2905 #ifndef ROCKSDB_LITE
2906 TEST_P(ColumnFamilyTest, CreateDropAndDestroyWithoutFileDeletion) {
2907 ColumnFamilyHandle* cfh;
2908 Open();
2909 ASSERT_OK(db_->CreateColumnFamily(ColumnFamilyOptions(), "yoyo", &cfh));
2910 ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "bar"));
2911 ASSERT_OK(db_->Flush(FlushOptions(), cfh));
2912 ASSERT_OK(db_->DisableFileDeletions());
2913 ASSERT_OK(db_->DropColumnFamily(cfh));
2914 ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
2915 }
2916
2917 TEST_P(ColumnFamilyTest, FlushCloseWALFiles) {
2918 SpecialEnv env(Env::Default());
2919 db_options_.env = &env;
2920 db_options_.max_background_flushes = 1;
2921 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
2922 Open();
2923 CreateColumnFamilies({"one"});
2924 ASSERT_OK(Put(1, "fodor", "mirko"));
2925 ASSERT_OK(Put(0, "fodor", "mirko"));
2926 ASSERT_OK(Put(1, "fodor", "mirko"));
2927
2928 rocksdb::SyncPoint::GetInstance()->LoadDependency({
2929 {"DBImpl::BGWorkFlush:done", "FlushCloseWALFiles:0"},
2930 });
2931 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2932
2933 // Block flush jobs from running
2934 test::SleepingBackgroundTask sleeping_task;
2935 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2936 Env::Priority::HIGH);
2937
2938 WriteOptions wo;
2939 wo.sync = true;
2940 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
2941
2942 ASSERT_EQ(2, env.num_open_wal_file_.load());
2943
2944 sleeping_task.WakeUp();
2945 sleeping_task.WaitUntilDone();
2946 TEST_SYNC_POINT("FlushCloseWALFiles:0");
2947 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2948 ASSERT_EQ(1, env.num_open_wal_file_.load());
2949
2950 Reopen();
2951 ASSERT_EQ("mirko", Get(0, "fodor"));
2952 ASSERT_EQ("mirko", Get(1, "fodor"));
2953 db_options_.env = env_;
2954 Close();
2955 }
2956 #endif // !ROCKSDB_LITE
2957
2958 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported
2959 TEST_P(ColumnFamilyTest, IteratorCloseWALFile1) {
2960 SpecialEnv env(Env::Default());
2961 db_options_.env = &env;
2962 db_options_.max_background_flushes = 1;
2963 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
2964 Open();
2965 CreateColumnFamilies({"one"});
2966 ASSERT_OK(Put(1, "fodor", "mirko"));
2967 // Create an iterator holding the current super version.
2968 Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
2969 // A flush will make `it` hold the last reference of its super version.
2970 Flush(1);
2971
2972 ASSERT_OK(Put(1, "fodor", "mirko"));
2973 ASSERT_OK(Put(0, "fodor", "mirko"));
2974 ASSERT_OK(Put(1, "fodor", "mirko"));
2975
2976 // Flush jobs will close previous WAL files after finishing. By
2977 // block flush jobs from running, we trigger a condition where
2978 // the iterator destructor should close the WAL files.
2979 test::SleepingBackgroundTask sleeping_task;
2980 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2981 Env::Priority::HIGH);
2982
2983 WriteOptions wo;
2984 wo.sync = true;
2985 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
2986
2987 ASSERT_EQ(2, env.num_open_wal_file_.load());
2988 // Deleting the iterator will clear its super version, triggering
2989 // closing all files
2990 delete it;
2991 ASSERT_EQ(1, env.num_open_wal_file_.load());
2992
2993 sleeping_task.WakeUp();
2994 sleeping_task.WaitUntilDone();
2995 WaitForFlush(1);
2996
2997 Reopen();
2998 ASSERT_EQ("mirko", Get(0, "fodor"));
2999 ASSERT_EQ("mirko", Get(1, "fodor"));
3000 db_options_.env = env_;
3001 Close();
3002 }
3003
3004 TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) {
3005 SpecialEnv env(Env::Default());
3006 // Allow both of flush and purge job to schedule.
3007 env.SetBackgroundThreads(2, Env::HIGH);
3008 db_options_.env = &env;
3009 db_options_.max_background_flushes = 1;
3010 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
3011 Open();
3012 CreateColumnFamilies({"one"});
3013 ASSERT_OK(Put(1, "fodor", "mirko"));
3014 // Create an iterator holding the current super version.
3015 ReadOptions ro;
3016 ro.background_purge_on_iterator_cleanup = true;
3017 Iterator* it = db_->NewIterator(ro, handles_[1]);
3018 // A flush will make `it` hold the last reference of its super version.
3019 Flush(1);
3020
3021 ASSERT_OK(Put(1, "fodor", "mirko"));
3022 ASSERT_OK(Put(0, "fodor", "mirko"));
3023 ASSERT_OK(Put(1, "fodor", "mirko"));
3024
3025 rocksdb::SyncPoint::GetInstance()->LoadDependency({
3026 {"ColumnFamilyTest::IteratorCloseWALFile2:0",
3027 "DBImpl::BGWorkPurge:start"},
3028 {"ColumnFamilyTest::IteratorCloseWALFile2:2",
3029 "DBImpl::BackgroundCallFlush:start"},
3030 {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
3031 });
3032 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3033
3034 WriteOptions wo;
3035 wo.sync = true;
3036 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
3037
3038 ASSERT_EQ(2, env.num_open_wal_file_.load());
3039 // Deleting the iterator will clear its super version, triggering
3040 // closing all files
3041 delete it;
3042 ASSERT_EQ(2, env.num_open_wal_file_.load());
3043
3044 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
3045 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
3046 ASSERT_EQ(1, env.num_open_wal_file_.load());
3047 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
3048 WaitForFlush(1);
3049 ASSERT_EQ(1, env.num_open_wal_file_.load());
3050 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3051
3052 Reopen();
3053 ASSERT_EQ("mirko", Get(0, "fodor"));
3054 ASSERT_EQ("mirko", Get(1, "fodor"));
3055 db_options_.env = env_;
3056 Close();
3057 }
3058 #endif // !ROCKSDB_LITE
3059
3060 #ifndef ROCKSDB_LITE // TEST functions are not supported in lite
3061 TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
3062 SpecialEnv env(Env::Default());
3063 // Allow both of flush and purge job to schedule.
3064 env.SetBackgroundThreads(2, Env::HIGH);
3065 db_options_.env = &env;
3066 db_options_.max_background_flushes = 1;
3067 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3));
3068 column_family_options_.level0_file_num_compaction_trigger = 2;
3069 Open();
3070 CreateColumnFamilies({"one"});
3071 ASSERT_OK(Put(1, "fodor", "mirko"));
3072 ASSERT_OK(Put(1, "fodar2", "mirko"));
3073 Flush(1);
3074
3075 // Create an iterator holding the current super version, as well as
3076 // the SST file just flushed.
3077 ReadOptions ro;
3078 ro.tailing = true;
3079 ro.background_purge_on_iterator_cleanup = true;
3080 Iterator* it = db_->NewIterator(ro, handles_[1]);
3081 // A flush will make `it` hold the last reference of its super version.
3082
3083 ASSERT_OK(Put(1, "fodor", "mirko"));
3084 ASSERT_OK(Put(1, "fodar2", "mirko"));
3085 Flush(1);
3086
3087 WaitForCompaction();
3088
3089 ASSERT_OK(Put(1, "fodor", "mirko"));
3090 ASSERT_OK(Put(1, "fodor", "mirko"));
3091 ASSERT_OK(Put(0, "fodor", "mirko"));
3092 ASSERT_OK(Put(1, "fodor", "mirko"));
3093
3094 rocksdb::SyncPoint::GetInstance()->LoadDependency({
3095 {"ColumnFamilyTest::IteratorCloseWALFile2:0",
3096 "DBImpl::BGWorkPurge:start"},
3097 {"ColumnFamilyTest::IteratorCloseWALFile2:2",
3098 "DBImpl::BackgroundCallFlush:start"},
3099 {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
3100 });
3101 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3102
3103 WriteOptions wo;
3104 wo.sync = true;
3105 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
3106
3107 env.delete_count_.store(0);
3108 ASSERT_EQ(2, env.num_open_wal_file_.load());
3109 // Deleting the iterator will clear its super version, triggering
3110 // closing all files
3111 it->Seek("");
3112 ASSERT_EQ(2, env.num_open_wal_file_.load());
3113 ASSERT_EQ(0, env.delete_count_.load());
3114
3115 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
3116 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
3117 ASSERT_EQ(1, env.num_open_wal_file_.load());
3118 ASSERT_EQ(1, env.delete_count_.load());
3119 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
3120 WaitForFlush(1);
3121 ASSERT_EQ(1, env.num_open_wal_file_.load());
3122 ASSERT_EQ(1, env.delete_count_.load());
3123
3124 delete it;
3125 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3126
3127 Reopen();
3128 ASSERT_EQ("mirko", Get(0, "fodor"));
3129 ASSERT_EQ("mirko", Get(1, "fodor"));
3130 db_options_.env = env_;
3131 Close();
3132 }
3133 #endif // !ROCKSDB_LITE
3134
3135 // Disable on windows because SyncWAL requires env->IsSyncThreadSafe()
3136 // to return true which is not so in unbuffered mode.
3137 #ifndef OS_WIN
3138 TEST_P(ColumnFamilyTest, LogSyncConflictFlush) {
3139 Open();
3140 CreateColumnFamiliesAndReopen({"one", "two"});
3141
3142 Put(0, "", "");
3143 Put(1, "foo", "bar");
3144
3145 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3146 {{"DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
3147 "ColumnFamilyTest::LogSyncConflictFlush:1"},
3148 {"ColumnFamilyTest::LogSyncConflictFlush:2",
3149 "DBImpl::SyncWAL:BeforeMarkLogsSynced:2"}});
3150
3151 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3152
3153 rocksdb::port::Thread thread([&] { db_->SyncWAL(); });
3154
3155 TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1");
3156 Flush(1);
3157 Put(1, "foo", "bar");
3158 Flush(1);
3159
3160 TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:2");
3161
3162 thread.join();
3163
3164 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3165 Close();
3166 }
3167 #endif
3168
3169 // this test is placed here, because the infrastructure for Column Family
3170 // test is being used to ensure a roll of wal files.
3171 // Basic idea is to test that WAL truncation is being detected and not
3172 // ignored
3173 TEST_P(ColumnFamilyTest, DISABLED_LogTruncationTest) {
3174 Open();
3175 CreateColumnFamiliesAndReopen({"one", "two"});
3176
3177 Build(0, 100);
3178
3179 // Flush the 0th column family to force a roll of the wal log
3180 Flush(0);
3181
3182 // Add some more entries
3183 Build(100, 100);
3184
3185 std::vector<std::string> filenames;
3186 ASSERT_OK(env_->GetChildren(dbname_, &filenames));
3187
3188 // collect wal files
3189 std::vector<std::string> logfs;
3190 for (size_t i = 0; i < filenames.size(); i++) {
3191 uint64_t number;
3192 FileType type;
3193 if (!(ParseFileName(filenames[i], &number, &type))) continue;
3194
3195 if (type != kLogFile) continue;
3196
3197 logfs.push_back(filenames[i]);
3198 }
3199
3200 std::sort(logfs.begin(), logfs.end());
3201 ASSERT_GE(logfs.size(), 2);
3202
3203 // Take the last but one file, and truncate it
3204 std::string fpath = dbname_ + "/" + logfs[logfs.size() - 2];
3205 std::vector<std::string> names_save = names_;
3206
3207 uint64_t fsize;
3208 ASSERT_OK(env_->GetFileSize(fpath, &fsize));
3209 ASSERT_GT(fsize, 0);
3210
3211 Close();
3212
3213 std::string backup_logs = dbname_ + "/backup_logs";
3214 std::string t_fpath = backup_logs + "/" + logfs[logfs.size() - 2];
3215
3216 ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
3217 // Not sure how easy it is to make this data driven.
3218 // need to read back the WAL file and truncate last 10
3219 // entries
3220 CopyFile(fpath, t_fpath, fsize - 9180);
3221
3222 ASSERT_OK(env_->DeleteFile(fpath));
3223 ASSERT_OK(env_->RenameFile(t_fpath, fpath));
3224
3225 db_options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
3226
3227 OpenReadOnly(names_save);
3228
3229 CheckMissed();
3230
3231 Close();
3232
3233 Open(names_save);
3234
3235 CheckMissed();
3236
3237 Close();
3238
3239 // cleanup
3240 env_->DeleteDir(backup_logs);
3241 }
3242
3243 TEST_P(ColumnFamilyTest, DefaultCfPathsTest) {
3244 Open();
3245 // Leave cf_paths for one column families to be empty.
3246 // Files should be generated according to db_paths for that
3247 // column family.
3248 ColumnFamilyOptions cf_opt1, cf_opt2;
3249 cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1",
3250 std::numeric_limits<uint64_t>::max());
3251 CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2});
3252 Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
3253
3254 // Fill Column family 1.
3255 PutRandomData(1, 100, 100);
3256 Flush(1);
3257
3258 ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path));
3259 ASSERT_EQ(0, GetSstFileCount(dbname_));
3260
3261 // Fill column family 2
3262 PutRandomData(2, 100, 100);
3263 Flush(2);
3264
3265 // SST from Column family 2 should be generated in
3266 // db_paths which is dbname_ in this case.
3267 ASSERT_EQ(1, GetSstFileCount(dbname_));
3268 }
3269
3270 TEST_P(ColumnFamilyTest, MultipleCFPathsTest) {
3271 Open();
3272 // Configure Column family specific paths.
3273 ColumnFamilyOptions cf_opt1, cf_opt2;
3274 cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1",
3275 std::numeric_limits<uint64_t>::max());
3276 cf_opt2.cf_paths.emplace_back(dbname_ + "_two_1",
3277 std::numeric_limits<uint64_t>::max());
3278 CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2});
3279 Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
3280
3281 PutRandomData(1, 100, 100, true /* save */);
3282 Flush(1);
3283
3284 // Check that files are generated in appropriate paths.
3285 ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path));
3286 ASSERT_EQ(0, GetSstFileCount(dbname_));
3287
3288 PutRandomData(2, 100, 100, true /* save */);
3289 Flush(2);
3290
3291 ASSERT_EQ(1, GetSstFileCount(cf_opt2.cf_paths[0].path));
3292 ASSERT_EQ(0, GetSstFileCount(dbname_));
3293
3294 // Re-open and verify the keys.
3295 Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
3296 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
3297 for (int cf = 1; cf != 3; ++cf) {
3298 ReadOptions read_options;
3299 read_options.readahead_size = 0;
3300 auto it = dbi->NewIterator(read_options, handles_[cf]);
3301 for (it->SeekToFirst(); it->Valid(); it->Next()) {
3302 Slice key(it->key());
3303 ASSERT_NE(keys_[cf].end(), keys_[cf].find(key.ToString()));
3304 }
3305 delete it;
3306
3307 for (const auto& key : keys_[cf]) {
3308 ASSERT_NE("NOT_FOUND", Get(cf, key));
3309 }
3310 }
3311 }
3312
3313 } // namespace rocksdb
3314
3315 int main(int argc, char** argv) {
3316 ::testing::InitGoogleTest(&argc, argv);
3317 return RUN_ALL_TESTS();
3318 }