1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
11 #include "rocksdb/cache.h"
12 #include "rocksdb/write_batch.h"
13 #include "rocksdb/write_buffer_manager.h"
15 #include "db/column_family.h"
16 #include "db/db_impl.h"
17 #include "db/log_writer.h"
18 #include "db/version_set.h"
19 #include "db/wal_manager.h"
20 #include "env/mock_env.h"
21 #include "table/mock_table.h"
22 #include "util/file_reader_writer.h"
23 #include "util/string_util.h"
24 #include "util/testharness.h"
25 #include "util/testutil.h"
29 // TODO(icanadi) mock out VersionSet
30 // TODO(icanadi) move other WalManager-specific tests from db_test here
31 class WalManagerTest
: public testing::Test
{
34 : env_(new MockEnv(Env::Default())),
35 dbname_(test::PerThreadDBPath("wal_manager_test")),
37 table_cache_(NewLRUCache(50000, 16)),
38 write_buffer_manager_(db_options_
.db_write_buffer_size
),
39 current_log_number_(0) {
40 DestroyDB(dbname_
, Options());
44 ASSERT_OK(env_
->CreateDirIfMissing(dbname_
));
45 ASSERT_OK(env_
->CreateDirIfMissing(ArchivalDirectory(dbname_
)));
46 db_options_
.db_paths
.emplace_back(dbname_
,
47 std::numeric_limits
<uint64_t>::max());
48 db_options_
.wal_dir
= dbname_
;
49 db_options_
.env
= env_
.get();
51 versions_
.reset(new VersionSet(dbname_
, &db_options_
, env_options_
,
52 table_cache_
.get(), &write_buffer_manager_
,
55 wal_manager_
.reset(new WalManager(db_options_
, env_options_
));
59 wal_manager_
.reset(new WalManager(db_options_
, env_options_
));
63 void Put(const std::string
& key
, const std::string
& value
) {
64 assert(current_log_writer_
.get() != nullptr);
65 uint64_t seq
= versions_
->LastSequence() + 1;
67 batch
.Put(key
, value
);
68 WriteBatchInternal::SetSequence(&batch
, seq
);
69 current_log_writer_
->AddRecord(WriteBatchInternal::Contents(&batch
));
70 versions_
->SetLastAllocatedSequence(seq
);
71 versions_
->SetLastPublishedSequence(seq
);
72 versions_
->SetLastSequence(seq
);
76 void RollTheLog(bool /*archived*/) {
77 current_log_number_
++;
78 std::string fname
= ArchivedLogFileName(dbname_
, current_log_number_
);
79 std::unique_ptr
<WritableFile
> file
;
80 ASSERT_OK(env_
->NewWritableFile(fname
, &file
, env_options_
));
81 std::unique_ptr
<WritableFileWriter
> file_writer(
82 new WritableFileWriter(std::move(file
), fname
, env_options_
));
83 current_log_writer_
.reset(new log::Writer(std::move(file_writer
), 0, false));
86 void CreateArchiveLogs(int num_logs
, int entries_per_log
) {
87 for (int i
= 1; i
<= num_logs
; ++i
) {
89 for (int k
= 0; k
< entries_per_log
; ++k
) {
90 Put(ToString(k
), std::string(1024, 'a'));
95 std::unique_ptr
<TransactionLogIterator
> OpenTransactionLogIter(
96 const SequenceNumber seq
) {
97 std::unique_ptr
<TransactionLogIterator
> iter
;
98 Status status
= wal_manager_
->GetUpdatesSince(
99 seq
, &iter
, TransactionLogIterator::ReadOptions(), versions_
.get());
104 std::unique_ptr
<MockEnv
> env_
;
106 ImmutableDBOptions db_options_
;
107 WriteController write_controller_
;
108 EnvOptions env_options_
;
109 std::shared_ptr
<Cache
> table_cache_
;
110 WriteBufferManager write_buffer_manager_
;
111 std::unique_ptr
<VersionSet
> versions_
;
112 std::unique_ptr
<WalManager
> wal_manager_
;
114 std::unique_ptr
<log::Writer
> current_log_writer_
;
115 uint64_t current_log_number_
;
118 TEST_F(WalManagerTest
, ReadFirstRecordCache
) {
120 std::string path
= dbname_
+ "/000001.log";
121 std::unique_ptr
<WritableFile
> file
;
122 ASSERT_OK(env_
->NewWritableFile(path
, &file
, EnvOptions()));
125 ASSERT_OK(wal_manager_
->TEST_ReadFirstLine(path
, 1 /* number */, &s
));
129 wal_manager_
->TEST_ReadFirstRecord(kAliveLogFile
, 1 /* number */, &s
));
132 std::unique_ptr
<WritableFileWriter
> file_writer(
133 new WritableFileWriter(std::move(file
), path
, EnvOptions()));
134 log::Writer
writer(std::move(file_writer
), 1,
135 db_options_
.recycle_log_file_num
> 0);
137 batch
.Put("foo", "bar");
138 WriteBatchInternal::SetSequence(&batch
, 10);
139 writer
.AddRecord(WriteBatchInternal::Contents(&batch
));
141 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
142 // Waiting for lei to finish with db_test
143 // env_->count_sequential_reads_ = true;
144 // sequential_read_counter_ sanity test
145 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
147 ASSERT_OK(wal_manager_
->TEST_ReadFirstRecord(kAliveLogFile
, 1, &s
));
150 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
151 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
153 ASSERT_OK(wal_manager_
->TEST_ReadFirstRecord(kAliveLogFile
, 1, &s
));
155 // no new reads since the value is cached
156 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
157 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
161 uint64_t GetLogDirSize(std::string dir_path
, Env
* env
) {
162 uint64_t dir_size
= 0;
163 std::vector
<std::string
> files
;
164 env
->GetChildren(dir_path
, &files
);
165 for (auto& f
: files
) {
168 if (ParseFileName(f
, &number
, &type
) && type
== kLogFile
) {
169 std::string
const file_path
= dir_path
+ "/" + f
;
171 env
->GetFileSize(file_path
, &file_size
);
172 dir_size
+= file_size
;
177 std::vector
<std::uint64_t> ListSpecificFiles(
178 Env
* env
, const std::string
& path
, const FileType expected_file_type
) {
179 std::vector
<std::string
> files
;
180 std::vector
<uint64_t> file_numbers
;
181 env
->GetChildren(path
, &files
);
184 for (size_t i
= 0; i
< files
.size(); ++i
) {
185 if (ParseFileName(files
[i
], &number
, &type
)) {
186 if (type
== expected_file_type
) {
187 file_numbers
.push_back(number
);
194 int CountRecords(TransactionLogIterator
* iter
) {
196 SequenceNumber lastSequence
= 0;
198 while (iter
->Valid()) {
199 res
= iter
->GetBatch();
200 EXPECT_TRUE(res
.sequence
> lastSequence
);
202 lastSequence
= res
.sequence
;
203 EXPECT_OK(iter
->status());
210 TEST_F(WalManagerTest
, WALArchivalSizeLimit
) {
211 db_options_
.wal_ttl_seconds
= 0;
212 db_options_
.wal_size_limit_mb
= 1000;
215 // TEST : Create WalManager with huge size limit and no ttl.
216 // Create some archived files and call PurgeObsoleteWALFiles().
217 // Count the archived log files that survived.
218 // Assert that all of them did.
219 // Change size limit. Re-open WalManager.
220 // Assert that archive is not greater than wal_size_limit_mb after
221 // PurgeObsoleteWALFiles()
222 // Set ttl and time_to_check_ to small values. Re-open db.
223 // Assert that there are no archived logs left.
225 std::string archive_dir
= ArchivalDirectory(dbname_
);
226 CreateArchiveLogs(20, 5000);
228 std::vector
<std::uint64_t> log_files
=
229 ListSpecificFiles(env_
.get(), archive_dir
, kLogFile
);
230 ASSERT_EQ(log_files
.size(), 20U);
232 db_options_
.wal_size_limit_mb
= 8;
234 wal_manager_
->PurgeObsoleteWALFiles();
236 uint64_t archive_size
= GetLogDirSize(archive_dir
, env_
.get());
237 ASSERT_TRUE(archive_size
<= db_options_
.wal_size_limit_mb
* 1024 * 1024);
239 db_options_
.wal_ttl_seconds
= 1;
240 env_
->FakeSleepForMicroseconds(2 * 1000 * 1000);
242 wal_manager_
->PurgeObsoleteWALFiles();
244 log_files
= ListSpecificFiles(env_
.get(), archive_dir
, kLogFile
);
245 ASSERT_TRUE(log_files
.empty());
248 TEST_F(WalManagerTest
, WALArchivalTtl
) {
249 db_options_
.wal_ttl_seconds
= 1000;
252 // TEST : Create WalManager with a ttl and no size limit.
253 // Create some archived log files and call PurgeObsoleteWALFiles().
254 // Assert that files are not deleted
255 // Reopen db with small ttl.
256 // Assert that all archived logs was removed.
258 std::string archive_dir
= ArchivalDirectory(dbname_
);
259 CreateArchiveLogs(20, 5000);
261 std::vector
<uint64_t> log_files
=
262 ListSpecificFiles(env_
.get(), archive_dir
, kLogFile
);
263 ASSERT_GT(log_files
.size(), 0U);
265 db_options_
.wal_ttl_seconds
= 1;
266 env_
->FakeSleepForMicroseconds(3 * 1000 * 1000);
268 wal_manager_
->PurgeObsoleteWALFiles();
270 log_files
= ListSpecificFiles(env_
.get(), archive_dir
, kLogFile
);
271 ASSERT_TRUE(log_files
.empty());
274 TEST_F(WalManagerTest
, TransactionLogIteratorMoveOverZeroFiles
) {
277 Put("key1", std::string(1024, 'a'));
278 // Create a zero record WAL file.
282 Put("key2", std::string(1024, 'a'));
284 auto iter
= OpenTransactionLogIter(0);
285 ASSERT_EQ(2, CountRecords(iter
.get()));
288 TEST_F(WalManagerTest
, TransactionLogIteratorJustEmptyFile
) {
291 auto iter
= OpenTransactionLogIter(0);
292 // Check that an empty iterator is returned
293 ASSERT_TRUE(!iter
->Valid());
296 } // namespace rocksdb
298 int main(int argc
, char** argv
) {
299 ::testing::InitGoogleTest(&argc
, argv
);
300 return RUN_ALL_TESTS();
306 int main(int /*argc*/, char** /*argv*/) {
307 fprintf(stderr
, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n");
311 #endif // !ROCKSDB_LITE