1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "rocksdb/env.h"
13 #include "util/coding.h"
14 #include "util/crc32c.h"
15 #include "util/file_reader_writer.h"
16 #include "util/random.h"
17 #include "util/testharness.h"
18 #include "util/testutil.h"
23 // Construct a string of the specified length made out of the supplied
25 static std::string
BigString(const std::string
& partial_string
, size_t n
) {
27 while (result
.size() < n
) {
28 result
.append(partial_string
);
34 // Construct a string from a number
35 static std::string
NumberString(int n
) {
37 snprintf(buf
, sizeof(buf
), "%d.", n
);
38 return std::string(buf
);
41 // Return a skewed potentially long string
42 static std::string
RandomSkewedString(int i
, Random
* rnd
) {
43 return BigString(NumberString(i
), rnd
->Skewed(17));
46 // Param type is tuple<int, bool>
47 // get<0>(tuple): non-zero if recycling log, zero if regular log
48 // get<1>(tuple): true if allow retry after read EOF, false otherwise
49 class LogTest
: public ::testing::TestWithParam
<std::tuple
<int, bool>> {
51 class StringSource
: public SequentialFile
{
55 size_t force_error_position_
;
57 size_t force_eof_position_
;
58 bool returned_partial_
;
59 bool fail_after_read_partial_
;
60 explicit StringSource(Slice
& contents
, bool fail_after_read_partial
)
61 : contents_(contents
),
63 force_error_position_(0),
65 force_eof_position_(0),
66 returned_partial_(false),
67 fail_after_read_partial_(fail_after_read_partial
) {}
69 Status
Read(size_t n
, Slice
* result
, char* scratch
) override
{
70 if (fail_after_read_partial_
) {
71 EXPECT_TRUE(!returned_partial_
) << "must not Read() after eof/error";
75 if (force_error_position_
>= n
) {
76 force_error_position_
-= n
;
78 *result
= Slice(contents_
.data(), force_error_position_
);
79 contents_
.remove_prefix(force_error_position_
);
81 returned_partial_
= true;
82 return Status::Corruption("read error");
86 if (contents_
.size() < n
) {
88 returned_partial_
= true;
92 if (force_eof_position_
>= n
) {
93 force_eof_position_
-= n
;
96 n
= force_eof_position_
;
97 returned_partial_
= true;
101 // By using scratch we ensure that caller has control over the
102 // lifetime of result.data()
103 memcpy(scratch
, contents_
.data(), n
);
104 *result
= Slice(scratch
, n
);
106 contents_
.remove_prefix(n
);
110 Status
Skip(uint64_t n
) override
{
111 if (n
> contents_
.size()) {
113 return Status::NotFound("in-memory file skipepd past end");
116 contents_
.remove_prefix(n
);
122 class ReportCollector
: public Reader::Reporter
{
124 size_t dropped_bytes_
;
125 std::string message_
;
127 ReportCollector() : dropped_bytes_(0) { }
128 void Corruption(size_t bytes
, const Status
& status
) override
{
129 dropped_bytes_
+= bytes
;
130 message_
.append(status
.ToString());
134 std::string
& dest_contents() {
136 dynamic_cast<test::StringSink
*>(writer_
.file()->writable_file());
138 return dest
->contents_
;
141 const std::string
& dest_contents() const {
143 dynamic_cast<const test::StringSink
*>(writer_
.file()->writable_file());
145 return dest
->contents_
;
148 void reset_source_contents() {
149 auto src
= dynamic_cast<StringSource
*>(reader_
->file()->file());
151 src
->contents_
= dest_contents();
154 Slice reader_contents_
;
155 std::unique_ptr
<WritableFileWriter
> dest_holder_
;
156 std::unique_ptr
<SequentialFileReader
> source_holder_
;
157 ReportCollector report_
;
159 std::unique_ptr
<Reader
> reader_
;
162 bool allow_retry_read_
;
166 : reader_contents_(),
167 dest_holder_(test::GetWritableFileWriter(
168 new test::StringSink(&reader_contents_
), "" /* don't care */)),
169 source_holder_(test::GetSequentialFileReader(
170 new StringSource(reader_contents_
, !std::get
<1>(GetParam())),
171 "" /* file name */)),
172 writer_(std::move(dest_holder_
), 123, std::get
<0>(GetParam())),
173 allow_retry_read_(std::get
<1>(GetParam())) {
174 if (allow_retry_read_
) {
175 reader_
.reset(new FragmentBufferedReader(
176 nullptr, std::move(source_holder_
), &report_
, true /* checksum */,
177 123 /* log_number */));
179 reader_
.reset(new Reader(nullptr, std::move(source_holder_
), &report_
,
180 true /* checksum */, 123 /* log_number */));
184 Slice
* get_reader_contents() { return &reader_contents_
; }
186 void Write(const std::string
& msg
) {
187 writer_
.AddRecord(Slice(msg
));
190 size_t WrittenBytes() const {
191 return dest_contents().size();
194 std::string
Read(const WALRecoveryMode wal_recovery_mode
=
195 WALRecoveryMode::kTolerateCorruptedTailRecords
) {
199 ret
= reader_
->ReadRecord(&record
, &scratch
, wal_recovery_mode
);
201 return record
.ToString();
207 void IncrementByte(int offset
, char delta
) {
208 dest_contents()[offset
] += delta
;
211 void SetByte(int offset
, char new_byte
) {
212 dest_contents()[offset
] = new_byte
;
215 void ShrinkSize(int bytes
) {
217 dynamic_cast<test::StringSink
*>(writer_
.file()->writable_file());
222 void FixChecksum(int header_offset
, int len
, bool recyclable
) {
223 // Compute crc of type/len/data
224 int header_size
= recyclable
? kRecyclableHeaderSize
: kHeaderSize
;
225 uint32_t crc
= crc32c::Value(&dest_contents()[header_offset
+ 6],
226 header_size
- 6 + len
);
227 crc
= crc32c::Mask(crc
);
228 EncodeFixed32(&dest_contents()[header_offset
], crc
);
231 void ForceError(size_t position
= 0) {
232 auto src
= dynamic_cast<StringSource
*>(reader_
->file()->file());
233 src
->force_error_
= true;
234 src
->force_error_position_
= position
;
237 size_t DroppedBytes() const {
238 return report_
.dropped_bytes_
;
241 std::string
ReportMessage() const {
242 return report_
.message_
;
245 void ForceEOF(size_t position
= 0) {
246 auto src
= dynamic_cast<StringSource
*>(reader_
->file()->file());
247 src
->force_eof_
= true;
248 src
->force_eof_position_
= position
;
252 auto src
= dynamic_cast<StringSource
*>(reader_
->file()->file());
253 src
->returned_partial_
= false;
254 reader_
->UnmarkEOF();
257 bool IsEOF() { return reader_
->IsEOF(); }
259 // Returns OK iff recorded error message contains "msg"
260 std::string
MatchError(const std::string
& msg
) const {
261 if (report_
.message_
.find(msg
) == std::string::npos
) {
262 return report_
.message_
;
269 TEST_P(LogTest
, Empty
) { ASSERT_EQ("EOF", Read()); }
271 TEST_P(LogTest
, ReadWrite
) {
276 ASSERT_EQ("foo", Read());
277 ASSERT_EQ("bar", Read());
278 ASSERT_EQ("", Read());
279 ASSERT_EQ("xxxx", Read());
280 ASSERT_EQ("EOF", Read());
281 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
284 TEST_P(LogTest
, ManyBlocks
) {
285 for (int i
= 0; i
< 100000; i
++) {
286 Write(NumberString(i
));
288 for (int i
= 0; i
< 100000; i
++) {
289 ASSERT_EQ(NumberString(i
), Read());
291 ASSERT_EQ("EOF", Read());
294 TEST_P(LogTest
, Fragmentation
) {
296 Write(BigString("medium", 50000));
297 Write(BigString("large", 100000));
298 ASSERT_EQ("small", Read());
299 ASSERT_EQ(BigString("medium", 50000), Read());
300 ASSERT_EQ(BigString("large", 100000), Read());
301 ASSERT_EQ("EOF", Read());
304 TEST_P(LogTest
, MarginalTrailer
) {
305 // Make a trailer that is exactly the same length as an empty record.
307 std::get
<0>(GetParam()) ? kRecyclableHeaderSize
: kHeaderSize
;
308 const int n
= kBlockSize
- 2 * header_size
;
309 Write(BigString("foo", n
));
310 ASSERT_EQ((unsigned int)(kBlockSize
- header_size
), WrittenBytes());
313 ASSERT_EQ(BigString("foo", n
), Read());
314 ASSERT_EQ("", Read());
315 ASSERT_EQ("bar", Read());
316 ASSERT_EQ("EOF", Read());
319 TEST_P(LogTest
, MarginalTrailer2
) {
320 // Make a trailer that is exactly the same length as an empty record.
322 std::get
<0>(GetParam()) ? kRecyclableHeaderSize
: kHeaderSize
;
323 const int n
= kBlockSize
- 2 * header_size
;
324 Write(BigString("foo", n
));
325 ASSERT_EQ((unsigned int)(kBlockSize
- header_size
), WrittenBytes());
327 ASSERT_EQ(BigString("foo", n
), Read());
328 ASSERT_EQ("bar", Read());
329 ASSERT_EQ("EOF", Read());
330 ASSERT_EQ(0U, DroppedBytes());
331 ASSERT_EQ("", ReportMessage());
334 TEST_P(LogTest
, ShortTrailer
) {
336 std::get
<0>(GetParam()) ? kRecyclableHeaderSize
: kHeaderSize
;
337 const int n
= kBlockSize
- 2 * header_size
+ 4;
338 Write(BigString("foo", n
));
339 ASSERT_EQ((unsigned int)(kBlockSize
- header_size
+ 4), WrittenBytes());
342 ASSERT_EQ(BigString("foo", n
), Read());
343 ASSERT_EQ("", Read());
344 ASSERT_EQ("bar", Read());
345 ASSERT_EQ("EOF", Read());
348 TEST_P(LogTest
, AlignedEof
) {
350 std::get
<0>(GetParam()) ? kRecyclableHeaderSize
: kHeaderSize
;
351 const int n
= kBlockSize
- 2 * header_size
+ 4;
352 Write(BigString("foo", n
));
353 ASSERT_EQ((unsigned int)(kBlockSize
- header_size
+ 4), WrittenBytes());
354 ASSERT_EQ(BigString("foo", n
), Read());
355 ASSERT_EQ("EOF", Read());
358 TEST_P(LogTest
, RandomRead
) {
360 Random
write_rnd(301);
361 for (int i
= 0; i
< N
; i
++) {
362 Write(RandomSkewedString(i
, &write_rnd
));
364 Random
read_rnd(301);
365 for (int i
= 0; i
< N
; i
++) {
366 ASSERT_EQ(RandomSkewedString(i
, &read_rnd
), Read());
368 ASSERT_EQ("EOF", Read());
371 // Tests of all the error paths in log_reader.cc follow:
373 TEST_P(LogTest
, ReadError
) {
376 ASSERT_EQ("EOF", Read());
377 ASSERT_EQ((unsigned int)kBlockSize
, DroppedBytes());
378 ASSERT_EQ("OK", MatchError("read error"));
381 TEST_P(LogTest
, BadRecordType
) {
383 // Type is stored in header[6]
384 IncrementByte(6, 100);
385 FixChecksum(0, 3, false);
386 ASSERT_EQ("EOF", Read());
387 ASSERT_EQ(3U, DroppedBytes());
388 ASSERT_EQ("OK", MatchError("unknown record type"));
391 TEST_P(LogTest
, TruncatedTrailingRecordIsIgnored
) {
393 ShrinkSize(4); // Drop all payload as well as a header byte
394 ASSERT_EQ("EOF", Read());
395 // Truncated last record is ignored, not treated as an error
396 ASSERT_EQ(0U, DroppedBytes());
397 ASSERT_EQ("", ReportMessage());
400 TEST_P(LogTest
, TruncatedTrailingRecordIsNotIgnored
) {
401 if (allow_retry_read_
) {
402 // If read retry is allowed, then truncated trailing record should not
407 ShrinkSize(4); // Drop all payload as well as a header byte
408 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency
));
409 // Truncated last record is ignored, not treated as an error
410 ASSERT_GT(DroppedBytes(), 0U);
411 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
414 TEST_P(LogTest
, BadLength
) {
415 if (allow_retry_read_
) {
416 // If read retry is allowed, then we should not raise an error when the
417 // record length specified in header is longer than data currently
418 // available. It's possible that the body of the record is not written yet.
421 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
422 int header_size
= recyclable_log
? kRecyclableHeaderSize
: kHeaderSize
;
423 const int kPayloadSize
= kBlockSize
- header_size
;
424 Write(BigString("bar", kPayloadSize
));
426 // Least significant size byte is stored in header[4].
428 if (!recyclable_log
) {
429 ASSERT_EQ("foo", Read());
430 ASSERT_EQ(kBlockSize
, DroppedBytes());
431 ASSERT_EQ("OK", MatchError("bad record length"));
433 ASSERT_EQ("EOF", Read());
437 TEST_P(LogTest
, BadLengthAtEndIsIgnored
) {
438 if (allow_retry_read_
) {
439 // If read retry is allowed, then we should not raise an error when the
440 // record length specified in header is longer than data currently
441 // available. It's possible that the body of the record is not written yet.
446 ASSERT_EQ("EOF", Read());
447 ASSERT_EQ(0U, DroppedBytes());
448 ASSERT_EQ("", ReportMessage());
451 TEST_P(LogTest
, BadLengthAtEndIsNotIgnored
) {
452 if (allow_retry_read_
) {
453 // If read retry is allowed, then we should not raise an error when the
454 // record length specified in header is longer than data currently
455 // available. It's possible that the body of the record is not written yet.
460 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency
));
461 ASSERT_GT(DroppedBytes(), 0U);
462 ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
465 TEST_P(LogTest
, ChecksumMismatch
) {
467 IncrementByte(0, 14);
468 ASSERT_EQ("EOF", Read());
469 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
470 if (!recyclable_log
) {
471 ASSERT_EQ(14U, DroppedBytes());
472 ASSERT_EQ("OK", MatchError("checksum mismatch"));
474 ASSERT_EQ(0U, DroppedBytes());
475 ASSERT_EQ("", ReportMessage());
479 TEST_P(LogTest
, UnexpectedMiddleType
) {
481 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
482 SetByte(6, static_cast<char>(recyclable_log
? kRecyclableMiddleType
484 FixChecksum(0, 3, !!recyclable_log
);
485 ASSERT_EQ("EOF", Read());
486 ASSERT_EQ(3U, DroppedBytes());
487 ASSERT_EQ("OK", MatchError("missing start"));
490 TEST_P(LogTest
, UnexpectedLastType
) {
492 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
494 static_cast<char>(recyclable_log
? kRecyclableLastType
: kLastType
));
495 FixChecksum(0, 3, !!recyclable_log
);
496 ASSERT_EQ("EOF", Read());
497 ASSERT_EQ(3U, DroppedBytes());
498 ASSERT_EQ("OK", MatchError("missing start"));
501 TEST_P(LogTest
, UnexpectedFullType
) {
504 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
506 6, static_cast<char>(recyclable_log
? kRecyclableFirstType
: kFirstType
));
507 FixChecksum(0, 3, !!recyclable_log
);
508 ASSERT_EQ("bar", Read());
509 ASSERT_EQ("EOF", Read());
510 ASSERT_EQ(3U, DroppedBytes());
511 ASSERT_EQ("OK", MatchError("partial record without end"));
514 TEST_P(LogTest
, UnexpectedFirstType
) {
516 Write(BigString("bar", 100000));
517 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
519 6, static_cast<char>(recyclable_log
? kRecyclableFirstType
: kFirstType
));
520 FixChecksum(0, 3, !!recyclable_log
);
521 ASSERT_EQ(BigString("bar", 100000), Read());
522 ASSERT_EQ("EOF", Read());
523 ASSERT_EQ(3U, DroppedBytes());
524 ASSERT_EQ("OK", MatchError("partial record without end"));
527 TEST_P(LogTest
, MissingLastIsIgnored
) {
528 Write(BigString("bar", kBlockSize
));
529 // Remove the LAST block, including header.
531 ASSERT_EQ("EOF", Read());
532 ASSERT_EQ("", ReportMessage());
533 ASSERT_EQ(0U, DroppedBytes());
536 TEST_P(LogTest
, MissingLastIsNotIgnored
) {
537 if (allow_retry_read_
) {
538 // If read retry is allowed, then truncated trailing record should not
542 Write(BigString("bar", kBlockSize
));
543 // Remove the LAST block, including header.
545 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency
));
546 ASSERT_GT(DroppedBytes(), 0U);
547 ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
550 TEST_P(LogTest
, PartialLastIsIgnored
) {
551 Write(BigString("bar", kBlockSize
));
552 // Cause a bad record length in the LAST block.
554 ASSERT_EQ("EOF", Read());
555 ASSERT_EQ("", ReportMessage());
556 ASSERT_EQ(0U, DroppedBytes());
559 TEST_P(LogTest
, PartialLastIsNotIgnored
) {
560 if (allow_retry_read_
) {
561 // If read retry is allowed, then truncated trailing record should not
565 Write(BigString("bar", kBlockSize
));
566 // Cause a bad record length in the LAST block.
568 ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency
));
569 ASSERT_GT(DroppedBytes(), 0U);
570 ASSERT_EQ("OK", MatchError(
571 "Corruption: truncated headerCorruption: "
572 "error reading trailing data"));
575 TEST_P(LogTest
, ErrorJoinsRecords
) {
576 // Consider two fragmented records:
577 // first(R1) last(R1) first(R2) last(R2)
578 // where the middle two fragments disappear. We do not want
579 // first(R1),last(R2) to get joined and returned as a valid record.
581 // Write records that span two blocks
582 Write(BigString("foo", kBlockSize
));
583 Write(BigString("bar", kBlockSize
));
586 // Wipe the middle block
587 for (unsigned int offset
= kBlockSize
; offset
< 2*kBlockSize
; offset
++) {
588 SetByte(offset
, 'x');
591 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
592 if (!recyclable_log
) {
593 ASSERT_EQ("correct", Read());
594 ASSERT_EQ("EOF", Read());
595 size_t dropped
= DroppedBytes();
596 ASSERT_LE(dropped
, 2 * kBlockSize
+ 100);
597 ASSERT_GE(dropped
, 2 * kBlockSize
);
599 ASSERT_EQ("EOF", Read());
603 TEST_P(LogTest
, ClearEofSingleBlock
) {
606 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
607 int header_size
= recyclable_log
? kRecyclableHeaderSize
: kHeaderSize
;
608 ForceEOF(3 + header_size
+ 2);
609 ASSERT_EQ("foo", Read());
611 ASSERT_EQ("bar", Read());
612 ASSERT_TRUE(IsEOF());
613 ASSERT_EQ("EOF", Read());
616 ASSERT_EQ("xxx", Read());
617 ASSERT_TRUE(IsEOF());
620 TEST_P(LogTest
, ClearEofMultiBlock
) {
621 size_t num_full_blocks
= 5;
622 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
623 int header_size
= recyclable_log
? kRecyclableHeaderSize
: kHeaderSize
;
624 size_t n
= (kBlockSize
- header_size
) * num_full_blocks
+ 25;
625 Write(BigString("foo", n
));
626 Write(BigString("bar", n
));
627 ForceEOF(n
+ num_full_blocks
* header_size
+ header_size
+ 3);
628 ASSERT_EQ(BigString("foo", n
), Read());
629 ASSERT_TRUE(IsEOF());
631 ASSERT_EQ(BigString("bar", n
), Read());
632 ASSERT_TRUE(IsEOF());
633 Write(BigString("xxx", n
));
635 ASSERT_EQ(BigString("xxx", n
), Read());
636 ASSERT_TRUE(IsEOF());
639 TEST_P(LogTest
, ClearEofError
) {
640 // If an error occurs during Read() in UnmarkEOF(), the records contained
641 // in the buffer should be returned on subsequent calls of ReadRecord()
642 // until no more full records are left, whereafter ReadRecord() should return
643 // false to indicate that it cannot read any further.
648 ASSERT_EQ("foo", Read());
649 ASSERT_TRUE(IsEOF());
653 ASSERT_EQ("bar", Read());
654 ASSERT_EQ("EOF", Read());
657 TEST_P(LogTest
, ClearEofError2
) {
661 ASSERT_EQ("foo", Read());
665 ASSERT_EQ("bar", Read());
666 ASSERT_EQ("EOF", Read());
667 ASSERT_EQ(3U, DroppedBytes());
668 ASSERT_EQ("OK", MatchError("read error"));
671 TEST_P(LogTest
, Recycle
) {
672 bool recyclable_log
= (std::get
<0>(GetParam()) != 0);
673 if (!recyclable_log
) {
674 return; // test is only valid for recycled logs
681 while (get_reader_contents()->size() < log::kBlockSize
* 2) {
682 Write("xxxxxxxxxxxxxxxx");
684 std::unique_ptr
<WritableFileWriter
> dest_holder(test::GetWritableFileWriter(
685 new test::OverwritingStringSink(get_reader_contents()),
686 "" /* don't care */));
687 Writer
recycle_writer(std::move(dest_holder
), 123, true);
688 recycle_writer
.AddRecord(Slice("foooo"));
689 recycle_writer
.AddRecord(Slice("bar"));
690 ASSERT_GE(get_reader_contents()->size(), log::kBlockSize
* 2);
691 ASSERT_EQ("foooo", Read());
692 ASSERT_EQ("bar", Read());
693 ASSERT_EQ("EOF", Read());
696 INSTANTIATE_TEST_CASE_P(bool, LogTest
,
697 ::testing::Values(std::make_tuple(0, false),
698 std::make_tuple(0, true),
699 std::make_tuple(1, false),
700 std::make_tuple(1, true)));
702 class RetriableLogTest
: public ::testing::TestWithParam
<int> {
704 class ReportCollector
: public Reader::Reporter
{
706 size_t dropped_bytes_
;
707 std::string message_
;
709 ReportCollector() : dropped_bytes_(0) {}
710 void Corruption(size_t bytes
, const Status
& status
) override
{
711 dropped_bytes_
+= bytes
;
712 message_
.append(status
.ToString());
717 std::unique_ptr
<WritableFileWriter
> dest_holder_
;
718 std::unique_ptr
<Writer
> log_writer_
;
720 EnvOptions env_options_
;
721 const std::string test_dir_
;
722 const std::string log_file_
;
723 std::unique_ptr
<WritableFileWriter
> writer_
;
724 std::unique_ptr
<SequentialFileReader
> reader_
;
725 ReportCollector report_
;
726 std::unique_ptr
<FragmentBufferedReader
> log_reader_
;
731 dest_holder_(nullptr),
732 log_writer_(nullptr),
733 env_(Env::Default()),
734 test_dir_(test::PerThreadDBPath("retriable_log_test")),
735 log_file_(test_dir_
+ "/log"),
738 log_reader_(nullptr) {}
740 Status
SetupTestEnv() {
741 dest_holder_
.reset(test::GetWritableFileWriter(
742 new test::StringSink(&contents_
), "" /* file name */));
743 assert(dest_holder_
!= nullptr);
744 log_writer_
.reset(new Writer(std::move(dest_holder_
), 123, GetParam()));
745 assert(log_writer_
!= nullptr);
748 s
= env_
->CreateDirIfMissing(test_dir_
);
749 std::unique_ptr
<WritableFile
> writable_file
;
751 s
= env_
->NewWritableFile(log_file_
, &writable_file
, env_options_
);
754 writer_
.reset(new WritableFileWriter(std::move(writable_file
), log_file_
,
756 assert(writer_
!= nullptr);
758 std::unique_ptr
<SequentialFile
> seq_file
;
760 s
= env_
->NewSequentialFile(log_file_
, &seq_file
, env_options_
);
763 reader_
.reset(new SequentialFileReader(std::move(seq_file
), log_file_
));
764 assert(reader_
!= nullptr);
765 log_reader_
.reset(new FragmentBufferedReader(
766 nullptr, std::move(reader_
), &report_
, true /* checksum */,
767 123 /* log_number */));
768 assert(log_reader_
!= nullptr);
773 std::string
contents() {
775 dynamic_cast<test::StringSink
*>(log_writer_
->file()->writable_file());
776 assert(file
!= nullptr);
777 return file
->contents_
;
780 void Encode(const std::string
& msg
) { log_writer_
->AddRecord(Slice(msg
)); }
782 void Write(const Slice
& data
) {
783 writer_
->Append(data
);
787 bool TryRead(std::string
* result
) {
788 assert(result
!= nullptr);
792 bool r
= log_reader_
->ReadRecord(&record
, &scratch
);
794 result
->assign(record
.data(), record
.size());
802 TEST_P(RetriableLogTest
, TailLog_PartialHeader
) {
803 ASSERT_OK(SetupTestEnv());
804 std::vector
<int> remaining_bytes_in_last_record
;
805 size_t header_size
= GetParam() ? kRecyclableHeaderSize
: kHeaderSize
;
807 SyncPoint::GetInstance()->DisableProcessing();
808 SyncPoint::GetInstance()->LoadDependency(
809 {{"RetriableLogTest::TailLog:AfterPart1",
810 "RetriableLogTest::TailLog:BeforeReadRecord"},
811 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
812 "RetriableLogTest::TailLog:BeforePart2"}});
813 SyncPoint::GetInstance()->ClearAllCallBacks();
814 SyncPoint::GetInstance()->SetCallBack(
815 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
816 [&](void* /*arg*/) { eof
= true; });
817 SyncPoint::GetInstance()->EnableProcessing();
819 size_t delta
= header_size
- 1;
820 port::Thread
log_writer_thread([&]() {
821 size_t old_sz
= contents().size();
823 size_t new_sz
= contents().size();
824 std::string part1
= contents().substr(old_sz
, delta
);
826 contents().substr(old_sz
+ delta
, new_sz
- old_sz
- delta
);
828 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
829 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
834 port::Thread
log_reader_thread([&]() {
835 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
836 while (!TryRead(&record
)) {
839 log_reader_thread
.join();
840 log_writer_thread
.join();
841 ASSERT_EQ("foo", record
);
845 TEST_P(RetriableLogTest
, TailLog_FullHeader
) {
846 ASSERT_OK(SetupTestEnv());
847 std::vector
<int> remaining_bytes_in_last_record
;
848 size_t header_size
= GetParam() ? kRecyclableHeaderSize
: kHeaderSize
;
850 SyncPoint::GetInstance()->DisableProcessing();
851 SyncPoint::GetInstance()->LoadDependency(
852 {{"RetriableLogTest::TailLog:AfterPart1",
853 "RetriableLogTest::TailLog:BeforeReadRecord"},
854 {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
855 "RetriableLogTest::TailLog:BeforePart2"}});
856 SyncPoint::GetInstance()->ClearAllCallBacks();
857 SyncPoint::GetInstance()->SetCallBack(
858 "FragmentBufferedLogReader::TryReadMore:FirstEOF",
859 [&](void* /*arg*/) { eof
= true; });
860 SyncPoint::GetInstance()->EnableProcessing();
862 size_t delta
= header_size
+ 1;
863 port::Thread
log_writer_thread([&]() {
864 size_t old_sz
= contents().size();
866 size_t new_sz
= contents().size();
867 std::string part1
= contents().substr(old_sz
, delta
);
869 contents().substr(old_sz
+ delta
, new_sz
- old_sz
- delta
);
871 TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
872 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
878 port::Thread
log_reader_thread([&]() {
879 TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
880 while (!TryRead(&record
)) {
883 log_reader_thread
.join();
884 log_writer_thread
.join();
885 ASSERT_EQ("foo", record
);
888 TEST_P(RetriableLogTest
, NonBlockingReadFullRecord
) {
889 // Clear all sync point callbacks even if this test does not use sync point.
890 // It is necessary, otherwise the execute of this test may hit a sync point
891 // with which a callback is registered. The registered callback may access
892 // some dead variable, causing segfault.
893 SyncPoint::GetInstance()->DisableProcessing();
894 SyncPoint::GetInstance()->ClearAllCallBacks();
895 ASSERT_OK(SetupTestEnv());
896 size_t header_size
= GetParam() ? kRecyclableHeaderSize
: kHeaderSize
;
897 size_t delta
= header_size
- 1;
898 size_t old_sz
= contents().size();
900 size_t new_sz
= contents().size();
901 std::string part1
= contents().substr(old_sz
, delta
);
903 contents().substr(old_sz
+ delta
, new_sz
- old_sz
- delta
);
906 ASSERT_FALSE(TryRead(&record
));
907 ASSERT_TRUE(record
.empty());
909 ASSERT_TRUE(TryRead(&record
));
910 ASSERT_EQ("foo-bar", record
);
913 INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest
, ::testing::Values(0, 2));
916 } // namespace rocksdb
918 int main(int argc
, char** argv
) {
919 ::testing::InitGoogleTest(&argc
, argv
);
920 return RUN_ALL_TESTS();