]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/util/file_reader_writer_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / file_reader_writer_test.cc
index 01faf632e4be38c7e610e2d89e11b4b2015823f6..e778efc3c5499dfde88273db2f783a1d7b0c7758 100644 (file)
@@ -5,58 +5,87 @@
 //
 #include <algorithm>
 #include <vector>
-#include "env/composite_env_wrapper.h"
+
+#include "db/db_test_util.h"
+#include "env/mock_env.h"
+#include "file/line_file_reader.h"
 #include "file/random_access_file_reader.h"
+#include "file/read_write_util.h"
 #include "file/readahead_raf.h"
 #include "file/sequence_file_reader.h"
 #include "file/writable_file_writer.h"
+#include "rocksdb/file_system.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
+#include "util/crc32c.h"
 #include "util/random.h"
+#include "utilities/fault_injection_fs.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 class WritableFileWriterTest : public testing::Test {};
 
-const uint32_t kMb = 1 << 20;
+constexpr uint32_t kMb = static_cast<uint32_t>(1) << 20;
 
 TEST_F(WritableFileWriterTest, RangeSync) {
-  class FakeWF : public WritableFile {
+  class FakeWF : public FSWritableFile {
    public:
     explicit FakeWF() : size_(0), last_synced_(0) {}
     ~FakeWF() override {}
 
-    Status Append(const Slice& data) override {
+    using FSWritableFile::Append;
+    IOStatus Append(const Slice& data, const IOOptions& /*options*/,
+                    IODebugContext* /*dbg*/) override {
       size_ += data.size();
-      return Status::OK();
+      return IOStatus::OK();
+    }
+    IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/,
+                      IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
     }
-    Status Truncate(uint64_t /*size*/) override { return Status::OK(); }
-    Status Close() override {
+    IOStatus Close(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
       EXPECT_GE(size_, last_synced_ + kMb);
       EXPECT_LT(size_, last_synced_ + 2 * kMb);
       // Make sure random writes generated enough writes.
       EXPECT_GT(size_, 10 * kMb);
-      return Status::OK();
+      return IOStatus::OK();
+    }
+    IOStatus Flush(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Sync(const IOOptions& /*options*/,
+                  IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Fsync(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
     }
-    Status Flush() override { return Status::OK(); }
-    Status Sync() override { return Status::OK(); }
-    Status Fsync() override { return Status::OK(); }
     void SetIOPriority(Env::IOPriority /*pri*/) override {}
-    uint64_t GetFileSize() override { return size_; }
+    uint64_t GetFileSize(const IOOptions& /*options*/,
+                         IODebugContext* /*dbg*/) override {
+      return size_;
+    }
     void GetPreallocationStatus(size_t* /*block_size*/,
                                 size_t* /*last_allocated_block*/) override {}
     size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
       return 0;
     }
-    Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
-      return Status::OK();
+    IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
+      return IOStatus::OK();
     }
 
    protected:
-    Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override {
-      return Status::OK();
+    IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
+                      const IOOptions& /*options*/,
+                      IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
     }
-    Status RangeSync(uint64_t offset, uint64_t nbytes) override {
+    IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
+                       const IOOptions& /*options*/,
+                       IODebugContext* /*dbg*/) override {
       EXPECT_EQ(offset % 4096, 0u);
       EXPECT_EQ(nbytes % 4096, 0u);
 
@@ -66,7 +95,7 @@ TEST_F(WritableFileWriterTest, RangeSync) {
       if (size_ > 2 * kMb) {
         EXPECT_LT(size_, last_synced_ + 2 * kMb);
       }
-      return Status::OK();
+      return IOStatus::OK();
     }
 
     uint64_t size_;
@@ -77,8 +106,7 @@ TEST_F(WritableFileWriterTest, RangeSync) {
   env_options.bytes_per_sync = kMb;
   std::unique_ptr<FakeWF> wf(new FakeWF);
   std::unique_ptr<WritableFileWriter> writer(
-      new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
-                             "" /* don't care */, env_options));
+      new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
   Random r(301);
   Status s;
   std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
@@ -99,7 +127,7 @@ TEST_F(WritableFileWriterTest, RangeSync) {
 }
 
 TEST_F(WritableFileWriterTest, IncrementalBuffer) {
-  class FakeWF : public WritableFile {
+  class FakeWF : public FSWritableFile {
    public:
     explicit FakeWF(std::string* _file_data, bool _use_direct_io,
                     bool _no_flush)
@@ -108,37 +136,58 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) {
           no_flush_(_no_flush) {}
     ~FakeWF() override {}
 
-    Status Append(const Slice& data) override {
+    using FSWritableFile::Append;
+    IOStatus Append(const Slice& data, const IOOptions& /*options*/,
+                    IODebugContext* /*dbg*/) override {
       file_data_->append(data.data(), data.size());
       size_ += data.size();
-      return Status::OK();
+      return IOStatus::OK();
     }
-    Status PositionedAppend(const Slice& data, uint64_t pos) override {
+    using FSWritableFile::PositionedAppend;
+    IOStatus PositionedAppend(const Slice& data, uint64_t pos,
+                              const IOOptions& /*options*/,
+                              IODebugContext* /*dbg*/) override {
       EXPECT_TRUE(pos % 512 == 0);
       EXPECT_TRUE(data.size() % 512 == 0);
       file_data_->resize(pos);
       file_data_->append(data.data(), data.size());
       size_ += data.size();
-      return Status::OK();
+      return IOStatus::OK();
     }
 
-    Status Truncate(uint64_t size) override {
+    IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
+                      IODebugContext* /*dbg*/) override {
       file_data_->resize(size);
-      return Status::OK();
+      return IOStatus::OK();
+    }
+    IOStatus Close(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Flush(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Sync(const IOOptions& /*options*/,
+                  IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Fsync(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
     }
-    Status Close() override { return Status::OK(); }
-    Status Flush() override { return Status::OK(); }
-    Status Sync() override { return Status::OK(); }
-    Status Fsync() override { return Status::OK(); }
     void SetIOPriority(Env::IOPriority /*pri*/) override {}
-    uint64_t GetFileSize() override { return size_; }
+    uint64_t GetFileSize(const IOOptions& /*options*/,
+                         IODebugContext* /*dbg*/) override {
+      return size_;
+    }
     void GetPreallocationStatus(size_t* /*block_size*/,
                                 size_t* /*last_allocated_block*/) override {}
     size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
       return 0;
     }
-    Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
-      return Status::OK();
+    IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
+      return IOStatus::OK();
     }
     bool use_direct_io() const override { return use_direct_io_; }
 
@@ -163,51 +212,252 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) {
                                           false,
 #endif
                                           no_flush));
-    std::unique_ptr<WritableFileWriter> writer(
-        new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
-                               "" /* don't care */, env_options));
+    std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter(
+        std::move(wf), "" /* don't care */, env_options));
 
     std::string target;
     for (int i = 0; i < 20; i++) {
       uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
       std::string random_string = r.RandomString(num);
-      writer->Append(Slice(random_string.c_str(), num));
+      ASSERT_OK(writer->Append(Slice(random_string.c_str(), num)));
       target.append(random_string.c_str(), num);
 
       // In some attempts, flush in a chance of 1/10.
       if (!no_flush && r.Uniform(10) == 0) {
-        writer->Flush();
+        ASSERT_OK(writer->Flush());
       }
     }
-    writer->Flush();
-    writer->Close();
+    ASSERT_OK(writer->Flush());
+    ASSERT_OK(writer->Close());
     ASSERT_EQ(target.size(), actual.size());
     ASSERT_EQ(target, actual);
   }
 }
 
+TEST_F(WritableFileWriterTest, BufferWithZeroCapacityDirectIO) {
+  EnvOptions env_opts;
+  env_opts.use_direct_writes = true;
+  env_opts.writable_file_max_buffer_size = 0;
+  {
+    std::unique_ptr<WritableFileWriter> writer;
+    const Status s =
+        WritableFileWriter::Create(FileSystem::Default(), /*fname=*/"dont_care",
+                                   FileOptions(env_opts), &writer,
+                                   /*dbg=*/nullptr);
+    ASSERT_TRUE(s.IsInvalidArgument());
+  }
+}
+
+class DBWritableFileWriterTest : public DBTestBase {
+ public:
+  DBWritableFileWriterTest()
+      : DBTestBase("db_secondary_cache_test", /*env_do_fsync=*/true) {
+    fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
+    fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
+  }
+
+  std::shared_ptr<FaultInjectionTestFS> fault_fs_;
+  std::unique_ptr<Env> fault_env_;
+};
+
+TEST_F(DBWritableFileWriterTest, AppendWithChecksum) {
+  FileOptions file_options = FileOptions();
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  DestroyAndReopen(options);
+  std::string fname = dbname_ + "/test_file";
+  std::unique_ptr<FSWritableFile> writable_file_ptr;
+  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
+                                       /*dbg*/ nullptr));
+  std::unique_ptr<TestFSWritableFile> file;
+  file.reset(new TestFSWritableFile(
+      fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
+  std::unique_ptr<WritableFileWriter> file_writer;
+  ImmutableOptions ioptions(options);
+  file_writer.reset(new WritableFileWriter(
+      std::move(file), fname, file_options, SystemClock::Default().get(),
+      nullptr, ioptions.stats, ioptions.listeners,
+      ioptions.file_checksum_gen_factory.get(), true, true));
+
+  Random rnd(301);
+  std::string data = rnd.RandomString(1000);
+  uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
+  fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+
+  ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
+  ASSERT_OK(file_writer->Flush());
+  Random size_r(47);
+  for (int i = 0; i < 2000; i++) {
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
+    data_crc32c = crc32c::Value(data.c_str(), data.size());
+    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
+
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
+    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
+    ASSERT_OK(file_writer->Flush());
+  }
+  ASSERT_OK(file_writer->Close());
+  Destroy(options);
+}
+
+TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) {
+  FileOptions file_options = FileOptions();
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  DestroyAndReopen(options);
+  std::string fname = dbname_ + "/test_file";
+  std::unique_ptr<FSWritableFile> writable_file_ptr;
+  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
+                                       /*dbg*/ nullptr));
+  std::unique_ptr<TestFSWritableFile> file;
+  file.reset(new TestFSWritableFile(
+      fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
+  std::unique_ptr<WritableFileWriter> file_writer;
+  ImmutableOptions ioptions(options);
+  // Enable checksum handoff for this file, but do not enable buffer checksum.
+  // So Append with checksum logic will not be triggered
+  file_writer.reset(new WritableFileWriter(
+      std::move(file), fname, file_options, SystemClock::Default().get(),
+      nullptr, ioptions.stats, ioptions.listeners,
+      ioptions.file_checksum_gen_factory.get(), true, false));
+
+  Random rnd(301);
+  std::string data = rnd.RandomString(1000);
+  uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size());
+  fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+
+  ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
+  ASSERT_OK(file_writer->Flush());
+  Random size_r(47);
+  for (int i = 0; i < 1000; i++) {
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
+    data_crc32c = crc32c::Value(data.c_str(), data.size());
+    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
+
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
+    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
+    ASSERT_OK(file_writer->Flush());
+  }
+  ASSERT_OK(file_writer->Close());
+  Destroy(options);
+}
+
+TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) {
+  FileOptions file_options = FileOptions();
+  file_options.rate_limiter = nullptr;
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  DestroyAndReopen(options);
+  std::string fname = dbname_ + "/test_file";
+  std::unique_ptr<FSWritableFile> writable_file_ptr;
+  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr,
+                                       /*dbg*/ nullptr));
+  std::unique_ptr<TestFSWritableFile> file;
+  file.reset(new TestFSWritableFile(
+      fname, file_options, std::move(writable_file_ptr), fault_fs_.get()));
+  std::unique_ptr<WritableFileWriter> file_writer;
+  ImmutableOptions ioptions(options);
+  // Enable checksum handoff for this file, but do not enable buffer checksum.
+  // So Append with checksum logic will not be triggered
+  file_writer.reset(new WritableFileWriter(
+      std::move(file), fname, file_options, SystemClock::Default().get(),
+      nullptr, ioptions.stats, ioptions.listeners,
+      ioptions.file_checksum_gen_factory.get(), true, true));
+  fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+
+  Random rnd(301);
+  std::string data;
+  uint32_t data_crc32c;
+  uint64_t start = fault_env_->NowMicros();
+  Random size_r(47);
+  uint64_t bytes_written = 0;
+  for (int i = 0; i < 100; i++) {
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
+    data_crc32c = crc32c::Value(data.c_str(), data.size());
+    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
+    bytes_written += static_cast<uint64_t>(data.size());
+
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
+    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
+    ASSERT_OK(file_writer->Flush());
+    bytes_written += static_cast<uint64_t>(data.size());
+  }
+  uint64_t elapsed = fault_env_->NowMicros() - start;
+  double raw_rate = bytes_written * 1000000.0 / elapsed;
+  ASSERT_OK(file_writer->Close());
+
+  // Set the rate-limiter
+  FileOptions file_options1 = FileOptions();
+  file_options1.rate_limiter =
+      NewGenericRateLimiter(static_cast<int64_t>(0.5 * raw_rate));
+  fname = dbname_ + "/test_file_1";
+  std::unique_ptr<FSWritableFile> writable_file_ptr1;
+  ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1,
+                                       &writable_file_ptr1,
+                                       /*dbg*/ nullptr));
+  file.reset(new TestFSWritableFile(
+      fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get()));
+  // Enable checksum handoff for this file, but do not enable buffer checksum.
+  // So Append with checksum logic will not be triggered
+  file_writer.reset(new WritableFileWriter(
+      std::move(file), fname, file_options1, SystemClock::Default().get(),
+      nullptr, ioptions.stats, ioptions.listeners,
+      ioptions.file_checksum_gen_factory.get(), true, true));
+
+  for (int i = 0; i < 1000; i++) {
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000));
+    data_crc32c = crc32c::Value(data.c_str(), data.size());
+    ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c));
+
+    data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97));
+    ASSERT_OK(file_writer->Append(Slice(data.c_str())));
+    ASSERT_OK(file_writer->Flush());
+  }
+  ASSERT_OK(file_writer->Close());
+  if (file_options1.rate_limiter != nullptr) {
+    delete file_options1.rate_limiter;
+  }
+
+  Destroy(options);
+}
+
 #ifndef ROCKSDB_LITE
 TEST_F(WritableFileWriterTest, AppendStatusReturn) {
-  class FakeWF : public WritableFile {
+  class FakeWF : public FSWritableFile {
    public:
     explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
 
     bool use_direct_io() const override { return use_direct_io_; }
-    Status Append(const Slice& /*data*/) override {
+
+    using FSWritableFile::Append;
+    IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
+                    IODebugContext* /*dbg*/) override {
       if (io_error_) {
-        return Status::IOError("Fake IO error");
+        return IOStatus::IOError("Fake IO error");
       }
-      return Status::OK();
+      return IOStatus::OK();
     }
-    Status PositionedAppend(const Slice& /*data*/, uint64_t) override {
+    using FSWritableFile::PositionedAppend;
+    IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
+                              const IOOptions& /*options*/,
+                              IODebugContext* /*dbg*/) override {
       if (io_error_) {
-        return Status::IOError("Fake IO error");
+        return IOStatus::IOError("Fake IO error");
       }
-      return Status::OK();
+      return IOStatus::OK();
+    }
+    IOStatus Close(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Flush(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Sync(const IOOptions& /*options*/,
+                  IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
     }
-    Status Close() override { return Status::OK(); }
-    Status Flush() override { return Status::OK(); }
-    Status Sync() override { return Status::OK(); }
     void Setuse_direct_io(bool val) { use_direct_io_ = val; }
     void SetIOError(bool val) { io_error_ = val; }
 
@@ -218,15 +468,13 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
   std::unique_ptr<FakeWF> wf(new FakeWF());
   wf->Setuse_direct_io(true);
   std::unique_ptr<WritableFileWriter> writer(
-      new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
-                             "" /* don't care */, EnvOptions()));
+      new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
 
   ASSERT_OK(writer->Append(std::string(2 * kMb, 'a')));
 
   // Next call to WritableFile::Append() should fail
-  LegacyWritableFileWrapper* file =
-      static_cast<LegacyWritableFileWrapper*>(writer->writable_file());
-  static_cast<FakeWF*>(file->target())->SetIOError(true);
+  FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file());
+  fwf->SetIOError(true);
   ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
 }
 #endif
@@ -246,19 +494,21 @@ class ReadaheadRandomAccessFileTest
   ReadaheadRandomAccessFileTest() : control_contents_() {}
   std::string Read(uint64_t offset, size_t n) {
     Slice result;
-    Status s = test_read_holder_->Read(offset, n, &result, scratch_.get());
+    Status s = test_read_holder_->Read(offset, n, IOOptions(), &result,
+                                       scratch_.get(), nullptr);
     EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
     return std::string(result.data(), result.size());
   }
   void ResetSourceStr(const std::string& str = "") {
-    auto write_holder =
-        std::unique_ptr<WritableFileWriter>(test::GetWritableFileWriter(
-            new test::StringSink(&control_contents_), "" /* don't care */));
+    std::unique_ptr<FSWritableFile> sink(
+        new test::StringSink(&control_contents_));
+    std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter(
+        std::move(sink), "" /* don't care */, FileOptions()));
     Status s = write_holder->Append(Slice(str));
     EXPECT_OK(s);
     s = write_holder->Flush();
     EXPECT_OK(s);
-    auto read_holder = std::unique_ptr<RandomAccessFile>(
+    std::unique_ptr<FSRandomAccessFile> read_holder(
         new test::StringSource(control_contents_));
     test_read_holder_ =
         NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
@@ -268,7 +518,7 @@ class ReadaheadRandomAccessFileTest
  private:
   size_t readahead_size_;
   Slice control_contents_;
-  std::unique_ptr<RandomAccessFile> test_read_holder_;
+  std::unique_ptr<FSRandomAccessFile> test_read_holder_;
   std::unique_ptr<char[]> scratch_;
 };
 
@@ -347,16 +597,17 @@ class ReadaheadSequentialFileTest : public testing::Test,
   ReadaheadSequentialFileTest() {}
   std::string Read(size_t n) {
     Slice result;
-    Status s = test_read_holder_->Read(n, &result, scratch_.get());
+    Status s = test_read_holder_->Read(
+        n, &result, scratch_.get(), Env::IO_TOTAL /* rate_limiter_priority*/);
     EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
     return std::string(result.data(), result.size());
   }
   void Skip(size_t n) { test_read_holder_->Skip(n); }
   void ResetSourceStr(const std::string& str = "") {
-    auto read_holder = std::unique_ptr<SequentialFile>(
+    auto read_holder = std::unique_ptr<FSSequentialFile>(
         new test::SeqStringSource(str, &seq_read_count_));
-    test_read_holder_.reset(new SequentialFileReader(
-        NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_));
+    test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
+                                                     "test", readahead_size_));
   }
   size_t GetReadaheadSize() const { return readahead_size_; }
 
@@ -439,9 +690,377 @@ INSTANTIATE_TEST_CASE_P(
 INSTANTIATE_TEST_CASE_P(
     ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
+
+namespace {
+std::string GenerateLine(int n) {
+  std::string rv;
+  // Multiples of 17 characters per line, for likely bad buffer alignment
+  for (int i = 0; i < n; ++i) {
+    rv.push_back(static_cast<char>('0' + (i % 10)));
+    rv.append("xxxxxxxxxxxxxxxx");
+  }
+  return rv;
+}
+}  // namespace
+
+TEST(LineFileReaderTest, LineFileReaderTest) {
+  const int nlines = 1000;
+
+  std::unique_ptr<Env> mem_env(MockEnv::Create(Env::Default()));
+  std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem();
+  // Create an input file
+  {
+    std::unique_ptr<FSWritableFile> file;
+    ASSERT_OK(
+        fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr));
+
+    for (int i = 0; i < nlines; ++i) {
+      std::string line = GenerateLine(i);
+      line.push_back('\n');
+      ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr));
+    }
+  }
+
+  // Verify with no I/O errors
+  {
+    std::unique_ptr<LineFileReader> reader;
+    ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
+                                     nullptr /* dbg */,
+                                     nullptr /* rate_limiter */));
+    std::string line;
+    int count = 0;
+    while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
+      ASSERT_EQ(line, GenerateLine(count));
+      ++count;
+      ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
+    }
+    ASSERT_OK(reader->GetStatus());
+    ASSERT_EQ(count, nlines);
+    ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
+    // And still
+    ASSERT_FALSE(
+        reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
+    ASSERT_OK(reader->GetStatus());
+    ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
+  }
+
+  // Verify with injected I/O error
+  {
+    std::unique_ptr<LineFileReader> reader;
+    ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
+                                     nullptr /* dbg */,
+                                     nullptr /* rate_limiter */));
+    std::string line;
+    int count = 0;
+    // Read part way through the file
+    while (count < nlines / 4) {
+      ASSERT_TRUE(
+          reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
+      ASSERT_EQ(line, GenerateLine(count));
+      ++count;
+      ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
+    }
+    ASSERT_OK(reader->GetStatus());
+
+    // Inject error
+    int callback_count = 0;
+    SyncPoint::GetInstance()->SetCallBack(
+        "MemFile::Read:IOStatus", [&](void* arg) {
+          IOStatus* status = static_cast<IOStatus*>(arg);
+          *status = IOStatus::Corruption("test");
+          ++callback_count;
+        });
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) {
+      ASSERT_EQ(line, GenerateLine(count));
+      ++count;
+      ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
+    }
+    ASSERT_TRUE(reader->GetStatus().IsCorruption());
+    ASSERT_LT(count, nlines / 2);
+    ASSERT_EQ(callback_count, 1);
+
+    // Still get error & no retry
+    ASSERT_FALSE(
+        reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */));
+    ASSERT_TRUE(reader->GetStatus().IsCorruption());
+    ASSERT_EQ(callback_count, 1);
+
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+  }
+}
+
+#ifndef ROCKSDB_LITE
+class IOErrorEventListener : public EventListener {
+ public:
+  IOErrorEventListener() { notify_error_.store(0); }
+
+  void OnIOError(const IOErrorInfo& io_error_info) override {
+    notify_error_++;
+    EXPECT_FALSE(io_error_info.file_path.empty());
+    EXPECT_FALSE(io_error_info.io_status.ok());
+  }
+
+  size_t NotifyErrorCount() { return notify_error_; }
+
+  bool ShouldBeNotifiedOnFileIO() override { return true; }
+
+ private:
+  std::atomic<size_t> notify_error_;
+};
+
+TEST_F(DBWritableFileWriterTest, IOErrorNotification) {
+  class FakeWF : public FSWritableFile {
+   public:
+    explicit FakeWF() : io_error_(false) {
+      file_append_errors_.store(0);
+      file_flush_errors_.store(0);
+    }
+
+    using FSWritableFile::Append;
+    IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
+                    IODebugContext* /*dbg*/) override {
+      if (io_error_) {
+        file_append_errors_++;
+        return IOStatus::IOError("Fake IO error");
+      }
+      return IOStatus::OK();
+    }
+
+    using FSWritableFile::PositionedAppend;
+    IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
+                              const IOOptions& /*options*/,
+                              IODebugContext* /*dbg*/) override {
+      if (io_error_) {
+        return IOStatus::IOError("Fake IO error");
+      }
+      return IOStatus::OK();
+    }
+    IOStatus Close(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+    IOStatus Flush(const IOOptions& /*options*/,
+                   IODebugContext* /*dbg*/) override {
+      if (io_error_) {
+        file_flush_errors_++;
+        return IOStatus::IOError("Fake IO error");
+      }
+      return IOStatus::OK();
+    }
+    IOStatus Sync(const IOOptions& /*options*/,
+                  IODebugContext* /*dbg*/) override {
+      return IOStatus::OK();
+    }
+
+    void SetIOError(bool val) { io_error_ = val; }
+
+    void CheckCounters(int file_append_errors, int file_flush_errors) {
+      ASSERT_EQ(file_append_errors, file_append_errors_);
+      ASSERT_EQ(file_flush_errors_, file_flush_errors);
+    }
+
+   protected:
+    bool io_error_;
+    std::atomic<size_t> file_append_errors_;
+    std::atomic<size_t> file_flush_errors_;
+  };
+
+  FileOptions file_options = FileOptions();
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  IOErrorEventListener* listener = new IOErrorEventListener();
+  options.listeners.emplace_back(listener);
+
+  DestroyAndReopen(options);
+  ImmutableOptions ioptions(options);
+
+  std::string fname = dbname_ + "/test_file";
+  std::unique_ptr<FakeWF> writable_file_ptr(new FakeWF);
+
+  std::unique_ptr<WritableFileWriter> file_writer;
+  writable_file_ptr->SetIOError(true);
+
+  file_writer.reset(new WritableFileWriter(
+      std::move(writable_file_ptr), fname, file_options,
+      SystemClock::Default().get(), nullptr, ioptions.stats, ioptions.listeners,
+      ioptions.file_checksum_gen_factory.get(), true, true));
+
+  FakeWF* fwf = static_cast<FakeWF*>(file_writer->writable_file());
+
+  fwf->SetIOError(true);
+  ASSERT_NOK(file_writer->Append(std::string(2 * kMb, 'a')));
+  fwf->CheckCounters(1, 0);
+  ASSERT_EQ(listener->NotifyErrorCount(), 1);
+
+  file_writer->reset_seen_error();
+  fwf->SetIOError(true);
+  ASSERT_NOK(file_writer->Flush());
+  fwf->CheckCounters(1, 1);
+  ASSERT_EQ(listener->NotifyErrorCount(), 2);
+
+  /* No error generation */
+  file_writer->reset_seen_error();
+  fwf->SetIOError(false);
+  ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b')));
+  ASSERT_EQ(listener->NotifyErrorCount(), 2);
+  fwf->CheckCounters(1, 1);
+}
+#endif  // ROCKSDB_LITE
+
+class WritableFileWriterIOPriorityTest : public testing::Test {
+ protected:
+  // This test is to check whether the rate limiter priority can be passed
+  // correctly from WritableFileWriter functions to FSWritableFile functions.
+
+  void SetUp() override {
+    // When op_rate_limiter_priority parameter in WritableFileWriter functions
+    // is the default (Env::IO_TOTAL).
+    std::unique_ptr<FakeWF> wf{new FakeWF(Env::IO_HIGH)};
+    FileOptions file_options;
+    writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */,
+                                         file_options));
+  }
+
+  class FakeWF : public FSWritableFile {
+   public:
+    explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); }
+    ~FakeWF() override {}
+
+    IOStatus Append(const Slice& /*data*/, const IOOptions& options,
+                    IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus Append(const Slice& data, const IOOptions& options,
+                    const DataVerificationInfo& /* verification_info */,
+                    IODebugContext* dbg) override {
+      return Append(data, options, dbg);
+    }
+    IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/,
+                              const IOOptions& options,
+                              IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus PositionedAppend(
+        const Slice& /* data */, uint64_t /* offset */,
+        const IOOptions& options,
+        const DataVerificationInfo& /* verification_info */,
+        IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus Truncate(uint64_t /*size*/, const IOOptions& options,
+                      IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    uint64_t GetFileSize(const IOOptions& options,
+                         IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return 0;
+    }
+    void GetPreallocationStatus(size_t* /*block_size*/,
+                                size_t* /*last_allocated_block*/) override {}
+    size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
+      return 0;
+    }
+    IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
+      return IOStatus::OK();
+    }
+
+    IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
+                      const IOOptions& options,
+                      IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+    IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/,
+                       const IOOptions& options,
+                       IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+      return IOStatus::OK();
+    }
+
+    void PrepareWrite(size_t /*offset*/, size_t /*len*/,
+                      const IOOptions& options,
+                      IODebugContext* /*dbg*/) override {
+      EXPECT_EQ(options.rate_limiter_priority, io_priority_);
+    }
+
+    bool IsSyncThreadSafe() const override { return true; }
+  };
+
+  std::unique_ptr<WritableFileWriter> writer_;
+};
+
+TEST_F(WritableFileWriterIOPriorityTest, Append) {
+  ASSERT_OK(writer_->Append(Slice("abc")));
+}
+
+TEST_F(WritableFileWriterIOPriorityTest, Pad) { ASSERT_OK(writer_->Pad(500)); }
+
+TEST_F(WritableFileWriterIOPriorityTest, Flush) { ASSERT_OK(writer_->Flush()); }
+
+TEST_F(WritableFileWriterIOPriorityTest, Close) { ASSERT_OK(writer_->Close()); }
+
+TEST_F(WritableFileWriterIOPriorityTest, Sync) {
+  ASSERT_OK(writer_->Sync(false));
+  ASSERT_OK(writer_->Sync(true));
+}
+
+TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) {
+  ASSERT_OK(writer_->SyncWithoutFlush(false));
+  ASSERT_OK(writer_->SyncWithoutFlush(true));
+}
+
+TEST_F(WritableFileWriterIOPriorityTest, BasicOp) {
+  EnvOptions env_options;
+  env_options.bytes_per_sync = kMb;
+  std::unique_ptr<FakeWF> wf(new FakeWF(Env::IO_HIGH));
+  std::unique_ptr<WritableFileWriter> writer(
+      new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
+  Random r(301);
+  Status s;
+  std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
+  for (int i = 0; i < 1000; i++) {
+    int skew_limit = (i < 700) ? 10 : 15;
+    uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
+    s = writer->Append(Slice(large_buf.get(), num));
+    ASSERT_OK(s);
+
+    // Flush in a chance of 1/10.
+    if (r.Uniform(10) == 0) {
+      s = writer->Flush();
+      ASSERT_OK(s);
+    }
+  }
+  s = writer->Close();
+  ASSERT_OK(s);
+}
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }