]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/wal_manager_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / wal_manager_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include <map>
9 #include <string>
10
11 #include "rocksdb/cache.h"
12 #include "rocksdb/write_batch.h"
13 #include "rocksdb/write_buffer_manager.h"
14
15 #include "db/column_family.h"
16 #include "db/db_impl.h"
17 #include "db/log_writer.h"
18 #include "db/version_set.h"
19 #include "db/wal_manager.h"
20 #include "env/mock_env.h"
21 #include "table/mock_table.h"
22 #include "util/file_reader_writer.h"
23 #include "util/string_util.h"
24 #include "util/testharness.h"
25 #include "util/testutil.h"
26
27 namespace rocksdb {
28
29 // TODO(icanadi) mock out VersionSet
30 // TODO(icanadi) move other WalManager-specific tests from db_test here
31 class WalManagerTest : public testing::Test {
32 public:
33 WalManagerTest()
34 : env_(new MockEnv(Env::Default())),
35 dbname_(test::PerThreadDBPath("wal_manager_test")),
36 db_options_(),
37 table_cache_(NewLRUCache(50000, 16)),
38 write_buffer_manager_(db_options_.db_write_buffer_size),
39 current_log_number_(0) {
40 DestroyDB(dbname_, Options());
41 }
42
43 void Init() {
44 ASSERT_OK(env_->CreateDirIfMissing(dbname_));
45 ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
46 db_options_.db_paths.emplace_back(dbname_,
47 std::numeric_limits<uint64_t>::max());
48 db_options_.wal_dir = dbname_;
49 db_options_.env = env_.get();
50
51 versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
52 table_cache_.get(), &write_buffer_manager_,
53 &write_controller_));
54
55 wal_manager_.reset(new WalManager(db_options_, env_options_));
56 }
57
58 void Reopen() {
59 wal_manager_.reset(new WalManager(db_options_, env_options_));
60 }
61
62 // NOT thread safe
63 void Put(const std::string& key, const std::string& value) {
64 assert(current_log_writer_.get() != nullptr);
65 uint64_t seq = versions_->LastSequence() + 1;
66 WriteBatch batch;
67 batch.Put(key, value);
68 WriteBatchInternal::SetSequence(&batch, seq);
69 current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
70 versions_->SetLastAllocatedSequence(seq);
71 versions_->SetLastPublishedSequence(seq);
72 versions_->SetLastSequence(seq);
73 }
74
75 // NOT thread safe
76 void RollTheLog(bool /*archived*/) {
77 current_log_number_++;
78 std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
79 std::unique_ptr<WritableFile> file;
80 ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
81 std::unique_ptr<WritableFileWriter> file_writer(
82 new WritableFileWriter(std::move(file), fname, env_options_));
83 current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
84 }
85
86 void CreateArchiveLogs(int num_logs, int entries_per_log) {
87 for (int i = 1; i <= num_logs; ++i) {
88 RollTheLog(true);
89 for (int k = 0; k < entries_per_log; ++k) {
90 Put(ToString(k), std::string(1024, 'a'));
91 }
92 }
93 }
94
95 std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
96 const SequenceNumber seq) {
97 std::unique_ptr<TransactionLogIterator> iter;
98 Status status = wal_manager_->GetUpdatesSince(
99 seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
100 EXPECT_OK(status);
101 return iter;
102 }
103
104 std::unique_ptr<MockEnv> env_;
105 std::string dbname_;
106 ImmutableDBOptions db_options_;
107 WriteController write_controller_;
108 EnvOptions env_options_;
109 std::shared_ptr<Cache> table_cache_;
110 WriteBufferManager write_buffer_manager_;
111 std::unique_ptr<VersionSet> versions_;
112 std::unique_ptr<WalManager> wal_manager_;
113
114 std::unique_ptr<log::Writer> current_log_writer_;
115 uint64_t current_log_number_;
116 };
117
118 TEST_F(WalManagerTest, ReadFirstRecordCache) {
119 Init();
120 std::string path = dbname_ + "/000001.log";
121 std::unique_ptr<WritableFile> file;
122 ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
123
124 SequenceNumber s;
125 ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
126 ASSERT_EQ(s, 0U);
127
128 ASSERT_OK(
129 wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
130 ASSERT_EQ(s, 0U);
131
132 std::unique_ptr<WritableFileWriter> file_writer(
133 new WritableFileWriter(std::move(file), path, EnvOptions()));
134 log::Writer writer(std::move(file_writer), 1,
135 db_options_.recycle_log_file_num > 0);
136 WriteBatch batch;
137 batch.Put("foo", "bar");
138 WriteBatchInternal::SetSequence(&batch, 10);
139 writer.AddRecord(WriteBatchInternal::Contents(&batch));
140
141 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
142 // Waiting for lei to finish with db_test
143 // env_->count_sequential_reads_ = true;
144 // sequential_read_counter_ sanity test
145 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
146
147 ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
148 ASSERT_EQ(s, 10U);
149 // did a read
150 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
151 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
152
153 ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
154 ASSERT_EQ(s, 10U);
155 // no new reads since the value is cached
156 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
157 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
158 }
159
160 namespace {
161 uint64_t GetLogDirSize(std::string dir_path, Env* env) {
162 uint64_t dir_size = 0;
163 std::vector<std::string> files;
164 env->GetChildren(dir_path, &files);
165 for (auto& f : files) {
166 uint64_t number;
167 FileType type;
168 if (ParseFileName(f, &number, &type) && type == kLogFile) {
169 std::string const file_path = dir_path + "/" + f;
170 uint64_t file_size;
171 env->GetFileSize(file_path, &file_size);
172 dir_size += file_size;
173 }
174 }
175 return dir_size;
176 }
177 std::vector<std::uint64_t> ListSpecificFiles(
178 Env* env, const std::string& path, const FileType expected_file_type) {
179 std::vector<std::string> files;
180 std::vector<uint64_t> file_numbers;
181 env->GetChildren(path, &files);
182 uint64_t number;
183 FileType type;
184 for (size_t i = 0; i < files.size(); ++i) {
185 if (ParseFileName(files[i], &number, &type)) {
186 if (type == expected_file_type) {
187 file_numbers.push_back(number);
188 }
189 }
190 }
191 return file_numbers;
192 }
193
194 int CountRecords(TransactionLogIterator* iter) {
195 int count = 0;
196 SequenceNumber lastSequence = 0;
197 BatchResult res;
198 while (iter->Valid()) {
199 res = iter->GetBatch();
200 EXPECT_TRUE(res.sequence > lastSequence);
201 ++count;
202 lastSequence = res.sequence;
203 EXPECT_OK(iter->status());
204 iter->Next();
205 }
206 return count;
207 }
208 } // namespace
209
210 TEST_F(WalManagerTest, WALArchivalSizeLimit) {
211 db_options_.wal_ttl_seconds = 0;
212 db_options_.wal_size_limit_mb = 1000;
213 Init();
214
215 // TEST : Create WalManager with huge size limit and no ttl.
216 // Create some archived files and call PurgeObsoleteWALFiles().
217 // Count the archived log files that survived.
218 // Assert that all of them did.
219 // Change size limit. Re-open WalManager.
220 // Assert that archive is not greater than wal_size_limit_mb after
221 // PurgeObsoleteWALFiles()
222 // Set ttl and time_to_check_ to small values. Re-open db.
223 // Assert that there are no archived logs left.
224
225 std::string archive_dir = ArchivalDirectory(dbname_);
226 CreateArchiveLogs(20, 5000);
227
228 std::vector<std::uint64_t> log_files =
229 ListSpecificFiles(env_.get(), archive_dir, kLogFile);
230 ASSERT_EQ(log_files.size(), 20U);
231
232 db_options_.wal_size_limit_mb = 8;
233 Reopen();
234 wal_manager_->PurgeObsoleteWALFiles();
235
236 uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
237 ASSERT_TRUE(archive_size <= db_options_.wal_size_limit_mb * 1024 * 1024);
238
239 db_options_.wal_ttl_seconds = 1;
240 env_->FakeSleepForMicroseconds(2 * 1000 * 1000);
241 Reopen();
242 wal_manager_->PurgeObsoleteWALFiles();
243
244 log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
245 ASSERT_TRUE(log_files.empty());
246 }
247
248 TEST_F(WalManagerTest, WALArchivalTtl) {
249 db_options_.wal_ttl_seconds = 1000;
250 Init();
251
252 // TEST : Create WalManager with a ttl and no size limit.
253 // Create some archived log files and call PurgeObsoleteWALFiles().
254 // Assert that files are not deleted
255 // Reopen db with small ttl.
256 // Assert that all archived logs was removed.
257
258 std::string archive_dir = ArchivalDirectory(dbname_);
259 CreateArchiveLogs(20, 5000);
260
261 std::vector<uint64_t> log_files =
262 ListSpecificFiles(env_.get(), archive_dir, kLogFile);
263 ASSERT_GT(log_files.size(), 0U);
264
265 db_options_.wal_ttl_seconds = 1;
266 env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
267 Reopen();
268 wal_manager_->PurgeObsoleteWALFiles();
269
270 log_files = ListSpecificFiles(env_.get(), archive_dir, kLogFile);
271 ASSERT_TRUE(log_files.empty());
272 }
273
274 TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
275 Init();
276 RollTheLog(false);
277 Put("key1", std::string(1024, 'a'));
278 // Create a zero record WAL file.
279 RollTheLog(false);
280 RollTheLog(false);
281
282 Put("key2", std::string(1024, 'a'));
283
284 auto iter = OpenTransactionLogIter(0);
285 ASSERT_EQ(2, CountRecords(iter.get()));
286 }
287
288 TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
289 Init();
290 RollTheLog(false);
291 auto iter = OpenTransactionLogIter(0);
292 // Check that an empty iterator is returned
293 ASSERT_TRUE(!iter->Valid());
294 }
295
296 } // namespace rocksdb
297
298 int main(int argc, char** argv) {
299 ::testing::InitGoogleTest(&argc, argv);
300 return RUN_ALL_TESTS();
301 }
302
303 #else
304 #include <stdio.h>
305
306 int main(int /*argc*/, char** /*argv*/) {
307 fprintf(stderr, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n");
308 return 0;
309 }
310
311 #endif // !ROCKSDB_LITE