]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_log_iter_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_log_iter_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 // Introduction of SyncPoint effectively disabled building and running this test
11 // in Release build.
12 // which is a pity, it is a good test
13 #if !defined(ROCKSDB_LITE)
14
15 #include "db/db_test_util.h"
16 #include "port/stack_trace.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 class DBTestXactLogIterator : public DBTestBase {
21 public:
22 DBTestXactLogIterator()
23 : DBTestBase("/db_log_iter_test", /*env_do_fsync=*/true) {}
24
25 std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
26 const SequenceNumber seq) {
27 std::unique_ptr<TransactionLogIterator> iter;
28 Status status = dbfull()->GetUpdatesSince(seq, &iter);
29 EXPECT_OK(status);
30 EXPECT_TRUE(iter->Valid());
31 return iter;
32 }
33 };
34
35 namespace {
36 SequenceNumber ReadRecords(
37 std::unique_ptr<TransactionLogIterator>& iter,
38 int& count) {
39 count = 0;
40 SequenceNumber lastSequence = 0;
41 BatchResult res;
42 while (iter->Valid()) {
43 res = iter->GetBatch();
44 EXPECT_TRUE(res.sequence > lastSequence);
45 ++count;
46 lastSequence = res.sequence;
47 EXPECT_OK(iter->status());
48 iter->Next();
49 }
50 return res.sequence;
51 }
52
53 void ExpectRecords(
54 const int expected_no_records,
55 std::unique_ptr<TransactionLogIterator>& iter) {
56 int num_records;
57 ReadRecords(iter, num_records);
58 ASSERT_EQ(num_records, expected_no_records);
59 }
60 } // namespace
61
62 TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
63 do {
64 Options options = OptionsForLogIterTest();
65 DestroyAndReopen(options);
66 CreateAndReopenWithCF({"pikachu"}, options);
67 Put(0, "key1", DummyString(1024));
68 Put(1, "key2", DummyString(1024));
69 Put(1, "key2", DummyString(1024));
70 ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
71 {
72 auto iter = OpenTransactionLogIter(0);
73 ExpectRecords(3, iter);
74 }
75 ReopenWithColumnFamilies({"default", "pikachu"}, options);
76 env_->SleepForMicroseconds(2 * 1000 * 1000);
77 {
78 Put(0, "key4", DummyString(1024));
79 Put(1, "key5", DummyString(1024));
80 Put(0, "key6", DummyString(1024));
81 }
82 {
83 auto iter = OpenTransactionLogIter(0);
84 ExpectRecords(6, iter);
85 }
86 } while (ChangeCompactOptions());
87 }
88
89 #ifndef NDEBUG // sync point is not included with DNDEBUG build
90 TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
91 static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
92 static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
93 {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
94 "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
95 {"WalManager::GetSortedWalsOfType:1",
96 "WalManager::PurgeObsoleteFiles:1",
97 "WalManager::PurgeObsoleteFiles:2",
98 "WalManager::GetSortedWalsOfType:2"}};
99 for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
100 // Setup sync point dependency to reproduce the race condition of
101 // a log file moved to archived dir, in the middle of GetSortedWalFiles
102 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
103 {sync_points[test][0], sync_points[test][1]},
104 {sync_points[test][2], sync_points[test][3]},
105 });
106
107 do {
108 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
109 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
110 Options options = OptionsForLogIterTest();
111 DestroyAndReopen(options);
112 Put("key1", DummyString(1024));
113 dbfull()->Flush(FlushOptions());
114 Put("key2", DummyString(1024));
115 dbfull()->Flush(FlushOptions());
116 Put("key3", DummyString(1024));
117 dbfull()->Flush(FlushOptions());
118 Put("key4", DummyString(1024));
119 ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
120 dbfull()->FlushWAL(false);
121
122 {
123 auto iter = OpenTransactionLogIter(0);
124 ExpectRecords(4, iter);
125 }
126
127 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
128 // trigger async flush, and log move. Well, log move will
129 // wait until the GetSortedWalFiles:1 to reproduce the race
130 // condition
131 FlushOptions flush_options;
132 flush_options.wait = false;
133 dbfull()->Flush(flush_options);
134
135 // "key5" would be written in a new memtable and log
136 Put("key5", DummyString(1024));
137 dbfull()->FlushWAL(false);
138 {
139 // this iter would miss "key4" if not fixed
140 auto iter = OpenTransactionLogIter(0);
141 ExpectRecords(5, iter);
142 }
143 } while (ChangeCompactOptions());
144 }
145 }
146 #endif
147
148 TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
149 do {
150 Options options = OptionsForLogIterTest();
151 DestroyAndReopen(options);
152 Put("key1", DummyString(1024));
153 auto iter = OpenTransactionLogIter(0);
154 ASSERT_OK(iter->status());
155 ASSERT_TRUE(iter->Valid());
156 iter->Next();
157 ASSERT_TRUE(!iter->Valid());
158 ASSERT_OK(iter->status());
159 Put("key2", DummyString(1024));
160 iter->Next();
161 ASSERT_OK(iter->status());
162 ASSERT_TRUE(iter->Valid());
163 } while (ChangeCompactOptions());
164 }
165
166 TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
167 do {
168 Options options = OptionsForLogIterTest();
169 DestroyAndReopen(options);
170 Put("key1", DummyString(1024));
171 Put("key2", DummyString(1023));
172 dbfull()->Flush(FlushOptions());
173 Reopen(options);
174 auto iter = OpenTransactionLogIter(0);
175 ExpectRecords(2, iter);
176 } while (ChangeCompactOptions());
177 }
178
179 TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
180 do {
181 Options options = OptionsForLogIterTest();
182 DestroyAndReopen(options);
183 for (int i = 0; i < 1024; i++) {
184 Put("key"+ToString(i), DummyString(10));
185 }
186 dbfull()->Flush(FlushOptions());
187 dbfull()->FlushWAL(false);
188 // Corrupt this log to create a gap
189 ROCKSDB_NAMESPACE::VectorLogPtr wal_files;
190 ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
191 const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
192 if (mem_env_) {
193 mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2);
194 } else {
195 ASSERT_EQ(0, truncate(logfile_path.c_str(),
196 wal_files.front()->SizeFileBytes() / 2));
197 }
198
199 // Insert a new entry to a new log file
200 Put("key1025", DummyString(10));
201 dbfull()->FlushWAL(false);
202 // Try to read from the beginning. Should stop before the gap and read less
203 // than 1025 entries
204 auto iter = OpenTransactionLogIter(0);
205 int count;
206 SequenceNumber last_sequence_read = ReadRecords(iter, count);
207 ASSERT_LT(last_sequence_read, 1025U);
208 // Try to read past the gap, should be able to seek to key1025
209 auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
210 ExpectRecords(1, iter2);
211 } while (ChangeCompactOptions());
212 }
213
214 TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
215 do {
216 Options options = OptionsForLogIterTest();
217 DestroyAndReopen(options);
218 CreateAndReopenWithCF({"pikachu"}, options);
219 WriteBatch batch;
220 batch.Put(handles_[1], "key1", DummyString(1024));
221 batch.Put(handles_[0], "key2", DummyString(1024));
222 batch.Put(handles_[1], "key3", DummyString(1024));
223 batch.Delete(handles_[0], "key2");
224 dbfull()->Write(WriteOptions(), &batch);
225 Flush(1);
226 Flush(0);
227 ReopenWithColumnFamilies({"default", "pikachu"}, options);
228 Put(1, "key4", DummyString(1024));
229 auto iter = OpenTransactionLogIter(3);
230 ExpectRecords(2, iter);
231 } while (ChangeCompactOptions());
232 }
233
234 TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
235 Options options = OptionsForLogIterTest();
236 DestroyAndReopen(options);
237 CreateAndReopenWithCF({"pikachu"}, options);
238 {
239 WriteBatch batch;
240 batch.Put(handles_[1], "key1", DummyString(1024));
241 batch.Put(handles_[0], "key2", DummyString(1024));
242 batch.PutLogData(Slice("blob1"));
243 batch.Put(handles_[1], "key3", DummyString(1024));
244 batch.PutLogData(Slice("blob2"));
245 batch.Delete(handles_[0], "key2");
246 dbfull()->Write(WriteOptions(), &batch);
247 ReopenWithColumnFamilies({"default", "pikachu"}, options);
248 }
249
250 auto res = OpenTransactionLogIter(0)->GetBatch();
251 struct Handler : public WriteBatch::Handler {
252 std::string seen;
253 Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
254 seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " +
255 ToString(value.size()) + ")";
256 return Status::OK();
257 }
258 Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
259 seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +
260 ToString(value.size()) + ")";
261 return Status::OK();
262 }
263 void LogData(const Slice& blob) override {
264 seen += "LogData(" + blob.ToString() + ")";
265 }
266 Status DeleteCF(uint32_t cf, const Slice& key) override {
267 seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";
268 return Status::OK();
269 }
270 } handler;
271 res.writeBatchPtr->Iterate(&handler);
272 ASSERT_EQ(
273 "Put(1, key1, 1024)"
274 "Put(0, key2, 1024)"
275 "LogData(blob1)"
276 "Put(1, key3, 1024)"
277 "LogData(blob2)"
278 "Delete(0, key2)",
279 handler.seen);
280 }
281 } // namespace ROCKSDB_NAMESPACE
282
283 #endif // !defined(ROCKSDB_LITE)
284
285 int main(int argc, char** argv) {
286 #if !defined(ROCKSDB_LITE)
287 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
288 ::testing::InitGoogleTest(&argc, argv);
289 return RUN_ALL_TESTS();
290 #else
291 (void) argc;
292 (void) argv;
293 return 0;
294 #endif
295 }