]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_log_iter_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_log_iter_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 // Introduction of SyncPoint effectively disabled building and running this test
11 // in Release build.
12 // which is a pity, it is a good test
13 #if !defined(ROCKSDB_LITE)
14
15 #include "db/db_test_util.h"
16 #include "env/mock_env.h"
17 #include "port/stack_trace.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 class DBTestXactLogIterator : public DBTestBase {
22 public:
23 DBTestXactLogIterator()
24 : DBTestBase("db_log_iter_test", /*env_do_fsync=*/true) {}
25
26 std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
27 const SequenceNumber seq) {
28 std::unique_ptr<TransactionLogIterator> iter;
29 Status status = dbfull()->GetUpdatesSince(seq, &iter);
30 EXPECT_OK(status);
31 EXPECT_TRUE(iter->Valid());
32 return iter;
33 }
34 };
35
36 namespace {
37 SequenceNumber ReadRecords(std::unique_ptr<TransactionLogIterator>& iter,
38 int& count, bool expect_ok = true) {
39 count = 0;
40 SequenceNumber lastSequence = 0;
41 BatchResult res;
42 while (iter->Valid()) {
43 res = iter->GetBatch();
44 EXPECT_TRUE(res.sequence > lastSequence);
45 ++count;
46 lastSequence = res.sequence;
47 EXPECT_OK(iter->status());
48 iter->Next();
49 }
50 if (expect_ok) {
51 EXPECT_OK(iter->status());
52 } else {
53 EXPECT_NOK(iter->status());
54 }
55 return res.sequence;
56 }
57
58 void ExpectRecords(const int expected_no_records,
59 std::unique_ptr<TransactionLogIterator>& iter) {
60 int num_records;
61 ReadRecords(iter, num_records);
62 ASSERT_EQ(num_records, expected_no_records);
63 }
64 } // anonymous namespace
65
66 TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
67 do {
68 Options options = OptionsForLogIterTest();
69 DestroyAndReopen(options);
70 CreateAndReopenWithCF({"pikachu"}, options);
71 ASSERT_OK(Put(0, "key1", DummyString(1024)));
72 ASSERT_OK(Put(1, "key2", DummyString(1024)));
73 ASSERT_OK(Put(1, "key2", DummyString(1024)));
74 ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
75 {
76 auto iter = OpenTransactionLogIter(0);
77 ExpectRecords(3, iter);
78 }
79 ReopenWithColumnFamilies({"default", "pikachu"}, options);
80 env_->SleepForMicroseconds(2 * 1000 * 1000);
81 {
82 ASSERT_OK(Put(0, "key4", DummyString(1024)));
83 ASSERT_OK(Put(1, "key5", DummyString(1024)));
84 ASSERT_OK(Put(0, "key6", DummyString(1024)));
85 }
86 {
87 auto iter = OpenTransactionLogIter(0);
88 ExpectRecords(6, iter);
89 }
90 } while (ChangeCompactOptions());
91 }
92
93 #ifndef NDEBUG // sync point is not included with DNDEBUG build
94 TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
95 static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
96 static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
97 {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
98 "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
99 {"WalManager::GetSortedWalsOfType:1", "WalManager::PurgeObsoleteFiles:1",
100 "WalManager::PurgeObsoleteFiles:2",
101 "WalManager::GetSortedWalsOfType:2"}};
102 for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
103 // Setup sync point dependency to reproduce the race condition of
104 // a log file moved to archived dir, in the middle of GetSortedWalFiles
105 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
106 {sync_points[test][0], sync_points[test][1]},
107 {sync_points[test][2], sync_points[test][3]},
108 });
109
110 do {
111 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
112 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
113 Options options = OptionsForLogIterTest();
114 DestroyAndReopen(options);
115 ASSERT_OK(Put("key1", DummyString(1024)));
116 ASSERT_OK(dbfull()->Flush(FlushOptions()));
117 ASSERT_OK(Put("key2", DummyString(1024)));
118 ASSERT_OK(dbfull()->Flush(FlushOptions()));
119 ASSERT_OK(Put("key3", DummyString(1024)));
120 ASSERT_OK(dbfull()->Flush(FlushOptions()));
121 ASSERT_OK(Put("key4", DummyString(1024)));
122 ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
123 ASSERT_OK(dbfull()->FlushWAL(false));
124
125 {
126 auto iter = OpenTransactionLogIter(0);
127 ExpectRecords(4, iter);
128 }
129
130 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
131 // trigger async flush, and log move. Well, log move will
132 // wait until the GetSortedWalFiles:1 to reproduce the race
133 // condition
134 FlushOptions flush_options;
135 flush_options.wait = false;
136 ASSERT_OK(dbfull()->Flush(flush_options));
137
138 // "key5" would be written in a new memtable and log
139 ASSERT_OK(Put("key5", DummyString(1024)));
140 ASSERT_OK(dbfull()->FlushWAL(false));
141 {
142 // this iter would miss "key4" if not fixed
143 auto iter = OpenTransactionLogIter(0);
144 ExpectRecords(5, iter);
145 }
146 } while (ChangeCompactOptions());
147 }
148 }
149 #endif
150
151 TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
152 do {
153 Options options = OptionsForLogIterTest();
154 DestroyAndReopen(options);
155 ASSERT_OK(Put("key1", DummyString(1024)));
156 auto iter = OpenTransactionLogIter(0);
157 ASSERT_OK(iter->status());
158 ASSERT_TRUE(iter->Valid());
159 iter->Next();
160 ASSERT_TRUE(!iter->Valid());
161 ASSERT_OK(iter->status());
162 ASSERT_OK(Put("key2", DummyString(1024)));
163 iter->Next();
164 ASSERT_OK(iter->status());
165 ASSERT_TRUE(iter->Valid());
166 } while (ChangeCompactOptions());
167 }
168
169 TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
170 do {
171 Options options = OptionsForLogIterTest();
172 DestroyAndReopen(options);
173 ASSERT_OK(Put("key1", DummyString(1024)));
174 ASSERT_OK(Put("key2", DummyString(1023)));
175 ASSERT_OK(dbfull()->Flush(FlushOptions()));
176 Reopen(options);
177 auto iter = OpenTransactionLogIter(0);
178 ExpectRecords(2, iter);
179 } while (ChangeCompactOptions());
180 }
181
182 TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
183 do {
184 Options options = OptionsForLogIterTest();
185 DestroyAndReopen(options);
186
187 for (int i = 0; i < 1024; i++) {
188 ASSERT_OK(Put("key" + std::to_string(i), DummyString(10)));
189 }
190
191 ASSERT_OK(Flush());
192 ASSERT_OK(db_->FlushWAL(false));
193
194 // Corrupt this log to create a gap
195 ASSERT_OK(db_->DisableFileDeletions());
196
197 VectorLogPtr wal_files;
198 ASSERT_OK(db_->GetSortedWalFiles(wal_files));
199 ASSERT_FALSE(wal_files.empty());
200
201 const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
202 ASSERT_OK(test::TruncateFile(env_, logfile_path,
203 wal_files.front()->SizeFileBytes() / 2));
204
205 ASSERT_OK(db_->EnableFileDeletions());
206
207 // Insert a new entry to a new log file
208 ASSERT_OK(Put("key1025", DummyString(10)));
209 ASSERT_OK(db_->FlushWAL(false));
210
211 // Try to read from the beginning. Should stop before the gap and read less
212 // than 1025 entries
213 auto iter = OpenTransactionLogIter(0);
214 int count = 0;
215 SequenceNumber last_sequence_read = ReadRecords(iter, count, false);
216 ASSERT_LT(last_sequence_read, 1025U);
217
218 // Try to read past the gap, should be able to seek to key1025
219 auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
220 ExpectRecords(1, iter2);
221 } while (ChangeCompactOptions());
222 }
223
224 TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
225 do {
226 Options options = OptionsForLogIterTest();
227 DestroyAndReopen(options);
228 CreateAndReopenWithCF({"pikachu"}, options);
229 WriteBatch batch;
230 ASSERT_OK(batch.Put(handles_[1], "key1", DummyString(1024)));
231 ASSERT_OK(batch.Put(handles_[0], "key2", DummyString(1024)));
232 ASSERT_OK(batch.Put(handles_[1], "key3", DummyString(1024)));
233 ASSERT_OK(batch.Delete(handles_[0], "key2"));
234 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
235 ASSERT_OK(Flush(1));
236 ASSERT_OK(Flush(0));
237 ReopenWithColumnFamilies({"default", "pikachu"}, options);
238 ASSERT_OK(Put(1, "key4", DummyString(1024)));
239 auto iter = OpenTransactionLogIter(3);
240 ExpectRecords(2, iter);
241 } while (ChangeCompactOptions());
242 }
243
244 TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
245 Options options = OptionsForLogIterTest();
246 DestroyAndReopen(options);
247 CreateAndReopenWithCF({"pikachu"}, options);
248 {
249 WriteBatch batch;
250 ASSERT_OK(batch.Put(handles_[1], "key1", DummyString(1024)));
251 ASSERT_OK(batch.Put(handles_[0], "key2", DummyString(1024)));
252 ASSERT_OK(batch.PutLogData(Slice("blob1")));
253 ASSERT_OK(batch.Put(handles_[1], "key3", DummyString(1024)));
254 ASSERT_OK(batch.PutLogData(Slice("blob2")));
255 ASSERT_OK(batch.Delete(handles_[0], "key2"));
256 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
257 ReopenWithColumnFamilies({"default", "pikachu"}, options);
258 }
259
260 auto res = OpenTransactionLogIter(0)->GetBatch();
261 struct Handler : public WriteBatch::Handler {
262 std::string seen;
263 Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
264 seen += "Put(" + std::to_string(cf) + ", " + key.ToString() + ", " +
265 std::to_string(value.size()) + ")";
266 return Status::OK();
267 }
268 Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
269 seen += "Merge(" + std::to_string(cf) + ", " + key.ToString() + ", " +
270 std::to_string(value.size()) + ")";
271 return Status::OK();
272 }
273 void LogData(const Slice& blob) override {
274 seen += "LogData(" + blob.ToString() + ")";
275 }
276 Status DeleteCF(uint32_t cf, const Slice& key) override {
277 seen += "Delete(" + std::to_string(cf) + ", " + key.ToString() + ")";
278 return Status::OK();
279 }
280 } handler;
281 ASSERT_OK(res.writeBatchPtr->Iterate(&handler));
282 ASSERT_EQ(
283 "Put(1, key1, 1024)"
284 "Put(0, key2, 1024)"
285 "LogData(blob1)"
286 "Put(1, key3, 1024)"
287 "LogData(blob2)"
288 "Delete(0, key2)",
289 handler.seen);
290 }
291 } // namespace ROCKSDB_NAMESPACE
292
293 #endif // !defined(ROCKSDB_LITE)
294
295 int main(int argc, char** argv) {
296 #if !defined(ROCKSDB_LITE)
297 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
298 ::testing::InitGoogleTest(&argc, argv);
299 return RUN_ALL_TESTS();
300 #else
301 (void)argc;
302 (void)argv;
303 return 0;
304 #endif
305 }