]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/log_reader.h" | |
11 | #include "db/log_writer.h" | |
f67539c2 TL |
12 | #include "env/composite_env_wrapper.h" |
13 | #include "file/sequence_file_reader.h" | |
14 | #include "file/writable_file_writer.h" | |
7c673cae | 15 | #include "rocksdb/env.h" |
f67539c2 TL |
16 | #include "test_util/testharness.h" |
17 | #include "test_util/testutil.h" | |
7c673cae FG |
18 | #include "util/coding.h" |
19 | #include "util/crc32c.h" | |
7c673cae | 20 | #include "util/random.h" |
7c673cae | 21 | |
f67539c2 | 22 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
23 | namespace log { |
24 | ||
25 | // Construct a string of the specified length made out of the supplied | |
26 | // partial string. | |
27 | static std::string BigString(const std::string& partial_string, size_t n) { | |
28 | std::string result; | |
29 | while (result.size() < n) { | |
30 | result.append(partial_string); | |
31 | } | |
32 | result.resize(n); | |
33 | return result; | |
34 | } | |
35 | ||
36 | // Construct a string from a number | |
37 | static std::string NumberString(int n) { | |
38 | char buf[50]; | |
39 | snprintf(buf, sizeof(buf), "%d.", n); | |
40 | return std::string(buf); | |
41 | } | |
42 | ||
43 | // Return a skewed potentially long string | |
44 | static std::string RandomSkewedString(int i, Random* rnd) { | |
45 | return BigString(NumberString(i), rnd->Skewed(17)); | |
46 | } | |
47 | ||
494da23a TL |
48 | // Param type is tuple<int, bool> |
49 | // get<0>(tuple): non-zero if recycling log, zero if regular log | |
50 | // get<1>(tuple): true if allow retry after read EOF, false otherwise | |
51 | class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> { | |
7c673cae FG |
52 | private: |
53 | class StringSource : public SequentialFile { | |
54 | public: | |
55 | Slice& contents_; | |
56 | bool force_error_; | |
57 | size_t force_error_position_; | |
58 | bool force_eof_; | |
59 | size_t force_eof_position_; | |
60 | bool returned_partial_; | |
494da23a TL |
61 | bool fail_after_read_partial_; |
62 | explicit StringSource(Slice& contents, bool fail_after_read_partial) | |
63 | : contents_(contents), | |
64 | force_error_(false), | |
65 | force_error_position_(0), | |
66 | force_eof_(false), | |
67 | force_eof_position_(0), | |
68 | returned_partial_(false), | |
69 | fail_after_read_partial_(fail_after_read_partial) {} | |
70 | ||
71 | Status Read(size_t n, Slice* result, char* scratch) override { | |
72 | if (fail_after_read_partial_) { | |
73 | EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; | |
74 | } | |
7c673cae FG |
75 | |
76 | if (force_error_) { | |
77 | if (force_error_position_ >= n) { | |
78 | force_error_position_ -= n; | |
79 | } else { | |
80 | *result = Slice(contents_.data(), force_error_position_); | |
81 | contents_.remove_prefix(force_error_position_); | |
82 | force_error_ = false; | |
83 | returned_partial_ = true; | |
84 | return Status::Corruption("read error"); | |
85 | } | |
86 | } | |
87 | ||
88 | if (contents_.size() < n) { | |
89 | n = contents_.size(); | |
90 | returned_partial_ = true; | |
91 | } | |
92 | ||
93 | if (force_eof_) { | |
94 | if (force_eof_position_ >= n) { | |
95 | force_eof_position_ -= n; | |
96 | } else { | |
97 | force_eof_ = false; | |
98 | n = force_eof_position_; | |
99 | returned_partial_ = true; | |
100 | } | |
101 | } | |
102 | ||
103 | // By using scratch we ensure that caller has control over the | |
104 | // lifetime of result.data() | |
105 | memcpy(scratch, contents_.data(), n); | |
106 | *result = Slice(scratch, n); | |
107 | ||
108 | contents_.remove_prefix(n); | |
109 | return Status::OK(); | |
110 | } | |
111 | ||
494da23a | 112 | Status Skip(uint64_t n) override { |
7c673cae FG |
113 | if (n > contents_.size()) { |
114 | contents_.clear(); | |
115 | return Status::NotFound("in-memory file skipepd past end"); | |
116 | } | |
117 | ||
118 | contents_.remove_prefix(n); | |
119 | ||
120 | return Status::OK(); | |
121 | } | |
122 | }; | |
123 | ||
f67539c2 TL |
124 | inline StringSource* GetStringSourceFromLegacyReader( |
125 | SequentialFileReader* reader) { | |
126 | LegacySequentialFileWrapper* file = | |
127 | static_cast<LegacySequentialFileWrapper*>(reader->file()); | |
128 | return static_cast<StringSource*>(file->target()); | |
129 | } | |
130 | ||
7c673cae FG |
131 | class ReportCollector : public Reader::Reporter { |
132 | public: | |
133 | size_t dropped_bytes_; | |
134 | std::string message_; | |
135 | ||
136 | ReportCollector() : dropped_bytes_(0) { } | |
494da23a | 137 | void Corruption(size_t bytes, const Status& status) override { |
7c673cae FG |
138 | dropped_bytes_ += bytes; |
139 | message_.append(status.ToString()); | |
140 | } | |
141 | }; | |
142 | ||
143 | std::string& dest_contents() { | |
f67539c2 | 144 | auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); |
7c673cae FG |
145 | assert(dest); |
146 | return dest->contents_; | |
147 | } | |
148 | ||
149 | const std::string& dest_contents() const { | |
f67539c2 | 150 | auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); |
7c673cae FG |
151 | assert(dest); |
152 | return dest->contents_; | |
153 | } | |
154 | ||
155 | void reset_source_contents() { | |
f67539c2 | 156 | auto src = GetStringSourceFromLegacyReader(reader_->file()); |
7c673cae FG |
157 | assert(src); |
158 | src->contents_ = dest_contents(); | |
159 | } | |
160 | ||
161 | Slice reader_contents_; | |
494da23a TL |
162 | std::unique_ptr<WritableFileWriter> dest_holder_; |
163 | std::unique_ptr<SequentialFileReader> source_holder_; | |
7c673cae FG |
164 | ReportCollector report_; |
165 | Writer writer_; | |
494da23a | 166 | std::unique_ptr<Reader> reader_; |
7c673cae | 167 | |
494da23a TL |
168 | protected: |
169 | bool allow_retry_read_; | |
7c673cae FG |
170 | |
171 | public: | |
172 | LogTest() | |
173 | : reader_contents_(), | |
174 | dest_holder_(test::GetWritableFileWriter( | |
11fdf7f2 TL |
175 | new test::StringSink(&reader_contents_), "" /* don't care */)), |
176 | source_holder_(test::GetSequentialFileReader( | |
494da23a TL |
177 | new StringSource(reader_contents_, !std::get<1>(GetParam())), |
178 | "" /* file name */)), | |
179 | writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())), | |
180 | allow_retry_read_(std::get<1>(GetParam())) { | |
181 | if (allow_retry_read_) { | |
182 | reader_.reset(new FragmentBufferedReader( | |
183 | nullptr, std::move(source_holder_), &report_, true /* checksum */, | |
184 | 123 /* log_number */)); | |
185 | } else { | |
186 | reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_, | |
187 | true /* checksum */, 123 /* log_number */)); | |
188 | } | |
7c673cae FG |
189 | } |
190 | ||
191 | Slice* get_reader_contents() { return &reader_contents_; } | |
192 | ||
193 | void Write(const std::string& msg) { | |
194 | writer_.AddRecord(Slice(msg)); | |
195 | } | |
196 | ||
197 | size_t WrittenBytes() const { | |
198 | return dest_contents().size(); | |
199 | } | |
200 | ||
201 | std::string Read(const WALRecoveryMode wal_recovery_mode = | |
202 | WALRecoveryMode::kTolerateCorruptedTailRecords) { | |
203 | std::string scratch; | |
204 | Slice record; | |
494da23a TL |
205 | bool ret = false; |
206 | ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode); | |
207 | if (ret) { | |
7c673cae FG |
208 | return record.ToString(); |
209 | } else { | |
210 | return "EOF"; | |
211 | } | |
212 | } | |
213 | ||
11fdf7f2 | 214 | void IncrementByte(int offset, char delta) { |
7c673cae FG |
215 | dest_contents()[offset] += delta; |
216 | } | |
217 | ||
218 | void SetByte(int offset, char new_byte) { | |
219 | dest_contents()[offset] = new_byte; | |
220 | } | |
221 | ||
222 | void ShrinkSize(int bytes) { | |
f67539c2 | 223 | auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); |
7c673cae FG |
224 | assert(dest); |
225 | dest->Drop(bytes); | |
226 | } | |
227 | ||
228 | void FixChecksum(int header_offset, int len, bool recyclable) { | |
229 | // Compute crc of type/len/data | |
230 | int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize; | |
231 | uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6], | |
232 | header_size - 6 + len); | |
233 | crc = crc32c::Mask(crc); | |
234 | EncodeFixed32(&dest_contents()[header_offset], crc); | |
235 | } | |
236 | ||
237 | void ForceError(size_t position = 0) { | |
f67539c2 | 238 | auto src = GetStringSourceFromLegacyReader(reader_->file()); |
7c673cae FG |
239 | src->force_error_ = true; |
240 | src->force_error_position_ = position; | |
241 | } | |
242 | ||
243 | size_t DroppedBytes() const { | |
244 | return report_.dropped_bytes_; | |
245 | } | |
246 | ||
247 | std::string ReportMessage() const { | |
248 | return report_.message_; | |
249 | } | |
250 | ||
251 | void ForceEOF(size_t position = 0) { | |
f67539c2 | 252 | auto src = GetStringSourceFromLegacyReader(reader_->file()); |
7c673cae FG |
253 | src->force_eof_ = true; |
254 | src->force_eof_position_ = position; | |
255 | } | |
256 | ||
257 | void UnmarkEOF() { | |
f67539c2 | 258 | auto src = GetStringSourceFromLegacyReader(reader_->file()); |
7c673cae | 259 | src->returned_partial_ = false; |
494da23a | 260 | reader_->UnmarkEOF(); |
7c673cae FG |
261 | } |
262 | ||
494da23a | 263 | bool IsEOF() { return reader_->IsEOF(); } |
7c673cae FG |
264 | |
265 | // Returns OK iff recorded error message contains "msg" | |
266 | std::string MatchError(const std::string& msg) const { | |
267 | if (report_.message_.find(msg) == std::string::npos) { | |
268 | return report_.message_; | |
269 | } else { | |
270 | return "OK"; | |
271 | } | |
272 | } | |
7c673cae FG |
273 | }; |
274 | ||
7c673cae FG |
275 | TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } |
276 | ||
277 | TEST_P(LogTest, ReadWrite) { | |
278 | Write("foo"); | |
279 | Write("bar"); | |
280 | Write(""); | |
281 | Write("xxxx"); | |
282 | ASSERT_EQ("foo", Read()); | |
283 | ASSERT_EQ("bar", Read()); | |
284 | ASSERT_EQ("", Read()); | |
285 | ASSERT_EQ("xxxx", Read()); | |
286 | ASSERT_EQ("EOF", Read()); | |
287 | ASSERT_EQ("EOF", Read()); // Make sure reads at eof work | |
288 | } | |
289 | ||
290 | TEST_P(LogTest, ManyBlocks) { | |
291 | for (int i = 0; i < 100000; i++) { | |
292 | Write(NumberString(i)); | |
293 | } | |
294 | for (int i = 0; i < 100000; i++) { | |
295 | ASSERT_EQ(NumberString(i), Read()); | |
296 | } | |
297 | ASSERT_EQ("EOF", Read()); | |
298 | } | |
299 | ||
300 | TEST_P(LogTest, Fragmentation) { | |
301 | Write("small"); | |
302 | Write(BigString("medium", 50000)); | |
303 | Write(BigString("large", 100000)); | |
304 | ASSERT_EQ("small", Read()); | |
305 | ASSERT_EQ(BigString("medium", 50000), Read()); | |
306 | ASSERT_EQ(BigString("large", 100000), Read()); | |
307 | ASSERT_EQ("EOF", Read()); | |
308 | } | |
309 | ||
310 | TEST_P(LogTest, MarginalTrailer) { | |
311 | // Make a trailer that is exactly the same length as an empty record. | |
494da23a TL |
312 | int header_size = |
313 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
314 | const int n = kBlockSize - 2 * header_size; |
315 | Write(BigString("foo", n)); | |
316 | ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); | |
317 | Write(""); | |
318 | Write("bar"); | |
319 | ASSERT_EQ(BigString("foo", n), Read()); | |
320 | ASSERT_EQ("", Read()); | |
321 | ASSERT_EQ("bar", Read()); | |
322 | ASSERT_EQ("EOF", Read()); | |
323 | } | |
324 | ||
325 | TEST_P(LogTest, MarginalTrailer2) { | |
326 | // Make a trailer that is exactly the same length as an empty record. | |
494da23a TL |
327 | int header_size = |
328 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
329 | const int n = kBlockSize - 2 * header_size; |
330 | Write(BigString("foo", n)); | |
331 | ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); | |
332 | Write("bar"); | |
333 | ASSERT_EQ(BigString("foo", n), Read()); | |
334 | ASSERT_EQ("bar", Read()); | |
335 | ASSERT_EQ("EOF", Read()); | |
336 | ASSERT_EQ(0U, DroppedBytes()); | |
337 | ASSERT_EQ("", ReportMessage()); | |
338 | } | |
339 | ||
340 | TEST_P(LogTest, ShortTrailer) { | |
494da23a TL |
341 | int header_size = |
342 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
343 | const int n = kBlockSize - 2 * header_size + 4; |
344 | Write(BigString("foo", n)); | |
345 | ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); | |
346 | Write(""); | |
347 | Write("bar"); | |
348 | ASSERT_EQ(BigString("foo", n), Read()); | |
349 | ASSERT_EQ("", Read()); | |
350 | ASSERT_EQ("bar", Read()); | |
351 | ASSERT_EQ("EOF", Read()); | |
352 | } | |
353 | ||
354 | TEST_P(LogTest, AlignedEof) { | |
494da23a TL |
355 | int header_size = |
356 | std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
357 | const int n = kBlockSize - 2 * header_size + 4; |
358 | Write(BigString("foo", n)); | |
359 | ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); | |
360 | ASSERT_EQ(BigString("foo", n), Read()); | |
361 | ASSERT_EQ("EOF", Read()); | |
362 | } | |
363 | ||
364 | TEST_P(LogTest, RandomRead) { | |
365 | const int N = 500; | |
366 | Random write_rnd(301); | |
367 | for (int i = 0; i < N; i++) { | |
368 | Write(RandomSkewedString(i, &write_rnd)); | |
369 | } | |
370 | Random read_rnd(301); | |
371 | for (int i = 0; i < N; i++) { | |
372 | ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read()); | |
373 | } | |
374 | ASSERT_EQ("EOF", Read()); | |
375 | } | |
376 | ||
377 | // Tests of all the error paths in log_reader.cc follow: | |
378 | ||
379 | TEST_P(LogTest, ReadError) { | |
380 | Write("foo"); | |
381 | ForceError(); | |
382 | ASSERT_EQ("EOF", Read()); | |
383 | ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes()); | |
384 | ASSERT_EQ("OK", MatchError("read error")); | |
385 | } | |
386 | ||
387 | TEST_P(LogTest, BadRecordType) { | |
388 | Write("foo"); | |
389 | // Type is stored in header[6] | |
390 | IncrementByte(6, 100); | |
391 | FixChecksum(0, 3, false); | |
392 | ASSERT_EQ("EOF", Read()); | |
393 | ASSERT_EQ(3U, DroppedBytes()); | |
394 | ASSERT_EQ("OK", MatchError("unknown record type")); | |
395 | } | |
396 | ||
397 | TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { | |
398 | Write("foo"); | |
399 | ShrinkSize(4); // Drop all payload as well as a header byte | |
400 | ASSERT_EQ("EOF", Read()); | |
401 | // Truncated last record is ignored, not treated as an error | |
402 | ASSERT_EQ(0U, DroppedBytes()); | |
403 | ASSERT_EQ("", ReportMessage()); | |
404 | } | |
405 | ||
406 | TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { | |
494da23a TL |
407 | if (allow_retry_read_) { |
408 | // If read retry is allowed, then truncated trailing record should not | |
409 | // raise an error. | |
410 | return; | |
411 | } | |
7c673cae FG |
412 | Write("foo"); |
413 | ShrinkSize(4); // Drop all payload as well as a header byte | |
414 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
415 | // Truncated last record is ignored, not treated as an error | |
416 | ASSERT_GT(DroppedBytes(), 0U); | |
417 | ASSERT_EQ("OK", MatchError("Corruption: truncated header")); | |
418 | } | |
419 | ||
420 | TEST_P(LogTest, BadLength) { | |
494da23a TL |
421 | if (allow_retry_read_) { |
422 | // If read retry is allowed, then we should not raise an error when the | |
423 | // record length specified in header is longer than data currently | |
424 | // available. It's possible that the body of the record is not written yet. | |
425 | return; | |
426 | } | |
427 | bool recyclable_log = (std::get<0>(GetParam()) != 0); | |
428 | int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
429 | const int kPayloadSize = kBlockSize - header_size; |
430 | Write(BigString("bar", kPayloadSize)); | |
431 | Write("foo"); | |
432 | // Least significant size byte is stored in header[4]. | |
433 | IncrementByte(4, 1); | |
494da23a | 434 | if (!recyclable_log) { |
7c673cae FG |
435 | ASSERT_EQ("foo", Read()); |
436 | ASSERT_EQ(kBlockSize, DroppedBytes()); | |
437 | ASSERT_EQ("OK", MatchError("bad record length")); | |
438 | } else { | |
439 | ASSERT_EQ("EOF", Read()); | |
440 | } | |
441 | } | |
442 | ||
443 | TEST_P(LogTest, BadLengthAtEndIsIgnored) { | |
494da23a TL |
444 | if (allow_retry_read_) { |
445 | // If read retry is allowed, then we should not raise an error when the | |
446 | // record length specified in header is longer than data currently | |
447 | // available. It's possible that the body of the record is not written yet. | |
448 | return; | |
449 | } | |
7c673cae FG |
450 | Write("foo"); |
451 | ShrinkSize(1); | |
452 | ASSERT_EQ("EOF", Read()); | |
453 | ASSERT_EQ(0U, DroppedBytes()); | |
454 | ASSERT_EQ("", ReportMessage()); | |
455 | } | |
456 | ||
457 | TEST_P(LogTest, BadLengthAtEndIsNotIgnored) { | |
494da23a TL |
458 | if (allow_retry_read_) { |
459 | // If read retry is allowed, then we should not raise an error when the | |
460 | // record length specified in header is longer than data currently | |
461 | // available. It's possible that the body of the record is not written yet. | |
462 | return; | |
463 | } | |
7c673cae FG |
464 | Write("foo"); |
465 | ShrinkSize(1); | |
466 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
467 | ASSERT_GT(DroppedBytes(), 0U); | |
20effc67 | 468 | ASSERT_EQ("OK", MatchError("Corruption: truncated record body")); |
7c673cae FG |
469 | } |
470 | ||
471 | TEST_P(LogTest, ChecksumMismatch) { | |
472 | Write("foooooo"); | |
473 | IncrementByte(0, 14); | |
474 | ASSERT_EQ("EOF", Read()); | |
494da23a TL |
475 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
476 | if (!recyclable_log) { | |
7c673cae FG |
477 | ASSERT_EQ(14U, DroppedBytes()); |
478 | ASSERT_EQ("OK", MatchError("checksum mismatch")); | |
479 | } else { | |
480 | ASSERT_EQ(0U, DroppedBytes()); | |
481 | ASSERT_EQ("", ReportMessage()); | |
482 | } | |
483 | } | |
484 | ||
485 | TEST_P(LogTest, UnexpectedMiddleType) { | |
486 | Write("foo"); | |
494da23a TL |
487 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
488 | SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType | |
489 | : kMiddleType)); | |
490 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
491 | ASSERT_EQ("EOF", Read()); |
492 | ASSERT_EQ(3U, DroppedBytes()); | |
493 | ASSERT_EQ("OK", MatchError("missing start")); | |
494 | } | |
495 | ||
496 | TEST_P(LogTest, UnexpectedLastType) { | |
497 | Write("foo"); | |
494da23a TL |
498 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
499 | SetByte(6, | |
500 | static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType)); | |
501 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
502 | ASSERT_EQ("EOF", Read()); |
503 | ASSERT_EQ(3U, DroppedBytes()); | |
504 | ASSERT_EQ("OK", MatchError("missing start")); | |
505 | } | |
506 | ||
507 | TEST_P(LogTest, UnexpectedFullType) { | |
508 | Write("foo"); | |
509 | Write("bar"); | |
494da23a TL |
510 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
511 | SetByte( | |
512 | 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType)); | |
513 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
514 | ASSERT_EQ("bar", Read()); |
515 | ASSERT_EQ("EOF", Read()); | |
516 | ASSERT_EQ(3U, DroppedBytes()); | |
517 | ASSERT_EQ("OK", MatchError("partial record without end")); | |
518 | } | |
519 | ||
520 | TEST_P(LogTest, UnexpectedFirstType) { | |
521 | Write("foo"); | |
522 | Write(BigString("bar", 100000)); | |
494da23a TL |
523 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
524 | SetByte( | |
525 | 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType)); | |
526 | FixChecksum(0, 3, !!recyclable_log); | |
7c673cae FG |
527 | ASSERT_EQ(BigString("bar", 100000), Read()); |
528 | ASSERT_EQ("EOF", Read()); | |
529 | ASSERT_EQ(3U, DroppedBytes()); | |
530 | ASSERT_EQ("OK", MatchError("partial record without end")); | |
531 | } | |
532 | ||
533 | TEST_P(LogTest, MissingLastIsIgnored) { | |
534 | Write(BigString("bar", kBlockSize)); | |
535 | // Remove the LAST block, including header. | |
536 | ShrinkSize(14); | |
537 | ASSERT_EQ("EOF", Read()); | |
538 | ASSERT_EQ("", ReportMessage()); | |
539 | ASSERT_EQ(0U, DroppedBytes()); | |
540 | } | |
541 | ||
542 | TEST_P(LogTest, MissingLastIsNotIgnored) { | |
494da23a TL |
543 | if (allow_retry_read_) { |
544 | // If read retry is allowed, then truncated trailing record should not | |
545 | // raise an error. | |
546 | return; | |
547 | } | |
7c673cae FG |
548 | Write(BigString("bar", kBlockSize)); |
549 | // Remove the LAST block, including header. | |
550 | ShrinkSize(14); | |
551 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
552 | ASSERT_GT(DroppedBytes(), 0U); | |
553 | ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data")); | |
554 | } | |
555 | ||
556 | TEST_P(LogTest, PartialLastIsIgnored) { | |
557 | Write(BigString("bar", kBlockSize)); | |
558 | // Cause a bad record length in the LAST block. | |
559 | ShrinkSize(1); | |
560 | ASSERT_EQ("EOF", Read()); | |
561 | ASSERT_EQ("", ReportMessage()); | |
562 | ASSERT_EQ(0U, DroppedBytes()); | |
563 | } | |
564 | ||
565 | TEST_P(LogTest, PartialLastIsNotIgnored) { | |
494da23a TL |
566 | if (allow_retry_read_) { |
567 | // If read retry is allowed, then truncated trailing record should not | |
568 | // raise an error. | |
569 | return; | |
570 | } | |
7c673cae FG |
571 | Write(BigString("bar", kBlockSize)); |
572 | // Cause a bad record length in the LAST block. | |
573 | ShrinkSize(1); | |
574 | ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); | |
575 | ASSERT_GT(DroppedBytes(), 0U); | |
20effc67 | 576 | ASSERT_EQ("OK", MatchError("Corruption: truncated record body")); |
7c673cae FG |
577 | } |
578 | ||
579 | TEST_P(LogTest, ErrorJoinsRecords) { | |
580 | // Consider two fragmented records: | |
581 | // first(R1) last(R1) first(R2) last(R2) | |
582 | // where the middle two fragments disappear. We do not want | |
583 | // first(R1),last(R2) to get joined and returned as a valid record. | |
584 | ||
585 | // Write records that span two blocks | |
586 | Write(BigString("foo", kBlockSize)); | |
587 | Write(BigString("bar", kBlockSize)); | |
588 | Write("correct"); | |
589 | ||
590 | // Wipe the middle block | |
591 | for (unsigned int offset = kBlockSize; offset < 2*kBlockSize; offset++) { | |
592 | SetByte(offset, 'x'); | |
593 | } | |
594 | ||
494da23a TL |
595 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
596 | if (!recyclable_log) { | |
7c673cae FG |
597 | ASSERT_EQ("correct", Read()); |
598 | ASSERT_EQ("EOF", Read()); | |
599 | size_t dropped = DroppedBytes(); | |
600 | ASSERT_LE(dropped, 2 * kBlockSize + 100); | |
601 | ASSERT_GE(dropped, 2 * kBlockSize); | |
602 | } else { | |
603 | ASSERT_EQ("EOF", Read()); | |
604 | } | |
605 | } | |
606 | ||
7c673cae FG |
607 | TEST_P(LogTest, ClearEofSingleBlock) { |
608 | Write("foo"); | |
609 | Write("bar"); | |
494da23a TL |
610 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
611 | int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
612 | ForceEOF(3 + header_size + 2); |
613 | ASSERT_EQ("foo", Read()); | |
614 | UnmarkEOF(); | |
615 | ASSERT_EQ("bar", Read()); | |
616 | ASSERT_TRUE(IsEOF()); | |
617 | ASSERT_EQ("EOF", Read()); | |
618 | Write("xxx"); | |
619 | UnmarkEOF(); | |
620 | ASSERT_EQ("xxx", Read()); | |
621 | ASSERT_TRUE(IsEOF()); | |
622 | } | |
623 | ||
624 | TEST_P(LogTest, ClearEofMultiBlock) { | |
625 | size_t num_full_blocks = 5; | |
494da23a TL |
626 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
627 | int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; | |
7c673cae FG |
628 | size_t n = (kBlockSize - header_size) * num_full_blocks + 25; |
629 | Write(BigString("foo", n)); | |
630 | Write(BigString("bar", n)); | |
631 | ForceEOF(n + num_full_blocks * header_size + header_size + 3); | |
632 | ASSERT_EQ(BigString("foo", n), Read()); | |
633 | ASSERT_TRUE(IsEOF()); | |
634 | UnmarkEOF(); | |
635 | ASSERT_EQ(BigString("bar", n), Read()); | |
636 | ASSERT_TRUE(IsEOF()); | |
637 | Write(BigString("xxx", n)); | |
638 | UnmarkEOF(); | |
639 | ASSERT_EQ(BigString("xxx", n), Read()); | |
640 | ASSERT_TRUE(IsEOF()); | |
641 | } | |
642 | ||
643 | TEST_P(LogTest, ClearEofError) { | |
644 | // If an error occurs during Read() in UnmarkEOF(), the records contained | |
645 | // in the buffer should be returned on subsequent calls of ReadRecord() | |
646 | // until no more full records are left, whereafter ReadRecord() should return | |
647 | // false to indicate that it cannot read any further. | |
648 | ||
649 | Write("foo"); | |
650 | Write("bar"); | |
651 | UnmarkEOF(); | |
652 | ASSERT_EQ("foo", Read()); | |
653 | ASSERT_TRUE(IsEOF()); | |
654 | Write("xxx"); | |
655 | ForceError(0); | |
656 | UnmarkEOF(); | |
657 | ASSERT_EQ("bar", Read()); | |
658 | ASSERT_EQ("EOF", Read()); | |
659 | } | |
660 | ||
661 | TEST_P(LogTest, ClearEofError2) { | |
662 | Write("foo"); | |
663 | Write("bar"); | |
664 | UnmarkEOF(); | |
665 | ASSERT_EQ("foo", Read()); | |
666 | Write("xxx"); | |
667 | ForceError(3); | |
668 | UnmarkEOF(); | |
669 | ASSERT_EQ("bar", Read()); | |
670 | ASSERT_EQ("EOF", Read()); | |
671 | ASSERT_EQ(3U, DroppedBytes()); | |
672 | ASSERT_EQ("OK", MatchError("read error")); | |
673 | } | |
674 | ||
675 | TEST_P(LogTest, Recycle) { | |
494da23a TL |
676 | bool recyclable_log = (std::get<0>(GetParam()) != 0); |
677 | if (!recyclable_log) { | |
7c673cae FG |
678 | return; // test is only valid for recycled logs |
679 | } | |
680 | Write("foo"); | |
681 | Write("bar"); | |
682 | Write("baz"); | |
683 | Write("bif"); | |
684 | Write("blitz"); | |
685 | while (get_reader_contents()->size() < log::kBlockSize * 2) { | |
686 | Write("xxxxxxxxxxxxxxxx"); | |
687 | } | |
494da23a | 688 | std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter( |
11fdf7f2 TL |
689 | new test::OverwritingStringSink(get_reader_contents()), |
690 | "" /* don't care */)); | |
7c673cae FG |
691 | Writer recycle_writer(std::move(dest_holder), 123, true); |
692 | recycle_writer.AddRecord(Slice("foooo")); | |
693 | recycle_writer.AddRecord(Slice("bar")); | |
694 | ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2); | |
695 | ASSERT_EQ("foooo", Read()); | |
696 | ASSERT_EQ("bar", Read()); | |
697 | ASSERT_EQ("EOF", Read()); | |
698 | } | |
699 | ||
494da23a TL |
700 | INSTANTIATE_TEST_CASE_P(bool, LogTest, |
701 | ::testing::Values(std::make_tuple(0, false), | |
702 | std::make_tuple(0, true), | |
703 | std::make_tuple(1, false), | |
704 | std::make_tuple(1, true))); | |
705 | ||
706 | class RetriableLogTest : public ::testing::TestWithParam<int> { | |
707 | private: | |
708 | class ReportCollector : public Reader::Reporter { | |
709 | public: | |
710 | size_t dropped_bytes_; | |
711 | std::string message_; | |
712 | ||
713 | ReportCollector() : dropped_bytes_(0) {} | |
714 | void Corruption(size_t bytes, const Status& status) override { | |
715 | dropped_bytes_ += bytes; | |
716 | message_.append(status.ToString()); | |
717 | } | |
718 | }; | |
719 | ||
720 | Slice contents_; | |
721 | std::unique_ptr<WritableFileWriter> dest_holder_; | |
722 | std::unique_ptr<Writer> log_writer_; | |
723 | Env* env_; | |
724 | EnvOptions env_options_; | |
725 | const std::string test_dir_; | |
726 | const std::string log_file_; | |
727 | std::unique_ptr<WritableFileWriter> writer_; | |
728 | std::unique_ptr<SequentialFileReader> reader_; | |
729 | ReportCollector report_; | |
730 | std::unique_ptr<FragmentBufferedReader> log_reader_; | |
731 | ||
732 | public: | |
733 | RetriableLogTest() | |
734 | : contents_(), | |
735 | dest_holder_(nullptr), | |
736 | log_writer_(nullptr), | |
737 | env_(Env::Default()), | |
738 | test_dir_(test::PerThreadDBPath("retriable_log_test")), | |
739 | log_file_(test_dir_ + "/log"), | |
740 | writer_(nullptr), | |
741 | reader_(nullptr), | |
742 | log_reader_(nullptr) {} | |
743 | ||
744 | Status SetupTestEnv() { | |
745 | dest_holder_.reset(test::GetWritableFileWriter( | |
746 | new test::StringSink(&contents_), "" /* file name */)); | |
747 | assert(dest_holder_ != nullptr); | |
748 | log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam())); | |
749 | assert(log_writer_ != nullptr); | |
750 | ||
751 | Status s; | |
752 | s = env_->CreateDirIfMissing(test_dir_); | |
753 | std::unique_ptr<WritableFile> writable_file; | |
754 | if (s.ok()) { | |
755 | s = env_->NewWritableFile(log_file_, &writable_file, env_options_); | |
756 | } | |
757 | if (s.ok()) { | |
f67539c2 TL |
758 | writer_.reset(new WritableFileWriter( |
759 | NewLegacyWritableFileWrapper(std::move(writable_file)), log_file_, | |
760 | env_options_)); | |
494da23a TL |
761 | assert(writer_ != nullptr); |
762 | } | |
763 | std::unique_ptr<SequentialFile> seq_file; | |
764 | if (s.ok()) { | |
765 | s = env_->NewSequentialFile(log_file_, &seq_file, env_options_); | |
766 | } | |
767 | if (s.ok()) { | |
f67539c2 TL |
768 | reader_.reset(new SequentialFileReader( |
769 | NewLegacySequentialFileWrapper(seq_file), log_file_)); | |
494da23a TL |
770 | assert(reader_ != nullptr); |
771 | log_reader_.reset(new FragmentBufferedReader( | |
772 | nullptr, std::move(reader_), &report_, true /* checksum */, | |
773 | 123 /* log_number */)); | |
774 | assert(log_reader_ != nullptr); | |
775 | } | |
776 | return s; | |
777 | } | |
778 | ||
779 | std::string contents() { | |
f67539c2 | 780 | auto file = test::GetStringSinkFromLegacyWriter(log_writer_->file()); |
494da23a TL |
781 | assert(file != nullptr); |
782 | return file->contents_; | |
783 | } | |
784 | ||
785 | void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); } | |
786 | ||
787 | void Write(const Slice& data) { | |
788 | writer_->Append(data); | |
789 | writer_->Sync(true); | |
790 | } | |
791 | ||
792 | bool TryRead(std::string* result) { | |
793 | assert(result != nullptr); | |
794 | result->clear(); | |
795 | std::string scratch; | |
796 | Slice record; | |
797 | bool r = log_reader_->ReadRecord(&record, &scratch); | |
798 | if (r) { | |
799 | result->assign(record.data(), record.size()); | |
800 | return true; | |
801 | } else { | |
802 | return false; | |
803 | } | |
804 | } | |
805 | }; | |
806 | ||
807 | TEST_P(RetriableLogTest, TailLog_PartialHeader) { | |
808 | ASSERT_OK(SetupTestEnv()); | |
809 | std::vector<int> remaining_bytes_in_last_record; | |
810 | size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; | |
811 | bool eof = false; | |
812 | SyncPoint::GetInstance()->DisableProcessing(); | |
813 | SyncPoint::GetInstance()->LoadDependency( | |
814 | {{"RetriableLogTest::TailLog:AfterPart1", | |
815 | "RetriableLogTest::TailLog:BeforeReadRecord"}, | |
816 | {"FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
817 | "RetriableLogTest::TailLog:BeforePart2"}}); | |
818 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
819 | SyncPoint::GetInstance()->SetCallBack( | |
820 | "FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
821 | [&](void* /*arg*/) { eof = true; }); | |
822 | SyncPoint::GetInstance()->EnableProcessing(); | |
823 | ||
824 | size_t delta = header_size - 1; | |
825 | port::Thread log_writer_thread([&]() { | |
826 | size_t old_sz = contents().size(); | |
827 | Encode("foo"); | |
828 | size_t new_sz = contents().size(); | |
829 | std::string part1 = contents().substr(old_sz, delta); | |
830 | std::string part2 = | |
831 | contents().substr(old_sz + delta, new_sz - old_sz - delta); | |
832 | Write(Slice(part1)); | |
833 | TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); | |
834 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); | |
835 | Write(Slice(part2)); | |
836 | }); | |
837 | ||
838 | std::string record; | |
839 | port::Thread log_reader_thread([&]() { | |
840 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); | |
841 | while (!TryRead(&record)) { | |
842 | } | |
843 | }); | |
844 | log_reader_thread.join(); | |
845 | log_writer_thread.join(); | |
846 | ASSERT_EQ("foo", record); | |
847 | ASSERT_TRUE(eof); | |
848 | } | |
849 | ||
850 | TEST_P(RetriableLogTest, TailLog_FullHeader) { | |
851 | ASSERT_OK(SetupTestEnv()); | |
852 | std::vector<int> remaining_bytes_in_last_record; | |
853 | size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; | |
854 | bool eof = false; | |
855 | SyncPoint::GetInstance()->DisableProcessing(); | |
856 | SyncPoint::GetInstance()->LoadDependency( | |
857 | {{"RetriableLogTest::TailLog:AfterPart1", | |
858 | "RetriableLogTest::TailLog:BeforeReadRecord"}, | |
859 | {"FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
860 | "RetriableLogTest::TailLog:BeforePart2"}}); | |
861 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
862 | SyncPoint::GetInstance()->SetCallBack( | |
863 | "FragmentBufferedLogReader::TryReadMore:FirstEOF", | |
864 | [&](void* /*arg*/) { eof = true; }); | |
865 | SyncPoint::GetInstance()->EnableProcessing(); | |
866 | ||
867 | size_t delta = header_size + 1; | |
868 | port::Thread log_writer_thread([&]() { | |
869 | size_t old_sz = contents().size(); | |
870 | Encode("foo"); | |
871 | size_t new_sz = contents().size(); | |
872 | std::string part1 = contents().substr(old_sz, delta); | |
873 | std::string part2 = | |
874 | contents().substr(old_sz + delta, new_sz - old_sz - delta); | |
875 | Write(Slice(part1)); | |
876 | TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); | |
877 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); | |
878 | Write(Slice(part2)); | |
879 | ASSERT_TRUE(eof); | |
880 | }); | |
881 | ||
882 | std::string record; | |
883 | port::Thread log_reader_thread([&]() { | |
884 | TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); | |
885 | while (!TryRead(&record)) { | |
886 | } | |
887 | }); | |
888 | log_reader_thread.join(); | |
889 | log_writer_thread.join(); | |
890 | ASSERT_EQ("foo", record); | |
891 | } | |
892 | ||
893 | TEST_P(RetriableLogTest, NonBlockingReadFullRecord) { | |
894 | // Clear all sync point callbacks even if this test does not use sync point. | |
895 | // It is necessary, otherwise the execute of this test may hit a sync point | |
896 | // with which a callback is registered. The registered callback may access | |
897 | // some dead variable, causing segfault. | |
898 | SyncPoint::GetInstance()->DisableProcessing(); | |
899 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
900 | ASSERT_OK(SetupTestEnv()); | |
901 | size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; | |
902 | size_t delta = header_size - 1; | |
903 | size_t old_sz = contents().size(); | |
904 | Encode("foo-bar"); | |
905 | size_t new_sz = contents().size(); | |
906 | std::string part1 = contents().substr(old_sz, delta); | |
907 | std::string part2 = | |
908 | contents().substr(old_sz + delta, new_sz - old_sz - delta); | |
909 | Write(Slice(part1)); | |
910 | std::string record; | |
911 | ASSERT_FALSE(TryRead(&record)); | |
912 | ASSERT_TRUE(record.empty()); | |
913 | Write(Slice(part2)); | |
914 | ASSERT_TRUE(TryRead(&record)); | |
915 | ASSERT_EQ("foo-bar", record); | |
916 | } | |
917 | ||
918 | INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2)); | |
7c673cae FG |
919 | |
920 | } // namespace log | |
f67539c2 | 921 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
922 | |
923 | int main(int argc, char** argv) { | |
924 | ::testing::InitGoogleTest(&argc, argv); | |
925 | return RUN_ALL_TESTS(); | |
926 | } |