]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/log_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / log_test.cc
index e14aa202b5a61af5b0e0bbf2e93edce9526c6858..fd237b030e7366e6714d0057e5a426cf2cbc074d 100644 (file)
@@ -43,7 +43,10 @@ static std::string RandomSkewedString(int i, Random* rnd) {
   return BigString(NumberString(i), rnd->Skewed(17));
 }
 
-class LogTest : public ::testing::TestWithParam<int> {
+// Param type is tuple<int, bool>
+// get<0>(tuple): non-zero if recycling log, zero if regular log
+// get<1>(tuple): true if allow retry after read EOF, false otherwise
+class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
  private:
   class StringSource : public SequentialFile {
    public:
@@ -53,16 +56,20 @@ class LogTest : public ::testing::TestWithParam<int> {
     bool force_eof_;
     size_t force_eof_position_;
     bool returned_partial_;
-    explicit StringSource(Slice& contents) :
-      contents_(contents),
-      force_error_(false),
-      force_error_position_(0),
-      force_eof_(false),
-      force_eof_position_(0),
-      returned_partial_(false) { }
-
-    virtual Status Read(size_t n, Slice* result, char* scratch) override {
-      EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
+    bool fail_after_read_partial_;
+    explicit StringSource(Slice& contents, bool fail_after_read_partial)
+        : contents_(contents),
+          force_error_(false),
+          force_error_position_(0),
+          force_eof_(false),
+          force_eof_position_(0),
+          returned_partial_(false),
+          fail_after_read_partial_(fail_after_read_partial) {}
+
+    Status Read(size_t n, Slice* result, char* scratch) override {
+      if (fail_after_read_partial_) {
+        EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
+      }
 
       if (force_error_) {
         if (force_error_position_ >= n) {
@@ -100,7 +107,7 @@ class LogTest : public ::testing::TestWithParam<int> {
       return Status::OK();
     }
 
-    virtual Status Skip(uint64_t n) override {
+    Status Skip(uint64_t n) override {
       if (n > contents_.size()) {
         contents_.clear();
         return Status::NotFound("in-memory file skipepd past end");
@@ -118,7 +125,7 @@ class LogTest : public ::testing::TestWithParam<int> {
     std::string message_;
 
     ReportCollector() : dropped_bytes_(0) { }
-    virtual void Corruption(size_t bytes, const Status& status) override {
+    void Corruption(size_t bytes, const Status& status) override {
       dropped_bytes_ += bytes;
       message_.append(status.ToString());
     }
@@ -139,21 +146,20 @@ class LogTest : public ::testing::TestWithParam<int> {
   }
 
   void reset_source_contents() {
-    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+    auto src = dynamic_cast<StringSource*>(reader_->file()->file());
     assert(src);
     src->contents_ = dest_contents();
   }
 
   Slice reader_contents_;
-  unique_ptr<WritableFileWriter> dest_holder_;
-  unique_ptr<SequentialFileReader> source_holder_;
+  std::unique_ptr<WritableFileWriter> dest_holder_;
+  std::unique_ptr<SequentialFileReader> source_holder_;
   ReportCollector report_;
   Writer writer_;
-  Reader reader_;
+  std::unique_ptr<Reader> reader_;
 
-  // Record metadata for testing initial offset functionality
-  static size_t initial_offset_record_sizes_[];
-  uint64_t initial_offset_last_record_offsets_[4];
+ protected:
+  bool allow_retry_read_;
 
  public:
   LogTest()
@@ -161,17 +167,18 @@ class LogTest : public ::testing::TestWithParam<int> {
         dest_holder_(test::GetWritableFileWriter(
             new test::StringSink(&reader_contents_), "" /* don't care */)),
         source_holder_(test::GetSequentialFileReader(
-            new StringSource(reader_contents_), "" /* file name */)),
-        writer_(std::move(dest_holder_), 123, GetParam()),
-        reader_(nullptr, std::move(source_holder_), &report_,
-                true /* checksum */, 123 /* log_number */) {
-    int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
-    initial_offset_last_record_offsets_[0] = 0;
-    initial_offset_last_record_offsets_[1] = header_size + 10000;
-    initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
-    initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
-                                             (2 * log::kBlockSize - 1000) +
-                                             3 * header_size;
+            new StringSource(reader_contents_, !std::get<1>(GetParam())),
+            "" /* file name */)),
+        writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
+        allow_retry_read_(std::get<1>(GetParam())) {
+    if (allow_retry_read_) {
+      reader_.reset(new FragmentBufferedReader(
+          nullptr, std::move(source_holder_), &report_, true /* checksum */,
+          123 /* log_number */));
+    } else {
+      reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
+                               true /* checksum */, 123 /* log_number */));
+    }
   }
 
   Slice* get_reader_contents() { return &reader_contents_; }
@@ -188,7 +195,9 @@ class LogTest : public ::testing::TestWithParam<int> {
                        WALRecoveryMode::kTolerateCorruptedTailRecords) {
     std::string scratch;
     Slice record;
-    if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
+    bool ret = false;
+    ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
+    if (ret) {
       return record.ToString();
     } else {
       return "EOF";
@@ -220,7 +229,7 @@ class LogTest : public ::testing::TestWithParam<int> {
   }
 
   void ForceError(size_t position = 0) {
-    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+    auto src = dynamic_cast<StringSource*>(reader_->file()->file());
     src->force_error_ = true;
     src->force_error_position_ = position;
   }
@@ -234,20 +243,18 @@ class LogTest : public ::testing::TestWithParam<int> {
   }
 
   void ForceEOF(size_t position = 0) {
-    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+    auto src = dynamic_cast<StringSource*>(reader_->file()->file());
     src->force_eof_ = true;
     src->force_eof_position_ = position;
   }
 
   void UnmarkEOF() {
-    auto src = dynamic_cast<StringSource*>(reader_.file()->file());
+    auto src = dynamic_cast<StringSource*>(reader_->file()->file());
     src->returned_partial_ = false;
-    reader_.UnmarkEOF();
+    reader_->UnmarkEOF();
   }
 
-  bool IsEOF() {
-    return reader_.IsEOF();
-  }
+  bool IsEOF() { return reader_->IsEOF(); }
 
   // Returns OK iff recorded error message contains "msg"
   std::string MatchError(const std::string& msg) const {
@@ -257,23 +264,8 @@ class LogTest : public ::testing::TestWithParam<int> {
       return "OK";
     }
   }
-
-  void WriteInitialOffsetLog() {
-    for (int i = 0; i < 4; i++) {
-      std::string record(initial_offset_record_sizes_[i],
-                         static_cast<char>('a' + i));
-      Write(record);
-    }
-  }
-
 };
 
-size_t LogTest::initial_offset_record_sizes_[] =
-    {10000,  // Two sizable records in first block
-     10000,
-     2 * log::kBlockSize - 1000,  // Span three blocks
-     1};
-
 TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
 
 TEST_P(LogTest, ReadWrite) {
@@ -311,7 +303,8 @@ TEST_P(LogTest, Fragmentation) {
 
 TEST_P(LogTest, MarginalTrailer) {
   // Make a trailer that is exactly the same length as an empty record.
-  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  int header_size =
+      std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
   const int n = kBlockSize - 2 * header_size;
   Write(BigString("foo", n));
   ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
@@ -325,7 +318,8 @@ TEST_P(LogTest, MarginalTrailer) {
 
 TEST_P(LogTest, MarginalTrailer2) {
   // Make a trailer that is exactly the same length as an empty record.
-  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  int header_size =
+      std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
   const int n = kBlockSize - 2 * header_size;
   Write(BigString("foo", n));
   ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
@@ -338,7 +332,8 @@ TEST_P(LogTest, MarginalTrailer2) {
 }
 
 TEST_P(LogTest, ShortTrailer) {
-  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  int header_size =
+      std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
   const int n = kBlockSize - 2 * header_size + 4;
   Write(BigString("foo", n));
   ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
@@ -351,7 +346,8 @@ TEST_P(LogTest, ShortTrailer) {
 }
 
 TEST_P(LogTest, AlignedEof) {
-  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  int header_size =
+      std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
   const int n = kBlockSize - 2 * header_size + 4;
   Write(BigString("foo", n));
   ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
@@ -402,6 +398,11 @@ TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
 }
 
 TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
+  if (allow_retry_read_) {
+    // If read retry is allowed, then truncated trailing record should not
+    // raise an error.
+    return;
+  }
   Write("foo");
   ShrinkSize(4);  // Drop all payload as well as a header byte
   ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
@@ -411,13 +412,20 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
 }
 
 TEST_P(LogTest, BadLength) {
-  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  if (allow_retry_read_) {
+    // If read retry is allowed, then we should not raise an error when the
+    // record length specified in header is longer than data currently
+    // available. It's possible that the body of the record is not written yet.
+    return;
+  }
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
   const int kPayloadSize = kBlockSize - header_size;
   Write(BigString("bar", kPayloadSize));
   Write("foo");
   // Least significant size byte is stored in header[4].
   IncrementByte(4, 1);
-  if (!GetParam()) {
+  if (!recyclable_log) {
     ASSERT_EQ("foo", Read());
     ASSERT_EQ(kBlockSize, DroppedBytes());
     ASSERT_EQ("OK", MatchError("bad record length"));
@@ -427,6 +435,12 @@ TEST_P(LogTest, BadLength) {
 }
 
 TEST_P(LogTest, BadLengthAtEndIsIgnored) {
+  if (allow_retry_read_) {
+    // If read retry is allowed, then we should not raise an error when the
+    // record length specified in header is longer than data currently
+    // available. It's possible that the body of the record is not written yet.
+    return;
+  }
   Write("foo");
   ShrinkSize(1);
   ASSERT_EQ("EOF", Read());
@@ -435,6 +449,12 @@ TEST_P(LogTest, BadLengthAtEndIsIgnored) {
 }
 
 TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
+  if (allow_retry_read_) {
+    // If read retry is allowed, then we should not raise an error when the
+    // record length specified in header is longer than data currently
+    // available. It's possible that the body of the record is not written yet.
+    return;
+  }
   Write("foo");
   ShrinkSize(1);
   ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
@@ -446,7 +466,8 @@ TEST_P(LogTest, ChecksumMismatch) {
   Write("foooooo");
   IncrementByte(0, 14);
   ASSERT_EQ("EOF", Read());
-  if (!GetParam()) {
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  if (!recyclable_log) {
     ASSERT_EQ(14U, DroppedBytes());
     ASSERT_EQ("OK", MatchError("checksum mismatch"));
   } else {
@@ -457,8 +478,10 @@ TEST_P(LogTest, ChecksumMismatch) {
 
 TEST_P(LogTest, UnexpectedMiddleType) {
   Write("foo");
-  SetByte(6, static_cast<char>(GetParam() ? kRecyclableMiddleType : kMiddleType));
-  FixChecksum(0, 3, !!GetParam());
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
+                                              : kMiddleType));
+  FixChecksum(0, 3, !!recyclable_log);
   ASSERT_EQ("EOF", Read());
   ASSERT_EQ(3U, DroppedBytes());
   ASSERT_EQ("OK", MatchError("missing start"));
@@ -466,8 +489,10 @@ TEST_P(LogTest, UnexpectedMiddleType) {
 
 TEST_P(LogTest, UnexpectedLastType) {
   Write("foo");
-  SetByte(6, static_cast<char>(GetParam() ? kRecyclableLastType : kLastType));
-  FixChecksum(0, 3, !!GetParam());
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  SetByte(6,
+          static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
+  FixChecksum(0, 3, !!recyclable_log);
   ASSERT_EQ("EOF", Read());
   ASSERT_EQ(3U, DroppedBytes());
   ASSERT_EQ("OK", MatchError("missing start"));
@@ -476,8 +501,10 @@ TEST_P(LogTest, UnexpectedLastType) {
 TEST_P(LogTest, UnexpectedFullType) {
   Write("foo");
   Write("bar");
-  SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
-  FixChecksum(0, 3, !!GetParam());
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  SetByte(
+      6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
+  FixChecksum(0, 3, !!recyclable_log);
   ASSERT_EQ("bar", Read());
   ASSERT_EQ("EOF", Read());
   ASSERT_EQ(3U, DroppedBytes());
@@ -487,8 +514,10 @@ TEST_P(LogTest, UnexpectedFullType) {
 TEST_P(LogTest, UnexpectedFirstType) {
   Write("foo");
   Write(BigString("bar", 100000));
-  SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
-  FixChecksum(0, 3, !!GetParam());
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  SetByte(
+      6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
+  FixChecksum(0, 3, !!recyclable_log);
   ASSERT_EQ(BigString("bar", 100000), Read());
   ASSERT_EQ("EOF", Read());
   ASSERT_EQ(3U, DroppedBytes());
@@ -505,6 +534,11 @@ TEST_P(LogTest, MissingLastIsIgnored) {
 }
 
 TEST_P(LogTest, MissingLastIsNotIgnored) {
+  if (allow_retry_read_) {
+    // If read retry is allowed, then truncated trailing record should not
+    // raise an error.
+    return;
+  }
   Write(BigString("bar", kBlockSize));
   // Remove the LAST block, including header.
   ShrinkSize(14);
@@ -523,6 +557,11 @@ TEST_P(LogTest, PartialLastIsIgnored) {
 }
 
 TEST_P(LogTest, PartialLastIsNotIgnored) {
+  if (allow_retry_read_) {
+    // If read retry is allowed, then truncated trailing record should not
+    // raise an error.
+    return;
+  }
   Write(BigString("bar", kBlockSize));
   // Cause a bad record length in the LAST block.
   ShrinkSize(1);
@@ -549,7 +588,8 @@ TEST_P(LogTest, ErrorJoinsRecords) {
     SetByte(offset, 'x');
   }
 
-  if (!GetParam()) {
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  if (!recyclable_log) {
     ASSERT_EQ("correct", Read());
     ASSERT_EQ("EOF", Read());
     size_t dropped = DroppedBytes();
@@ -563,7 +603,8 @@ TEST_P(LogTest, ErrorJoinsRecords) {
 TEST_P(LogTest, ClearEofSingleBlock) {
   Write("foo");
   Write("bar");
-  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
   ForceEOF(3 + header_size + 2);
   ASSERT_EQ("foo", Read());
   UnmarkEOF();
@@ -578,7 +619,8 @@ TEST_P(LogTest, ClearEofSingleBlock) {
 
 TEST_P(LogTest, ClearEofMultiBlock) {
   size_t num_full_blocks = 5;
-  int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
   size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
   Write(BigString("foo", n));
   Write(BigString("bar", n));
@@ -627,7 +669,8 @@ TEST_P(LogTest, ClearEofError2) {
 }
 
 TEST_P(LogTest, Recycle) {
-  if (!GetParam()) {
+  bool recyclable_log = (std::get<0>(GetParam()) != 0);
+  if (!recyclable_log) {
     return;  // test is only valid for recycled logs
   }
   Write("foo");
@@ -638,7 +681,7 @@ TEST_P(LogTest, Recycle) {
   while (get_reader_contents()->size() < log::kBlockSize * 2) {
     Write("xxxxxxxxxxxxxxxx");
   }
-  unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
+  std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
       new test::OverwritingStringSink(get_reader_contents()),
       "" /* don't care */));
   Writer recycle_writer(std::move(dest_holder), 123, true);
@@ -650,7 +693,224 @@ TEST_P(LogTest, Recycle) {
   ASSERT_EQ("EOF", Read());
 }
 
-INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
+INSTANTIATE_TEST_CASE_P(bool, LogTest,
+                        ::testing::Values(std::make_tuple(0, false),
+                                          std::make_tuple(0, true),
+                                          std::make_tuple(1, false),
+                                          std::make_tuple(1, true)));
+
+class RetriableLogTest : public ::testing::TestWithParam<int> {
+ private:
+  class ReportCollector : public Reader::Reporter {
+   public:
+    size_t dropped_bytes_;
+    std::string message_;
+
+    ReportCollector() : dropped_bytes_(0) {}
+    void Corruption(size_t bytes, const Status& status) override {
+      dropped_bytes_ += bytes;
+      message_.append(status.ToString());
+    }
+  };
+
+  Slice contents_;
+  std::unique_ptr<WritableFileWriter> dest_holder_;
+  std::unique_ptr<Writer> log_writer_;
+  Env* env_;
+  EnvOptions env_options_;
+  const std::string test_dir_;
+  const std::string log_file_;
+  std::unique_ptr<WritableFileWriter> writer_;
+  std::unique_ptr<SequentialFileReader> reader_;
+  ReportCollector report_;
+  std::unique_ptr<FragmentBufferedReader> log_reader_;
+
+ public:
+  RetriableLogTest()
+      : contents_(),
+        dest_holder_(nullptr),
+        log_writer_(nullptr),
+        env_(Env::Default()),
+        test_dir_(test::PerThreadDBPath("retriable_log_test")),
+        log_file_(test_dir_ + "/log"),
+        writer_(nullptr),
+        reader_(nullptr),
+        log_reader_(nullptr) {}
+
+  Status SetupTestEnv() {
+    dest_holder_.reset(test::GetWritableFileWriter(
+        new test::StringSink(&contents_), "" /* file name */));
+    assert(dest_holder_ != nullptr);
+    log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
+    assert(log_writer_ != nullptr);
+
+    Status s;
+    s = env_->CreateDirIfMissing(test_dir_);
+    std::unique_ptr<WritableFile> writable_file;
+    if (s.ok()) {
+      s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
+    }
+    if (s.ok()) {
+      writer_.reset(new WritableFileWriter(std::move(writable_file), log_file_,
+                                           env_options_));
+      assert(writer_ != nullptr);
+    }
+    std::unique_ptr<SequentialFile> seq_file;
+    if (s.ok()) {
+      s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
+    }
+    if (s.ok()) {
+      reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
+      assert(reader_ != nullptr);
+      log_reader_.reset(new FragmentBufferedReader(
+          nullptr, std::move(reader_), &report_, true /* checksum */,
+          123 /* log_number */));
+      assert(log_reader_ != nullptr);
+    }
+    return s;
+  }
+
+  std::string contents() {
+    auto file =
+        dynamic_cast<test::StringSink*>(log_writer_->file()->writable_file());
+    assert(file != nullptr);
+    return file->contents_;
+  }
+
+  void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); }
+
+  void Write(const Slice& data) {
+    writer_->Append(data);
+    writer_->Sync(true);
+  }
+
+  bool TryRead(std::string* result) {
+    assert(result != nullptr);
+    result->clear();
+    std::string scratch;
+    Slice record;
+    bool r = log_reader_->ReadRecord(&record, &scratch);
+    if (r) {
+      result->assign(record.data(), record.size());
+      return true;
+    } else {
+      return false;
+    }
+  }
+};
+
+TEST_P(RetriableLogTest, TailLog_PartialHeader) {
+  ASSERT_OK(SetupTestEnv());
+  std::vector<int> remaining_bytes_in_last_record;
+  size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  bool eof = false;
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"RetriableLogTest::TailLog:AfterPart1",
+        "RetriableLogTest::TailLog:BeforeReadRecord"},
+       {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
+        "RetriableLogTest::TailLog:BeforePart2"}});
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->SetCallBack(
+      "FragmentBufferedLogReader::TryReadMore:FirstEOF",
+      [&](void* /*arg*/) { eof = true; });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  size_t delta = header_size - 1;
+  port::Thread log_writer_thread([&]() {
+    size_t old_sz = contents().size();
+    Encode("foo");
+    size_t new_sz = contents().size();
+    std::string part1 = contents().substr(old_sz, delta);
+    std::string part2 =
+        contents().substr(old_sz + delta, new_sz - old_sz - delta);
+    Write(Slice(part1));
+    TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
+    TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
+    Write(Slice(part2));
+  });
+
+  std::string record;
+  port::Thread log_reader_thread([&]() {
+    TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
+    while (!TryRead(&record)) {
+    }
+  });
+  log_reader_thread.join();
+  log_writer_thread.join();
+  ASSERT_EQ("foo", record);
+  ASSERT_TRUE(eof);
+}
+
+TEST_P(RetriableLogTest, TailLog_FullHeader) {
+  ASSERT_OK(SetupTestEnv());
+  std::vector<int> remaining_bytes_in_last_record;
+  size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  bool eof = false;
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"RetriableLogTest::TailLog:AfterPart1",
+        "RetriableLogTest::TailLog:BeforeReadRecord"},
+       {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
+        "RetriableLogTest::TailLog:BeforePart2"}});
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->SetCallBack(
+      "FragmentBufferedLogReader::TryReadMore:FirstEOF",
+      [&](void* /*arg*/) { eof = true; });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  size_t delta = header_size + 1;
+  port::Thread log_writer_thread([&]() {
+    size_t old_sz = contents().size();
+    Encode("foo");
+    size_t new_sz = contents().size();
+    std::string part1 = contents().substr(old_sz, delta);
+    std::string part2 =
+        contents().substr(old_sz + delta, new_sz - old_sz - delta);
+    Write(Slice(part1));
+    TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
+    TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
+    Write(Slice(part2));
+    ASSERT_TRUE(eof);
+  });
+
+  std::string record;
+  port::Thread log_reader_thread([&]() {
+    TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
+    while (!TryRead(&record)) {
+    }
+  });
+  log_reader_thread.join();
+  log_writer_thread.join();
+  ASSERT_EQ("foo", record);
+}
+
+TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
+  // Clear all sync point callbacks even if this test does not use sync point.
+  // It is necessary, otherwise the execute of this test may hit a sync point
+  // with which a callback is registered. The registered callback may access
+  // some dead variable, causing segfault.
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  ASSERT_OK(SetupTestEnv());
+  size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
+  size_t delta = header_size - 1;
+  size_t old_sz = contents().size();
+  Encode("foo-bar");
+  size_t new_sz = contents().size();
+  std::string part1 = contents().substr(old_sz, delta);
+  std::string part2 =
+      contents().substr(old_sz + delta, new_sz - old_sz - delta);
+  Write(Slice(part1));
+  std::string record;
+  ASSERT_FALSE(TryRead(&record));
+  ASSERT_TRUE(record.empty());
+  Write(Slice(part2));
+  ASSERT_TRUE(TryRead(&record));
+  ASSERT_EQ("foo-bar", record);
+}
+
+INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
 
 }  // namespace log
 }  // namespace rocksdb