1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
10 #include "db/column_family.h"
11 #include "db/flush_job.h"
12 #include "db/version_set.h"
13 #include "rocksdb/cache.h"
14 #include "rocksdb/write_buffer_manager.h"
15 #include "table/mock_table.h"
16 #include "util/file_reader_writer.h"
17 #include "util/string_util.h"
18 #include "util/testharness.h"
19 #include "util/testutil.h"
23 // TODO(icanadi) Mock out everything else:
26 class FlushJobTest
: public testing::Test
{
29 : env_(Env::Default()),
30 dbname_(test::PerThreadDBPath("flush_job_test")),
32 db_options_(options_
),
33 table_cache_(NewLRUCache(50000, 16)),
34 write_buffer_manager_(db_options_
.db_write_buffer_size
),
35 versions_(new VersionSet(dbname_
, &db_options_
, env_options_
,
36 table_cache_
.get(), &write_buffer_manager_
,
38 shutting_down_(false),
39 mock_table_factory_(new mock::MockTableFactory()) {
40 EXPECT_OK(env_
->CreateDirIfMissing(dbname_
));
41 db_options_
.db_paths
.emplace_back(dbname_
,
42 std::numeric_limits
<uint64_t>::max());
43 db_options_
.statistics
= rocksdb::CreateDBStatistics();
44 // TODO(icanadi) Remove this once we mock out VersionSet
46 std::vector
<ColumnFamilyDescriptor
> column_families
;
47 cf_options_
.table_factory
= mock_table_factory_
;
48 column_families
.emplace_back(kDefaultColumnFamilyName
, cf_options_
);
50 EXPECT_OK(versions_
->Recover(column_families
, false));
55 new_db
.SetLogNumber(0);
56 new_db
.SetNextFile(2);
57 new_db
.SetLastSequence(0);
59 const std::string manifest
= DescriptorFileName(dbname_
, 1);
60 unique_ptr
<WritableFile
> file
;
61 Status s
= env_
->NewWritableFile(
62 manifest
, &file
, env_
->OptimizeForManifestWrite(env_options_
));
64 unique_ptr
<WritableFileWriter
> file_writer(
65 new WritableFileWriter(std::move(file
), manifest
, EnvOptions()));
67 log::Writer
log(std::move(file_writer
), 0, false);
69 new_db
.EncodeTo(&record
);
70 s
= log
.AddRecord(record
);
73 // Make "CURRENT" file that points to the new manifest file.
74 s
= SetCurrentFile(env_
, dbname_
, 1, nullptr);
79 EnvOptions env_options_
;
81 ImmutableDBOptions db_options_
;
82 std::shared_ptr
<Cache
> table_cache_
;
83 WriteController write_controller_
;
84 WriteBufferManager write_buffer_manager_
;
85 ColumnFamilyOptions cf_options_
;
86 std::unique_ptr
<VersionSet
> versions_
;
87 InstrumentedMutex mutex_
;
88 std::atomic
<bool> shutting_down_
;
89 std::shared_ptr
<mock::MockTableFactory
> mock_table_factory_
;
92 TEST_F(FlushJobTest
, Empty
) {
93 JobContext
job_context(0);
94 auto cfd
= versions_
->GetColumnFamilySet()->GetDefault();
95 EventLogger
event_logger(db_options_
.info_log
.get());
96 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
98 dbname_
, versions_
->GetColumnFamilySet()->GetDefault(), db_options_
,
99 *cfd
->GetLatestMutableCFOptions(), env_options_
, versions_
.get(), &mutex_
,
100 &shutting_down_
, {}, kMaxSequenceNumber
, snapshot_checker
, &job_context
,
101 nullptr, nullptr, nullptr, kNoCompression
, nullptr, &event_logger
, false);
103 InstrumentedMutexLock
l(&mutex_
);
104 flush_job
.PickMemTable();
105 ASSERT_OK(flush_job
.Run());
110 TEST_F(FlushJobTest
, NonEmpty
) {
111 JobContext
job_context(0);
112 auto cfd
= versions_
->GetColumnFamilySet()->GetDefault();
113 auto new_mem
= cfd
->ConstructNewMemtable(*cfd
->GetLatestMutableCFOptions(),
116 auto inserted_keys
= mock::MakeMockFile();
118 // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
119 // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
120 // range-delete "9995" -> "9999" at seqno 10000
121 for (int i
= 1; i
< 10000; ++i
) {
122 std::string
key(ToString((i
+ 1000) % 10000));
123 std::string
value("value" + key
);
124 new_mem
->Add(SequenceNumber(i
), kTypeValue
, key
, value
);
125 if ((i
+ 1000) % 10000 < 9995) {
126 InternalKey
internal_key(key
, SequenceNumber(i
), kTypeValue
);
127 inserted_keys
.insert({internal_key
.Encode().ToString(), value
});
130 new_mem
->Add(SequenceNumber(10000), kTypeRangeDeletion
, "9995", "9999a");
131 InternalKey
internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion
);
132 inserted_keys
.insert({internal_key
.Encode().ToString(), "9999a"});
134 autovector
<MemTable
*> to_delete
;
135 cfd
->imm()->Add(new_mem
, &to_delete
);
136 for (auto& m
: to_delete
) {
140 EventLogger
event_logger(db_options_
.info_log
.get());
141 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
142 FlushJob
flush_job(dbname_
, versions_
->GetColumnFamilySet()->GetDefault(),
143 db_options_
, *cfd
->GetLatestMutableCFOptions(),
144 env_options_
, versions_
.get(), &mutex_
, &shutting_down_
,
145 {}, kMaxSequenceNumber
, snapshot_checker
, &job_context
,
146 nullptr, nullptr, nullptr, kNoCompression
,
147 db_options_
.statistics
.get(), &event_logger
, true);
150 FileMetaData file_meta
;
152 flush_job
.PickMemTable();
153 ASSERT_OK(flush_job
.Run(nullptr, &file_meta
));
155 db_options_
.statistics
->histogramData(FLUSH_TIME
, &hist
);
156 ASSERT_GT(hist
.average
, 0.0);
158 ASSERT_EQ(ToString(0), file_meta
.smallest
.user_key().ToString());
161 file_meta
.largest
.user_key().ToString()); // range tombstone end key
162 ASSERT_EQ(1, file_meta
.fd
.smallest_seqno
);
163 ASSERT_EQ(10000, file_meta
.fd
.largest_seqno
); // range tombstone seqnum 10000
164 mock_table_factory_
->AssertSingleFile(inserted_keys
);
168 TEST_F(FlushJobTest
, Snapshots
) {
169 JobContext
job_context(0);
170 auto cfd
= versions_
->GetColumnFamilySet()->GetDefault();
171 auto new_mem
= cfd
->ConstructNewMemtable(*cfd
->GetLatestMutableCFOptions(),
174 std::vector
<SequenceNumber
> snapshots
;
175 std::set
<SequenceNumber
> snapshots_set
;
177 int max_inserts_per_keys
= 8;
180 for (int i
= 0; i
< keys
/ 2; ++i
) {
181 snapshots
.push_back(rnd
.Uniform(keys
* (max_inserts_per_keys
/ 2)) + 1);
182 snapshots_set
.insert(snapshots
.back());
184 std::sort(snapshots
.begin(), snapshots
.end());
187 SequenceNumber current_seqno
= 0;
188 auto inserted_keys
= mock::MakeMockFile();
189 for (int i
= 1; i
< keys
; ++i
) {
190 std::string
key(ToString(i
));
191 int insertions
= rnd
.Uniform(max_inserts_per_keys
);
192 for (int j
= 0; j
< insertions
; ++j
) {
193 std::string
value(test::RandomHumanReadableString(&rnd
, 10));
194 auto seqno
= ++current_seqno
;
195 new_mem
->Add(SequenceNumber(seqno
), kTypeValue
, key
, value
);
196 // a key is visible only if:
197 // 1. it's the last one written (j == insertions - 1)
198 // 2. there's a snapshot pointing at it
199 bool visible
= (j
== insertions
- 1) ||
200 (snapshots_set
.find(seqno
) != snapshots_set
.end());
202 InternalKey
internal_key(key
, seqno
, kTypeValue
);
203 inserted_keys
.insert({internal_key
.Encode().ToString(), value
});
208 autovector
<MemTable
*> to_delete
;
209 cfd
->imm()->Add(new_mem
, &to_delete
);
210 for (auto& m
: to_delete
) {
214 EventLogger
event_logger(db_options_
.info_log
.get());
215 SnapshotChecker
* snapshot_checker
= nullptr; // not relavant
216 FlushJob
flush_job(dbname_
, versions_
->GetColumnFamilySet()->GetDefault(),
217 db_options_
, *cfd
->GetLatestMutableCFOptions(),
218 env_options_
, versions_
.get(), &mutex_
, &shutting_down_
,
219 snapshots
, kMaxSequenceNumber
, snapshot_checker
,
220 &job_context
, nullptr, nullptr, nullptr, kNoCompression
,
221 db_options_
.statistics
.get(), &event_logger
, true);
223 flush_job
.PickMemTable();
224 ASSERT_OK(flush_job
.Run());
226 mock_table_factory_
->AssertSingleFile(inserted_keys
);
228 db_options_
.statistics
->histogramData(FLUSH_TIME
, &hist
);
229 ASSERT_GT(hist
.average
, 0.0);
233 } // namespace rocksdb
235 int main(int argc
, char** argv
) {
236 ::testing::InitGoogleTest(&argc
, argv
);
237 return RUN_ALL_TESTS();