]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/column_family_test.cc
24ff4e08b8a1f7202387c068f2f24a95a8b15f96
[ceph.git] / ceph / src / rocksdb / db / column_family_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <algorithm>
11 #include <vector>
12 #include <string>
13 #include <thread>
14
15 #include "db/db_impl/db_impl.h"
16 #include "db/db_test_util.h"
17 #include "memtable/hash_skiplist_rep.h"
18 #include "options/options_parser.h"
19 #include "port/port.h"
20 #include "port/stack_trace.h"
21 #include "rocksdb/db.h"
22 #include "rocksdb/env.h"
23 #include "rocksdb/iterator.h"
24 #include "rocksdb/utilities/object_registry.h"
25 #include "test_util/fault_injection_test_env.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "test_util/testutil.h"
29 #include "util/coding.h"
30 #include "util/string_util.h"
31 #include "utilities/merge_operators.h"
32
33 namespace ROCKSDB_NAMESPACE {
34
35 static const int kValueSize = 1000;
36
37 namespace {
38 std::string RandomString(Random* rnd, int len) {
39 std::string r;
40 test::RandomString(rnd, len, &r);
41 return r;
42 }
43 } // anonymous namespace
44
45 // counts how many operations were performed
46 class EnvCounter : public EnvWrapper {
47 public:
48 explicit EnvCounter(Env* base)
49 : EnvWrapper(base), num_new_writable_file_(0) {}
50 int GetNumberOfNewWritableFileCalls() {
51 return num_new_writable_file_;
52 }
53 Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
54 const EnvOptions& soptions) override {
55 ++num_new_writable_file_;
56 return EnvWrapper::NewWritableFile(f, r, soptions);
57 }
58
59 private:
60 std::atomic<int> num_new_writable_file_;
61 };
62
63 class ColumnFamilyTestBase : public testing::Test {
64 public:
65 explicit ColumnFamilyTestBase(uint32_t format) : rnd_(139), format_(format) {
66 Env* base_env = Env::Default();
67 #ifndef ROCKSDB_LITE
68 const char* test_env_uri = getenv("TEST_ENV_URI");
69 if (test_env_uri) {
70 Env* test_env = nullptr;
71 Status s = Env::LoadEnv(test_env_uri, &test_env, &env_guard_);
72 base_env = test_env;
73 EXPECT_OK(s);
74 EXPECT_NE(Env::Default(), base_env);
75 }
76 #endif // !ROCKSDB_LITE
77 EXPECT_NE(nullptr, base_env);
78 env_ = new EnvCounter(base_env);
79 dbname_ = test::PerThreadDBPath("column_family_test");
80 db_options_.create_if_missing = true;
81 db_options_.fail_if_options_file_error = true;
82 db_options_.env = env_;
83 DestroyDB(dbname_, Options(db_options_, column_family_options_));
84 }
85
86 ~ColumnFamilyTestBase() override {
87 std::vector<ColumnFamilyDescriptor> column_families;
88 for (auto h : handles_) {
89 ColumnFamilyDescriptor cfdescriptor;
90 h->GetDescriptor(&cfdescriptor);
91 column_families.push_back(cfdescriptor);
92 }
93 Close();
94 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
95 Destroy(column_families);
96 delete env_;
97 }
98
99 BlockBasedTableOptions GetBlockBasedTableOptions() {
100 BlockBasedTableOptions options;
101 options.format_version = format_;
102 return options;
103 }
104
105 // Return the value to associate with the specified key
106 Slice Value(int k, std::string* storage) {
107 if (k == 0) {
108 // Ugh. Random seed of 0 used to produce no entropy. This code
109 // preserves the implementation that was in place when all of the
110 // magic values in this file were picked.
111 *storage = std::string(kValueSize, ' ');
112 return Slice(*storage);
113 } else {
114 Random r(k);
115 return test::RandomString(&r, kValueSize, storage);
116 }
117 }
118
119 void Build(int base, int n, int flush_every = 0) {
120 std::string key_space, value_space;
121 WriteBatch batch;
122
123 for (int i = 0; i < n; i++) {
124 if (flush_every != 0 && i != 0 && i % flush_every == 0) {
125 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
126 dbi->TEST_FlushMemTable();
127 }
128
129 int keyi = base + i;
130 Slice key(DBTestBase::Key(keyi));
131
132 batch.Clear();
133 batch.Put(handles_[0], key, Value(keyi, &value_space));
134 batch.Put(handles_[1], key, Value(keyi, &value_space));
135 batch.Put(handles_[2], key, Value(keyi, &value_space));
136 ASSERT_OK(db_->Write(WriteOptions(), &batch));
137 }
138 }
139
140 void CheckMissed() {
141 uint64_t next_expected = 0;
142 uint64_t missed = 0;
143 int bad_keys = 0;
144 int bad_values = 0;
145 int correct = 0;
146 std::string value_space;
147 for (int cf = 0; cf < 3; cf++) {
148 next_expected = 0;
149 Iterator* iter = db_->NewIterator(ReadOptions(false, true), handles_[cf]);
150 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
151 uint64_t key;
152 Slice in(iter->key());
153 in.remove_prefix(3);
154 if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
155 key < next_expected) {
156 bad_keys++;
157 continue;
158 }
159 missed += (key - next_expected);
160 next_expected = key + 1;
161 if (iter->value() != Value(static_cast<int>(key), &value_space)) {
162 bad_values++;
163 } else {
164 correct++;
165 }
166 }
167 delete iter;
168 }
169
170 ASSERT_EQ(0, bad_keys);
171 ASSERT_EQ(0, bad_values);
172 ASSERT_EQ(0, missed);
173 (void)correct;
174 }
175
176 void Close() {
177 for (auto h : handles_) {
178 if (h) {
179 db_->DestroyColumnFamilyHandle(h);
180 }
181 }
182 handles_.clear();
183 names_.clear();
184 delete db_;
185 db_ = nullptr;
186 }
187
188 Status TryOpen(std::vector<std::string> cf,
189 std::vector<ColumnFamilyOptions> options = {}) {
190 std::vector<ColumnFamilyDescriptor> column_families;
191 names_.clear();
192 for (size_t i = 0; i < cf.size(); ++i) {
193 column_families.push_back(ColumnFamilyDescriptor(
194 cf[i], options.size() == 0 ? column_family_options_ : options[i]));
195 names_.push_back(cf[i]);
196 }
197 return DB::Open(db_options_, dbname_, column_families, &handles_, &db_);
198 }
199
200 Status OpenReadOnly(std::vector<std::string> cf,
201 std::vector<ColumnFamilyOptions> options = {}) {
202 std::vector<ColumnFamilyDescriptor> column_families;
203 names_.clear();
204 for (size_t i = 0; i < cf.size(); ++i) {
205 column_families.push_back(ColumnFamilyDescriptor(
206 cf[i], options.size() == 0 ? column_family_options_ : options[i]));
207 names_.push_back(cf[i]);
208 }
209 return DB::OpenForReadOnly(db_options_, dbname_, column_families, &handles_,
210 &db_);
211 }
212
213 #ifndef ROCKSDB_LITE // ReadOnlyDB is not supported
214 void AssertOpenReadOnly(std::vector<std::string> cf,
215 std::vector<ColumnFamilyOptions> options = {}) {
216 ASSERT_OK(OpenReadOnly(cf, options));
217 }
218 #endif // !ROCKSDB_LITE
219
220
221 void Open(std::vector<std::string> cf,
222 std::vector<ColumnFamilyOptions> options = {}) {
223 ASSERT_OK(TryOpen(cf, options));
224 }
225
226 void Open() {
227 Open({"default"});
228 }
229
230 DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }
231
232 int GetProperty(int cf, std::string property) {
233 std::string value;
234 EXPECT_TRUE(dbfull()->GetProperty(handles_[cf], property, &value));
235 #ifndef CYGWIN
236 return std::stoi(value);
237 #else
238 return std::strtol(value.c_str(), 0 /* off */, 10 /* base */);
239 #endif
240 }
241
242 bool IsDbWriteStopped() {
243 #ifndef ROCKSDB_LITE
244 uint64_t v;
245 EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.is-write-stopped", &v));
246 return (v == 1);
247 #else
248 return dbfull()->TEST_write_controler().IsStopped();
249 #endif // !ROCKSDB_LITE
250 }
251
252 uint64_t GetDbDelayedWriteRate() {
253 #ifndef ROCKSDB_LITE
254 uint64_t v;
255 EXPECT_TRUE(
256 dbfull()->GetIntProperty("rocksdb.actual-delayed-write-rate", &v));
257 return v;
258 #else
259 if (!dbfull()->TEST_write_controler().NeedsDelay()) {
260 return 0;
261 }
262 return dbfull()->TEST_write_controler().delayed_write_rate();
263 #endif // !ROCKSDB_LITE
264 }
265
266 void Destroy(const std::vector<ColumnFamilyDescriptor>& column_families =
267 std::vector<ColumnFamilyDescriptor>()) {
268 Close();
269 ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_),
270 column_families));
271 }
272
273 void CreateColumnFamilies(
274 const std::vector<std::string>& cfs,
275 const std::vector<ColumnFamilyOptions> options = {}) {
276 int cfi = static_cast<int>(handles_.size());
277 handles_.resize(cfi + cfs.size());
278 names_.resize(cfi + cfs.size());
279 for (size_t i = 0; i < cfs.size(); ++i) {
280 const auto& current_cf_opt =
281 options.size() == 0 ? column_family_options_ : options[i];
282 ASSERT_OK(
283 db_->CreateColumnFamily(current_cf_opt, cfs[i], &handles_[cfi]));
284 names_[cfi] = cfs[i];
285
286 #ifndef ROCKSDB_LITE // RocksDBLite does not support GetDescriptor
287 // Verify the CF options of the returned CF handle.
288 ColumnFamilyDescriptor desc;
289 ASSERT_OK(handles_[cfi]->GetDescriptor(&desc));
290 RocksDBOptionsParser::VerifyCFOptions(desc.options, current_cf_opt);
291 #endif // !ROCKSDB_LITE
292 cfi++;
293 }
294 }
295
296 void Reopen(const std::vector<ColumnFamilyOptions> options = {}) {
297 std::vector<std::string> names;
298 for (auto name : names_) {
299 if (name != "") {
300 names.push_back(name);
301 }
302 }
303 Close();
304 assert(options.size() == 0 || names.size() == options.size());
305 Open(names, options);
306 }
307
308 void CreateColumnFamiliesAndReopen(const std::vector<std::string>& cfs) {
309 CreateColumnFamilies(cfs);
310 Reopen();
311 }
312
313 void DropColumnFamilies(const std::vector<int>& cfs) {
314 for (auto cf : cfs) {
315 ASSERT_OK(db_->DropColumnFamily(handles_[cf]));
316 db_->DestroyColumnFamilyHandle(handles_[cf]);
317 handles_[cf] = nullptr;
318 names_[cf] = "";
319 }
320 }
321
322 void PutRandomData(int cf, int num, int key_value_size, bool save = false) {
323 if (cf >= static_cast<int>(keys_.size())) {
324 keys_.resize(cf + 1);
325 }
326 for (int i = 0; i < num; ++i) {
327 // 10 bytes for key, rest is value
328 if (!save) {
329 ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 11),
330 RandomString(&rnd_, key_value_size - 10)));
331 } else {
332 std::string key = test::RandomKey(&rnd_, 11);
333 keys_[cf].insert(key);
334 ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10)));
335 }
336 }
337 db_->FlushWAL(false);
338 }
339
340 #ifndef ROCKSDB_LITE // TEST functions in DB are not supported in lite
341 void WaitForFlush(int cf) {
342 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
343 }
344
345 void WaitForCompaction() {
346 ASSERT_OK(dbfull()->TEST_WaitForCompact());
347 }
348
349 uint64_t MaxTotalInMemoryState() {
350 return dbfull()->TEST_MaxTotalInMemoryState();
351 }
352
353 void AssertMaxTotalInMemoryState(uint64_t value) {
354 ASSERT_EQ(value, MaxTotalInMemoryState());
355 }
356 #endif // !ROCKSDB_LITE
357
358 Status Put(int cf, const std::string& key, const std::string& value) {
359 return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
360 }
361 Status Merge(int cf, const std::string& key, const std::string& value) {
362 return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value));
363 }
364 Status Flush(int cf) {
365 return db_->Flush(FlushOptions(), handles_[cf]);
366 }
367
368 std::string Get(int cf, const std::string& key) {
369 ReadOptions options;
370 options.verify_checksums = true;
371 std::string result;
372 Status s = db_->Get(options, handles_[cf], Slice(key), &result);
373 if (s.IsNotFound()) {
374 result = "NOT_FOUND";
375 } else if (!s.ok()) {
376 result = s.ToString();
377 }
378 return result;
379 }
380
381 void CompactAll(int cf) {
382 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
383 nullptr));
384 }
385
386 void Compact(int cf, const Slice& start, const Slice& limit) {
387 ASSERT_OK(
388 db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
389 }
390
391 int NumTableFilesAtLevel(int level, int cf) {
392 return GetProperty(cf,
393 "rocksdb.num-files-at-level" + ToString(level));
394 }
395
396 #ifndef ROCKSDB_LITE
397 // Return spread of files per level
398 std::string FilesPerLevel(int cf) {
399 std::string result;
400 int last_non_zero_offset = 0;
401 for (int level = 0; level < dbfull()->NumberLevels(handles_[cf]); level++) {
402 int f = NumTableFilesAtLevel(level, cf);
403 char buf[100];
404 snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
405 result += buf;
406 if (f > 0) {
407 last_non_zero_offset = static_cast<int>(result.size());
408 }
409 }
410 result.resize(last_non_zero_offset);
411 return result;
412 }
413 #endif
414
415 void AssertFilesPerLevel(const std::string& value, int cf) {
416 #ifndef ROCKSDB_LITE
417 ASSERT_EQ(value, FilesPerLevel(cf));
418 #else
419 (void) value;
420 (void) cf;
421 #endif
422 }
423
424 #ifndef ROCKSDB_LITE // GetLiveFilesMetaData is not supported
425 int CountLiveFiles() {
426 std::vector<LiveFileMetaData> metadata;
427 db_->GetLiveFilesMetaData(&metadata);
428 return static_cast<int>(metadata.size());
429 }
430 #endif // !ROCKSDB_LITE
431
432 void AssertCountLiveFiles(int expected_value) {
433 #ifndef ROCKSDB_LITE
434 ASSERT_EQ(expected_value, CountLiveFiles());
435 #else
436 (void) expected_value;
437 #endif
438 }
439
440 // Do n memtable flushes, each of which produces an sstable
441 // covering the range [small,large].
442 void MakeTables(int cf, int n, const std::string& small,
443 const std::string& large) {
444 for (int i = 0; i < n; i++) {
445 ASSERT_OK(Put(cf, small, "begin"));
446 ASSERT_OK(Put(cf, large, "end"));
447 ASSERT_OK(db_->Flush(FlushOptions(), handles_[cf]));
448 }
449 }
450
451 #ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported
452 int CountLiveLogFiles() {
453 int micros_wait_for_log_deletion = 20000;
454 env_->SleepForMicroseconds(micros_wait_for_log_deletion);
455 int ret = 0;
456 VectorLogPtr wal_files;
457 Status s;
458 // GetSortedWalFiles is a flakey function -- it gets all the wal_dir
459 // children files and then later checks for their existence. if some of the
460 // log files doesn't exist anymore, it reports an error. it does all of this
461 // without DB mutex held, so if a background process deletes the log file
462 // while the function is being executed, it returns an error. We retry the
463 // function 10 times to avoid the error failing the test
464 for (int retries = 0; retries < 10; ++retries) {
465 wal_files.clear();
466 s = db_->GetSortedWalFiles(wal_files);
467 if (s.ok()) {
468 break;
469 }
470 }
471 EXPECT_OK(s);
472 for (const auto& wal : wal_files) {
473 if (wal->Type() == kAliveLogFile) {
474 ++ret;
475 }
476 }
477 return ret;
478 return 0;
479 }
480 #endif // !ROCKSDB_LITE
481
482 void AssertCountLiveLogFiles(int value) {
483 #ifndef ROCKSDB_LITE // GetSortedWalFiles is not supported
484 ASSERT_EQ(value, CountLiveLogFiles());
485 #else
486 (void) value;
487 #endif // !ROCKSDB_LITE
488 }
489
490 void AssertNumberOfImmutableMemtables(std::vector<int> num_per_cf) {
491 assert(num_per_cf.size() == handles_.size());
492
493 #ifndef ROCKSDB_LITE // GetProperty is not supported in lite
494 for (size_t i = 0; i < num_per_cf.size(); ++i) {
495 ASSERT_EQ(num_per_cf[i], GetProperty(static_cast<int>(i),
496 "rocksdb.num-immutable-mem-table"));
497 }
498 #endif // !ROCKSDB_LITE
499 }
500
501 void CopyFile(const std::string& source, const std::string& destination,
502 uint64_t size = 0) {
503 const EnvOptions soptions;
504 std::unique_ptr<SequentialFile> srcfile;
505 ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
506 std::unique_ptr<WritableFile> destfile;
507 ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
508
509 if (size == 0) {
510 // default argument means copy everything
511 ASSERT_OK(env_->GetFileSize(source, &size));
512 }
513
514 char buffer[4096];
515 Slice slice;
516 while (size > 0) {
517 uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
518 ASSERT_OK(srcfile->Read(one, &slice, buffer));
519 ASSERT_OK(destfile->Append(slice));
520 size -= slice.size();
521 }
522 ASSERT_OK(destfile->Close());
523 }
524
525 int GetSstFileCount(std::string path) {
526 std::vector<std::string> files;
527 DBTestBase::GetSstFiles(env_, path, &files);
528 return static_cast<int>(files.size());
529 }
530
531 void RecalculateWriteStallConditions(ColumnFamilyData* cfd,
532 const MutableCFOptions& mutable_cf_options) {
533 // add lock to avoid race condition between
534 // `RecalculateWriteStallConditions` which writes to CFStats and
535 // background `DBImpl::DumpStats()` threads which read CFStats
536 dbfull()->TEST_LockMutex();
537 cfd->RecalculateWriteStallConditions(mutable_cf_options);
538 dbfull()-> TEST_UnlockMutex();
539 }
540
541 std::vector<ColumnFamilyHandle*> handles_;
542 std::vector<std::string> names_;
543 std::vector<std::set<std::string>> keys_;
544 ColumnFamilyOptions column_family_options_;
545 DBOptions db_options_;
546 std::string dbname_;
547 DB* db_ = nullptr;
548 EnvCounter* env_;
549 std::shared_ptr<Env> env_guard_;
550 Random rnd_;
551 uint32_t format_;
552 };
553
554 class ColumnFamilyTest
555 : public ColumnFamilyTestBase,
556 virtual public ::testing::WithParamInterface<uint32_t> {
557 public:
558 ColumnFamilyTest() : ColumnFamilyTestBase(GetParam()) {}
559 };
560
561 INSTANTIATE_TEST_CASE_P(FormatDef, ColumnFamilyTest,
562 testing::Values(test::kDefaultFormatVersion));
563 INSTANTIATE_TEST_CASE_P(FormatLatest, ColumnFamilyTest,
564 testing::Values(test::kLatestFormatVersion));
565
566 TEST_P(ColumnFamilyTest, DontReuseColumnFamilyID) {
567 for (int iter = 0; iter < 3; ++iter) {
568 Open();
569 CreateColumnFamilies({"one", "two", "three"});
570 for (size_t i = 0; i < handles_.size(); ++i) {
571 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[i]);
572 ASSERT_EQ(i, cfh->GetID());
573 }
574 if (iter == 1) {
575 Reopen();
576 }
577 DropColumnFamilies({3});
578 Reopen();
579 if (iter == 2) {
580 // this tests if max_column_family is correctly persisted with
581 // WriteSnapshot()
582 Reopen();
583 }
584 CreateColumnFamilies({"three2"});
585 // ID 3 that was used for dropped column family "three" should not be
586 // reused
587 auto cfh3 = reinterpret_cast<ColumnFamilyHandleImpl*>(handles_[3]);
588 ASSERT_EQ(4U, cfh3->GetID());
589 Close();
590 Destroy();
591 }
592 }
593
594 #ifndef ROCKSDB_LITE
595 TEST_P(ColumnFamilyTest, CreateCFRaceWithGetAggProperty) {
596 Open();
597
598 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
599 {{"DBImpl::WriteOptionsFile:1",
600 "ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"},
601 {"ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2",
602 "DBImpl::WriteOptionsFile:2"}});
603 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
604
605 ROCKSDB_NAMESPACE::port::Thread thread(
606 [&] { CreateColumnFamilies({"one"}); });
607
608 TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1");
609 uint64_t pv;
610 db_->GetAggregatedIntProperty(DB::Properties::kEstimateTableReadersMem, &pv);
611 TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2");
612
613 thread.join();
614
615 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
616 }
617 #endif // !ROCKSDB_LITE
618
619 class FlushEmptyCFTestWithParam
620 : public ColumnFamilyTestBase,
621 virtual public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
622 public:
623 FlushEmptyCFTestWithParam()
624 : ColumnFamilyTestBase(std::get<0>(GetParam())),
625 allow_2pc_(std::get<1>(GetParam())) {}
626
627 // Required if inheriting from testing::WithParamInterface<>
628 static void SetUpTestCase() {}
629 static void TearDownTestCase() {}
630
631 bool allow_2pc_;
632 };
633
634 TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) {
635 std::unique_ptr<FaultInjectionTestEnv> fault_env(
636 new FaultInjectionTestEnv(env_));
637 db_options_.env = fault_env.get();
638 db_options_.allow_2pc = allow_2pc_;
639 Open();
640 CreateColumnFamilies({"one", "two"});
641 // Generate log file A.
642 ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
643
644 Reopen();
645 // Log file A is not dropped after reopening because default column family's
646 // min log number is 0.
647 // It flushes to SST file X
648 ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
649 ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
650 // Current log file is file B now. While flushing, a new log file C is created
651 // and is set to current. Boths' min log number is set to file C in memory, so
652 // after flushing file B is deleted. At the same time, the min log number of
653 // default CF is not written to manifest. Log file A still remains.
654 // Flushed to SST file Y.
655 Flush(1);
656 Flush(0);
657 ASSERT_OK(Put(1, "bar", "v3")); // seqID 4
658 ASSERT_OK(Put(1, "foo", "v4")); // seqID 5
659 db_->FlushWAL(false);
660
661 // Preserve file system state up to here to simulate a crash condition.
662 fault_env->SetFilesystemActive(false);
663 std::vector<std::string> names;
664 for (auto name : names_) {
665 if (name != "") {
666 names.push_back(name);
667 }
668 }
669
670 Close();
671 fault_env->ResetState();
672
673 // Before opening, there are four files:
674 // Log file A contains seqID 1
675 // Log file C contains seqID 4, 5
676 // SST file X contains seqID 1
677 // SST file Y contains seqID 2, 3
678 // Min log number:
679 // default CF: 0
680 // CF one, two: C
681 // When opening the DB, all the seqID should be preserved.
682 Open(names, {});
683 ASSERT_EQ("v4", Get(1, "foo"));
684 ASSERT_EQ("v3", Get(1, "bar"));
685 Close();
686
687 db_options_.env = env_;
688 }
689
690 TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) {
691 std::unique_ptr<FaultInjectionTestEnv> fault_env(
692 new FaultInjectionTestEnv(env_));
693 db_options_.env = fault_env.get();
694 db_options_.allow_2pc = allow_2pc_;
695 Open();
696 CreateColumnFamilies({"one", "two"});
697 // Generate log file A.
698 ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
699
700 Reopen();
701 // Log file A is not dropped after reopening because default column family's
702 // min log number is 0.
703 // It flushes to SST file X
704 ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
705 ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
706 // Current log file is file B now. While flushing, a new log file C is created
707 // and is set to current. Both CFs' min log number is set to file C so after
708 // flushing file B is deleted. Log file A still remains.
709 // Flushed to SST file Y.
710 Flush(1);
711 ASSERT_OK(Put(0, "bar", "v2")); // seqID 4
712 ASSERT_OK(Put(2, "bar", "v2")); // seqID 5
713 ASSERT_OK(Put(1, "bar", "v3")); // seqID 6
714 // Flushing all column families. This forces all CFs' min log to current. This
715 // is written to the manifest file. Log file C is cleared.
716 Flush(0);
717 Flush(1);
718 Flush(2);
719 // Write to log file D
720 ASSERT_OK(Put(1, "bar", "v4")); // seqID 7
721 ASSERT_OK(Put(1, "bar", "v5")); // seqID 8
722 db_->FlushWAL(false);
723 // Preserve file system state up to here to simulate a crash condition.
724 fault_env->SetFilesystemActive(false);
725 std::vector<std::string> names;
726 for (auto name : names_) {
727 if (name != "") {
728 names.push_back(name);
729 }
730 }
731
732 Close();
733 fault_env->ResetState();
734 // Before opening, there are two logfiles:
735 // Log file A contains seqID 1
736 // Log file D contains seqID 7, 8
737 // Min log number:
738 // default CF: D
739 // CF one, two: D
740 // When opening the DB, log file D should be replayed using the seqID
741 // specified in the file.
742 Open(names, {});
743 ASSERT_EQ("v1", Get(1, "foo"));
744 ASSERT_EQ("v5", Get(1, "bar"));
745 Close();
746
747 db_options_.env = env_;
748 }
749
750 INSTANTIATE_TEST_CASE_P(
751 FormatDef, FlushEmptyCFTestWithParam,
752 testing::Values(std::make_tuple(test::kDefaultFormatVersion, true),
753 std::make_tuple(test::kDefaultFormatVersion, false)));
754 INSTANTIATE_TEST_CASE_P(
755 FormatLatest, FlushEmptyCFTestWithParam,
756 testing::Values(std::make_tuple(test::kLatestFormatVersion, true),
757 std::make_tuple(test::kLatestFormatVersion, false)));
758
759 TEST_P(ColumnFamilyTest, AddDrop) {
760 Open();
761 CreateColumnFamilies({"one", "two", "three"});
762 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
763 ASSERT_EQ("NOT_FOUND", Get(2, "fodor"));
764 DropColumnFamilies({2});
765 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
766 CreateColumnFamilies({"four"});
767 ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
768 ASSERT_OK(Put(1, "fodor", "mirko"));
769 ASSERT_EQ("mirko", Get(1, "fodor"));
770 ASSERT_EQ("NOT_FOUND", Get(3, "fodor"));
771 Close();
772 ASSERT_TRUE(TryOpen({"default"}).IsInvalidArgument());
773 Open({"default", "one", "three", "four"});
774 DropColumnFamilies({1});
775 Reopen();
776 Close();
777
778 std::vector<std::string> families;
779 ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
780 std::sort(families.begin(), families.end());
781 ASSERT_TRUE(families ==
782 std::vector<std::string>({"default", "four", "three"}));
783 }
784
785 TEST_P(ColumnFamilyTest, BulkAddDrop) {
786 constexpr int kNumCF = 1000;
787 ColumnFamilyOptions cf_options;
788 WriteOptions write_options;
789 Open();
790 std::vector<std::string> cf_names;
791 std::vector<ColumnFamilyHandle*> cf_handles;
792 for (int i = 1; i <= kNumCF; i++) {
793 cf_names.push_back("cf1-" + ToString(i));
794 }
795 ASSERT_OK(db_->CreateColumnFamilies(cf_options, cf_names, &cf_handles));
796 for (int i = 1; i <= kNumCF; i++) {
797 ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
798 }
799 ASSERT_OK(db_->DropColumnFamilies(cf_handles));
800 std::vector<ColumnFamilyDescriptor> cf_descriptors;
801 for (auto* handle : cf_handles) {
802 delete handle;
803 }
804 cf_handles.clear();
805 for (int i = 1; i <= kNumCF; i++) {
806 cf_descriptors.emplace_back("cf2-" + ToString(i), ColumnFamilyOptions());
807 }
808 ASSERT_OK(db_->CreateColumnFamilies(cf_descriptors, &cf_handles));
809 for (int i = 1; i <= kNumCF; i++) {
810 ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
811 }
812 ASSERT_OK(db_->DropColumnFamilies(cf_handles));
813 for (auto* handle : cf_handles) {
814 delete handle;
815 }
816 Close();
817 std::vector<std::string> families;
818 ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
819 std::sort(families.begin(), families.end());
820 ASSERT_TRUE(families == std::vector<std::string>({"default"}));
821 }
822
823 TEST_P(ColumnFamilyTest, DropTest) {
824 // first iteration - dont reopen DB before dropping
825 // second iteration - reopen DB before dropping
826 for (int iter = 0; iter < 2; ++iter) {
827 Open({"default"});
828 CreateColumnFamiliesAndReopen({"pikachu"});
829 for (int i = 0; i < 100; ++i) {
830 ASSERT_OK(Put(1, ToString(i), "bar" + ToString(i)));
831 }
832 ASSERT_OK(Flush(1));
833
834 if (iter == 1) {
835 Reopen();
836 }
837 ASSERT_EQ("bar1", Get(1, "1"));
838
839 AssertCountLiveFiles(1);
840 DropColumnFamilies({1});
841 // make sure that all files are deleted when we drop the column family
842 AssertCountLiveFiles(0);
843 Destroy();
844 }
845 }
846
847 TEST_P(ColumnFamilyTest, WriteBatchFailure) {
848 Open();
849 CreateColumnFamiliesAndReopen({"one", "two"});
850 WriteBatch batch;
851 batch.Put(handles_[0], Slice("existing"), Slice("column-family"));
852 batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
853 ASSERT_OK(db_->Write(WriteOptions(), &batch));
854 DropColumnFamilies({1});
855 WriteOptions woptions_ignore_missing_cf;
856 woptions_ignore_missing_cf.ignore_missing_column_families = true;
857 batch.Put(handles_[0], Slice("still here"), Slice("column-family"));
858 ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch));
859 ASSERT_EQ("column-family", Get(0, "still here"));
860 Status s = db_->Write(WriteOptions(), &batch);
861 ASSERT_TRUE(s.IsInvalidArgument());
862 Close();
863 }
864
865 TEST_P(ColumnFamilyTest, ReadWrite) {
866 Open();
867 CreateColumnFamiliesAndReopen({"one", "two"});
868 ASSERT_OK(Put(0, "foo", "v1"));
869 ASSERT_OK(Put(0, "bar", "v2"));
870 ASSERT_OK(Put(1, "mirko", "v3"));
871 ASSERT_OK(Put(0, "foo", "v2"));
872 ASSERT_OK(Put(2, "fodor", "v5"));
873
874 for (int iter = 0; iter <= 3; ++iter) {
875 ASSERT_EQ("v2", Get(0, "foo"));
876 ASSERT_EQ("v2", Get(0, "bar"));
877 ASSERT_EQ("v3", Get(1, "mirko"));
878 ASSERT_EQ("v5", Get(2, "fodor"));
879 ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
880 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
881 ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
882 if (iter <= 1) {
883 Reopen();
884 }
885 }
886 Close();
887 }
888
889 TEST_P(ColumnFamilyTest, IgnoreRecoveredLog) {
890 std::string backup_logs = dbname_ + "/backup_logs";
891
892 // delete old files in backup_logs directory
893 ASSERT_OK(env_->CreateDirIfMissing(dbname_));
894 ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
895 std::vector<std::string> old_files;
896 env_->GetChildren(backup_logs, &old_files);
897 for (auto& file : old_files) {
898 if (file != "." && file != "..") {
899 env_->DeleteFile(backup_logs + "/" + file);
900 }
901 }
902
903 column_family_options_.merge_operator =
904 MergeOperators::CreateUInt64AddOperator();
905 db_options_.wal_dir = dbname_ + "/logs";
906 Destroy();
907 Open();
908 CreateColumnFamilies({"cf1", "cf2"});
909
910 // fill up the DB
911 std::string one, two, three;
912 PutFixed64(&one, 1);
913 PutFixed64(&two, 2);
914 PutFixed64(&three, 3);
915 ASSERT_OK(Merge(0, "foo", one));
916 ASSERT_OK(Merge(1, "mirko", one));
917 ASSERT_OK(Merge(0, "foo", one));
918 ASSERT_OK(Merge(2, "bla", one));
919 ASSERT_OK(Merge(2, "fodor", one));
920 ASSERT_OK(Merge(0, "bar", one));
921 ASSERT_OK(Merge(2, "bla", one));
922 ASSERT_OK(Merge(1, "mirko", two));
923 ASSERT_OK(Merge(1, "franjo", one));
924
925 // copy the logs to backup
926 std::vector<std::string> logs;
927 env_->GetChildren(db_options_.wal_dir, &logs);
928 for (auto& log : logs) {
929 if (log != ".." && log != ".") {
930 CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log);
931 }
932 }
933
934 // recover the DB
935 Close();
936
937 // 1. check consistency
938 // 2. copy the logs from backup back to WAL dir. if the recovery happens
939 // again on the same log files, this should lead to incorrect results
940 // due to applying merge operator twice
941 // 3. check consistency
942 for (int iter = 0; iter < 2; ++iter) {
943 // assert consistency
944 Open({"default", "cf1", "cf2"});
945 ASSERT_EQ(two, Get(0, "foo"));
946 ASSERT_EQ(one, Get(0, "bar"));
947 ASSERT_EQ(three, Get(1, "mirko"));
948 ASSERT_EQ(one, Get(1, "franjo"));
949 ASSERT_EQ(one, Get(2, "fodor"));
950 ASSERT_EQ(two, Get(2, "bla"));
951 Close();
952
953 if (iter == 0) {
954 // copy the logs from backup back to wal dir
955 for (auto& log : logs) {
956 if (log != ".." && log != ".") {
957 CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log);
958 }
959 }
960 }
961 }
962 }
963
964 #ifndef ROCKSDB_LITE // TEST functions used are not supported
965 TEST_P(ColumnFamilyTest, FlushTest) {
966 Open();
967 CreateColumnFamiliesAndReopen({"one", "two"});
968 ASSERT_OK(Put(0, "foo", "v1"));
969 ASSERT_OK(Put(0, "bar", "v2"));
970 ASSERT_OK(Put(1, "mirko", "v3"));
971 ASSERT_OK(Put(0, "foo", "v2"));
972 ASSERT_OK(Put(2, "fodor", "v5"));
973
974 for (int j = 0; j < 2; j++) {
975 ReadOptions ro;
976 std::vector<Iterator*> iterators;
977 // Hold super version.
978 if (j == 0) {
979 ASSERT_OK(db_->NewIterators(ro, handles_, &iterators));
980 }
981
982 for (int i = 0; i < 3; ++i) {
983 uint64_t max_total_in_memory_state =
984 MaxTotalInMemoryState();
985 Flush(i);
986 AssertMaxTotalInMemoryState(max_total_in_memory_state);
987 }
988 ASSERT_OK(Put(1, "foofoo", "bar"));
989 ASSERT_OK(Put(0, "foofoo", "bar"));
990
991 for (auto* it : iterators) {
992 delete it;
993 }
994 }
995 Reopen();
996
997 for (int iter = 0; iter <= 2; ++iter) {
998 ASSERT_EQ("v2", Get(0, "foo"));
999 ASSERT_EQ("v2", Get(0, "bar"));
1000 ASSERT_EQ("v3", Get(1, "mirko"));
1001 ASSERT_EQ("v5", Get(2, "fodor"));
1002 ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
1003 ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
1004 ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
1005 if (iter <= 1) {
1006 Reopen();
1007 }
1008 }
1009 Close();
1010 }
1011
1012 // Makes sure that obsolete log files get deleted
1013 TEST_P(ColumnFamilyTest, LogDeletionTest) {
1014 db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
1015 column_family_options_.arena_block_size = 4 * 1024;
1016 column_family_options_.write_buffer_size = 128000; // 128KB
1017 Open();
1018 CreateColumnFamilies({"one", "two", "three", "four"});
1019 // Each bracket is one log file. if number is in (), it means
1020 // we don't need it anymore (it's been flushed)
1021 // []
1022 AssertCountLiveLogFiles(0);
1023 PutRandomData(0, 1, 128);
1024 // [0]
1025 PutRandomData(1, 1, 128);
1026 // [0, 1]
1027 PutRandomData(1, 1000, 128);
1028 WaitForFlush(1);
1029 // [0, (1)] [1]
1030 AssertCountLiveLogFiles(2);
1031 PutRandomData(0, 1, 128);
1032 // [0, (1)] [0, 1]
1033 AssertCountLiveLogFiles(2);
1034 PutRandomData(2, 1, 128);
1035 // [0, (1)] [0, 1, 2]
1036 PutRandomData(2, 1000, 128);
1037 WaitForFlush(2);
1038 // [0, (1)] [0, 1, (2)] [2]
1039 AssertCountLiveLogFiles(3);
1040 PutRandomData(2, 1000, 128);
1041 WaitForFlush(2);
1042 // [0, (1)] [0, 1, (2)] [(2)] [2]
1043 AssertCountLiveLogFiles(4);
1044 PutRandomData(3, 1, 128);
1045 // [0, (1)] [0, 1, (2)] [(2)] [2, 3]
1046 PutRandomData(1, 1, 128);
1047 // [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3]
1048 AssertCountLiveLogFiles(4);
1049 PutRandomData(1, 1000, 128);
1050 WaitForFlush(1);
1051 // [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1]
1052 AssertCountLiveLogFiles(5);
1053 PutRandomData(0, 1000, 128);
1054 WaitForFlush(0);
1055 // [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0]
1056 // delete obsolete logs -->
1057 // [(1), 2, 3] [1, (0)] [0]
1058 AssertCountLiveLogFiles(3);
1059 PutRandomData(0, 1000, 128);
1060 WaitForFlush(0);
1061 // [(1), 2, 3] [1, (0)], [(0)] [0]
1062 AssertCountLiveLogFiles(4);
1063 PutRandomData(1, 1000, 128);
1064 WaitForFlush(1);
1065 // [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1]
1066 AssertCountLiveLogFiles(5);
1067 PutRandomData(2, 1000, 128);
1068 WaitForFlush(2);
1069 // [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2]
1070 AssertCountLiveLogFiles(6);
1071 PutRandomData(3, 1000, 128);
1072 WaitForFlush(3);
1073 // [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3]
1074 // delete obsolete logs -->
1075 // [0, (1)] [1, (2)], [2, (3)] [3]
1076 AssertCountLiveLogFiles(4);
1077 Close();
1078 }
1079 #endif // !ROCKSDB_LITE
1080
1081 TEST_P(ColumnFamilyTest, CrashAfterFlush) {
1082 std::unique_ptr<FaultInjectionTestEnv> fault_env(
1083 new FaultInjectionTestEnv(env_));
1084 db_options_.env = fault_env.get();
1085 Open();
1086 CreateColumnFamilies({"one"});
1087
1088 WriteBatch batch;
1089 batch.Put(handles_[0], Slice("foo"), Slice("bar"));
1090 batch.Put(handles_[1], Slice("foo"), Slice("bar"));
1091 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1092 Flush(0);
1093 fault_env->SetFilesystemActive(false);
1094
1095 std::vector<std::string> names;
1096 for (auto name : names_) {
1097 if (name != "") {
1098 names.push_back(name);
1099 }
1100 }
1101 Close();
1102 fault_env->DropUnsyncedFileData();
1103 fault_env->ResetState();
1104 Open(names, {});
1105
1106 // Write batch should be atomic.
1107 ASSERT_EQ(Get(0, "foo"), Get(1, "foo"));
1108
1109 Close();
1110 db_options_.env = env_;
1111 }
1112
1113 TEST_P(ColumnFamilyTest, OpenNonexistentColumnFamily) {
1114 ASSERT_OK(TryOpen({"default"}));
1115 Close();
1116 ASSERT_TRUE(TryOpen({"default", "dne"}).IsInvalidArgument());
1117 }
1118
1119 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported
1120 // Makes sure that obsolete log files get deleted
1121 TEST_P(ColumnFamilyTest, DifferentWriteBufferSizes) {
1122 // disable flushing stale column families
1123 db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
1124 Open();
1125 CreateColumnFamilies({"one", "two", "three"});
1126 ColumnFamilyOptions default_cf, one, two, three;
1127 // setup options. all column families have max_write_buffer_number setup to 10
1128 // "default" -> 100KB memtable, start flushing immediatelly
1129 // "one" -> 200KB memtable, start flushing with two immutable memtables
1130 // "two" -> 1MB memtable, start flushing with three immutable memtables
1131 // "three" -> 90KB memtable, start flushing with four immutable memtables
1132 default_cf.write_buffer_size = 100000;
1133 default_cf.arena_block_size = 4 * 4096;
1134 default_cf.max_write_buffer_number = 10;
1135 default_cf.min_write_buffer_number_to_merge = 1;
1136 default_cf.max_write_buffer_size_to_maintain = 0;
1137 one.write_buffer_size = 200000;
1138 one.arena_block_size = 4 * 4096;
1139 one.max_write_buffer_number = 10;
1140 one.min_write_buffer_number_to_merge = 2;
1141 one.max_write_buffer_size_to_maintain =
1142 static_cast<int>(one.write_buffer_size);
1143 two.write_buffer_size = 1000000;
1144 two.arena_block_size = 4 * 4096;
1145 two.max_write_buffer_number = 10;
1146 two.min_write_buffer_number_to_merge = 3;
1147 two.max_write_buffer_size_to_maintain =
1148 static_cast<int>(two.write_buffer_size);
1149 three.write_buffer_size = 4096 * 22;
1150 three.arena_block_size = 4096;
1151 three.max_write_buffer_number = 10;
1152 three.min_write_buffer_number_to_merge = 4;
1153 three.max_write_buffer_size_to_maintain =
1154 static_cast<int>(three.write_buffer_size);
1155
1156 Reopen({default_cf, one, two, three});
1157
1158 int micros_wait_for_flush = 10000;
1159 PutRandomData(0, 100, 1000);
1160 WaitForFlush(0);
1161 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1162 AssertCountLiveLogFiles(1);
1163 PutRandomData(1, 200, 1000);
1164 env_->SleepForMicroseconds(micros_wait_for_flush);
1165 AssertNumberOfImmutableMemtables({0, 1, 0, 0});
1166 AssertCountLiveLogFiles(2);
1167 PutRandomData(2, 1000, 1000);
1168 env_->SleepForMicroseconds(micros_wait_for_flush);
1169 AssertNumberOfImmutableMemtables({0, 1, 1, 0});
1170 AssertCountLiveLogFiles(3);
1171 PutRandomData(2, 1000, 1000);
1172 env_->SleepForMicroseconds(micros_wait_for_flush);
1173 AssertNumberOfImmutableMemtables({0, 1, 2, 0});
1174 AssertCountLiveLogFiles(4);
1175 PutRandomData(3, 93, 990);
1176 env_->SleepForMicroseconds(micros_wait_for_flush);
1177 AssertNumberOfImmutableMemtables({0, 1, 2, 1});
1178 AssertCountLiveLogFiles(5);
1179 PutRandomData(3, 88, 990);
1180 env_->SleepForMicroseconds(micros_wait_for_flush);
1181 AssertNumberOfImmutableMemtables({0, 1, 2, 2});
1182 AssertCountLiveLogFiles(6);
1183 PutRandomData(3, 88, 990);
1184 env_->SleepForMicroseconds(micros_wait_for_flush);
1185 AssertNumberOfImmutableMemtables({0, 1, 2, 3});
1186 AssertCountLiveLogFiles(7);
1187 PutRandomData(0, 100, 1000);
1188 WaitForFlush(0);
1189 AssertNumberOfImmutableMemtables({0, 1, 2, 3});
1190 AssertCountLiveLogFiles(8);
1191 PutRandomData(2, 100, 10000);
1192 WaitForFlush(2);
1193 AssertNumberOfImmutableMemtables({0, 1, 0, 3});
1194 AssertCountLiveLogFiles(9);
1195 PutRandomData(3, 88, 990);
1196 WaitForFlush(3);
1197 AssertNumberOfImmutableMemtables({0, 1, 0, 0});
1198 AssertCountLiveLogFiles(10);
1199 PutRandomData(3, 88, 990);
1200 env_->SleepForMicroseconds(micros_wait_for_flush);
1201 AssertNumberOfImmutableMemtables({0, 1, 0, 1});
1202 AssertCountLiveLogFiles(11);
1203 PutRandomData(1, 200, 1000);
1204 WaitForFlush(1);
1205 AssertNumberOfImmutableMemtables({0, 0, 0, 1});
1206 AssertCountLiveLogFiles(5);
1207 PutRandomData(3, 88 * 3, 990);
1208 WaitForFlush(3);
1209 PutRandomData(3, 88 * 4, 990);
1210 WaitForFlush(3);
1211 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1212 AssertCountLiveLogFiles(12);
1213 PutRandomData(0, 100, 1000);
1214 WaitForFlush(0);
1215 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1216 AssertCountLiveLogFiles(12);
1217 PutRandomData(2, 3 * 1000, 1000);
1218 WaitForFlush(2);
1219 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1220 AssertCountLiveLogFiles(12);
1221 PutRandomData(1, 2*200, 1000);
1222 WaitForFlush(1);
1223 AssertNumberOfImmutableMemtables({0, 0, 0, 0});
1224 AssertCountLiveLogFiles(7);
1225 Close();
1226 }
1227 #endif // !ROCKSDB_LITE
1228
1229 // The test is commented out because we want to test that snapshot is
1230 // not created for memtables not supported it, but There isn't a memtable
1231 // that doesn't support snapshot right now. If we have one later, we can
1232 // re-enable the test.
1233 //
1234 // #ifndef ROCKSDB_LITE // Cuckoo is not supported in lite
1235 // TEST_P(ColumnFamilyTest, MemtableNotSupportSnapshot) {
1236 // db_options_.allow_concurrent_memtable_write = false;
1237 // Open();
1238 // auto* s1 = dbfull()->GetSnapshot();
1239 // ASSERT_TRUE(s1 != nullptr);
1240 // dbfull()->ReleaseSnapshot(s1);
1241
1242 // // Add a column family that doesn't support snapshot
1243 // ColumnFamilyOptions first;
1244 // first.memtable_factory.reset(new DummyMemtableNotSupportingSnapshot());
1245 // CreateColumnFamilies({"first"}, {first});
1246 // auto* s2 = dbfull()->GetSnapshot();
1247 // ASSERT_TRUE(s2 == nullptr);
1248
1249 // // Add a column family that supports snapshot. Snapshot stays not
1250 // supported. ColumnFamilyOptions second; CreateColumnFamilies({"second"},
1251 // {second}); auto* s3 = dbfull()->GetSnapshot(); ASSERT_TRUE(s3 == nullptr);
1252 // Close();
1253 // }
1254 // #endif // !ROCKSDB_LITE
1255
1256 class TestComparator : public Comparator {
1257 int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/,
1258 const ROCKSDB_NAMESPACE::Slice& /*b*/) const override {
1259 return 0;
1260 }
1261 const char* Name() const override { return "Test"; }
1262 void FindShortestSeparator(
1263 std::string* /*start*/,
1264 const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {}
1265 void FindShortSuccessor(std::string* /*key*/) const override {}
1266 };
1267
1268 static TestComparator third_comparator;
1269 static TestComparator fourth_comparator;
1270
1271 // Test that we can retrieve the comparator from a created CF
1272 TEST_P(ColumnFamilyTest, GetComparator) {
1273 Open();
1274 // Add a column family with no comparator specified
1275 CreateColumnFamilies({"first"});
1276 const Comparator* comp = handles_[0]->GetComparator();
1277 ASSERT_EQ(comp, BytewiseComparator());
1278
1279 // Add three column families - one with no comparator and two
1280 // with comparators specified
1281 ColumnFamilyOptions second, third, fourth;
1282 second.comparator = &third_comparator;
1283 third.comparator = &fourth_comparator;
1284 CreateColumnFamilies({"second", "third", "fourth"}, {second, third, fourth});
1285 ASSERT_EQ(handles_[1]->GetComparator(), BytewiseComparator());
1286 ASSERT_EQ(handles_[2]->GetComparator(), &third_comparator);
1287 ASSERT_EQ(handles_[3]->GetComparator(), &fourth_comparator);
1288 Close();
1289 }
1290
1291 TEST_P(ColumnFamilyTest, DifferentMergeOperators) {
1292 Open();
1293 CreateColumnFamilies({"first", "second"});
1294 ColumnFamilyOptions default_cf, first, second;
1295 first.merge_operator = MergeOperators::CreateUInt64AddOperator();
1296 second.merge_operator = MergeOperators::CreateStringAppendOperator();
1297 Reopen({default_cf, first, second});
1298
1299 std::string one, two, three;
1300 PutFixed64(&one, 1);
1301 PutFixed64(&two, 2);
1302 PutFixed64(&three, 3);
1303
1304 ASSERT_OK(Put(0, "foo", two));
1305 ASSERT_OK(Put(0, "foo", one));
1306 ASSERT_TRUE(Merge(0, "foo", two).IsNotSupported());
1307 ASSERT_EQ(Get(0, "foo"), one);
1308
1309 ASSERT_OK(Put(1, "foo", two));
1310 ASSERT_OK(Put(1, "foo", one));
1311 ASSERT_OK(Merge(1, "foo", two));
1312 ASSERT_EQ(Get(1, "foo"), three);
1313
1314 ASSERT_OK(Put(2, "foo", two));
1315 ASSERT_OK(Put(2, "foo", one));
1316 ASSERT_OK(Merge(2, "foo", two));
1317 ASSERT_EQ(Get(2, "foo"), one + "," + two);
1318 Close();
1319 }
1320
1321 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported
1322 TEST_P(ColumnFamilyTest, DifferentCompactionStyles) {
1323 Open();
1324 CreateColumnFamilies({"one", "two"});
1325 ColumnFamilyOptions default_cf, one, two;
1326 db_options_.max_open_files = 20; // only 10 files in file cache
1327
1328 default_cf.compaction_style = kCompactionStyleLevel;
1329 default_cf.num_levels = 3;
1330 default_cf.write_buffer_size = 64 << 10; // 64KB
1331 default_cf.target_file_size_base = 30 << 10;
1332 default_cf.max_compaction_bytes = static_cast<uint64_t>(1) << 60;
1333
1334 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1335 table_options.no_block_cache = true;
1336 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1337
1338 one.compaction_style = kCompactionStyleUniversal;
1339
1340 one.num_levels = 1;
1341 // trigger compaction if there are >= 4 files
1342 one.level0_file_num_compaction_trigger = 4;
1343 one.write_buffer_size = 120000;
1344
1345 two.compaction_style = kCompactionStyleLevel;
1346 two.num_levels = 4;
1347 two.level0_file_num_compaction_trigger = 3;
1348 two.write_buffer_size = 100000;
1349
1350 Reopen({default_cf, one, two});
1351
1352 // SETUP column family "one" -- universal style
1353 for (int i = 0; i < one.level0_file_num_compaction_trigger - 1; ++i) {
1354 PutRandomData(1, 10, 12000);
1355 PutRandomData(1, 1, 10);
1356 WaitForFlush(1);
1357 AssertFilesPerLevel(ToString(i + 1), 1);
1358 }
1359
1360 // SETUP column family "two" -- level style with 4 levels
1361 for (int i = 0; i < two.level0_file_num_compaction_trigger - 1; ++i) {
1362 PutRandomData(2, 10, 12000);
1363 PutRandomData(2, 1, 10);
1364 WaitForFlush(2);
1365 AssertFilesPerLevel(ToString(i + 1), 2);
1366 }
1367
1368 // TRIGGER compaction "one"
1369 PutRandomData(1, 10, 12000);
1370 PutRandomData(1, 1, 10);
1371
1372 // TRIGGER compaction "two"
1373 PutRandomData(2, 10, 12000);
1374 PutRandomData(2, 1, 10);
1375
1376 // WAIT for compactions
1377 WaitForCompaction();
1378
1379 // VERIFY compaction "one"
1380 AssertFilesPerLevel("1", 1);
1381
1382 // VERIFY compaction "two"
1383 AssertFilesPerLevel("0,1", 2);
1384 CompactAll(2);
1385 AssertFilesPerLevel("0,1", 2);
1386
1387 Close();
1388 }
1389 #endif // !ROCKSDB_LITE
1390
1391 #ifndef ROCKSDB_LITE
1392 // Sync points not supported in RocksDB Lite
1393
1394 TEST_P(ColumnFamilyTest, MultipleManualCompactions) {
1395 Open();
1396 CreateColumnFamilies({"one", "two"});
1397 ColumnFamilyOptions default_cf, one, two;
1398 db_options_.max_open_files = 20; // only 10 files in file cache
1399 db_options_.max_background_compactions = 3;
1400
1401 default_cf.compaction_style = kCompactionStyleLevel;
1402 default_cf.num_levels = 3;
1403 default_cf.write_buffer_size = 64 << 10; // 64KB
1404 default_cf.target_file_size_base = 30 << 10;
1405 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1406 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1407 table_options.no_block_cache = true;
1408 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1409
1410 one.compaction_style = kCompactionStyleUniversal;
1411
1412 one.num_levels = 1;
1413 // trigger compaction if there are >= 4 files
1414 one.level0_file_num_compaction_trigger = 4;
1415 one.write_buffer_size = 120000;
1416
1417 two.compaction_style = kCompactionStyleLevel;
1418 two.num_levels = 4;
1419 two.level0_file_num_compaction_trigger = 3;
1420 two.write_buffer_size = 100000;
1421
1422 Reopen({default_cf, one, two});
1423
1424 // SETUP column family "one" -- universal style
1425 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1426 PutRandomData(1, 10, 12000, true);
1427 PutRandomData(1, 1, 10, true);
1428 WaitForFlush(1);
1429 AssertFilesPerLevel(ToString(i + 1), 1);
1430 }
1431 bool cf_1_1 = true;
1432 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1433 {{"ColumnFamilyTest::MultiManual:4", "ColumnFamilyTest::MultiManual:1"},
1434 {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:5"},
1435 {"ColumnFamilyTest::MultiManual:2", "ColumnFamilyTest::MultiManual:3"}});
1436 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1437 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1438 if (cf_1_1) {
1439 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:4");
1440 cf_1_1 = false;
1441 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:3");
1442 }
1443 });
1444
1445 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1446 std::vector<port::Thread> threads;
1447 threads.emplace_back([&] {
1448 CompactRangeOptions compact_options;
1449 compact_options.exclusive_manual_compaction = false;
1450 ASSERT_OK(
1451 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1452 });
1453
1454 // SETUP column family "two" -- level style with 4 levels
1455 for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
1456 PutRandomData(2, 10, 12000);
1457 PutRandomData(2, 1, 10);
1458 WaitForFlush(2);
1459 AssertFilesPerLevel(ToString(i + 1), 2);
1460 }
1461 threads.emplace_back([&] {
1462 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:1");
1463 CompactRangeOptions compact_options;
1464 compact_options.exclusive_manual_compaction = false;
1465 ASSERT_OK(
1466 db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
1467 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:2");
1468 });
1469
1470 TEST_SYNC_POINT("ColumnFamilyTest::MultiManual:5");
1471 for (auto& t : threads) {
1472 t.join();
1473 }
1474
1475 // VERIFY compaction "one"
1476 AssertFilesPerLevel("1", 1);
1477
1478 // VERIFY compaction "two"
1479 AssertFilesPerLevel("0,1", 2);
1480 CompactAll(2);
1481 AssertFilesPerLevel("0,1", 2);
1482 // Compare against saved keys
1483 std::set<std::string>::iterator key_iter = keys_[1].begin();
1484 while (key_iter != keys_[1].end()) {
1485 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1486 key_iter++;
1487 }
1488 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1489 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1490 Close();
1491 }
1492
1493 TEST_P(ColumnFamilyTest, AutomaticAndManualCompactions) {
1494 Open();
1495 CreateColumnFamilies({"one", "two"});
1496 ColumnFamilyOptions default_cf, one, two;
1497 db_options_.max_open_files = 20; // only 10 files in file cache
1498 db_options_.max_background_compactions = 3;
1499
1500 default_cf.compaction_style = kCompactionStyleLevel;
1501 default_cf.num_levels = 3;
1502 default_cf.write_buffer_size = 64 << 10; // 64KB
1503 default_cf.target_file_size_base = 30 << 10;
1504 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1505 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1506 ;
1507 table_options.no_block_cache = true;
1508 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1509
1510 one.compaction_style = kCompactionStyleUniversal;
1511
1512 one.num_levels = 1;
1513 // trigger compaction if there are >= 4 files
1514 one.level0_file_num_compaction_trigger = 4;
1515 one.write_buffer_size = 120000;
1516
1517 two.compaction_style = kCompactionStyleLevel;
1518 two.num_levels = 4;
1519 two.level0_file_num_compaction_trigger = 3;
1520 two.write_buffer_size = 100000;
1521
1522 Reopen({default_cf, one, two});
1523 // make sure all background compaction jobs can be scheduled
1524 auto stop_token =
1525 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1526
1527 bool cf_1_1 = true;
1528 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1529 {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:1"},
1530 {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:5"},
1531 {"ColumnFamilyTest::AutoManual:2", "ColumnFamilyTest::AutoManual:3"}});
1532 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1533 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1534 if (cf_1_1) {
1535 cf_1_1 = false;
1536 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
1537 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
1538 }
1539 });
1540 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1541 // SETUP column family "one" -- universal style
1542 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1543 PutRandomData(1, 10, 12000, true);
1544 PutRandomData(1, 1, 10, true);
1545 WaitForFlush(1);
1546 AssertFilesPerLevel(ToString(i + 1), 1);
1547 }
1548
1549 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
1550
1551 // SETUP column family "two" -- level style with 4 levels
1552 for (int i = 0; i < two.level0_file_num_compaction_trigger - 2; ++i) {
1553 PutRandomData(2, 10, 12000);
1554 PutRandomData(2, 1, 10);
1555 WaitForFlush(2);
1556 AssertFilesPerLevel(ToString(i + 1), 2);
1557 }
1558 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1559 CompactRangeOptions compact_options;
1560 compact_options.exclusive_manual_compaction = false;
1561 ASSERT_OK(
1562 db_->CompactRange(compact_options, handles_[2], nullptr, nullptr));
1563 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
1564 });
1565
1566 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
1567 threads.join();
1568
1569 // WAIT for compactions
1570 WaitForCompaction();
1571
1572 // VERIFY compaction "one"
1573 AssertFilesPerLevel("1", 1);
1574
1575 // VERIFY compaction "two"
1576 AssertFilesPerLevel("0,1", 2);
1577 CompactAll(2);
1578 AssertFilesPerLevel("0,1", 2);
1579 // Compare against saved keys
1580 std::set<std::string>::iterator key_iter = keys_[1].begin();
1581 while (key_iter != keys_[1].end()) {
1582 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1583 key_iter++;
1584 }
1585 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1586 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1587 }
1588
1589 TEST_P(ColumnFamilyTest, ManualAndAutomaticCompactions) {
1590 Open();
1591 CreateColumnFamilies({"one", "two"});
1592 ColumnFamilyOptions default_cf, one, two;
1593 db_options_.max_open_files = 20; // only 10 files in file cache
1594 db_options_.max_background_compactions = 3;
1595
1596 default_cf.compaction_style = kCompactionStyleLevel;
1597 default_cf.num_levels = 3;
1598 default_cf.write_buffer_size = 64 << 10; // 64KB
1599 default_cf.target_file_size_base = 30 << 10;
1600 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1601 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1602 ;
1603 table_options.no_block_cache = true;
1604 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1605
1606 one.compaction_style = kCompactionStyleUniversal;
1607
1608 one.num_levels = 1;
1609 // trigger compaction if there are >= 4 files
1610 one.level0_file_num_compaction_trigger = 4;
1611 one.write_buffer_size = 120000;
1612
1613 two.compaction_style = kCompactionStyleLevel;
1614 two.num_levels = 4;
1615 two.level0_file_num_compaction_trigger = 3;
1616 two.write_buffer_size = 100000;
1617
1618 Reopen({default_cf, one, two});
1619 // make sure all background compaction jobs can be scheduled
1620 auto stop_token =
1621 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1622
1623 // SETUP column family "one" -- universal style
1624 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1625 PutRandomData(1, 10, 12000, true);
1626 PutRandomData(1, 1, 10, true);
1627 WaitForFlush(1);
1628 AssertFilesPerLevel(ToString(i + 1), 1);
1629 }
1630 bool cf_1_1 = true;
1631 bool cf_1_2 = true;
1632 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1633 {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:1"},
1634 {"ColumnFamilyTest::ManualAuto:5", "ColumnFamilyTest::ManualAuto:2"},
1635 {"ColumnFamilyTest::ManualAuto:2", "ColumnFamilyTest::ManualAuto:3"}});
1636 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1637 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1638 if (cf_1_1) {
1639 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1640 cf_1_1 = false;
1641 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1642 } else if (cf_1_2) {
1643 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1644 cf_1_2 = false;
1645 }
1646 });
1647
1648 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1649 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1650 CompactRangeOptions compact_options;
1651 compact_options.exclusive_manual_compaction = false;
1652 ASSERT_OK(
1653 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1654 });
1655
1656 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1657
1658 // SETUP column family "two" -- level style with 4 levels
1659 for (int i = 0; i < two.level0_file_num_compaction_trigger; ++i) {
1660 PutRandomData(2, 10, 12000);
1661 PutRandomData(2, 1, 10);
1662 WaitForFlush(2);
1663 AssertFilesPerLevel(ToString(i + 1), 2);
1664 }
1665 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1666 threads.join();
1667
1668 // WAIT for compactions
1669 WaitForCompaction();
1670
1671 // VERIFY compaction "one"
1672 AssertFilesPerLevel("1", 1);
1673
1674 // VERIFY compaction "two"
1675 AssertFilesPerLevel("0,1", 2);
1676 CompactAll(2);
1677 AssertFilesPerLevel("0,1", 2);
1678 // Compare against saved keys
1679 std::set<std::string>::iterator key_iter = keys_[1].begin();
1680 while (key_iter != keys_[1].end()) {
1681 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1682 key_iter++;
1683 }
1684 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1685 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1686 }
1687
1688 TEST_P(ColumnFamilyTest, SameCFManualManualCompactions) {
1689 Open();
1690 CreateColumnFamilies({"one"});
1691 ColumnFamilyOptions default_cf, one;
1692 db_options_.max_open_files = 20; // only 10 files in file cache
1693 db_options_.max_background_compactions = 3;
1694
1695 default_cf.compaction_style = kCompactionStyleLevel;
1696 default_cf.num_levels = 3;
1697 default_cf.write_buffer_size = 64 << 10; // 64KB
1698 default_cf.target_file_size_base = 30 << 10;
1699 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1700 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1701 ;
1702 table_options.no_block_cache = true;
1703 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1704
1705 one.compaction_style = kCompactionStyleUniversal;
1706
1707 one.num_levels = 1;
1708 // trigger compaction if there are >= 4 files
1709 one.level0_file_num_compaction_trigger = 4;
1710 one.write_buffer_size = 120000;
1711
1712 Reopen({default_cf, one});
1713 // make sure all background compaction jobs can be scheduled
1714 auto stop_token =
1715 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1716
1717 // SETUP column family "one" -- universal style
1718 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1719 PutRandomData(1, 10, 12000, true);
1720 PutRandomData(1, 1, 10, true);
1721 WaitForFlush(1);
1722 AssertFilesPerLevel(ToString(i + 1), 1);
1723 }
1724 bool cf_1_1 = true;
1725 bool cf_1_2 = true;
1726 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1727 {{"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:2"},
1728 {"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:5"},
1729 {"ColumnFamilyTest::ManualManual:1", "ColumnFamilyTest::ManualManual:2"},
1730 {"ColumnFamilyTest::ManualManual:1",
1731 "ColumnFamilyTest::ManualManual:3"}});
1732 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1733 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1734 if (cf_1_1) {
1735 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:4");
1736 cf_1_1 = false;
1737 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:3");
1738 } else if (cf_1_2) {
1739 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:2");
1740 cf_1_2 = false;
1741 }
1742 });
1743
1744 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1745 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1746 CompactRangeOptions compact_options;
1747 compact_options.exclusive_manual_compaction = true;
1748 ASSERT_OK(
1749 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1750 });
1751
1752 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:5");
1753
1754 WaitForFlush(1);
1755
1756 // Add more L0 files and force another manual compaction
1757 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1758 PutRandomData(1, 10, 12000, true);
1759 PutRandomData(1, 1, 10, true);
1760 WaitForFlush(1);
1761 AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1762 1);
1763 }
1764
1765 ROCKSDB_NAMESPACE::port::Thread threads1([&] {
1766 CompactRangeOptions compact_options;
1767 compact_options.exclusive_manual_compaction = false;
1768 ASSERT_OK(
1769 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1770 });
1771
1772 TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:1");
1773
1774 threads.join();
1775 threads1.join();
1776 WaitForCompaction();
1777 // VERIFY compaction "one"
1778 ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
1779
1780 // Compare against saved keys
1781 std::set<std::string>::iterator key_iter = keys_[1].begin();
1782 while (key_iter != keys_[1].end()) {
1783 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1784 key_iter++;
1785 }
1786 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1787 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1788 }
1789
1790 TEST_P(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
1791 Open();
1792 CreateColumnFamilies({"one"});
1793 ColumnFamilyOptions default_cf, one;
1794 db_options_.max_open_files = 20; // only 10 files in file cache
1795 db_options_.max_background_compactions = 3;
1796
1797 default_cf.compaction_style = kCompactionStyleLevel;
1798 default_cf.num_levels = 3;
1799 default_cf.write_buffer_size = 64 << 10; // 64KB
1800 default_cf.target_file_size_base = 30 << 10;
1801 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1802 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1803 ;
1804 table_options.no_block_cache = true;
1805 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1806
1807 one.compaction_style = kCompactionStyleUniversal;
1808
1809 one.num_levels = 1;
1810 // trigger compaction if there are >= 4 files
1811 one.level0_file_num_compaction_trigger = 4;
1812 one.write_buffer_size = 120000;
1813
1814 Reopen({default_cf, one});
1815 // make sure all background compaction jobs can be scheduled
1816 auto stop_token =
1817 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1818
1819 // SETUP column family "one" -- universal style
1820 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1821 PutRandomData(1, 10, 12000, true);
1822 PutRandomData(1, 1, 10, true);
1823 WaitForFlush(1);
1824 AssertFilesPerLevel(ToString(i + 1), 1);
1825 }
1826 bool cf_1_1 = true;
1827 bool cf_1_2 = true;
1828 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1829 {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
1830 {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
1831 {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:2"},
1832 {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
1833 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1834 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1835 if (cf_1_1) {
1836 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1837 cf_1_1 = false;
1838 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1839 } else if (cf_1_2) {
1840 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1841 cf_1_2 = false;
1842 }
1843 });
1844
1845 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1846 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1847 CompactRangeOptions compact_options;
1848 compact_options.exclusive_manual_compaction = false;
1849 ASSERT_OK(
1850 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1851 });
1852
1853 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1854
1855 WaitForFlush(1);
1856
1857 // Add more L0 files and force automatic compaction
1858 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1859 PutRandomData(1, 10, 12000, true);
1860 PutRandomData(1, 1, 10, true);
1861 WaitForFlush(1);
1862 AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1863 1);
1864 }
1865
1866 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1867
1868 threads.join();
1869 WaitForCompaction();
1870 // VERIFY compaction "one"
1871 ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
1872
1873 // Compare against saved keys
1874 std::set<std::string>::iterator key_iter = keys_[1].begin();
1875 while (key_iter != keys_[1].end()) {
1876 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1877 key_iter++;
1878 }
1879 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1880 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1881 }
1882
1883 TEST_P(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
1884 Open();
1885 CreateColumnFamilies({"one"});
1886 ColumnFamilyOptions default_cf, one;
1887 db_options_.max_open_files = 20; // only 10 files in file cache
1888 db_options_.max_background_compactions = 3;
1889
1890 default_cf.compaction_style = kCompactionStyleLevel;
1891 default_cf.num_levels = 3;
1892 default_cf.write_buffer_size = 64 << 10; // 64KB
1893 default_cf.target_file_size_base = 30 << 10;
1894 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1895 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1896 ;
1897 table_options.no_block_cache = true;
1898 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1899
1900 one.compaction_style = kCompactionStyleLevel;
1901
1902 one.num_levels = 1;
1903 // trigger compaction if there are >= 4 files
1904 one.level0_file_num_compaction_trigger = 3;
1905 one.write_buffer_size = 120000;
1906
1907 Reopen({default_cf, one});
1908 // make sure all background compaction jobs can be scheduled
1909 auto stop_token =
1910 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1911
1912 // SETUP column family "one" -- level style
1913 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
1914 PutRandomData(1, 10, 12000, true);
1915 PutRandomData(1, 1, 10, true);
1916 WaitForFlush(1);
1917 AssertFilesPerLevel(ToString(i + 1), 1);
1918 }
1919 bool cf_1_1 = true;
1920 bool cf_1_2 = true;
1921 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1922 {{"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:2"},
1923 {"ColumnFamilyTest::ManualAuto:4", "ColumnFamilyTest::ManualAuto:5"},
1924 {"ColumnFamilyTest::ManualAuto:3", "ColumnFamilyTest::ManualAuto:2"},
1925 {"LevelCompactionPicker::PickCompactionBySize:0",
1926 "ColumnFamilyTest::ManualAuto:3"},
1927 {"ColumnFamilyTest::ManualAuto:1", "ColumnFamilyTest::ManualAuto:3"}});
1928 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1929 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1930 if (cf_1_1) {
1931 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:4");
1932 cf_1_1 = false;
1933 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:3");
1934 } else if (cf_1_2) {
1935 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:2");
1936 cf_1_2 = false;
1937 }
1938 });
1939
1940 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1941 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1942 CompactRangeOptions compact_options;
1943 compact_options.exclusive_manual_compaction = false;
1944 ASSERT_OK(
1945 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
1946 });
1947
1948 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:5");
1949
1950 // Add more L0 files and force automatic compaction
1951 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
1952 PutRandomData(1, 10, 12000, true);
1953 PutRandomData(1, 1, 10, true);
1954 WaitForFlush(1);
1955 AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1956 1);
1957 }
1958
1959 TEST_SYNC_POINT("ColumnFamilyTest::ManualAuto:1");
1960
1961 threads.join();
1962 WaitForCompaction();
1963 // VERIFY compaction "one"
1964 AssertFilesPerLevel("0,1", 1);
1965
1966 // Compare against saved keys
1967 std::set<std::string>::iterator key_iter = keys_[1].begin();
1968 while (key_iter != keys_[1].end()) {
1969 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
1970 key_iter++;
1971 }
1972 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1973 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1974 }
1975
1976 // In this test, we generate enough files to trigger automatic compactions.
1977 // The automatic compaction waits in NonTrivial:AfterRun
1978 // We generate more files and then trigger an automatic compaction
1979 // This will wait because the automatic compaction has files it needs.
1980 // Once the conflict is hit, the automatic compaction starts and ends
1981 // Then the manual will run and end.
1982 TEST_P(ColumnFamilyTest, SameCFAutomaticManualCompactions) {
1983 Open();
1984 CreateColumnFamilies({"one"});
1985 ColumnFamilyOptions default_cf, one;
1986 db_options_.max_open_files = 20; // only 10 files in file cache
1987 db_options_.max_background_compactions = 3;
1988
1989 default_cf.compaction_style = kCompactionStyleLevel;
1990 default_cf.num_levels = 3;
1991 default_cf.write_buffer_size = 64 << 10; // 64KB
1992 default_cf.target_file_size_base = 30 << 10;
1993 default_cf.max_compaction_bytes = default_cf.target_file_size_base * 1100;
1994 BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
1995 ;
1996 table_options.no_block_cache = true;
1997 default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
1998
1999 one.compaction_style = kCompactionStyleUniversal;
2000
2001 one.num_levels = 1;
2002 // trigger compaction if there are >= 4 files
2003 one.level0_file_num_compaction_trigger = 4;
2004 one.write_buffer_size = 120000;
2005
2006 Reopen({default_cf, one});
2007 // make sure all background compaction jobs can be scheduled
2008 auto stop_token =
2009 dbfull()->TEST_write_controler().GetCompactionPressureToken();
2010
2011 bool cf_1_1 = true;
2012 bool cf_1_2 = true;
2013 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2014 {{"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:2"},
2015 {"ColumnFamilyTest::AutoManual:4", "ColumnFamilyTest::AutoManual:5"},
2016 {"CompactionPicker::CompactRange:Conflict",
2017 "ColumnFamilyTest::AutoManual:3"}});
2018 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2019 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
2020 if (cf_1_1) {
2021 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:4");
2022 cf_1_1 = false;
2023 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:3");
2024 } else if (cf_1_2) {
2025 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:2");
2026 cf_1_2 = false;
2027 }
2028 });
2029
2030 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2031
2032 // SETUP column family "one" -- universal style
2033 for (int i = 0; i < one.level0_file_num_compaction_trigger; ++i) {
2034 PutRandomData(1, 10, 12000, true);
2035 PutRandomData(1, 1, 10, true);
2036 WaitForFlush(1);
2037 AssertFilesPerLevel(ToString(i + 1), 1);
2038 }
2039
2040 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:5");
2041
2042 // Add another L0 file and force automatic compaction
2043 for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
2044 PutRandomData(1, 10, 12000, true);
2045 PutRandomData(1, 1, 10, true);
2046 WaitForFlush(1);
2047 }
2048
2049 CompactRangeOptions compact_options;
2050 compact_options.exclusive_manual_compaction = false;
2051 ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
2052
2053 TEST_SYNC_POINT("ColumnFamilyTest::AutoManual:1");
2054
2055 WaitForCompaction();
2056 // VERIFY compaction "one"
2057 AssertFilesPerLevel("1", 1);
2058 // Compare against saved keys
2059 std::set<std::string>::iterator key_iter = keys_[1].begin();
2060 while (key_iter != keys_[1].end()) {
2061 ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
2062 key_iter++;
2063 }
2064 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2065 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
2066 }
2067 #endif // !ROCKSDB_LITE
2068
2069 #ifndef ROCKSDB_LITE // Tailing iterator not supported
2070 namespace {
2071 std::string IterStatus(Iterator* iter) {
2072 std::string result;
2073 if (iter->Valid()) {
2074 result = iter->key().ToString() + "->" + iter->value().ToString();
2075 } else {
2076 result = "(invalid)";
2077 }
2078 return result;
2079 }
2080 } // anonymous namespace
2081
2082 TEST_P(ColumnFamilyTest, NewIteratorsTest) {
2083 // iter == 0 -- no tailing
2084 // iter == 2 -- tailing
2085 for (int iter = 0; iter < 2; ++iter) {
2086 Open();
2087 CreateColumnFamiliesAndReopen({"one", "two"});
2088 ASSERT_OK(Put(0, "a", "b"));
2089 ASSERT_OK(Put(1, "b", "a"));
2090 ASSERT_OK(Put(2, "c", "m"));
2091 ASSERT_OK(Put(2, "v", "t"));
2092 std::vector<Iterator*> iterators;
2093 ReadOptions options;
2094 options.tailing = (iter == 1);
2095 ASSERT_OK(db_->NewIterators(options, handles_, &iterators));
2096
2097 for (auto it : iterators) {
2098 it->SeekToFirst();
2099 }
2100 ASSERT_EQ(IterStatus(iterators[0]), "a->b");
2101 ASSERT_EQ(IterStatus(iterators[1]), "b->a");
2102 ASSERT_EQ(IterStatus(iterators[2]), "c->m");
2103
2104 ASSERT_OK(Put(1, "x", "x"));
2105
2106 for (auto it : iterators) {
2107 it->Next();
2108 }
2109
2110 ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
2111 if (iter == 0) {
2112 // no tailing
2113 ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
2114 } else {
2115 // tailing
2116 ASSERT_EQ(IterStatus(iterators[1]), "x->x");
2117 }
2118 ASSERT_EQ(IterStatus(iterators[2]), "v->t");
2119
2120 for (auto it : iterators) {
2121 delete it;
2122 }
2123 Destroy();
2124 }
2125 }
2126 #endif // !ROCKSDB_LITE
2127
2128 #ifndef ROCKSDB_LITE // ReadOnlyDB is not supported
2129 TEST_P(ColumnFamilyTest, ReadOnlyDBTest) {
2130 Open();
2131 CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
2132 ASSERT_OK(Put(0, "a", "b"));
2133 ASSERT_OK(Put(1, "foo", "bla"));
2134 ASSERT_OK(Put(2, "foo", "blabla"));
2135 ASSERT_OK(Put(3, "foo", "blablabla"));
2136 ASSERT_OK(Put(4, "foo", "blablablabla"));
2137
2138 DropColumnFamilies({2});
2139 Close();
2140 // open only a subset of column families
2141 AssertOpenReadOnly({"default", "one", "four"});
2142 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
2143 ASSERT_EQ("bla", Get(1, "foo"));
2144 ASSERT_EQ("blablablabla", Get(2, "foo"));
2145
2146
2147 // test newiterators
2148 {
2149 std::vector<Iterator*> iterators;
2150 ASSERT_OK(db_->NewIterators(ReadOptions(), handles_, &iterators));
2151 for (auto it : iterators) {
2152 it->SeekToFirst();
2153 }
2154 ASSERT_EQ(IterStatus(iterators[0]), "a->b");
2155 ASSERT_EQ(IterStatus(iterators[1]), "foo->bla");
2156 ASSERT_EQ(IterStatus(iterators[2]), "foo->blablablabla");
2157 for (auto it : iterators) {
2158 it->Next();
2159 }
2160 ASSERT_EQ(IterStatus(iterators[0]), "(invalid)");
2161 ASSERT_EQ(IterStatus(iterators[1]), "(invalid)");
2162 ASSERT_EQ(IterStatus(iterators[2]), "(invalid)");
2163
2164 for (auto it : iterators) {
2165 delete it;
2166 }
2167 }
2168
2169 Close();
2170 // can't open dropped column family
2171 Status s = OpenReadOnly({"default", "one", "two"});
2172 ASSERT_TRUE(!s.ok());
2173
2174 // Can't open without specifying default column family
2175 s = OpenReadOnly({"one", "four"});
2176 ASSERT_TRUE(!s.ok());
2177 }
2178 #endif // !ROCKSDB_LITE
2179
2180 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported in lite
2181 TEST_P(ColumnFamilyTest, DontRollEmptyLogs) {
2182 Open();
2183 CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
2184
2185 for (size_t i = 0; i < handles_.size(); ++i) {
2186 PutRandomData(static_cast<int>(i), 10, 100);
2187 }
2188 int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls();
2189 // this will trigger the flushes
2190 for (int i = 0; i <= 4; ++i) {
2191 ASSERT_OK(Flush(i));
2192 }
2193
2194 for (int i = 0; i < 4; ++i) {
2195 WaitForFlush(i);
2196 }
2197 int total_new_writable_files =
2198 env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start;
2199 ASSERT_EQ(static_cast<size_t>(total_new_writable_files), handles_.size() + 1);
2200 Close();
2201 }
2202 #endif // !ROCKSDB_LITE
2203
2204 #ifndef ROCKSDB_LITE // WaitForCompaction() is not supported in lite
2205 TEST_P(ColumnFamilyTest, FlushStaleColumnFamilies) {
2206 Open();
2207 CreateColumnFamilies({"one", "two"});
2208 ColumnFamilyOptions default_cf, one, two;
2209 default_cf.write_buffer_size = 100000; // small write buffer size
2210 default_cf.arena_block_size = 4096;
2211 default_cf.disable_auto_compactions = true;
2212 one.disable_auto_compactions = true;
2213 two.disable_auto_compactions = true;
2214 db_options_.max_total_wal_size = 210000;
2215
2216 Reopen({default_cf, one, two});
2217
2218 PutRandomData(2, 1, 10); // 10 bytes
2219 for (int i = 0; i < 2; ++i) {
2220 PutRandomData(0, 100, 1000); // flush
2221 WaitForFlush(0);
2222
2223 AssertCountLiveFiles(i + 1);
2224 }
2225 // third flush. now, CF [two] should be detected as stale and flushed
2226 // column family 1 should not be flushed since it's empty
2227 PutRandomData(0, 100, 1000); // flush
2228 WaitForFlush(0);
2229 WaitForFlush(2);
2230 // 3 files for default column families, 1 file for column family [two], zero
2231 // files for column family [one], because it's empty
2232 AssertCountLiveFiles(4);
2233
2234 Flush(0);
2235 ASSERT_EQ(0, dbfull()->TEST_total_log_size());
2236 Close();
2237 }
2238 #endif // !ROCKSDB_LITE
2239
2240 TEST_P(ColumnFamilyTest, CreateMissingColumnFamilies) {
2241 Status s = TryOpen({"one", "two"});
2242 ASSERT_TRUE(!s.ok());
2243 db_options_.create_missing_column_families = true;
2244 s = TryOpen({"default", "one", "two"});
2245 ASSERT_TRUE(s.ok());
2246 Close();
2247 }
2248
2249 TEST_P(ColumnFamilyTest, SanitizeOptions) {
2250 DBOptions db_options;
2251 for (int s = kCompactionStyleLevel; s <= kCompactionStyleUniversal; ++s) {
2252 for (int l = 0; l <= 2; l++) {
2253 for (int i = 1; i <= 3; i++) {
2254 for (int j = 1; j <= 3; j++) {
2255 for (int k = 1; k <= 3; k++) {
2256 ColumnFamilyOptions original;
2257 original.compaction_style = static_cast<CompactionStyle>(s);
2258 original.num_levels = l;
2259 original.level0_stop_writes_trigger = i;
2260 original.level0_slowdown_writes_trigger = j;
2261 original.level0_file_num_compaction_trigger = k;
2262 original.write_buffer_size =
2263 l * 4 * 1024 * 1024 + i * 1024 * 1024 + j * 1024 + k;
2264
2265 ColumnFamilyOptions result =
2266 SanitizeOptions(ImmutableDBOptions(db_options), original);
2267 ASSERT_TRUE(result.level0_stop_writes_trigger >=
2268 result.level0_slowdown_writes_trigger);
2269 ASSERT_TRUE(result.level0_slowdown_writes_trigger >=
2270 result.level0_file_num_compaction_trigger);
2271 ASSERT_TRUE(result.level0_file_num_compaction_trigger ==
2272 original.level0_file_num_compaction_trigger);
2273 if (s == kCompactionStyleLevel) {
2274 ASSERT_GE(result.num_levels, 2);
2275 } else {
2276 ASSERT_GE(result.num_levels, 1);
2277 if (original.num_levels >= 1) {
2278 ASSERT_EQ(result.num_levels, original.num_levels);
2279 }
2280 }
2281
2282 // Make sure Sanitize options sets arena_block_size to 1/8 of
2283 // the write_buffer_size, rounded up to a multiple of 4k.
2284 size_t expected_arena_block_size =
2285 l * 4 * 1024 * 1024 / 8 + i * 1024 * 1024 / 8;
2286 if (j + k != 0) {
2287 // not a multiple of 4k, round up 4k
2288 expected_arena_block_size += 4 * 1024;
2289 }
2290 ASSERT_EQ(expected_arena_block_size, result.arena_block_size);
2291 }
2292 }
2293 }
2294 }
2295 }
2296 }
2297
2298 TEST_P(ColumnFamilyTest, ReadDroppedColumnFamily) {
2299 // iter 0 -- drop CF, don't reopen
2300 // iter 1 -- delete CF, reopen
2301 for (int iter = 0; iter < 2; ++iter) {
2302 db_options_.create_missing_column_families = true;
2303 db_options_.max_open_files = 20;
2304 // delete obsolete files always
2305 db_options_.delete_obsolete_files_period_micros = 0;
2306 Open({"default", "one", "two"});
2307 ColumnFamilyOptions options;
2308 options.level0_file_num_compaction_trigger = 100;
2309 options.level0_slowdown_writes_trigger = 200;
2310 options.level0_stop_writes_trigger = 200;
2311 options.write_buffer_size = 100000; // small write buffer size
2312 Reopen({options, options, options});
2313
2314 // 1MB should create ~10 files for each CF
2315 int kKeysNum = 10000;
2316 PutRandomData(0, kKeysNum, 100);
2317 PutRandomData(1, kKeysNum, 100);
2318 PutRandomData(2, kKeysNum, 100);
2319
2320 {
2321 std::unique_ptr<Iterator> iterator(
2322 db_->NewIterator(ReadOptions(), handles_[2]));
2323 iterator->SeekToFirst();
2324
2325 if (iter == 0) {
2326 // Drop CF two
2327 ASSERT_OK(db_->DropColumnFamily(handles_[2]));
2328 } else {
2329 // delete CF two
2330 db_->DestroyColumnFamilyHandle(handles_[2]);
2331 handles_[2] = nullptr;
2332 }
2333 // Make sure iterator created can still be used.
2334 int count = 0;
2335 for (; iterator->Valid(); iterator->Next()) {
2336 ASSERT_OK(iterator->status());
2337 ++count;
2338 }
2339 ASSERT_OK(iterator->status());
2340 ASSERT_EQ(count, kKeysNum);
2341 }
2342
2343 // Add bunch more data to other CFs
2344 PutRandomData(0, kKeysNum, 100);
2345 PutRandomData(1, kKeysNum, 100);
2346
2347 if (iter == 1) {
2348 Reopen();
2349 }
2350
2351 // Since we didn't delete CF handle, RocksDB's contract guarantees that
2352 // we're still able to read dropped CF
2353 for (int i = 0; i < 3; ++i) {
2354 std::unique_ptr<Iterator> iterator(
2355 db_->NewIterator(ReadOptions(), handles_[i]));
2356 int count = 0;
2357 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2358 ASSERT_OK(iterator->status());
2359 ++count;
2360 }
2361 ASSERT_OK(iterator->status());
2362 ASSERT_EQ(count, kKeysNum * ((i == 2) ? 1 : 2));
2363 }
2364
2365 Close();
2366 Destroy();
2367 }
2368 }
2369
2370 TEST_P(ColumnFamilyTest, LiveIteratorWithDroppedColumnFamily) {
2371 db_options_.create_missing_column_families = true;
2372 db_options_.max_open_files = 20;
2373 // delete obsolete files always
2374 db_options_.delete_obsolete_files_period_micros = 0;
2375 Open({"default", "one", "two"});
2376 ColumnFamilyOptions options;
2377 options.level0_file_num_compaction_trigger = 100;
2378 options.level0_slowdown_writes_trigger = 200;
2379 options.level0_stop_writes_trigger = 200;
2380 options.write_buffer_size = 100000; // small write buffer size
2381 Reopen({options, options, options});
2382
2383 // 1MB should create ~10 files for each CF
2384 int kKeysNum = 10000;
2385 PutRandomData(1, kKeysNum, 100);
2386
2387 {
2388 std::unique_ptr<Iterator> iterator(
2389 db_->NewIterator(ReadOptions(), handles_[1]));
2390 iterator->SeekToFirst();
2391
2392 DropColumnFamilies({1});
2393
2394 // Make sure iterator created can still be used.
2395 int count = 0;
2396 for (; iterator->Valid(); iterator->Next()) {
2397 ASSERT_OK(iterator->status());
2398 ++count;
2399 }
2400 ASSERT_OK(iterator->status());
2401 ASSERT_EQ(count, kKeysNum);
2402 }
2403
2404 Reopen();
2405 Close();
2406 Destroy();
2407 }
2408
2409 TEST_P(ColumnFamilyTest, FlushAndDropRaceCondition) {
2410 db_options_.create_missing_column_families = true;
2411 Open({"default", "one"});
2412 ColumnFamilyOptions options;
2413 options.level0_file_num_compaction_trigger = 100;
2414 options.level0_slowdown_writes_trigger = 200;
2415 options.level0_stop_writes_trigger = 200;
2416 options.max_write_buffer_number = 20;
2417 options.write_buffer_size = 100000; // small write buffer size
2418 Reopen({options, options});
2419
2420 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2421 {{"VersionSet::LogAndApply::ColumnFamilyDrop:0",
2422 "FlushJob::WriteLevel0Table"},
2423 {"VersionSet::LogAndApply::ColumnFamilyDrop:1",
2424 "FlushJob::InstallResults"},
2425 {"FlushJob::InstallResults",
2426 "VersionSet::LogAndApply::ColumnFamilyDrop:2"}});
2427
2428 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2429 test::SleepingBackgroundTask sleeping_task;
2430
2431 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2432 Env::Priority::HIGH);
2433
2434 // 1MB should create ~10 files for each CF
2435 int kKeysNum = 10000;
2436 PutRandomData(1, kKeysNum, 100);
2437
2438 std::vector<port::Thread> threads;
2439 threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });
2440
2441 sleeping_task.WakeUp();
2442 sleeping_task.WaitUntilDone();
2443 sleeping_task.Reset();
2444 // now we sleep again. this is just so we're certain that flush job finished
2445 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2446 Env::Priority::HIGH);
2447 sleeping_task.WakeUp();
2448 sleeping_task.WaitUntilDone();
2449
2450 {
2451 // Since we didn't delete CF handle, RocksDB's contract guarantees that
2452 // we're still able to read dropped CF
2453 std::unique_ptr<Iterator> iterator(
2454 db_->NewIterator(ReadOptions(), handles_[1]));
2455 int count = 0;
2456 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
2457 ASSERT_OK(iterator->status());
2458 ++count;
2459 }
2460 ASSERT_OK(iterator->status());
2461 ASSERT_EQ(count, kKeysNum);
2462 }
2463 for (auto& t : threads) {
2464 t.join();
2465 }
2466
2467 Close();
2468 Destroy();
2469 }
2470
2471 #ifndef ROCKSDB_LITE
2472 // skipped as persisting options is not supported in ROCKSDB_LITE
2473 namespace {
2474 std::atomic<int> test_stage(0);
2475 std::atomic<bool> ordered_by_writethread(false);
2476 const int kMainThreadStartPersistingOptionsFile = 1;
2477 const int kChildThreadFinishDroppingColumnFamily = 2;
2478 void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
2479 std::vector<Comparator*>* comparators) {
2480 while (test_stage < kMainThreadStartPersistingOptionsFile &&
2481 !ordered_by_writethread) {
2482 Env::Default()->SleepForMicroseconds(100);
2483 }
2484 cf_test->DropColumnFamilies({cf_id});
2485 if ((*comparators)[cf_id]) {
2486 delete (*comparators)[cf_id];
2487 (*comparators)[cf_id] = nullptr;
2488 }
2489 test_stage = kChildThreadFinishDroppingColumnFamily;
2490 }
2491 } // namespace
2492
2493 TEST_P(ColumnFamilyTest, CreateAndDropRace) {
2494 const int kCfCount = 5;
2495 std::vector<ColumnFamilyOptions> cf_opts;
2496 std::vector<Comparator*> comparators;
2497 for (int i = 0; i < kCfCount; ++i) {
2498 cf_opts.emplace_back();
2499 comparators.push_back(new test::SimpleSuffixReverseComparator());
2500 cf_opts.back().comparator = comparators.back();
2501 }
2502 db_options_.create_if_missing = true;
2503 db_options_.create_missing_column_families = true;
2504
2505 auto main_thread_id = std::this_thread::get_id();
2506
2507 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2508 "PersistRocksDBOptions:start", [&](void* /*arg*/) {
2509 auto current_thread_id = std::this_thread::get_id();
2510 // If it's the main thread hitting this sync-point, then it
2511 // will be blocked until some other thread update the test_stage.
2512 if (main_thread_id == current_thread_id) {
2513 test_stage = kMainThreadStartPersistingOptionsFile;
2514 while (test_stage < kChildThreadFinishDroppingColumnFamily &&
2515 !ordered_by_writethread) {
2516 Env::Default()->SleepForMicroseconds(100);
2517 }
2518 }
2519 });
2520
2521 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2522 "WriteThread::EnterUnbatched:Wait", [&](void* /*arg*/) {
2523 // This means a thread doing DropColumnFamily() is waiting for
2524 // other thread to finish persisting options.
2525 // In such case, we update the test_stage to unblock the main thread.
2526 ordered_by_writethread = true;
2527 });
2528
2529 // Create a database with four column families
2530 Open({"default", "one", "two", "three"},
2531 {cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]});
2532
2533 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2534
2535 // Start a thread that will drop the first column family
2536 // and its comparator
2537 ROCKSDB_NAMESPACE::port::Thread drop_cf_thread(DropSingleColumnFamily, this,
2538 1, &comparators);
2539
2540 DropColumnFamilies({2});
2541
2542 drop_cf_thread.join();
2543 Close();
2544 Destroy();
2545 for (auto* comparator : comparators) {
2546 if (comparator) {
2547 delete comparator;
2548 }
2549 }
2550
2551 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2552 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
2553 }
2554 #endif // !ROCKSDB_LITE
2555
2556 TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) {
2557 const uint64_t kBaseRate = 800000u;
2558 db_options_.delayed_write_rate = kBaseRate;
2559 db_options_.max_background_compactions = 6;
2560
2561 Open({"default"});
2562 ColumnFamilyData* cfd =
2563 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2564
2565 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2566
2567 MutableCFOptions mutable_cf_options(column_family_options_);
2568
2569 mutable_cf_options.level0_slowdown_writes_trigger = 20;
2570 mutable_cf_options.level0_stop_writes_trigger = 10000;
2571 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2572 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2573 mutable_cf_options.disable_auto_compactions = false;
2574
2575 vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2576 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2577 ASSERT_TRUE(!IsDbWriteStopped());
2578 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2579
2580 vstorage->TEST_set_estimated_compaction_needed_bytes(201);
2581 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2582 ASSERT_TRUE(!IsDbWriteStopped());
2583 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2584 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2585 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2586
2587 vstorage->TEST_set_estimated_compaction_needed_bytes(400);
2588 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2589 ASSERT_TRUE(!IsDbWriteStopped());
2590 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2591 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2592 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2593
2594 vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2595 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2596 ASSERT_TRUE(!IsDbWriteStopped());
2597 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2598 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2599
2600 vstorage->TEST_set_estimated_compaction_needed_bytes(450);
2601 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2602 ASSERT_TRUE(!IsDbWriteStopped());
2603 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2604 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2605
2606 vstorage->TEST_set_estimated_compaction_needed_bytes(205);
2607 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2608 ASSERT_TRUE(!IsDbWriteStopped());
2609 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2610 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2611
2612 vstorage->TEST_set_estimated_compaction_needed_bytes(202);
2613 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2614 ASSERT_TRUE(!IsDbWriteStopped());
2615 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2616 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2617
2618 vstorage->TEST_set_estimated_compaction_needed_bytes(201);
2619 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2620 ASSERT_TRUE(!IsDbWriteStopped());
2621 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2622 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2623
2624 vstorage->TEST_set_estimated_compaction_needed_bytes(198);
2625 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2626 ASSERT_TRUE(!IsDbWriteStopped());
2627 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2628
2629 vstorage->TEST_set_estimated_compaction_needed_bytes(399);
2630 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2631 ASSERT_TRUE(!IsDbWriteStopped());
2632 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2633 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2634
2635 vstorage->TEST_set_estimated_compaction_needed_bytes(599);
2636 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2637 ASSERT_TRUE(!IsDbWriteStopped());
2638 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2639 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2640
2641 vstorage->TEST_set_estimated_compaction_needed_bytes(2001);
2642 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2643 ASSERT_TRUE(IsDbWriteStopped());
2644 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2645 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2646
2647 vstorage->TEST_set_estimated_compaction_needed_bytes(3001);
2648 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2649 ASSERT_TRUE(IsDbWriteStopped());
2650 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2651
2652 vstorage->TEST_set_estimated_compaction_needed_bytes(390);
2653 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2654 ASSERT_TRUE(!IsDbWriteStopped());
2655 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2656 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2657
2658 vstorage->TEST_set_estimated_compaction_needed_bytes(100);
2659 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2660 ASSERT_TRUE(!IsDbWriteStopped());
2661 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2662
2663 vstorage->set_l0_delay_trigger_count(100);
2664 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2665 ASSERT_TRUE(!IsDbWriteStopped());
2666 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2667 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2668 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2669
2670 vstorage->set_l0_delay_trigger_count(101);
2671 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2672 ASSERT_TRUE(!IsDbWriteStopped());
2673 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2674 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2675
2676 vstorage->set_l0_delay_trigger_count(0);
2677 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2678 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2679 ASSERT_TRUE(!IsDbWriteStopped());
2680 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2681 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2682
2683 vstorage->set_l0_delay_trigger_count(101);
2684 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2685 ASSERT_TRUE(!IsDbWriteStopped());
2686 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2687 ASSERT_EQ(kBaseRate / 1.25 / 1.25 / 1.25, GetDbDelayedWriteRate());
2688
2689 vstorage->TEST_set_estimated_compaction_needed_bytes(200);
2690 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2691 ASSERT_TRUE(!IsDbWriteStopped());
2692 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2693 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2694
2695 vstorage->set_l0_delay_trigger_count(0);
2696 vstorage->TEST_set_estimated_compaction_needed_bytes(0);
2697 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2698 ASSERT_TRUE(!IsDbWriteStopped());
2699 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2700
2701 mutable_cf_options.disable_auto_compactions = true;
2702 dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate);
2703 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2704 ASSERT_TRUE(!IsDbWriteStopped());
2705 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2706
2707 vstorage->set_l0_delay_trigger_count(50);
2708 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2709 ASSERT_TRUE(!IsDbWriteStopped());
2710 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2711 ASSERT_EQ(0, GetDbDelayedWriteRate());
2712 ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2713
2714 vstorage->set_l0_delay_trigger_count(60);
2715 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2716 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2717 ASSERT_TRUE(!IsDbWriteStopped());
2718 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2719 ASSERT_EQ(0, GetDbDelayedWriteRate());
2720 ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
2721
2722 mutable_cf_options.disable_auto_compactions = false;
2723 vstorage->set_l0_delay_trigger_count(70);
2724 vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2725 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2726 ASSERT_TRUE(!IsDbWriteStopped());
2727 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2728 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2729
2730 vstorage->set_l0_delay_trigger_count(71);
2731 vstorage->TEST_set_estimated_compaction_needed_bytes(501);
2732 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2733 ASSERT_TRUE(!IsDbWriteStopped());
2734 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2735 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2736 }
2737
2738 TEST_P(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
2739 db_options_.max_background_compactions = 6;
2740 Open({"default"});
2741 ColumnFamilyData* cfd =
2742 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2743
2744 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2745
2746 MutableCFOptions mutable_cf_options(column_family_options_);
2747
2748 // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
2749 mutable_cf_options.level0_file_num_compaction_trigger = 4;
2750 mutable_cf_options.level0_slowdown_writes_trigger = 36;
2751 mutable_cf_options.level0_stop_writes_trigger = 50;
2752 // Speedup threshold = 200 / 4 = 50
2753 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2754 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2755
2756 vstorage->TEST_set_estimated_compaction_needed_bytes(40);
2757 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2758 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2759
2760 vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2761 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2762 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2763
2764 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2765 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2766 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2767
2768 vstorage->TEST_set_estimated_compaction_needed_bytes(45);
2769 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2770 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2771
2772 vstorage->set_l0_delay_trigger_count(7);
2773 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2774 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2775
2776 vstorage->set_l0_delay_trigger_count(9);
2777 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2778 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2779
2780 vstorage->set_l0_delay_trigger_count(6);
2781 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2782 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2783
2784 // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6
2785 mutable_cf_options.level0_file_num_compaction_trigger = 4;
2786 mutable_cf_options.level0_slowdown_writes_trigger = 16;
2787 mutable_cf_options.level0_stop_writes_trigger = 30;
2788
2789 vstorage->set_l0_delay_trigger_count(5);
2790 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2791 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2792
2793 vstorage->set_l0_delay_trigger_count(7);
2794 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2795 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2796
2797 vstorage->set_l0_delay_trigger_count(3);
2798 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2799 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2800 }
2801
2802 TEST_P(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
2803 const uint64_t kBaseRate = 810000u;
2804 db_options_.delayed_write_rate = kBaseRate;
2805 Open();
2806 CreateColumnFamilies({"one"});
2807 ColumnFamilyData* cfd =
2808 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2809 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2810
2811 ColumnFamilyData* cfd1 =
2812 static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
2813 VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
2814
2815 MutableCFOptions mutable_cf_options(column_family_options_);
2816 mutable_cf_options.level0_slowdown_writes_trigger = 20;
2817 mutable_cf_options.level0_stop_writes_trigger = 10000;
2818 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2819 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2820
2821 MutableCFOptions mutable_cf_options1 = mutable_cf_options;
2822 mutable_cf_options1.soft_pending_compaction_bytes_limit = 500;
2823
2824 vstorage->TEST_set_estimated_compaction_needed_bytes(50);
2825 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2826 ASSERT_TRUE(!IsDbWriteStopped());
2827 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2828
2829 vstorage1->TEST_set_estimated_compaction_needed_bytes(201);
2830 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2831 ASSERT_TRUE(!IsDbWriteStopped());
2832 ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
2833
2834 vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
2835 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2836 ASSERT_TRUE(!IsDbWriteStopped());
2837 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2838 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2839
2840 vstorage->TEST_set_estimated_compaction_needed_bytes(70);
2841 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2842 ASSERT_TRUE(!IsDbWriteStopped());
2843 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2844 ASSERT_EQ(kBaseRate, GetDbDelayedWriteRate());
2845
2846 vstorage1->TEST_set_estimated_compaction_needed_bytes(800);
2847 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2848 ASSERT_TRUE(!IsDbWriteStopped());
2849 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2850 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2851
2852 vstorage->TEST_set_estimated_compaction_needed_bytes(300);
2853 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2854 ASSERT_TRUE(!IsDbWriteStopped());
2855 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2856 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2857
2858 vstorage1->TEST_set_estimated_compaction_needed_bytes(700);
2859 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2860 ASSERT_TRUE(!IsDbWriteStopped());
2861 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2862 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2863
2864 vstorage->TEST_set_estimated_compaction_needed_bytes(500);
2865 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2866 ASSERT_TRUE(!IsDbWriteStopped());
2867 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2868 ASSERT_EQ(kBaseRate / 1.25 / 1.25, GetDbDelayedWriteRate());
2869
2870 vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
2871 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2872 ASSERT_TRUE(!IsDbWriteStopped());
2873 ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
2874 ASSERT_EQ(kBaseRate / 1.25, GetDbDelayedWriteRate());
2875 }
2876
2877 TEST_P(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
2878 db_options_.max_background_compactions = 6;
2879 column_family_options_.soft_pending_compaction_bytes_limit = 200;
2880 column_family_options_.hard_pending_compaction_bytes_limit = 2000;
2881 Open();
2882 CreateColumnFamilies({"one"});
2883 ColumnFamilyData* cfd =
2884 static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
2885 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2886
2887 ColumnFamilyData* cfd1 =
2888 static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
2889 VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
2890
2891 MutableCFOptions mutable_cf_options(column_family_options_);
2892 // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
2893 mutable_cf_options.level0_file_num_compaction_trigger = 4;
2894 mutable_cf_options.level0_slowdown_writes_trigger = 36;
2895 mutable_cf_options.level0_stop_writes_trigger = 30;
2896 // Speedup threshold = 200 / 4 = 50
2897 mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
2898 mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
2899
2900 MutableCFOptions mutable_cf_options1 = mutable_cf_options;
2901 mutable_cf_options1.level0_slowdown_writes_trigger = 16;
2902
2903 vstorage->TEST_set_estimated_compaction_needed_bytes(40);
2904 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2905 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2906
2907 vstorage->TEST_set_estimated_compaction_needed_bytes(60);
2908 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2909 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2910 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2911 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2912
2913 vstorage1->TEST_set_estimated_compaction_needed_bytes(30);
2914 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2915 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2916
2917 vstorage1->TEST_set_estimated_compaction_needed_bytes(70);
2918 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2919 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2920
2921 vstorage->TEST_set_estimated_compaction_needed_bytes(20);
2922 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2923 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2924
2925 vstorage1->TEST_set_estimated_compaction_needed_bytes(3);
2926 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2927 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2928
2929 vstorage->set_l0_delay_trigger_count(9);
2930 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2931 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2932
2933 vstorage1->set_l0_delay_trigger_count(2);
2934 RecalculateWriteStallConditions(cfd1, mutable_cf_options);
2935 ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
2936
2937 vstorage->set_l0_delay_trigger_count(0);
2938 RecalculateWriteStallConditions(cfd, mutable_cf_options);
2939 ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
2940 }
2941
2942 TEST_P(ColumnFamilyTest, CreateAndDestoryOptions) {
2943 std::unique_ptr<ColumnFamilyOptions> cfo(new ColumnFamilyOptions());
2944 ColumnFamilyHandle* cfh;
2945 Open();
2946 ASSERT_OK(db_->CreateColumnFamily(*(cfo.get()), "yoyo", &cfh));
2947 cfo.reset();
2948 ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "bar"));
2949 ASSERT_OK(db_->Flush(FlushOptions(), cfh));
2950 ASSERT_OK(db_->DropColumnFamily(cfh));
2951 ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
2952 }
2953
2954 TEST_P(ColumnFamilyTest, CreateDropAndDestroy) {
2955 ColumnFamilyHandle* cfh;
2956 Open();
2957 ASSERT_OK(db_->CreateColumnFamily(ColumnFamilyOptions(), "yoyo", &cfh));
2958 ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "bar"));
2959 ASSERT_OK(db_->Flush(FlushOptions(), cfh));
2960 ASSERT_OK(db_->DropColumnFamily(cfh));
2961 ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
2962 }
2963
2964 #ifndef ROCKSDB_LITE
2965 TEST_P(ColumnFamilyTest, CreateDropAndDestroyWithoutFileDeletion) {
2966 ColumnFamilyHandle* cfh;
2967 Open();
2968 ASSERT_OK(db_->CreateColumnFamily(ColumnFamilyOptions(), "yoyo", &cfh));
2969 ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "bar"));
2970 ASSERT_OK(db_->Flush(FlushOptions(), cfh));
2971 ASSERT_OK(db_->DisableFileDeletions());
2972 ASSERT_OK(db_->DropColumnFamily(cfh));
2973 ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
2974 }
2975
2976 TEST_P(ColumnFamilyTest, FlushCloseWALFiles) {
2977 SpecialEnv env(Env::Default());
2978 db_options_.env = &env;
2979 db_options_.max_background_flushes = 1;
2980 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
2981 Open();
2982 CreateColumnFamilies({"one"});
2983 ASSERT_OK(Put(1, "fodor", "mirko"));
2984 ASSERT_OK(Put(0, "fodor", "mirko"));
2985 ASSERT_OK(Put(1, "fodor", "mirko"));
2986
2987 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
2988 {"DBImpl::BGWorkFlush:done", "FlushCloseWALFiles:0"},
2989 });
2990 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2991
2992 // Block flush jobs from running
2993 test::SleepingBackgroundTask sleeping_task;
2994 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2995 Env::Priority::HIGH);
2996
2997 WriteOptions wo;
2998 wo.sync = true;
2999 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
3000
3001 ASSERT_EQ(2, env.num_open_wal_file_.load());
3002
3003 sleeping_task.WakeUp();
3004 sleeping_task.WaitUntilDone();
3005 TEST_SYNC_POINT("FlushCloseWALFiles:0");
3006 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3007 ASSERT_EQ(1, env.num_open_wal_file_.load());
3008
3009 Reopen();
3010 ASSERT_EQ("mirko", Get(0, "fodor"));
3011 ASSERT_EQ("mirko", Get(1, "fodor"));
3012 db_options_.env = env_;
3013 Close();
3014 }
3015 #endif // !ROCKSDB_LITE
3016
3017 #ifndef ROCKSDB_LITE // WaitForFlush() is not supported
3018 TEST_P(ColumnFamilyTest, IteratorCloseWALFile1) {
3019 SpecialEnv env(Env::Default());
3020 db_options_.env = &env;
3021 db_options_.max_background_flushes = 1;
3022 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
3023 Open();
3024 CreateColumnFamilies({"one"});
3025 ASSERT_OK(Put(1, "fodor", "mirko"));
3026 // Create an iterator holding the current super version.
3027 Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
3028 // A flush will make `it` hold the last reference of its super version.
3029 Flush(1);
3030
3031 ASSERT_OK(Put(1, "fodor", "mirko"));
3032 ASSERT_OK(Put(0, "fodor", "mirko"));
3033 ASSERT_OK(Put(1, "fodor", "mirko"));
3034
3035 // Flush jobs will close previous WAL files after finishing. By
3036 // block flush jobs from running, we trigger a condition where
3037 // the iterator destructor should close the WAL files.
3038 test::SleepingBackgroundTask sleeping_task;
3039 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
3040 Env::Priority::HIGH);
3041
3042 WriteOptions wo;
3043 wo.sync = true;
3044 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
3045
3046 ASSERT_EQ(2, env.num_open_wal_file_.load());
3047 // Deleting the iterator will clear its super version, triggering
3048 // closing all files
3049 delete it;
3050 ASSERT_EQ(1, env.num_open_wal_file_.load());
3051
3052 sleeping_task.WakeUp();
3053 sleeping_task.WaitUntilDone();
3054 WaitForFlush(1);
3055
3056 Reopen();
3057 ASSERT_EQ("mirko", Get(0, "fodor"));
3058 ASSERT_EQ("mirko", Get(1, "fodor"));
3059 db_options_.env = env_;
3060 Close();
3061 }
3062
3063 TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) {
3064 SpecialEnv env(Env::Default());
3065 // Allow both of flush and purge job to schedule.
3066 env.SetBackgroundThreads(2, Env::HIGH);
3067 db_options_.env = &env;
3068 db_options_.max_background_flushes = 1;
3069 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
3070 Open();
3071 CreateColumnFamilies({"one"});
3072 ASSERT_OK(Put(1, "fodor", "mirko"));
3073 // Create an iterator holding the current super version.
3074 ReadOptions ro;
3075 ro.background_purge_on_iterator_cleanup = true;
3076 Iterator* it = db_->NewIterator(ro, handles_[1]);
3077 // A flush will make `it` hold the last reference of its super version.
3078 Flush(1);
3079
3080 ASSERT_OK(Put(1, "fodor", "mirko"));
3081 ASSERT_OK(Put(0, "fodor", "mirko"));
3082 ASSERT_OK(Put(1, "fodor", "mirko"));
3083
3084 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3085 {"ColumnFamilyTest::IteratorCloseWALFile2:0",
3086 "DBImpl::BGWorkPurge:start"},
3087 {"ColumnFamilyTest::IteratorCloseWALFile2:2",
3088 "DBImpl::BackgroundCallFlush:start"},
3089 {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
3090 });
3091 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3092
3093 WriteOptions wo;
3094 wo.sync = true;
3095 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
3096
3097 ASSERT_EQ(2, env.num_open_wal_file_.load());
3098 // Deleting the iterator will clear its super version, triggering
3099 // closing all files
3100 delete it;
3101 ASSERT_EQ(2, env.num_open_wal_file_.load());
3102
3103 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
3104 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
3105 ASSERT_EQ(1, env.num_open_wal_file_.load());
3106 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
3107 WaitForFlush(1);
3108 ASSERT_EQ(1, env.num_open_wal_file_.load());
3109 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3110
3111 Reopen();
3112 ASSERT_EQ("mirko", Get(0, "fodor"));
3113 ASSERT_EQ("mirko", Get(1, "fodor"));
3114 db_options_.env = env_;
3115 Close();
3116 }
3117 #endif // !ROCKSDB_LITE
3118
3119 #ifndef ROCKSDB_LITE // TEST functions are not supported in lite
3120 TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
3121 SpecialEnv env(Env::Default());
3122 // Allow both of flush and purge job to schedule.
3123 env.SetBackgroundThreads(2, Env::HIGH);
3124 db_options_.env = &env;
3125 db_options_.max_background_flushes = 1;
3126 column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3));
3127 column_family_options_.level0_file_num_compaction_trigger = 2;
3128 Open();
3129 CreateColumnFamilies({"one"});
3130 ASSERT_OK(Put(1, "fodor", "mirko"));
3131 ASSERT_OK(Put(1, "fodar2", "mirko"));
3132 Flush(1);
3133
3134 // Create an iterator holding the current super version, as well as
3135 // the SST file just flushed.
3136 ReadOptions ro;
3137 ro.tailing = true;
3138 ro.background_purge_on_iterator_cleanup = true;
3139 Iterator* it = db_->NewIterator(ro, handles_[1]);
3140 // A flush will make `it` hold the last reference of its super version.
3141
3142 ASSERT_OK(Put(1, "fodor", "mirko"));
3143 ASSERT_OK(Put(1, "fodar2", "mirko"));
3144 Flush(1);
3145
3146 WaitForCompaction();
3147
3148 ASSERT_OK(Put(1, "fodor", "mirko"));
3149 ASSERT_OK(Put(1, "fodor", "mirko"));
3150 ASSERT_OK(Put(0, "fodor", "mirko"));
3151 ASSERT_OK(Put(1, "fodor", "mirko"));
3152
3153 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
3154 {"ColumnFamilyTest::IteratorCloseWALFile2:0",
3155 "DBImpl::BGWorkPurge:start"},
3156 {"ColumnFamilyTest::IteratorCloseWALFile2:2",
3157 "DBImpl::BackgroundCallFlush:start"},
3158 {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
3159 });
3160 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3161
3162 WriteOptions wo;
3163 wo.sync = true;
3164 ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
3165
3166 env.delete_count_.store(0);
3167 ASSERT_EQ(2, env.num_open_wal_file_.load());
3168 // Deleting the iterator will clear its super version, triggering
3169 // closing all files
3170 it->Seek("");
3171 ASSERT_EQ(2, env.num_open_wal_file_.load());
3172 ASSERT_EQ(0, env.delete_count_.load());
3173
3174 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
3175 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
3176 ASSERT_EQ(1, env.num_open_wal_file_.load());
3177 ASSERT_EQ(1, env.delete_count_.load());
3178 TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
3179 WaitForFlush(1);
3180 ASSERT_EQ(1, env.num_open_wal_file_.load());
3181 ASSERT_EQ(1, env.delete_count_.load());
3182
3183 delete it;
3184 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3185
3186 Reopen();
3187 ASSERT_EQ("mirko", Get(0, "fodor"));
3188 ASSERT_EQ("mirko", Get(1, "fodor"));
3189 db_options_.env = env_;
3190 Close();
3191 }
3192 #endif // !ROCKSDB_LITE
3193
3194 // Disable on windows because SyncWAL requires env->IsSyncThreadSafe()
3195 // to return true which is not so in unbuffered mode.
3196 #ifndef OS_WIN
3197 TEST_P(ColumnFamilyTest, LogSyncConflictFlush) {
3198 Open();
3199 CreateColumnFamiliesAndReopen({"one", "two"});
3200
3201 Put(0, "", "");
3202 Put(1, "foo", "bar");
3203
3204 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3205 {{"DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
3206 "ColumnFamilyTest::LogSyncConflictFlush:1"},
3207 {"ColumnFamilyTest::LogSyncConflictFlush:2",
3208 "DBImpl::SyncWAL:BeforeMarkLogsSynced:2"}});
3209
3210 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3211
3212 ROCKSDB_NAMESPACE::port::Thread thread([&] { db_->SyncWAL(); });
3213
3214 TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1");
3215 Flush(1);
3216 Put(1, "foo", "bar");
3217 Flush(1);
3218
3219 TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:2");
3220
3221 thread.join();
3222
3223 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3224 Close();
3225 }
3226 #endif
3227
3228 // this test is placed here, because the infrastructure for Column Family
3229 // test is being used to ensure a roll of wal files.
3230 // Basic idea is to test that WAL truncation is being detected and not
3231 // ignored
3232 TEST_P(ColumnFamilyTest, DISABLED_LogTruncationTest) {
3233 Open();
3234 CreateColumnFamiliesAndReopen({"one", "two"});
3235
3236 Build(0, 100);
3237
3238 // Flush the 0th column family to force a roll of the wal log
3239 Flush(0);
3240
3241 // Add some more entries
3242 Build(100, 100);
3243
3244 std::vector<std::string> filenames;
3245 ASSERT_OK(env_->GetChildren(dbname_, &filenames));
3246
3247 // collect wal files
3248 std::vector<std::string> logfs;
3249 for (size_t i = 0; i < filenames.size(); i++) {
3250 uint64_t number;
3251 FileType type;
3252 if (!(ParseFileName(filenames[i], &number, &type))) continue;
3253
3254 if (type != kLogFile) continue;
3255
3256 logfs.push_back(filenames[i]);
3257 }
3258
3259 std::sort(logfs.begin(), logfs.end());
3260 ASSERT_GE(logfs.size(), 2);
3261
3262 // Take the last but one file, and truncate it
3263 std::string fpath = dbname_ + "/" + logfs[logfs.size() - 2];
3264 std::vector<std::string> names_save = names_;
3265
3266 uint64_t fsize;
3267 ASSERT_OK(env_->GetFileSize(fpath, &fsize));
3268 ASSERT_GT(fsize, 0);
3269
3270 Close();
3271
3272 std::string backup_logs = dbname_ + "/backup_logs";
3273 std::string t_fpath = backup_logs + "/" + logfs[logfs.size() - 2];
3274
3275 ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
3276 // Not sure how easy it is to make this data driven.
3277 // need to read back the WAL file and truncate last 10
3278 // entries
3279 CopyFile(fpath, t_fpath, fsize - 9180);
3280
3281 ASSERT_OK(env_->DeleteFile(fpath));
3282 ASSERT_OK(env_->RenameFile(t_fpath, fpath));
3283
3284 db_options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
3285
3286 OpenReadOnly(names_save);
3287
3288 CheckMissed();
3289
3290 Close();
3291
3292 Open(names_save);
3293
3294 CheckMissed();
3295
3296 Close();
3297
3298 // cleanup
3299 env_->DeleteDir(backup_logs);
3300 }
3301
3302 TEST_P(ColumnFamilyTest, DefaultCfPathsTest) {
3303 Open();
3304 // Leave cf_paths for one column families to be empty.
3305 // Files should be generated according to db_paths for that
3306 // column family.
3307 ColumnFamilyOptions cf_opt1, cf_opt2;
3308 cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1",
3309 std::numeric_limits<uint64_t>::max());
3310 CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2});
3311 Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
3312
3313 // Fill Column family 1.
3314 PutRandomData(1, 100, 100);
3315 Flush(1);
3316
3317 ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path));
3318 ASSERT_EQ(0, GetSstFileCount(dbname_));
3319
3320 // Fill column family 2
3321 PutRandomData(2, 100, 100);
3322 Flush(2);
3323
3324 // SST from Column family 2 should be generated in
3325 // db_paths which is dbname_ in this case.
3326 ASSERT_EQ(1, GetSstFileCount(dbname_));
3327 }
3328
3329 TEST_P(ColumnFamilyTest, MultipleCFPathsTest) {
3330 Open();
3331 // Configure Column family specific paths.
3332 ColumnFamilyOptions cf_opt1, cf_opt2;
3333 cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1",
3334 std::numeric_limits<uint64_t>::max());
3335 cf_opt2.cf_paths.emplace_back(dbname_ + "_two_1",
3336 std::numeric_limits<uint64_t>::max());
3337 CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2});
3338 Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
3339
3340 PutRandomData(1, 100, 100, true /* save */);
3341 Flush(1);
3342
3343 // Check that files are generated in appropriate paths.
3344 ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path));
3345 ASSERT_EQ(0, GetSstFileCount(dbname_));
3346
3347 PutRandomData(2, 100, 100, true /* save */);
3348 Flush(2);
3349
3350 ASSERT_EQ(1, GetSstFileCount(cf_opt2.cf_paths[0].path));
3351 ASSERT_EQ(0, GetSstFileCount(dbname_));
3352
3353 // Re-open and verify the keys.
3354 Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
3355 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
3356 for (int cf = 1; cf != 3; ++cf) {
3357 ReadOptions read_options;
3358 read_options.readahead_size = 0;
3359 auto it = dbi->NewIterator(read_options, handles_[cf]);
3360 for (it->SeekToFirst(); it->Valid(); it->Next()) {
3361 Slice key(it->key());
3362 ASSERT_NE(keys_[cf].end(), keys_[cf].find(key.ToString()));
3363 }
3364 delete it;
3365
3366 for (const auto& key : keys_[cf]) {
3367 ASSERT_NE("NOT_FOUND", Get(cf, key));
3368 }
3369 }
3370 }
3371
3372 } // namespace ROCKSDB_NAMESPACE
3373
3374 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
3375 extern "C" {
3376 void RegisterCustomObjects(int argc, char** argv);
3377 }
3378 #else
3379 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
3380 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
3381
3382 int main(int argc, char** argv) {
3383 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
3384 ::testing::InitGoogleTest(&argc, argv);
3385 RegisterCustomObjects(argc, argv);
3386 return RUN_ALL_TESTS();
3387 }