]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/flush_job_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / flush_job_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <algorithm>
7 #include <map>
8 #include <string>
9
10 #include "db/column_family.h"
11 #include "db/flush_job.h"
12 #include "db/version_set.h"
13 #include "rocksdb/cache.h"
14 #include "rocksdb/write_buffer_manager.h"
15 #include "table/mock_table.h"
16 #include "util/file_reader_writer.h"
17 #include "util/string_util.h"
18 #include "util/testharness.h"
19 #include "util/testutil.h"
20
21 namespace rocksdb {
22
23 // TODO(icanadi) Mock out everything else:
24 // 1. VersionSet
25 // 2. Memtable
26 class FlushJobTest : public testing::Test {
27 public:
28 FlushJobTest()
29 : env_(Env::Default()),
30 dbname_(test::PerThreadDBPath("flush_job_test")),
31 options_(),
32 db_options_(options_),
33 table_cache_(NewLRUCache(50000, 16)),
34 write_buffer_manager_(db_options_.db_write_buffer_size),
35 versions_(new VersionSet(dbname_, &db_options_, env_options_,
36 table_cache_.get(), &write_buffer_manager_,
37 &write_controller_)),
38 shutting_down_(false),
39 mock_table_factory_(new mock::MockTableFactory()) {
40 EXPECT_OK(env_->CreateDirIfMissing(dbname_));
41 db_options_.db_paths.emplace_back(dbname_,
42 std::numeric_limits<uint64_t>::max());
43 db_options_.statistics = rocksdb::CreateDBStatistics();
44 // TODO(icanadi) Remove this once we mock out VersionSet
45 NewDB();
46 std::vector<ColumnFamilyDescriptor> column_families;
47 cf_options_.table_factory = mock_table_factory_;
48 column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
49
50 EXPECT_OK(versions_->Recover(column_families, false));
51 }
52
53 void NewDB() {
54 VersionEdit new_db;
55 new_db.SetLogNumber(0);
56 new_db.SetNextFile(2);
57 new_db.SetLastSequence(0);
58
59 const std::string manifest = DescriptorFileName(dbname_, 1);
60 unique_ptr<WritableFile> file;
61 Status s = env_->NewWritableFile(
62 manifest, &file, env_->OptimizeForManifestWrite(env_options_));
63 ASSERT_OK(s);
64 unique_ptr<WritableFileWriter> file_writer(
65 new WritableFileWriter(std::move(file), manifest, EnvOptions()));
66 {
67 log::Writer log(std::move(file_writer), 0, false);
68 std::string record;
69 new_db.EncodeTo(&record);
70 s = log.AddRecord(record);
71 }
72 ASSERT_OK(s);
73 // Make "CURRENT" file that points to the new manifest file.
74 s = SetCurrentFile(env_, dbname_, 1, nullptr);
75 }
76
77 Env* env_;
78 std::string dbname_;
79 EnvOptions env_options_;
80 Options options_;
81 ImmutableDBOptions db_options_;
82 std::shared_ptr<Cache> table_cache_;
83 WriteController write_controller_;
84 WriteBufferManager write_buffer_manager_;
85 ColumnFamilyOptions cf_options_;
86 std::unique_ptr<VersionSet> versions_;
87 InstrumentedMutex mutex_;
88 std::atomic<bool> shutting_down_;
89 std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
90 };
91
92 TEST_F(FlushJobTest, Empty) {
93 JobContext job_context(0);
94 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
95 EventLogger event_logger(db_options_.info_log.get());
96 SnapshotChecker* snapshot_checker = nullptr; // not relavant
97 FlushJob flush_job(
98 dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
99 *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_,
100 &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context,
101 nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false);
102 {
103 InstrumentedMutexLock l(&mutex_);
104 flush_job.PickMemTable();
105 ASSERT_OK(flush_job.Run());
106 }
107 job_context.Clean();
108 }
109
110 TEST_F(FlushJobTest, NonEmpty) {
111 JobContext job_context(0);
112 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
113 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
114 kMaxSequenceNumber);
115 new_mem->Ref();
116 auto inserted_keys = mock::MakeMockFile();
117 // Test data:
118 // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ]
119 // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ]
120 // range-delete "9995" -> "9999" at seqno 10000
121 for (int i = 1; i < 10000; ++i) {
122 std::string key(ToString((i + 1000) % 10000));
123 std::string value("value" + key);
124 new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
125 if ((i + 1000) % 10000 < 9995) {
126 InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
127 inserted_keys.insert({internal_key.Encode().ToString(), value});
128 }
129 }
130 new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a");
131 InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
132 inserted_keys.insert({internal_key.Encode().ToString(), "9999a"});
133
134 autovector<MemTable*> to_delete;
135 cfd->imm()->Add(new_mem, &to_delete);
136 for (auto& m : to_delete) {
137 delete m;
138 }
139
140 EventLogger event_logger(db_options_.info_log.get());
141 SnapshotChecker* snapshot_checker = nullptr; // not relavant
142 FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
143 db_options_, *cfd->GetLatestMutableCFOptions(),
144 env_options_, versions_.get(), &mutex_, &shutting_down_,
145 {}, kMaxSequenceNumber, snapshot_checker, &job_context,
146 nullptr, nullptr, nullptr, kNoCompression,
147 db_options_.statistics.get(), &event_logger, true);
148
149 HistogramData hist;
150 FileMetaData file_meta;
151 mutex_.Lock();
152 flush_job.PickMemTable();
153 ASSERT_OK(flush_job.Run(nullptr, &file_meta));
154 mutex_.Unlock();
155 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
156 ASSERT_GT(hist.average, 0.0);
157
158 ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
159 ASSERT_EQ(
160 "9999a",
161 file_meta.largest.user_key().ToString()); // range tombstone end key
162 ASSERT_EQ(1, file_meta.fd.smallest_seqno);
163 ASSERT_EQ(10000, file_meta.fd.largest_seqno); // range tombstone seqnum 10000
164 mock_table_factory_->AssertSingleFile(inserted_keys);
165 job_context.Clean();
166 }
167
168 TEST_F(FlushJobTest, Snapshots) {
169 JobContext job_context(0);
170 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
171 auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(),
172 kMaxSequenceNumber);
173
174 std::vector<SequenceNumber> snapshots;
175 std::set<SequenceNumber> snapshots_set;
176 int keys = 10000;
177 int max_inserts_per_keys = 8;
178
179 Random rnd(301);
180 for (int i = 0; i < keys / 2; ++i) {
181 snapshots.push_back(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1);
182 snapshots_set.insert(snapshots.back());
183 }
184 std::sort(snapshots.begin(), snapshots.end());
185
186 new_mem->Ref();
187 SequenceNumber current_seqno = 0;
188 auto inserted_keys = mock::MakeMockFile();
189 for (int i = 1; i < keys; ++i) {
190 std::string key(ToString(i));
191 int insertions = rnd.Uniform(max_inserts_per_keys);
192 for (int j = 0; j < insertions; ++j) {
193 std::string value(test::RandomHumanReadableString(&rnd, 10));
194 auto seqno = ++current_seqno;
195 new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value);
196 // a key is visible only if:
197 // 1. it's the last one written (j == insertions - 1)
198 // 2. there's a snapshot pointing at it
199 bool visible = (j == insertions - 1) ||
200 (snapshots_set.find(seqno) != snapshots_set.end());
201 if (visible) {
202 InternalKey internal_key(key, seqno, kTypeValue);
203 inserted_keys.insert({internal_key.Encode().ToString(), value});
204 }
205 }
206 }
207
208 autovector<MemTable*> to_delete;
209 cfd->imm()->Add(new_mem, &to_delete);
210 for (auto& m : to_delete) {
211 delete m;
212 }
213
214 EventLogger event_logger(db_options_.info_log.get());
215 SnapshotChecker* snapshot_checker = nullptr; // not relavant
216 FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
217 db_options_, *cfd->GetLatestMutableCFOptions(),
218 env_options_, versions_.get(), &mutex_, &shutting_down_,
219 snapshots, kMaxSequenceNumber, snapshot_checker,
220 &job_context, nullptr, nullptr, nullptr, kNoCompression,
221 db_options_.statistics.get(), &event_logger, true);
222 mutex_.Lock();
223 flush_job.PickMemTable();
224 ASSERT_OK(flush_job.Run());
225 mutex_.Unlock();
226 mock_table_factory_->AssertSingleFile(inserted_keys);
227 HistogramData hist;
228 db_options_.statistics->histogramData(FLUSH_TIME, &hist);
229 ASSERT_GT(hist.average, 0.0);
230 job_context.Clean();
231 }
232
233 } // namespace rocksdb
234
235 int main(int argc, char** argv) {
236 ::testing::InitGoogleTest(&argc, argv);
237 return RUN_ALL_TESTS();
238 }