return BigString(NumberString(i), rnd->Skewed(17));
}
-class LogTest : public ::testing::TestWithParam<int> {
+// Param type is tuple<int, bool>
+// get<0>(tuple): non-zero if recycling log, zero if regular log
+// get<1>(tuple): true if allow retry after read EOF, false otherwise
+class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
private:
class StringSource : public SequentialFile {
public:
bool force_eof_;
size_t force_eof_position_;
bool returned_partial_;
- explicit StringSource(Slice& contents) :
- contents_(contents),
- force_error_(false),
- force_error_position_(0),
- force_eof_(false),
- force_eof_position_(0),
- returned_partial_(false) { }
-
- virtual Status Read(size_t n, Slice* result, char* scratch) override {
- EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
+ bool fail_after_read_partial_;
+ explicit StringSource(Slice& contents, bool fail_after_read_partial)
+ : contents_(contents),
+ force_error_(false),
+ force_error_position_(0),
+ force_eof_(false),
+ force_eof_position_(0),
+ returned_partial_(false),
+ fail_after_read_partial_(fail_after_read_partial) {}
+
+ Status Read(size_t n, Slice* result, char* scratch) override {
+ if (fail_after_read_partial_) {
+ EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
+ }
if (force_error_) {
if (force_error_position_ >= n) {
return Status::OK();
}
- virtual Status Skip(uint64_t n) override {
+ Status Skip(uint64_t n) override {
if (n > contents_.size()) {
contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
std::string message_;
ReportCollector() : dropped_bytes_(0) { }
- virtual void Corruption(size_t bytes, const Status& status) override {
+ void Corruption(size_t bytes, const Status& status) override {
dropped_bytes_ += bytes;
message_.append(status.ToString());
}
}
void reset_source_contents() {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
assert(src);
src->contents_ = dest_contents();
}
Slice reader_contents_;
- unique_ptr<WritableFileWriter> dest_holder_;
- unique_ptr<SequentialFileReader> source_holder_;
+ std::unique_ptr<WritableFileWriter> dest_holder_;
+ std::unique_ptr<SequentialFileReader> source_holder_;
ReportCollector report_;
Writer writer_;
- Reader reader_;
+ std::unique_ptr<Reader> reader_;
- // Record metadata for testing initial offset functionality
- static size_t initial_offset_record_sizes_[];
- uint64_t initial_offset_last_record_offsets_[4];
+ protected:
+ bool allow_retry_read_;
public:
LogTest()
dest_holder_(test::GetWritableFileWriter(
new test::StringSink(&reader_contents_), "" /* don't care */)),
source_holder_(test::GetSequentialFileReader(
- new StringSource(reader_contents_), "" /* file name */)),
- writer_(std::move(dest_holder_), 123, GetParam()),
- reader_(nullptr, std::move(source_holder_), &report_,
- true /* checksum */, 123 /* log_number */) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
- initial_offset_last_record_offsets_[0] = 0;
- initial_offset_last_record_offsets_[1] = header_size + 10000;
- initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
- initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
- (2 * log::kBlockSize - 1000) +
- 3 * header_size;
+ new StringSource(reader_contents_, !std::get<1>(GetParam())),
+ "" /* file name */)),
+ writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
+ allow_retry_read_(std::get<1>(GetParam())) {
+ if (allow_retry_read_) {
+ reader_.reset(new FragmentBufferedReader(
+ nullptr, std::move(source_holder_), &report_, true /* checksum */,
+ 123 /* log_number */));
+ } else {
+ reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
+ true /* checksum */, 123 /* log_number */));
+ }
}
Slice* get_reader_contents() { return &reader_contents_; }
WALRecoveryMode::kTolerateCorruptedTailRecords) {
std::string scratch;
Slice record;
- if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
+ bool ret = false;
+ ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
+ if (ret) {
return record.ToString();
} else {
return "EOF";
}
void ForceError(size_t position = 0) {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->force_error_ = true;
src->force_error_position_ = position;
}
}
void ForceEOF(size_t position = 0) {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->force_eof_ = true;
src->force_eof_position_ = position;
}
void UnmarkEOF() {
- auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+ auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->returned_partial_ = false;
- reader_.UnmarkEOF();
+ reader_->UnmarkEOF();
}
- bool IsEOF() {
- return reader_.IsEOF();
- }
+ bool IsEOF() { return reader_->IsEOF(); }
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
return "OK";
}
}
-
- void WriteInitialOffsetLog() {
- for (int i = 0; i < 4; i++) {
- std::string record(initial_offset_record_sizes_[i],
- static_cast<char>('a' + i));
- Write(record);
- }
- }
-
};
-size_t LogTest::initial_offset_record_sizes_[] =
- {10000, // Two sizable records in first block
- 10000,
- 2 * log::kBlockSize - 1000, // Span three blocks
- 1};
-
TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
TEST_P(LogTest, ReadWrite) {
TEST_P(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record.
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
TEST_P(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
}
TEST_P(LogTest, ShortTrailer) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
}
TEST_P(LogTest, AlignedEof) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ int header_size =
+ std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
}
TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then truncated trailing record should not
+ // raise an error.
+ return;
+ }
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
}
TEST_P(LogTest, BadLength) {
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ if (allow_retry_read_) {
+ // If read retry is allowed, then we should not raise an error when the
+ // record length specified in header is longer than data currently
+ // available. It's possible that the body of the record is not written yet.
+ return;
+ }
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
const int kPayloadSize = kBlockSize - header_size;
Write(BigString("bar", kPayloadSize));
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
- if (!GetParam()) {
+ if (!recyclable_log) {
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
}
TEST_P(LogTest, BadLengthAtEndIsIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then we should not raise an error when the
+ // record length specified in header is longer than data currently
+ // available. It's possible that the body of the record is not written yet.
+ return;
+ }
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
}
TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then we should not raise an error when the
+ // record length specified in header is longer than data currently
+ // available. It's possible that the body of the record is not written yet.
+ return;
+ }
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
Write("foooooo");
IncrementByte(0, 14);
ASSERT_EQ("EOF", Read());
- if (!GetParam()) {
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ if (!recyclable_log) {
ASSERT_EQ(14U, DroppedBytes());
ASSERT_EQ("OK", MatchError("checksum mismatch"));
} else {
TEST_P(LogTest, UnexpectedMiddleType) {
Write("foo");
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableMiddleType : kMiddleType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
+ : kMiddleType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
TEST_P(LogTest, UnexpectedLastType) {
Write("foo");
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableLastType : kLastType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(6,
+ static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
TEST_P(LogTest, UnexpectedFullType) {
Write("foo");
Write("bar");
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(
+ 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
TEST_P(LogTest, UnexpectedFirstType) {
Write("foo");
Write(BigString("bar", 100000));
- SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
- FixChecksum(0, 3, !!GetParam());
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ SetByte(
+ 6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
+ FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ(BigString("bar", 100000), Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
}
TEST_P(LogTest, MissingLastIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then truncated trailing record should not
+ // raise an error.
+ return;
+ }
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
}
TEST_P(LogTest, PartialLastIsNotIgnored) {
+ if (allow_retry_read_) {
+ // If read retry is allowed, then truncated trailing record should not
+ // raise an error.
+ return;
+ }
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
SetByte(offset, 'x');
}
- if (!GetParam()) {
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ if (!recyclable_log) {
ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
size_t dropped = DroppedBytes();
TEST_P(LogTest, ClearEofSingleBlock) {
Write("foo");
Write("bar");
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
ForceEOF(3 + header_size + 2);
ASSERT_EQ("foo", Read());
UnmarkEOF();
TEST_P(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5;
- int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
Write(BigString("foo", n));
Write(BigString("bar", n));
}
TEST_P(LogTest, Recycle) {
- if (!GetParam()) {
+ bool recyclable_log = (std::get<0>(GetParam()) != 0);
+ if (!recyclable_log) {
return; // test is only valid for recycled logs
}
Write("foo");
while (get_reader_contents()->size() < log::kBlockSize * 2) {
Write("xxxxxxxxxxxxxxxx");
}
- unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
+ std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
new test::OverwritingStringSink(get_reader_contents()),
"" /* don't care */));
Writer recycle_writer(std::move(dest_holder), 123, true);
ASSERT_EQ("EOF", Read());
}
-INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
+INSTANTIATE_TEST_CASE_P(bool, LogTest,
+ ::testing::Values(std::make_tuple(0, false),
+ std::make_tuple(0, true),
+ std::make_tuple(1, false),
+ std::make_tuple(1, true)));
+
+class RetriableLogTest : public ::testing::TestWithParam<int> {
+ private:
+ class ReportCollector : public Reader::Reporter {
+ public:
+ size_t dropped_bytes_;
+ std::string message_;
+
+ ReportCollector() : dropped_bytes_(0) {}
+ void Corruption(size_t bytes, const Status& status) override {
+ dropped_bytes_ += bytes;
+ message_.append(status.ToString());
+ }
+ };
+
+ Slice contents_;
+ std::unique_ptr<WritableFileWriter> dest_holder_;
+ std::unique_ptr<Writer> log_writer_;
+ Env* env_;
+ EnvOptions env_options_;
+ const std::string test_dir_;
+ const std::string log_file_;
+ std::unique_ptr<WritableFileWriter> writer_;
+ std::unique_ptr<SequentialFileReader> reader_;
+ ReportCollector report_;
+ std::unique_ptr<FragmentBufferedReader> log_reader_;
+
+ public:
+ RetriableLogTest()
+ : contents_(),
+ dest_holder_(nullptr),
+ log_writer_(nullptr),
+ env_(Env::Default()),
+ test_dir_(test::PerThreadDBPath("retriable_log_test")),
+ log_file_(test_dir_ + "/log"),
+ writer_(nullptr),
+ reader_(nullptr),
+ log_reader_(nullptr) {}
+
+ Status SetupTestEnv() {
+ dest_holder_.reset(test::GetWritableFileWriter(
+ new test::StringSink(&contents_), "" /* file name */));
+ assert(dest_holder_ != nullptr);
+ log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
+ assert(log_writer_ != nullptr);
+
+ Status s;
+ s = env_->CreateDirIfMissing(test_dir_);
+ std::unique_ptr<WritableFile> writable_file;
+ if (s.ok()) {
+ s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
+ }
+ if (s.ok()) {
+ writer_.reset(new WritableFileWriter(std::move(writable_file), log_file_,
+ env_options_));
+ assert(writer_ != nullptr);
+ }
+ std::unique_ptr<SequentialFile> seq_file;
+ if (s.ok()) {
+ s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
+ }
+ if (s.ok()) {
+ reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
+ assert(reader_ != nullptr);
+ log_reader_.reset(new FragmentBufferedReader(
+ nullptr, std::move(reader_), &report_, true /* checksum */,
+ 123 /* log_number */));
+ assert(log_reader_ != nullptr);
+ }
+ return s;
+ }
+
+ std::string contents() {
+ auto file =
+ dynamic_cast<test::StringSink*>(log_writer_->file()->writable_file());
+ assert(file != nullptr);
+ return file->contents_;
+ }
+
+ void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); }
+
+ void Write(const Slice& data) {
+ writer_->Append(data);
+ writer_->Sync(true);
+ }
+
+ bool TryRead(std::string* result) {
+ assert(result != nullptr);
+ result->clear();
+ std::string scratch;
+ Slice record;
+ bool r = log_reader_->ReadRecord(&record, &scratch);
+ if (r) {
+ result->assign(record.data(), record.size());
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+TEST_P(RetriableLogTest, TailLog_PartialHeader) {
+ ASSERT_OK(SetupTestEnv());
+ std::vector<int> remaining_bytes_in_last_record;
+ size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool eof = false;
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"RetriableLogTest::TailLog:AfterPart1",
+ "RetriableLogTest::TailLog:BeforeReadRecord"},
+ {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
+ "RetriableLogTest::TailLog:BeforePart2"}});
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "FragmentBufferedLogReader::TryReadMore:FirstEOF",
+ [&](void* /*arg*/) { eof = true; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ size_t delta = header_size - 1;
+ port::Thread log_writer_thread([&]() {
+ size_t old_sz = contents().size();
+ Encode("foo");
+ size_t new_sz = contents().size();
+ std::string part1 = contents().substr(old_sz, delta);
+ std::string part2 =
+ contents().substr(old_sz + delta, new_sz - old_sz - delta);
+ Write(Slice(part1));
+ TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
+ TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
+ Write(Slice(part2));
+ });
+
+ std::string record;
+ port::Thread log_reader_thread([&]() {
+ TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
+ while (!TryRead(&record)) {
+ }
+ });
+ log_reader_thread.join();
+ log_writer_thread.join();
+ ASSERT_EQ("foo", record);
+ ASSERT_TRUE(eof);
+}
+
+TEST_P(RetriableLogTest, TailLog_FullHeader) {
+ ASSERT_OK(SetupTestEnv());
+ std::vector<int> remaining_bytes_in_last_record;
+ size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ bool eof = false;
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"RetriableLogTest::TailLog:AfterPart1",
+ "RetriableLogTest::TailLog:BeforeReadRecord"},
+ {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
+ "RetriableLogTest::TailLog:BeforePart2"}});
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "FragmentBufferedLogReader::TryReadMore:FirstEOF",
+ [&](void* /*arg*/) { eof = true; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ size_t delta = header_size + 1;
+ port::Thread log_writer_thread([&]() {
+ size_t old_sz = contents().size();
+ Encode("foo");
+ size_t new_sz = contents().size();
+ std::string part1 = contents().substr(old_sz, delta);
+ std::string part2 =
+ contents().substr(old_sz + delta, new_sz - old_sz - delta);
+ Write(Slice(part1));
+ TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
+ TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
+ Write(Slice(part2));
+ ASSERT_TRUE(eof);
+ });
+
+ std::string record;
+ port::Thread log_reader_thread([&]() {
+ TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
+ while (!TryRead(&record)) {
+ }
+ });
+ log_reader_thread.join();
+ log_writer_thread.join();
+ ASSERT_EQ("foo", record);
+}
+
+TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
+ // Clear all sync point callbacks even if this test does not use sync point.
+ // It is necessary, otherwise the execute of this test may hit a sync point
+ // with which a callback is registered. The registered callback may access
+ // some dead variable, causing segfault.
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ ASSERT_OK(SetupTestEnv());
+ size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+ size_t delta = header_size - 1;
+ size_t old_sz = contents().size();
+ Encode("foo-bar");
+ size_t new_sz = contents().size();
+ std::string part1 = contents().substr(old_sz, delta);
+ std::string part2 =
+ contents().substr(old_sz + delta, new_sz - old_sz - delta);
+ Write(Slice(part1));
+ std::string record;
+ ASSERT_FALSE(TryRead(&record));
+ ASSERT_TRUE(record.empty());
+ Write(Slice(part2));
+ ASSERT_TRUE(TryRead(&record));
+ ASSERT_EQ("foo-bar", record);
+}
+
+INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
} // namespace log
} // namespace rocksdb