]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/env/mock_env.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / env / mock_env.cc
index c246c13e1ef6d7a18f4981c70f1f1574bf06347b..d83e78ffc6cd784a1ccf536193654bdf249eac85 100644 (file)
@@ -8,11 +8,15 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "env/mock_env.h"
+
 #include <algorithm>
 #include <chrono>
+
+#include "file/filename.h"
 #include "port/sys_time.h"
+#include "rocksdb/file_system.h"
 #include "util/cast_util.h"
-#include "util/murmurhash.h"
+#include "util/hash.h"
 #include "util/random.h"
 #include "util/rate_limiter.h"
 
@@ -28,8 +32,7 @@ class MemFile {
         locked_(false),
         size_(0),
         modified_time_(Now()),
-        rnd_(static_cast<uint32_t>(
-            MurmurHash(fn.data(), static_cast<int>(fn.size()), 0))),
+        rnd_(Lower32of64(GetSliceNPHash64(fn))),
         fsynced_bytes_(0) {}
   // No copying allowed.
   MemFile(const MemFile&) = delete;
@@ -77,7 +80,8 @@ class MemFile {
 
   uint64_t Size() const { return size_; }
 
-  void Truncate(size_t size) {
+  void Truncate(size_t size, const IOOptions& /*options*/,
+                IODebugContext* /*dbg*/) {
     MutexLock lock(&mutex_);
     if (size < size_) {
       data_.resize(size);
@@ -99,7 +103,8 @@ class MemFile {
     }
   }
 
-  Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {
+  IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
+                Slice* result, char* scratch, IODebugContext* /*dbg*/) const {
     MutexLock lock(&mutex_);
     const uint64_t available = Size() - std::min(Size(), offset);
     size_t offset_ = static_cast<size_t>(offset);
@@ -108,7 +113,7 @@ class MemFile {
     }
     if (n == 0) {
       *result = Slice();
-      return Status::OK();
+      return IOStatus::OK();
     }
     if (scratch) {
       memcpy(scratch, &(data_[offset_]), n);
@@ -116,10 +121,11 @@ class MemFile {
     } else {
       *result = Slice(&(data_[offset_]), n);
     }
-    return Status::OK();
+    return IOStatus::OK();
   }
 
-  Status Write(uint64_t offset, const Slice& data) {
+  IOStatus Write(uint64_t offset, const Slice& data,
+                 const IOOptions& /*options*/, IODebugContext* /*dbg*/) {
     MutexLock lock(&mutex_);
     size_t offset_ = static_cast<size_t>(offset);
     if (offset + data.size() > data_.size()) {
@@ -128,20 +134,21 @@ class MemFile {
     data_.replace(offset_, data.size(), data.data(), data.size());
     size_ = data_.size();
     modified_time_ = Now();
-    return Status::OK();
+    return IOStatus::OK();
   }
 
-  Status Append(const Slice& data) {
+  IOStatus Append(const Slice& data, const IOOptions& /*options*/,
+                  IODebugContext* /*dbg*/) {
     MutexLock lock(&mutex_);
     data_.append(data.data(), data.size());
     size_ = data_.size();
     modified_time_ = Now();
-    return Status::OK();
+    return IOStatus::OK();
   }
 
-  Status Fsync() {
+  IOStatus Fsync(const IOOptions& /*options*/, IODebugContext* /*dbg*/) {
     fsynced_bytes_ = size_.load();
-    return Status::OK();
+    return IOStatus::OK();
   }
 
   uint64_t ModifiedTime() const { return modified_time_; }
@@ -176,111 +183,176 @@ class MemFile {
 
 namespace {
 
-class MockSequentialFile : public SequentialFile {
+class MockSequentialFile : public FSSequentialFile {
  public:
-  explicit MockSequentialFile(MemFile* file) : file_(file), pos_(0) {
+  explicit MockSequentialFile(MemFile* file, const FileOptions& opts)
+      : file_(file),
+        use_direct_io_(opts.use_direct_reads),
+        use_mmap_read_(opts.use_mmap_reads),
+        pos_(0) {
     file_->Ref();
   }
 
   ~MockSequentialFile() override { file_->Unref(); }
 
-  Status Read(size_t n, Slice* result, char* scratch) override {
-    Status s = file_->Read(pos_, n, result, scratch);
+  IOStatus Read(size_t n, const IOOptions& options, Slice* result,
+                char* scratch, IODebugContext* dbg) override {
+    IOStatus s = file_->Read(pos_, n, options, result,
+                             (use_mmap_read_) ? nullptr : scratch, dbg);
     if (s.ok()) {
       pos_ += result->size();
     }
     return s;
   }
 
-  Status Skip(uint64_t n) override {
+  bool use_direct_io() const override { return use_direct_io_; }
+  IOStatus Skip(uint64_t n) override {
     if (pos_ > file_->Size()) {
-      return Status::IOError("pos_ > file_->Size()");
+      return IOStatus::IOError("pos_ > file_->Size()");
     }
     const uint64_t available = file_->Size() - pos_;
     if (n > available) {
       n = available;
     }
     pos_ += static_cast<size_t>(n);
-    return Status::OK();
+    return IOStatus::OK();
   }
 
  private:
   MemFile* file_;
+  bool use_direct_io_;
+  bool use_mmap_read_;
   size_t pos_;
 };
 
-class MockRandomAccessFile : public RandomAccessFile {
+class MockRandomAccessFile : public FSRandomAccessFile {
  public:
-  explicit MockRandomAccessFile(MemFile* file) : file_(file) { file_->Ref(); }
+  explicit MockRandomAccessFile(MemFile* file, const FileOptions& opts)
+      : file_(file),
+        use_direct_io_(opts.use_direct_reads),
+        use_mmap_read_(opts.use_mmap_reads) {
+    file_->Ref();
+  }
 
   ~MockRandomAccessFile() override { file_->Unref(); }
 
-  Status Read(uint64_t offset, size_t n, Slice* result,
-              char* scratch) const override {
-    return file_->Read(offset, n, result, scratch);
+  bool use_direct_io() const override { return use_direct_io_; }
+
+  IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
+                    const IOOptions& /*options*/,
+                    IODebugContext* /*dbg*/) override {
+    return IOStatus::OK();
+  }
+
+  IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
+                Slice* result, char* scratch,
+                IODebugContext* dbg) const override {
+    if (use_mmap_read_) {
+      return file_->Read(offset, n, options, result, nullptr, dbg);
+    } else {
+      return file_->Read(offset, n, options, result, scratch, dbg);
+    }
   }
 
  private:
   MemFile* file_;
+  bool use_direct_io_;
+  bool use_mmap_read_;
 };
 
-class MockRandomRWFile : public RandomRWFile {
+class MockRandomRWFile : public FSRandomRWFile {
  public:
   explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); }
 
   ~MockRandomRWFile() override { file_->Unref(); }
 
-  Status Write(uint64_t offset, const Slice& data) override {
-    return file_->Write(offset, data);
+  IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options,
+                 IODebugContext* dbg) override {
+    return file_->Write(offset, data, options, dbg);
   }
 
-  Status Read(uint64_t offset, size_t n, Slice* result,
-              char* scratch) const override {
-    return file_->Read(offset, n, result, scratch);
+  IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
+                Slice* result, char* scratch,
+                IODebugContext* dbg) const override {
+    return file_->Read(offset, n, options, result, scratch, dbg);
   }
 
-  Status Close() override { return file_->Fsync(); }
+  IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+    return file_->Fsync(options, dbg);
+  }
 
-  Status Flush() override { return Status::OK(); }
+  IOStatus Flush(const IOOptions& /*options*/,
+                 IODebugContext* /*dbg*/) override {
+    return IOStatus::OK();
+  }
 
-  Status Sync() override { return file_->Fsync(); }
+  IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+    return file_->Fsync(options, dbg);
+  }
 
  private:
   MemFile* file_;
 };
 
-class MockWritableFile : public WritableFile {
+class MockWritableFile : public FSWritableFile {
  public:
-  MockWritableFile(MemFile* file, RateLimiter* rate_limiter)
-      : file_(file), rate_limiter_(rate_limiter) {
+  MockWritableFile(MemFile* file, const FileOptions& opts)
+      : file_(file),
+        use_direct_io_(opts.use_direct_writes),
+        rate_limiter_(opts.rate_limiter) {
     file_->Ref();
   }
 
   ~MockWritableFile() override { file_->Unref(); }
 
-  Status Append(const Slice& data) override {
+  bool use_direct_io() const override { return false && use_direct_io_; }
+
+  using FSWritableFile::Append;
+  IOStatus Append(const Slice& data, const IOOptions& options,
+                  IODebugContext* dbg) override {
     size_t bytes_written = 0;
     while (bytes_written < data.size()) {
       auto bytes = RequestToken(data.size() - bytes_written);
-      Status s = file_->Append(Slice(data.data() + bytes_written, bytes));
+      IOStatus s = file_->Append(Slice(data.data() + bytes_written, bytes),
+                                 options, dbg);
       if (!s.ok()) {
         return s;
       }
       bytes_written += bytes;
     }
-    return Status::OK();
+    return IOStatus::OK();
+  }
+
+  using FSWritableFile::PositionedAppend;
+  IOStatus PositionedAppend(const Slice& data, uint64_t /*offset*/,
+                            const IOOptions& options,
+                            IODebugContext* dbg) override {
+    assert(use_direct_io_);
+    return Append(data, options, dbg);
+  }
+
+  IOStatus Truncate(uint64_t size, const IOOptions& options,
+                    IODebugContext* dbg) override {
+    file_->Truncate(static_cast<size_t>(size), options, dbg);
+    return IOStatus::OK();
   }
-  Status Truncate(uint64_t size) override {
-    file_->Truncate(static_cast<size_t>(size));
-    return Status::OK();
+  IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+    return file_->Fsync(options, dbg);
   }
-  Status Close() override { return file_->Fsync(); }
 
-  Status Flush() override { return Status::OK(); }
+  IOStatus Flush(const IOOptions& /*options*/,
+                 IODebugContext* /*dbg*/) override {
+    return IOStatus::OK();
+  }
 
-  Status Sync() override { return file_->Fsync(); }
+  IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
+    return file_->Fsync(options, dbg);
+  }
 
-  uint64_t GetFileSize() override { return file_->Size(); }
+  uint64_t GetFileSize(const IOOptions& /*options*/,
+                       IODebugContext* /*dbg*/) override {
+    return file_->Size();
+  }
 
  private:
   inline size_t RequestToken(size_t bytes) {
@@ -293,12 +365,16 @@ class MockWritableFile : public WritableFile {
   }
 
   MemFile* file_;
+  bool use_direct_io_;
   RateLimiter* rate_limiter_;
 };
 
-class MockEnvDirectory : public Directory {
+class MockEnvDirectory : public FSDirectory {
  public:
-  Status Fsync() override { return Status::OK(); }
+  IOStatus Fsync(const IOOptions& /*options*/,
+                 IODebugContext* /*dbg*/) override {
+    return IOStatus::OK();
+  }
 };
 
 class MockEnvFileLock : public FileLock {
@@ -313,21 +389,26 @@ class MockEnvFileLock : public FileLock {
 
 class TestMemLogger : public Logger {
  private:
-  std::unique_ptr<WritableFile> file_;
+  std::unique_ptr<FSWritableFile> file_;
   std::atomic_size_t log_size_;
   static const uint64_t flush_every_seconds_ = 5;
   std::atomic_uint_fast64_t last_flush_micros_;
   Env* env_;
+  IOOptions options_;
+  IODebugContext* dbg_;
   std::atomic<bool> flush_pending_;
 
  public:
-  TestMemLogger(std::unique_ptr<WritableFile> f, Env* env,
+  TestMemLogger(std::unique_ptr<FSWritableFile> f, Env* env,
+                const IOOptions& options, IODebugContext* dbg,
                 const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
       : Logger(log_level),
         file_(std::move(f)),
         log_size_(0),
         last_flush_micros_(0),
         env_(env),
+        options_(options),
+        dbg_(dbg),
         flush_pending_(false) {}
   ~TestMemLogger() override {}
 
@@ -393,9 +474,11 @@ class TestMemLogger : public Logger {
       assert(p <= limit);
       const size_t write_size = p - base;
 
-      file_->Append(Slice(base, write_size));
-      flush_pending_ = true;
-      log_size_ += write_size;
+      Status s = file_->Append(Slice(base, write_size), options_, dbg_);
+      if (s.ok()) {
+        flush_pending_ = true;
+        log_size_ += write_size;
+      }
       uint64_t now_micros =
           static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec;
       if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
@@ -411,151 +494,304 @@ class TestMemLogger : public Logger {
   size_t GetLogFileSize() const override { return log_size_; }
 };
 
-}  // Anonymous namespace
+class MockFileSystem : public FileSystem {
+ public:
+  explicit MockFileSystem(Env* env, bool supports_direct_io = true)
+      : env_(env), supports_direct_io_(supports_direct_io) {}
 
-MockEnv::MockEnv(Env* base_env) : EnvWrapper(base_env), fake_sleep_micros_(0) {}
+  ~MockFileSystem() override {
+    for (auto i = file_map_.begin(); i != file_map_.end(); ++i) {
+      i->second->Unref();
+    }
+  }
 
-MockEnv::~MockEnv() {
-  for (FileSystem::iterator i = file_map_.begin(); i != file_map_.end(); ++i) {
-    i->second->Unref();
+  const char* Name() const override { return "Memory"; }
+  IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts,
+                             std::unique_ptr<FSSequentialFile>* r,
+                             IODebugContext* dbg) override;
+  IOStatus NewRandomAccessFile(const std::string& f,
+                               const FileOptions& file_opts,
+                               std::unique_ptr<FSRandomAccessFile>* r,
+                               IODebugContext* dbg) override;
+
+  IOStatus NewRandomRWFile(const std::string& fname,
+                           const FileOptions& file_opts,
+                           std::unique_ptr<FSRandomRWFile>* result,
+                           IODebugContext* dbg) override;
+  IOStatus ReuseWritableFile(const std::string& fname,
+                             const std::string& old_fname,
+                             const FileOptions& file_opts,
+                             std::unique_ptr<FSWritableFile>* result,
+                             IODebugContext* dbg) override;
+  IOStatus NewWritableFile(const std::string& fname,
+                           const FileOptions& file_opts,
+                           std::unique_ptr<FSWritableFile>* result,
+                           IODebugContext* dbg) override;
+  IOStatus ReopenWritableFile(const std::string& fname,
+                              const FileOptions& options,
+                              std::unique_ptr<FSWritableFile>* result,
+                              IODebugContext* dbg) override;
+  IOStatus NewDirectory(const std::string& /*name*/, const IOOptions& io_opts,
+                        std::unique_ptr<FSDirectory>* result,
+                        IODebugContext* dbg) override;
+  IOStatus FileExists(const std::string& fname, const IOOptions& /*io_opts*/,
+                      IODebugContext* /*dbg*/) override;
+  IOStatus GetChildren(const std::string& dir, const IOOptions& options,
+                       std::vector<std::string>* result,
+                       IODebugContext* dbg) override;
+  IOStatus DeleteFile(const std::string& fname, const IOOptions& options,
+                      IODebugContext* dbg) override;
+  IOStatus Truncate(const std::string& fname, size_t size,
+                    const IOOptions& options, IODebugContext* dbg) override;
+  IOStatus CreateDir(const std::string& dirname, const IOOptions& options,
+                     IODebugContext* dbg) override;
+  IOStatus CreateDirIfMissing(const std::string& dirname,
+                              const IOOptions& options,
+                              IODebugContext* dbg) override;
+  IOStatus DeleteDir(const std::string& dirname, const IOOptions& options,
+                     IODebugContext* dbg) override;
+
+  IOStatus GetFileSize(const std::string& fname, const IOOptions& options,
+                       uint64_t* file_size, IODebugContext* dbg) override;
+
+  IOStatus GetFileModificationTime(const std::string& fname,
+                                   const IOOptions& options,
+                                   uint64_t* file_mtime,
+                                   IODebugContext* dbg) override;
+  IOStatus RenameFile(const std::string& src, const std::string& target,
+                      const IOOptions& options, IODebugContext* dbg) override;
+  IOStatus LinkFile(const std::string& /*src*/, const std::string& /*target*/,
+                    const IOOptions& /*options*/,
+                    IODebugContext* /*dbg*/) override;
+  IOStatus LockFile(const std::string& fname, const IOOptions& options,
+                    FileLock** lock, IODebugContext* dbg) override;
+  IOStatus UnlockFile(FileLock* lock, const IOOptions& options,
+                      IODebugContext* dbg) override;
+  IOStatus GetTestDirectory(const IOOptions& options, std::string* path,
+                            IODebugContext* dbg) override;
+  IOStatus NewLogger(const std::string& fname, const IOOptions& io_opts,
+                     std::shared_ptr<Logger>* result,
+                     IODebugContext* dbg) override;
+  // Get full directory name for this db.
+  IOStatus GetAbsolutePath(const std::string& db_path,
+                           const IOOptions& /*options*/,
+                           std::string* output_path,
+                           IODebugContext* /*dbg*/) override {
+    *output_path = NormalizeMockPath(db_path);
+    if (output_path->at(0) != '/') {
+      return IOStatus::NotSupported("GetAbsolutePath");
+    } else {
+      return IOStatus::OK();
+    }
   }
-}
+  IOStatus IsDirectory(const std::string& /*path*/,
+                       const IOOptions& /*options*/, bool* /*is_dir*/,
+                       IODebugContext* /*dgb*/) override {
+    return IOStatus::NotSupported("IsDirectory");
+  }
+
+  Status CorruptBuffer(const std::string& fname);
+
+ private:
+  bool RenameFileInternal(const std::string& src, const std::string& dest);
+  void DeleteFileInternal(const std::string& fname);
+  bool GetChildrenInternal(const std::string& fname,
+                           std::vector<std::string>* results);
+
+  std::string NormalizeMockPath(const std::string& path) {
+    std::string p = NormalizePath(path);
+    if (p.back() == kFilePathSeparator && p.size() > 1) {
+      p.pop_back();
+    }
+    return p;
+  }
+
+ private:
+  // Map from filenames to MemFile objects, representing a simple file system.
+  port::Mutex mutex_;
+  std::map<std::string, MemFile*> file_map_;  // Protected by mutex_.
+  Env* env_;
+  bool supports_direct_io_;
+};
 
+}  // Anonymous namespace
 // Partial implementation of the Env interface.
-Status MockEnv::NewSequentialFile(const std::string& fname,
-                                  std::unique_ptr<SequentialFile>* result,
-                                  const EnvOptions& /*soptions*/) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::NewSequentialFile(
+    const std::string& fname, const FileOptions& file_opts,
+    std::unique_ptr<FSSequentialFile>* result, IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
+
   MutexLock lock(&mutex_);
   if (file_map_.find(fn) == file_map_.end()) {
     *result = nullptr;
-    return Status::IOError(fn, "File not found");
+    return IOStatus::PathNotFound(fn);
   }
   auto* f = file_map_[fn];
   if (f->is_lock_file()) {
-    return Status::InvalidArgument(fn, "Cannot open a lock file.");
+    return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
+  } else if (file_opts.use_direct_reads && !supports_direct_io_) {
+    return IOStatus::NotSupported("Direct I/O Not Supported");
+  } else {
+    result->reset(new MockSequentialFile(f, file_opts));
+    return IOStatus::OK();
   }
-  result->reset(new MockSequentialFile(f));
-  return Status::OK();
 }
 
-Status MockEnv::NewRandomAccessFile(const std::string& fname,
-                                    std::unique_ptr<RandomAccessFile>* result,
-                                    const EnvOptions& /*soptions*/) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::NewRandomAccessFile(
+    const std::string& fname, const FileOptions& file_opts,
+    std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   if (file_map_.find(fn) == file_map_.end()) {
     *result = nullptr;
-    return Status::IOError(fn, "File not found");
+    return IOStatus::PathNotFound(fn);
   }
   auto* f = file_map_[fn];
   if (f->is_lock_file()) {
-    return Status::InvalidArgument(fn, "Cannot open a lock file.");
+    return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
+  } else if (file_opts.use_direct_reads && !supports_direct_io_) {
+    return IOStatus::NotSupported("Direct I/O Not Supported");
+  } else {
+    result->reset(new MockRandomAccessFile(f, file_opts));
+    return IOStatus::OK();
   }
-  result->reset(new MockRandomAccessFile(f));
-  return Status::OK();
 }
 
-Status MockEnv::NewRandomRWFile(const std::string& fname,
-                                std::unique_ptr<RandomRWFile>* result,
-                                const EnvOptions& /*soptions*/) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::NewRandomRWFile(
+    const std::string& fname, const FileOptions& /*file_opts*/,
+    std::unique_ptr<FSRandomRWFile>* result, IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   if (file_map_.find(fn) == file_map_.end()) {
     *result = nullptr;
-    return Status::IOError(fn, "File not found");
+    return IOStatus::PathNotFound(fn);
   }
   auto* f = file_map_[fn];
   if (f->is_lock_file()) {
-    return Status::InvalidArgument(fn, "Cannot open a lock file.");
+    return IOStatus::InvalidArgument(fn, "Cannot open a lock file.");
   }
   result->reset(new MockRandomRWFile(f));
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::ReuseWritableFile(const std::string& fname,
-                                  const std::string& old_fname,
-                                  std::unique_ptr<WritableFile>* result,
-                                  const EnvOptions& options) {
-  auto s = RenameFile(old_fname, fname);
+IOStatus MockFileSystem::ReuseWritableFile(
+    const std::string& fname, const std::string& old_fname,
+    const FileOptions& options, std::unique_ptr<FSWritableFile>* result,
+    IODebugContext* dbg) {
+  auto s = RenameFile(old_fname, fname, IOOptions(), dbg);
   if (!s.ok()) {
     return s;
+  } else {
+    result->reset();
+    return NewWritableFile(fname, options, result, dbg);
   }
-  result->reset();
-  return NewWritableFile(fname, result, options);
 }
 
-Status MockEnv::NewWritableFile(const std::string& fname,
-                                std::unique_ptr<WritableFile>* result,
-                                const EnvOptions& env_options) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::NewWritableFile(
+    const std::string& fname, const FileOptions& file_opts,
+    std::unique_ptr<FSWritableFile>* result, IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   if (file_map_.find(fn) != file_map_.end()) {
     DeleteFileInternal(fn);
   }
-  MemFile* file = new MemFile(this, fn, false);
+  MemFile* file = new MemFile(env_, fn, false);
   file->Ref();
   file_map_[fn] = file;
+  if (file_opts.use_direct_writes && !supports_direct_io_) {
+    return IOStatus::NotSupported("Direct I/O Not Supported");
+  } else {
+    result->reset(new MockWritableFile(file, file_opts));
+    return IOStatus::OK();
+  }
+}
 
-  result->reset(new MockWritableFile(file, env_options.rate_limiter));
-  return Status::OK();
+IOStatus MockFileSystem::ReopenWritableFile(
+    const std::string& fname, const FileOptions& file_opts,
+    std::unique_ptr<FSWritableFile>* result, IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
+  MutexLock lock(&mutex_);
+  MemFile* file = nullptr;
+  if (file_map_.find(fn) == file_map_.end()) {
+    file = new MemFile(env_, fn, false);
+    file_map_[fn] = file;
+  } else {
+    file = file_map_[fn];
+  }
+  file->Ref();
+  if (file_opts.use_direct_writes && !supports_direct_io_) {
+    return IOStatus::NotSupported("Direct I/O Not Supported");
+  } else {
+    result->reset(new MockWritableFile(file, file_opts));
+    return IOStatus::OK();
+  }
 }
 
-Status MockEnv::NewDirectory(const std::string& /*name*/,
-                             std::unique_ptr<Directory>* result) {
+IOStatus MockFileSystem::NewDirectory(const std::string& /*name*/,
+                                      const IOOptions& /*io_opts*/,
+                                      std::unique_ptr<FSDirectory>* result,
+                                      IODebugContext* /*dbg*/) {
   result->reset(new MockEnvDirectory());
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::FileExists(const std::string& fname) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::FileExists(const std::string& fname,
+                                    const IOOptions& /*io_opts*/,
+                                    IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   if (file_map_.find(fn) != file_map_.end()) {
     // File exists
-    return Status::OK();
+    return IOStatus::OK();
   }
   // Now also check if fn exists as a dir
   for (const auto& iter : file_map_) {
     const std::string& filename = iter.first;
     if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' &&
         Slice(filename).starts_with(Slice(fn))) {
-      return Status::OK();
+      return IOStatus::OK();
     }
   }
-  return Status::NotFound();
+  return IOStatus::NotFound();
 }
 
-Status MockEnv::GetChildren(const std::string& dir,
-                            std::vector<std::string>* result) {
-  auto d = NormalizePath(dir);
+bool MockFileSystem::GetChildrenInternal(const std::string& dir,
+                                         std::vector<std::string>* result) {
+  auto d = NormalizeMockPath(dir);
   bool found_dir = false;
-  {
-    MutexLock lock(&mutex_);
-    result->clear();
-    for (const auto& iter : file_map_) {
-      const std::string& filename = iter.first;
-
-      if (filename == d) {
-        found_dir = true;
-      } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' &&
-                 Slice(filename).starts_with(Slice(d))) {
-        found_dir = true;
-        size_t next_slash = filename.find('/', d.size() + 1);
-        if (next_slash != std::string::npos) {
-          result->push_back(
-              filename.substr(d.size() + 1, next_slash - d.size() - 1));
-        } else {
-          result->push_back(filename.substr(d.size() + 1));
-        }
+  result->clear();
+  for (const auto& iter : file_map_) {
+    const std::string& filename = iter.first;
+
+    if (filename == d) {
+      found_dir = true;
+    } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' &&
+               Slice(filename).starts_with(Slice(d))) {
+      found_dir = true;
+      size_t next_slash = filename.find('/', d.size() + 1);
+      if (next_slash != std::string::npos) {
+        result->push_back(
+            filename.substr(d.size() + 1, next_slash - d.size() - 1));
+      } else {
+        result->push_back(filename.substr(d.size() + 1));
       }
     }
   }
   result->erase(std::unique(result->begin(), result->end()), result->end());
-  return found_dir ? Status::OK() : Status::NotFound();
+  return found_dir;
 }
 
-void MockEnv::DeleteFileInternal(const std::string& fname) {
-  assert(fname == NormalizePath(fname));
+IOStatus MockFileSystem::GetChildren(const std::string& dir,
+                                     const IOOptions& /*options*/,
+                                     std::vector<std::string>* result,
+                                     IODebugContext* /*dbg*/) {
+  MutexLock lock(&mutex_);
+  bool found_dir = GetChildrenInternal(dir, result);
+  return found_dir ? IOStatus::OK() : IOStatus::NotFound(dir);
+}
+
+void MockFileSystem::DeleteFileInternal(const std::string& fname) {
+  assert(fname == NormalizeMockPath(fname));
   const auto& pair = file_map_.find(fname);
   if (pair != file_map_.end()) {
     pair->second->Unref();
@@ -563,164 +799,237 @@ void MockEnv::DeleteFileInternal(const std::string& fname) {
   }
 }
 
-Status MockEnv::DeleteFile(const std::string& fname) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::DeleteFile(const std::string& fname,
+                                    const IOOptions& /*options*/,
+                                    IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   if (file_map_.find(fn) == file_map_.end()) {
-    return Status::IOError(fn, "File not found");
+    return IOStatus::PathNotFound(fn);
   }
 
   DeleteFileInternal(fn);
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::Truncate(const std::string& fname, size_t size) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::Truncate(const std::string& fname, size_t size,
+                                  const IOOptions& options,
+                                  IODebugContext* dbg) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   auto iter = file_map_.find(fn);
   if (iter == file_map_.end()) {
-    return Status::IOError(fn, "File not found");
+    return IOStatus::PathNotFound(fn);
   }
-  iter->second->Truncate(size);
-  return Status::OK();
+  iter->second->Truncate(size, options, dbg);
+  return IOStatus::OK();
 }
 
-Status MockEnv::CreateDir(const std::string& dirname) {
-  auto dn = NormalizePath(dirname);
+IOStatus MockFileSystem::CreateDir(const std::string& dirname,
+                                   const IOOptions& /*options*/,
+                                   IODebugContext* /*dbg*/) {
+  auto dn = NormalizeMockPath(dirname);
+  MutexLock lock(&mutex_);
   if (file_map_.find(dn) == file_map_.end()) {
-    MemFile* file = new MemFile(this, dn, false);
+    MemFile* file = new MemFile(env_, dn, false);
     file->Ref();
     file_map_[dn] = file;
   } else {
-    return Status::IOError();
+    return IOStatus::IOError();
   }
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::CreateDirIfMissing(const std::string& dirname) {
-  CreateDir(dirname);
-  return Status::OK();
+IOStatus MockFileSystem::CreateDirIfMissing(const std::string& dirname,
+                                            const IOOptions& options,
+                                            IODebugContext* dbg) {
+  CreateDir(dirname, options, dbg).PermitUncheckedError();
+  return IOStatus::OK();
 }
 
-Status MockEnv::DeleteDir(const std::string& dirname) {
-  return DeleteFile(dirname);
+IOStatus MockFileSystem::DeleteDir(const std::string& dirname,
+                                   const IOOptions& /*options*/,
+                                   IODebugContext* /*dbg*/) {
+  auto dir = NormalizeMockPath(dirname);
+  MutexLock lock(&mutex_);
+  if (file_map_.find(dir) == file_map_.end()) {
+    return IOStatus::PathNotFound(dir);
+  } else {
+    std::vector<std::string> children;
+    if (GetChildrenInternal(dir, &children)) {
+      for (const auto& child : children) {
+        DeleteFileInternal(child);
+      }
+    }
+    DeleteFileInternal(dir);
+    return IOStatus::OK();
+  }
 }
 
-Status MockEnv::GetFileSize(const std::string& fname, uint64_t* file_size) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::GetFileSize(const std::string& fname,
+                                     const IOOptions& /*options*/,
+                                     uint64_t* file_size,
+                                     IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   auto iter = file_map_.find(fn);
   if (iter == file_map_.end()) {
-    return Status::IOError(fn, "File not found");
+    return IOStatus::PathNotFound(fn);
   }
 
   *file_size = iter->second->Size();
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::GetFileModificationTime(const std::string& fname,
-                                        uint64_t* time) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::GetFileModificationTime(const std::string& fname,
+                                                 const IOOptions& /*options*/,
+                                                 uint64_t* time,
+                                                 IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   auto iter = file_map_.find(fn);
   if (iter == file_map_.end()) {
-    return Status::IOError(fn, "File not found");
+    return IOStatus::PathNotFound(fn);
   }
   *time = iter->second->ModifiedTime();
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::RenameFile(const std::string& src, const std::string& dest) {
-  auto s = NormalizePath(src);
-  auto t = NormalizePath(dest);
-  MutexLock lock(&mutex_);
-  if (file_map_.find(s) == file_map_.end()) {
-    return Status::IOError(s, "File not found");
+bool MockFileSystem::RenameFileInternal(const std::string& src,
+                                        const std::string& dest) {
+  if (file_map_.find(src) == file_map_.end()) {
+    return false;
+  } else {
+    std::vector<std::string> children;
+    if (GetChildrenInternal(src, &children)) {
+      for (const auto& child : children) {
+        RenameFileInternal(src + "/" + child, dest + "/" + child);
+      }
+    }
+    DeleteFileInternal(dest);
+    file_map_[dest] = file_map_[src];
+    file_map_.erase(src);
+    return true;
   }
+}
 
-  DeleteFileInternal(t);
-  file_map_[t] = file_map_[s];
-  file_map_.erase(s);
-  return Status::OK();
+IOStatus MockFileSystem::RenameFile(const std::string& src,
+                                    const std::string& dest,
+                                    const IOOptions& /*options*/,
+                                    IODebugContext* /*dbg*/) {
+  auto s = NormalizeMockPath(src);
+  auto t = NormalizeMockPath(dest);
+  MutexLock lock(&mutex_);
+  bool found = RenameFileInternal(s, t);
+  if (!found) {
+    return IOStatus::PathNotFound(s);
+  } else {
+    return IOStatus::OK();
+  }
 }
 
-Status MockEnv::LinkFile(const std::string& src, const std::string& dest) {
-  auto s = NormalizePath(src);
-  auto t = NormalizePath(dest);
+IOStatus MockFileSystem::LinkFile(const std::string& src,
+                                  const std::string& dest,
+                                  const IOOptions& /*options*/,
+                                  IODebugContext* /*dbg*/) {
+  auto s = NormalizeMockPath(src);
+  auto t = NormalizeMockPath(dest);
   MutexLock lock(&mutex_);
   if (file_map_.find(s) == file_map_.end()) {
-    return Status::IOError(s, "File not found");
+    return IOStatus::PathNotFound(s);
   }
 
   DeleteFileInternal(t);
   file_map_[t] = file_map_[s];
   file_map_[t]->Ref();  // Otherwise it might get deleted when noone uses s
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::NewLogger(const std::string& fname,
-                          std::shared_ptr<Logger>* result) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::NewLogger(const std::string& fname,
+                                   const IOOptions& io_opts,
+                                   std::shared_ptr<Logger>* result,
+                                   IODebugContext* dbg) {
+  auto fn = NormalizeMockPath(fname);
   MutexLock lock(&mutex_);
   auto iter = file_map_.find(fn);
   MemFile* file = nullptr;
   if (iter == file_map_.end()) {
-    file = new MemFile(this, fn, false);
+    file = new MemFile(env_, fn, false);
     file->Ref();
     file_map_[fn] = file;
   } else {
     file = iter->second;
   }
-  std::unique_ptr<WritableFile> f(new MockWritableFile(file, nullptr));
-  result->reset(new TestMemLogger(std::move(f), this));
-  return Status::OK();
+  std::unique_ptr<FSWritableFile> f(new MockWritableFile(file, FileOptions()));
+  result->reset(new TestMemLogger(std::move(f), env_, io_opts, dbg));
+  return IOStatus::OK();
 }
 
-Status MockEnv::LockFile(const std::string& fname, FileLock** flock) {
-  auto fn = NormalizePath(fname);
+IOStatus MockFileSystem::LockFile(const std::string& fname,
+                                  const IOOptions& /*options*/,
+                                  FileLock** flock, IODebugContext* /*dbg*/) {
+  auto fn = NormalizeMockPath(fname);
   {
     MutexLock lock(&mutex_);
     if (file_map_.find(fn) != file_map_.end()) {
       if (!file_map_[fn]->is_lock_file()) {
-        return Status::InvalidArgument(fname, "Not a lock file.");
+        return IOStatus::InvalidArgument(fname, "Not a lock file.");
       }
       if (!file_map_[fn]->Lock()) {
-        return Status::IOError(fn, "Lock is already held.");
+        return IOStatus::IOError(fn, "lock is already held.");
       }
     } else {
-      auto* file = new MemFile(this, fn, true);
+      auto* file = new MemFile(env_, fn, true);
       file->Ref();
       file->Lock();
       file_map_[fn] = file;
     }
   }
   *flock = new MockEnvFileLock(fn);
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::UnlockFile(FileLock* flock) {
-  std::string fn =
-      static_cast_with_check<MockEnvFileLock, FileLock>(flock)->FileName();
+IOStatus MockFileSystem::UnlockFile(FileLock* flock,
+                                    const IOOptions& /*options*/,
+                                    IODebugContext* /*dbg*/) {
+  std::string fn = static_cast_with_check<MockEnvFileLock>(flock)->FileName();
   {
     MutexLock lock(&mutex_);
     if (file_map_.find(fn) != file_map_.end()) {
       if (!file_map_[fn]->is_lock_file()) {
-        return Status::InvalidArgument(fn, "Not a lock file.");
+        return IOStatus::InvalidArgument(fn, "Not a lock file.");
       }
       file_map_[fn]->Unlock();
     }
   }
   delete flock;
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status MockEnv::GetTestDirectory(std::string* path) {
+IOStatus MockFileSystem::GetTestDirectory(const IOOptions& /*options*/,
+                                          std::string* path,
+                                          IODebugContext* /*dbg*/) {
   *path = "/test";
+  return IOStatus::OK();
+}
+
+Status MockFileSystem::CorruptBuffer(const std::string& fname) {
+  auto fn = NormalizeMockPath(fname);
+  MutexLock lock(&mutex_);
+  auto iter = file_map_.find(fn);
+  if (iter == file_map_.end()) {
+    return Status::IOError(fn, "File not found");
+  }
+  iter->second->CorruptBuffer();
   return Status::OK();
 }
 
+MockEnv::MockEnv(Env* base_env)
+    : CompositeEnvWrapper(base_env, std::make_shared<MockFileSystem>(this)),
+      fake_sleep_micros_(0) {}
+
 Status MockEnv::GetCurrentTime(int64_t* unix_time) {
-  auto s = EnvWrapper::GetCurrentTime(unix_time);
+  auto s = CompositeEnvWrapper::GetCurrentTime(unix_time);
   if (s.ok()) {
     *unix_time += fake_sleep_micros_.load() / (1000 * 1000);
   }
@@ -728,33 +1037,16 @@ Status MockEnv::GetCurrentTime(int64_t* unix_time) {
 }
 
 uint64_t MockEnv::NowMicros() {
-  return EnvWrapper::NowMicros() + fake_sleep_micros_.load();
+  return CompositeEnvWrapper::NowMicros() + fake_sleep_micros_.load();
 }
 
 uint64_t MockEnv::NowNanos() {
-  return EnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
+  return CompositeEnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
 }
 
 Status MockEnv::CorruptBuffer(const std::string& fname) {
-  auto fn = NormalizePath(fname);
-  MutexLock lock(&mutex_);
-  auto iter = file_map_.find(fn);
-  if (iter == file_map_.end()) {
-    return Status::IOError(fn, "File not found");
-  }
-  iter->second->CorruptBuffer();
-  return Status::OK();
-}
-
-std::string MockEnv::NormalizePath(const std::string path) {
-  std::string dst;
-  for (auto c : path) {
-    if (!dst.empty() && c == '/' && dst.back() == '/') {
-      continue;
-    }
-    dst.push_back(c);
-  }
-  return dst;
+  auto mock = static_cast_with_check<MockFileSystem>(GetFileSystem().get());
+  return mock->CorruptBuffer(fname);
 }
 
 void MockEnv::FakeSleepForMicroseconds(int64_t micros) {