]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/env/env_encryption.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / env / env_encryption.cc
index ca2542abbb169d4ffa2ee3485e5ab6211b603481..c6b0a257dbf19804d9d0622f0bb71ee194188c4c 100644 (file)
 #include <cctype>
 #include <iostream>
 
+#include "env/composite_env_wrapper.h"
 #include "env/env_encryption_ctr.h"
 #include "monitoring/perf_context_imp.h"
 #include "rocksdb/convenience.h"
+#include "rocksdb/io_status.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/options_type.h"
 #include "util/aligned_buffer.h"
 #include "util/coding.h"
 #include "util/random.h"
 #include "util/string_util.h"
 
 #endif
-
 namespace ROCKSDB_NAMESPACE {
-
 #ifndef ROCKSDB_LITE
-static constexpr char kROT13CipherName[] = "ROT13";
-static constexpr char kCTRProviderName[] = "CTR";
-
-Status BlockCipher::CreateFromString(const ConfigOptions& /*config_options*/,
-                                     const std::string& value,
-                                     std::shared_ptr<BlockCipher>* result) {
-  std::string id = value;
-  size_t colon = value.find(':');
-  if (colon != std::string::npos) {
-    id = value.substr(0, colon);
-  }
-  if (id == kROT13CipherName) {
-    if (colon != std::string::npos) {
-      size_t block_size = ParseSizeT(value.substr(colon + 1));
-      result->reset(new ROT13BlockCipher(block_size));
-    } else {
-      result->reset(new ROT13BlockCipher(32));
-    }
-    return Status::OK();
-  } else {
-    return Status::NotSupported("Could not find cipher ", value);
-  }
-}
-
-Status EncryptionProvider::CreateFromString(
-    const ConfigOptions& /*config_options*/, const std::string& value,
-    std::shared_ptr<EncryptionProvider>* result) {
-  std::string id = value;
-  bool is_test = StartsWith(value, "test://");
-  Status status = Status::OK();
-  if (is_test) {
-    id = value.substr(strlen("test://"));
-  }
-  if (id == kCTRProviderName) {
-    result->reset(new CTREncryptionProvider());
-  } else if (is_test) {
-    result->reset(new CTREncryptionProvider());
-  } else {
-    return Status::NotSupported("Could not find provider ", value);
-  }
-  if (status.ok() && is_test) {
-    status = result->get()->TEST_Initialize();
-  }
-  return status;
-}
-
 std::shared_ptr<EncryptionProvider> EncryptionProvider::NewCTRProvider(
     const std::shared_ptr<BlockCipher>& cipher) {
   return std::make_shared<CTREncryptionProvider>(cipher);
 }
 
-  // Read up to "n" bytes from the file.  "scratch[0..n-1]" may be
-  // written by this routine.  Sets "*result" to the data that was
-  // read (including if fewer than "n" bytes were successfully read).
-  // May set "*result" to point at data in "scratch[0..n-1]", so
-  // "scratch[0..n-1]" must be live when "*result" is used.
-  // If an error was encountered, returns a non-OK status.
-  //
-  // REQUIRES: External synchronization
-Status EncryptedSequentialFile::Read(size_t n, Slice* result, char* scratch) {
+// Read up to "n" bytes from the file.  "scratch[0..n-1]" may be
+// written by this routine.  Sets "*result" to the data that was
+// read (including if fewer than "n" bytes were successfully read).
+// May set "*result" to point at data in "scratch[0..n-1]", so
+// "scratch[0..n-1]" must be live when "*result" is used.
+// If an error was encountered, returns a non-OK status.
+//
+// REQUIRES: External synchronization
+IOStatus EncryptedSequentialFile::Read(size_t n, const IOOptions& options,
+                                       Slice* result, char* scratch,
+                                       IODebugContext* dbg) {
   assert(scratch);
-  Status status = file_->Read(n, result, scratch);
-  if (!status.ok()) {
-    return status;
+  IOStatus io_s = file_->Read(n, options, result, scratch, dbg);
+  if (!io_s.ok()) {
+    return io_s;
   }
   {
     PERF_TIMER_GUARD(decrypt_data_nanos);
-    status = stream_->Decrypt(offset_, (char*)result->data(), result->size());
+    io_s = status_to_io_status(
+        stream_->Decrypt(offset_, (char*)result->data(), result->size()));
   }
-  offset_ += result->size();  // We've already ready data from disk, so update
-                              // offset_ even if decryption fails.
-  return status;
+  if (io_s.ok()) {
+    offset_ += result->size();  // We've already ready data from disk, so update
+                                // offset_ even if decryption fails.
+  }
+  return io_s;
 }
 
 // Skip "n" bytes from the file. This is guaranteed to be no
@@ -106,7 +68,7 @@ Status EncryptedSequentialFile::Read(size_t n, Slice* result, char* scratch) {
 // file, and Skip will return OK.
 //
 // REQUIRES: External synchronization
-Status EncryptedSequentialFile::Skip(uint64_t n) {
+IOStatus EncryptedSequentialFile::Skip(uint64_t n) {
   auto status = file_->Skip(n);
   if (!status.ok()) {
     return status;
@@ -127,77 +89,86 @@ size_t EncryptedSequentialFile::GetRequiredBufferAlignment() const {
   return file_->GetRequiredBufferAlignment();
 }
 
-  // Remove any kind of caching of data from the offset to offset+length
-  // of this file. If the length is 0, then it refers to the end of file.
-  // If the system is not caching the file contents, then this is a noop.
-Status EncryptedSequentialFile::InvalidateCache(size_t offset, size_t length) {
+// Remove any kind of caching of data from the offset to offset+length
+// of this file. If the length is 0, then it refers to the end of file.
+// If the system is not caching the file contents, then this is a noop.
+IOStatus EncryptedSequentialFile::InvalidateCache(size_t offset,
+                                                  size_t length) {
   return file_->InvalidateCache(offset + prefixLength_, length);
 }
 
-  // Positioned Read for direct I/O
-  // If Direct I/O enabled, offset, n, and scratch should be properly aligned
-Status EncryptedSequentialFile::PositionedRead(uint64_t offset, size_t n,
-                                               Slice* result, char* scratch) {
+// Positioned Read for direct I/O
+// If Direct I/O enabled, offset, n, and scratch should be properly aligned
+IOStatus EncryptedSequentialFile::PositionedRead(uint64_t offset, size_t n,
+                                                 const IOOptions& options,
+                                                 Slice* result, char* scratch,
+                                                 IODebugContext* dbg) {
   assert(scratch);
   offset += prefixLength_;  // Skip prefix
-  auto status = file_->PositionedRead(offset, n, result, scratch);
-  if (!status.ok()) {
-    return status;
+  auto io_s = file_->PositionedRead(offset, n, options, result, scratch, dbg);
+  if (!io_s.ok()) {
+    return io_s;
   }
   offset_ = offset + result->size();
   {
     PERF_TIMER_GUARD(decrypt_data_nanos);
-    status = stream_->Decrypt(offset, (char*)result->data(), result->size());
+    io_s = status_to_io_status(
+        stream_->Decrypt(offset, (char*)result->data(), result->size()));
   }
-  return status;
+  return io_s;
 }
 
-  // Read up to "n" bytes from the file starting at "offset".
-  // "scratch[0..n-1]" may be written by this routine.  Sets "*result"
-  // to the data that was read (including if fewer than "n" bytes were
-  // successfully read).  May set "*result" to point at data in
-  // "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
-  // "*result" is used.  If an error was encountered, returns a non-OK
-  // status.
-  //
-  // Safe for concurrent use by multiple threads.
-  // If Direct I/O enabled, offset, n, and scratch should be aligned properly.
-Status EncryptedRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
-                                       char* scratch) const {
+// Read up to "n" bytes from the file starting at "offset".
+// "scratch[0..n-1]" may be written by this routine.  Sets "*result"
+// to the data that was read (including if fewer than "n" bytes were
+// successfully read).  May set "*result" to point at data in
+// "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
+// "*result" is used.  If an error was encountered, returns a non-OK
+// status.
+//
+// Safe for concurrent use by multiple threads.
+// If Direct I/O enabled, offset, n, and scratch should be aligned properly.
+IOStatus EncryptedRandomAccessFile::Read(uint64_t offset, size_t n,
+                                         const IOOptions& options,
+                                         Slice* result, char* scratch,
+                                         IODebugContext* dbg) const {
   assert(scratch);
   offset += prefixLength_;
-  auto status = file_->Read(offset, n, result, scratch);
-  if (!status.ok()) {
-    return status;
+  auto io_s = file_->Read(offset, n, options, result, scratch, dbg);
+  if (!io_s.ok()) {
+    return io_s;
   }
   {
     PERF_TIMER_GUARD(decrypt_data_nanos);
-    status = stream_->Decrypt(offset, (char*)result->data(), result->size());
+    io_s = status_to_io_status(
+        stream_->Decrypt(offset, (char*)result->data(), result->size()));
   }
-  return status;
+  return io_s;
 }
 
-  // Readahead the file starting from offset by n bytes for caching.
-Status EncryptedRandomAccessFile::Prefetch(uint64_t offset, size_t n) {
+// Readahead the file starting from offset by n bytes for caching.
+IOStatus EncryptedRandomAccessFile::Prefetch(uint64_t offset, size_t n,
+                                             const IOOptions& options,
+                                             IODebugContext* dbg) {
   // return Status::OK();
-  return file_->Prefetch(offset + prefixLength_, n);
+  return file_->Prefetch(offset + prefixLength_, n, options, dbg);
 }
 
-  // Tries to get an unique ID for this file that will be the same each time
-  // the file is opened (and will stay the same while the file is open).
-  // Furthermore, it tries to make this ID at most "max_size" bytes. If such an
-  // ID can be created this function returns the length of the ID and places it
-  // in "id"; otherwise, this function returns 0, in which case "id"
-  // may not have been modified.
-  //
-  // This function guarantees, for IDs from a given environment, two unique ids
-  // cannot be made equal to each other by adding arbitrary bytes to one of
-  // them. That is, no unique ID is the prefix of another.
-  //
-  // This function guarantees that the returned ID will not be interpretable as
-  // a single varint.
-  //
-  // Note: these IDs are only valid for the duration of the process.
+// Tries to get an unique ID for this file that will be the same each time
+// the file is opened (and will stay the same while the file is open).
+// Furthermore, it tries to make this ID at most "max_size" bytes. If such an
+// ID can be created this function returns the length of the ID and places it
+// in "id"; otherwise, this function returns 0, in which case "id"
+// may not have been modified.
+//
+// This function guarantees, for IDs from a given environment, two unique ids
+// cannot be made equal to each other by adding arbitrary bytes to one of
+// them. That is, no unique ID is the prefix of another.
+//
+// This function guarantees that the returned ID will not be interpretable as
+// a single varint.
+//
+// Note: these IDs are only valid for the duration of the process.
 size_t EncryptedRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
   return file_->GetUniqueId(id, max_size);
 };
@@ -206,35 +177,36 @@ void EncryptedRandomAccessFile::Hint(AccessPattern pattern) {
   file_->Hint(pattern);
 }
 
-  // Indicates the upper layers if the current RandomAccessFile implementation
-  // uses direct IO.
+// Indicates the upper layers if the current RandomAccessFile implementation
+// uses direct IO.
 bool EncryptedRandomAccessFile::use_direct_io() const {
   return file_->use_direct_io();
 }
 
-  // Use the returned alignment value to allocate
-  // aligned buffer for Direct I/O
+// Use the returned alignment value to allocate
+// aligned buffer for Direct I/O
 size_t EncryptedRandomAccessFile::GetRequiredBufferAlignment() const {
   return file_->GetRequiredBufferAlignment();
 }
 
-  // Remove any kind of caching of data from the offset to offset+length
-  // of this file. If the length is 0, then it refers to the end of file.
-  // If the system is not caching the file contents, then this is a noop.
-Status EncryptedRandomAccessFile::InvalidateCache(size_t offset,
-                                                  size_t length) {
+// Remove any kind of caching of data from the offset to offset+length
+// of this file. If the length is 0, then it refers to the end of file.
+// If the system is not caching the file contents, then this is a noop.
+IOStatus EncryptedRandomAccessFile::InvalidateCache(size_t offset,
+                                                    size_t length) {
   return file_->InvalidateCache(offset + prefixLength_, length);
 }
 
 // A file abstraction for sequential writing.  The implementation
 // must provide buffering since callers may append small fragments
 // at a time to the file.
-Status EncryptedWritableFile::Append(const Slice& data) {
+IOStatus EncryptedWritableFile::Append(const Slice& data,
+                                       const IOOptions& options,
+                                       IODebugContext* dbg) {
   AlignedBuffer buf;
-  Status status;
   Slice dataToAppend(data);
   if (data.size() > 0) {
-    auto offset = file_->GetFileSize();  // size including prefix
+    auto offset = file_->GetFileSize(options, dbg);  // size including prefix
     // Encrypt in cloned buffer
     buf.Alignment(GetRequiredBufferAlignment());
     buf.AllocateNewBuffer(data.size());
@@ -242,26 +214,25 @@ Status EncryptedWritableFile::Append(const Slice& data) {
     // so that the next two lines can be replaced with buf.Append().
     memmove(buf.BufferStart(), data.data(), data.size());
     buf.Size(data.size());
+    IOStatus io_s;
     {
       PERF_TIMER_GUARD(encrypt_data_nanos);
-      status = stream_->Encrypt(offset, buf.BufferStart(), buf.CurrentSize());
+      io_s = status_to_io_status(
+          stream_->Encrypt(offset, buf.BufferStart(), buf.CurrentSize()));
     }
-    if (!status.ok()) {
-      return status;
+    if (!io_s.ok()) {
+      return io_s;
     }
     dataToAppend = Slice(buf.BufferStart(), buf.CurrentSize());
   }
-  status = file_->Append(dataToAppend);
-  if (!status.ok()) {
-    return status;
-  }
-  return status;
+  return file_->Append(dataToAppend, options, dbg);
 }
 
-Status EncryptedWritableFile::PositionedAppend(const Slice& data,
-                                               uint64_t offset) {
+IOStatus EncryptedWritableFile::PositionedAppend(const Slice& data,
+                                                 uint64_t offset,
+                                                 const IOOptions& options,
+                                                 IODebugContext* dbg) {
   AlignedBuffer buf;
-  Status status;
   Slice dataToAppend(data);
   offset += prefixLength_;
   if (data.size() > 0) {
@@ -270,30 +241,34 @@ Status EncryptedWritableFile::PositionedAppend(const Slice& data,
     buf.AllocateNewBuffer(data.size());
     memmove(buf.BufferStart(), data.data(), data.size());
     buf.Size(data.size());
+    IOStatus io_s;
     {
       PERF_TIMER_GUARD(encrypt_data_nanos);
-      status = stream_->Encrypt(offset, buf.BufferStart(), buf.CurrentSize());
+      io_s = status_to_io_status(
+          stream_->Encrypt(offset, buf.BufferStart(), buf.CurrentSize()));
     }
-    if (!status.ok()) {
-      return status;
+    if (!io_s.ok()) {
+      return io_s;
     }
     dataToAppend = Slice(buf.BufferStart(), buf.CurrentSize());
   }
-  status = file_->PositionedAppend(dataToAppend, offset);
-  if (!status.ok()) {
-    return status;
-  }
-  return status;
+  return file_->PositionedAppend(dataToAppend, offset, options, dbg);
 }
 
-  // Indicates the upper layers if the current WritableFile implementation
-  // uses direct IO.
+// Indicates the upper layers if the current WritableFile implementation
+// uses direct IO.
 bool EncryptedWritableFile::use_direct_io() const {
   return file_->use_direct_io();
 }
 
-  // Use the returned alignment value to allocate
-  // aligned buffer for Direct I/O
+// true if Sync() and Fsync() are safe to call concurrently with Append()
+// and Flush().
+bool EncryptedWritableFile::IsSyncThreadSafe() const {
+  return file_->IsSyncThreadSafe();
+}
+
+// Use the returned alignment value to allocate
+// aligned buffer for Direct I/O
 size_t EncryptedWritableFile::GetRequiredBufferAlignment() const {
   return file_->GetRequiredBufferAlignment();
 }
@@ -301,48 +276,83 @@ size_t EncryptedWritableFile::GetRequiredBufferAlignment() const {
 /*
  * Get the size of valid data in the file.
  */
-uint64_t EncryptedWritableFile::GetFileSize() {
-  return file_->GetFileSize() - prefixLength_;
+uint64_t EncryptedWritableFile::GetFileSize(const IOOptions& options,
+                                            IODebugContext* dbg) {
+  return file_->GetFileSize(options, dbg) - prefixLength_;
 }
 
-  // Truncate is necessary to trim the file to the correct size
-  // before closing. It is not always possible to keep track of the file
-  // size due to whole pages writes. The behavior is undefined if called
-  // with other writes to follow.
-Status EncryptedWritableFile::Truncate(uint64_t size) {
-  return file_->Truncate(size + prefixLength_);
+// Truncate is necessary to trim the file to the correct size
+// before closing. It is not always possible to keep track of the file
+// size due to whole pages writes. The behavior is undefined if called
+// with other writes to follow.
+IOStatus EncryptedWritableFile::Truncate(uint64_t size,
+                                         const IOOptions& options,
+                                         IODebugContext* dbg) {
+  return file_->Truncate(size + prefixLength_, options, dbg);
 }
 
-    // Remove any kind of caching of data from the offset to offset+length
-  // of this file. If the length is 0, then it refers to the end of file.
-  // If the system is not caching the file contents, then this is a noop.
-  // This call has no effect on dirty pages in the cache.
-Status EncryptedWritableFile::InvalidateCache(size_t offset, size_t length) {
+// Remove any kind of caching of data from the offset to offset+length
+// of this file. If the length is 0, then it refers to the end of file.
+// If the system is not caching the file contents, then this is a noop.
+// This call has no effect on dirty pages in the cache.
+IOStatus EncryptedWritableFile::InvalidateCache(size_t offset, size_t length) {
   return file_->InvalidateCache(offset + prefixLength_, length);
 }
 
-  // Sync a file range with disk.
-  // offset is the starting byte of the file range to be synchronized.
-  // nbytes specifies the length of the range to be synchronized.
-  // This asks the OS to initiate flushing the cached data to disk,
-  // without waiting for completion.
-  // Default implementation does nothing.
-Status EncryptedWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) {
-  return file_->RangeSync(offset + prefixLength_, nbytes);
+// Sync a file range with disk.
+// offset is the starting byte of the file range to be synchronized.
+// nbytes specifies the length of the range to be synchronized.
+// This asks the OS to initiate flushing the cached data to disk,
+// without waiting for completion.
+// Default implementation does nothing.
+IOStatus EncryptedWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
+                                          const IOOptions& options,
+                                          IODebugContext* dbg) {
+  return file_->RangeSync(offset + prefixLength_, nbytes, options, dbg);
+}
+
+// PrepareWrite performs any necessary preparation for a write
+// before the write actually occurs.  This allows for pre-allocation
+// of space on devices where it can result in less file
+// fragmentation and/or less waste from over-zealous filesystem
+// pre-allocation.
+void EncryptedWritableFile::PrepareWrite(size_t offset, size_t len,
+                                         const IOOptions& options,
+                                         IODebugContext* dbg) {
+  file_->PrepareWrite(offset + prefixLength_, len, options, dbg);
+}
+
+void EncryptedWritableFile::SetPreallocationBlockSize(size_t size) {
+  // the size here doesn't need to include prefixLength_, as it's a
+  // configuration will be use for `PrepareWrite()`.
+  file_->SetPreallocationBlockSize(size);
+}
+
+void EncryptedWritableFile::GetPreallocationStatus(
+    size_t* block_size, size_t* last_allocated_block) {
+  file_->GetPreallocationStatus(block_size, last_allocated_block);
+}
+
+// Pre-allocates space for a file.
+IOStatus EncryptedWritableFile::Allocate(uint64_t offset, uint64_t len,
+                                         const IOOptions& options,
+                                         IODebugContext* dbg) {
+  return file_->Allocate(offset + prefixLength_, len, options, dbg);
+}
+
+IOStatus EncryptedWritableFile::Flush(const IOOptions& options,
+                                      IODebugContext* dbg) {
+  return file_->Flush(options, dbg);
 }
 
-  // PrepareWrite performs any necessary preparation for a write
-  // before the write actually occurs.  This allows for pre-allocation
-  // of space on devices where it can result in less file
-  // fragmentation and/or less waste from over-zealous filesystem
-  // pre-allocation.
-void EncryptedWritableFile::PrepareWrite(size_t offset, size_t len) {
-  file_->PrepareWrite(offset + prefixLength_, len);
+IOStatus EncryptedWritableFile::Sync(const IOOptions& options,
+                                     IODebugContext* dbg) {
+  return file_->Sync(options, dbg);
 }
 
-  // Pre-allocates space for a file.
-Status EncryptedWritableFile::Allocate(uint64_t offset, uint64_t len) {
-  return file_->Allocate(offset + prefixLength_, len);
+IOStatus EncryptedWritableFile::Close(const IOOptions& options,
+                                      IODebugContext* dbg) {
+  return file_->Close(options, dbg);
 }
 
 // A file abstraction for random reading and writing.
@@ -353,17 +363,18 @@ bool EncryptedRandomRWFile::use_direct_io() const {
   return file_->use_direct_io();
 }
 
-  // Use the returned alignment value to allocate
-  // aligned buffer for Direct I/O
+// Use the returned alignment value to allocate
+// aligned buffer for Direct I/O
 size_t EncryptedRandomRWFile::GetRequiredBufferAlignment() const {
   return file_->GetRequiredBufferAlignment();
 }
 
-  // Write bytes in `data` at  offset `offset`, Returns Status::OK() on success.
-  // Pass aligned buffer when use_direct_io() returns true.
-Status EncryptedRandomRWFile::Write(uint64_t offset, const Slice& data) {
+// Write bytes in `data` at  offset `offset`, Returns Status::OK() on success.
+// Pass aligned buffer when use_direct_io() returns true.
+IOStatus EncryptedRandomRWFile::Write(uint64_t offset, const Slice& data,
+                                      const IOOptions& options,
+                                      IODebugContext* dbg) {
   AlignedBuffer buf;
-  Status status;
   Slice dataToWrite(data);
   offset += prefixLength_;
   if (data.size() > 0) {
@@ -372,71 +383,98 @@ Status EncryptedRandomRWFile::Write(uint64_t offset, const Slice& data) {
     buf.AllocateNewBuffer(data.size());
     memmove(buf.BufferStart(), data.data(), data.size());
     buf.Size(data.size());
+    IOStatus io_s;
     {
       PERF_TIMER_GUARD(encrypt_data_nanos);
-      status = stream_->Encrypt(offset, buf.BufferStart(), buf.CurrentSize());
+      io_s = status_to_io_status(
+          stream_->Encrypt(offset, buf.BufferStart(), buf.CurrentSize()));
     }
-    if (!status.ok()) {
-      return status;
+    if (!io_s.ok()) {
+      return io_s;
     }
     dataToWrite = Slice(buf.BufferStart(), buf.CurrentSize());
   }
-  status = file_->Write(offset, dataToWrite);
-  return status;
+  return file_->Write(offset, dataToWrite, options, dbg);
 }
 
-  // Read up to `n` bytes starting from offset `offset` and store them in
-  // result, provided `scratch` size should be at least `n`.
-  // Returns Status::OK() on success.
-Status EncryptedRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
-                                   char* scratch) const {
+// Read up to `n` bytes starting from offset `offset` and store them in
+// result, provided `scratch` size should be at least `n`.
+// Returns Status::OK() on success.
+IOStatus EncryptedRandomRWFile::Read(uint64_t offset, size_t n,
+                                     const IOOptions& options, Slice* result,
+                                     char* scratch, IODebugContext* dbg) const {
   assert(scratch);
   offset += prefixLength_;
-  auto status = file_->Read(offset, n, result, scratch);
+  auto status = file_->Read(offset, n, options, result, scratch, dbg);
   if (!status.ok()) {
     return status;
   }
   {
     PERF_TIMER_GUARD(decrypt_data_nanos);
-    status = stream_->Decrypt(offset, (char*)result->data(), result->size());
+    status = status_to_io_status(
+        stream_->Decrypt(offset, (char*)result->data(), result->size()));
   }
   return status;
 }
 
-Status EncryptedRandomRWFile::Flush() { return file_->Flush(); }
+IOStatus EncryptedRandomRWFile::Flush(const IOOptions& options,
+                                      IODebugContext* dbg) {
+  return file_->Flush(options, dbg);
+}
 
-Status EncryptedRandomRWFile::Sync() { return file_->Sync(); }
+IOStatus EncryptedRandomRWFile::Sync(const IOOptions& options,
+                                     IODebugContext* dbg) {
+  return file_->Sync(options, dbg);
+}
 
-Status EncryptedRandomRWFile::Fsync() { return file_->Fsync(); }
+IOStatus EncryptedRandomRWFile::Fsync(const IOOptions& options,
+                                      IODebugContext* dbg) {
+  return file_->Fsync(options, dbg);
+}
 
-Status EncryptedRandomRWFile::Close() { return file_->Close(); }
+IOStatus EncryptedRandomRWFile::Close(const IOOptions& options,
+                                      IODebugContext* dbg) {
+  return file_->Close(options, dbg);
+}
 
-// EncryptedEnv implements an Env wrapper that adds encryption to files stored
-// on disk.
-class EncryptedEnvImpl : public EnvWrapper {
+namespace {
+static std::unordered_map<std::string, OptionTypeInfo> encrypted_fs_type_info =
+    {
+        {"provider",
+         OptionTypeInfo::AsCustomSharedPtr<EncryptionProvider>(
+             0 /* No offset, whole struct*/, OptionVerificationType::kByName,
+             OptionTypeFlags::kNone)},
+};
+// EncryptedFileSystemImpl implements an FileSystemWrapper that adds encryption
+// to files stored on disk.
+class EncryptedFileSystemImpl : public EncryptedFileSystem {
+ public:
+  const char* Name() const override {
+    return EncryptedFileSystem::kClassName();
+  }
   // Returns the raw encryption provider that should be used to write the input
   // encrypted file.  If there is no such provider, NotFound is returned.
-  Status GetWritableProvider(const std::string& /*fname*/,
-                             EncryptionProvider** result) {
+  IOStatus GetWritableProvider(const std::string& /*fname*/,
+                               EncryptionProvider** result) {
     if (provider_) {
       *result = provider_.get();
-      return Status::OK();
+      return IOStatus::OK();
     } else {
       *result = nullptr;
-      return Status::NotFound("No WriteProvider specified");
+      return IOStatus::NotFound("No WriteProvider specified");
     }
   }
 
   // Returns the raw encryption provider that should be used to read the input
   // encrypted file.  If there is no such provider, NotFound is returned.
-  Status GetReadableProvider(const std::string& /*fname*/,
-                             EncryptionProvider** result) {
+  IOStatus GetReadableProvider(const std::string& /*fname*/,
+                               EncryptionProvider** result) {
     if (provider_) {
       *result = provider_.get();
-      return Status::OK();
+      return IOStatus::OK();
     } else {
       *result = nullptr;
-      return Status::NotFound("No Provider specified");
+      return IOStatus::NotFound("No Provider specified");
     }
   }
 
@@ -452,13 +490,13 @@ class EncryptedEnvImpl : public EnvWrapper {
   // should be encrypted
   // @return OK on success, non-OK on failure.
   template <class TypeFile>
-  Status CreateWritableCipherStream(
+  IOStatus CreateWritableCipherStream(
       const std::string& fname, const std::unique_ptr<TypeFile>& underlying,
-      const EnvOptions& options, size_t* prefix_length,
-      std::unique_ptr<BlockAccessCipherStream>* stream) {
+      const FileOptions& options, size_t* prefix_length,
+      std::unique_ptr<BlockAccessCipherStream>* stream, IODebugContext* dbg) {
     EncryptionProvider* provider = nullptr;
     *prefix_length = 0;
-    Status status = GetWritableProvider(fname, &provider);
+    IOStatus status = GetWritableProvider(fname, &provider);
     if (!status.ok()) {
       return status;
     } else if (provider != nullptr) {
@@ -470,34 +508,36 @@ class EncryptedEnvImpl : public EnvWrapper {
         // Initialize prefix
         buffer.Alignment(underlying->GetRequiredBufferAlignment());
         buffer.AllocateNewBuffer(*prefix_length);
-        status = provider->CreateNewPrefix(fname, buffer.BufferStart(),
-                                           *prefix_length);
+        status = status_to_io_status(provider->CreateNewPrefix(
+            fname, buffer.BufferStart(), *prefix_length));
         if (status.ok()) {
           buffer.Size(*prefix_length);
           prefix = Slice(buffer.BufferStart(), buffer.CurrentSize());
           // Write prefix
-          status = underlying->Append(prefix);
+          status = underlying->Append(prefix, options.io_options, dbg);
         }
         if (!status.ok()) {
           return status;
         }
       }
       // Create cipher stream
-      status = provider->CreateCipherStream(fname, options, prefix, stream);
+      status = status_to_io_status(
+          provider->CreateCipherStream(fname, options, prefix, stream));
     }
     return status;
   }
 
   template <class TypeFile>
-  Status CreateWritableEncryptedFile(const std::string& fname,
-                                     std::unique_ptr<TypeFile>& underlying,
-                                     const EnvOptions& options,
-                                     std::unique_ptr<TypeFile>* result) {
+  IOStatus CreateWritableEncryptedFile(const std::string& fname,
+                                       std::unique_ptr<TypeFile>& underlying,
+                                       const FileOptions& options,
+                                       std::unique_ptr<TypeFile>* result,
+                                       IODebugContext* dbg) {
     // Create cipher stream
     std::unique_ptr<BlockAccessCipherStream> stream;
     size_t prefix_length;
-    Status status = CreateWritableCipherStream(fname, underlying, options,
-                                               &prefix_length, &stream);
+    IOStatus status = CreateWritableCipherStream(fname, underlying, options,
+                                                 &prefix_length, &stream, dbg);
     if (status.ok()) {
       if (stream) {
         result->reset(new EncryptedWritableFile(
@@ -521,15 +561,15 @@ class EncryptedEnvImpl : public EnvWrapper {
   // should be encrypted
   // @return OK on success, non-OK on failure.
   template <class TypeFile>
-  Status CreateRandomWriteCipherStream(
+  IOStatus CreateRandomWriteCipherStream(
       const std::string& fname, const std::unique_ptr<TypeFile>& underlying,
-      const EnvOptions& options, size_t* prefix_length,
-      std::unique_ptr<BlockAccessCipherStream>* stream) {
+      const FileOptions& options, size_t* prefix_length,
+      std::unique_ptr<BlockAccessCipherStream>* stream, IODebugContext* dbg) {
     EncryptionProvider* provider = nullptr;
     *prefix_length = 0;
-    Status status = GetWritableProvider(fname, &provider);
-    if (!status.ok()) {
-      return status;
+    IOStatus io_s = GetWritableProvider(fname, &provider);
+    if (!io_s.ok()) {
+      return io_s;
     } else if (provider != nullptr) {
       // Initialize & write prefix (if needed)
       AlignedBuffer buffer;
@@ -539,22 +579,23 @@ class EncryptedEnvImpl : public EnvWrapper {
         // Initialize prefix
         buffer.Alignment(underlying->GetRequiredBufferAlignment());
         buffer.AllocateNewBuffer(*prefix_length);
-        status = provider->CreateNewPrefix(fname, buffer.BufferStart(),
-                                           *prefix_length);
-        if (status.ok()) {
+        io_s = status_to_io_status(provider->CreateNewPrefix(
+            fname, buffer.BufferStart(), *prefix_length));
+        if (io_s.ok()) {
           buffer.Size(*prefix_length);
           prefix = Slice(buffer.BufferStart(), buffer.CurrentSize());
           // Write prefix
-          status = underlying->Write(0, prefix);
+          io_s = underlying->Write(0, prefix, options.io_options, dbg);
         }
-        if (!status.ok()) {
-          return status;
+        if (!io_s.ok()) {
+          return io_s;
         }
       }
       // Create cipher stream
-      status = provider->CreateCipherStream(fname, options, prefix, stream);
+      io_s = status_to_io_status(
+          provider->CreateCipherStream(fname, options, prefix, stream));
     }
-    return status;
+    return io_s;
   }
 
   // Creates a CipherStream for the underlying file/name using the options
@@ -569,10 +610,10 @@ class EncryptedEnvImpl : public EnvWrapper {
   // is encrypted
   // @return OK on success, non-OK on failure.
   template <class TypeFile>
-  Status CreateSequentialCipherStream(
+  IOStatus CreateSequentialCipherStream(
       const std::string& fname, const std::unique_ptr<TypeFile>& underlying,
-      const EnvOptions& options, size_t* prefix_length,
-      std::unique_ptr<BlockAccessCipherStream>* stream) {
+      const FileOptions& options, size_t* prefix_length,
+      std::unique_ptr<BlockAccessCipherStream>* stream, IODebugContext* dbg) {
     // Read prefix (if needed)
     AlignedBuffer buffer;
     Slice prefix;
@@ -581,14 +622,15 @@ class EncryptedEnvImpl : public EnvWrapper {
       // Read prefix
       buffer.Alignment(underlying->GetRequiredBufferAlignment());
       buffer.AllocateNewBuffer(*prefix_length);
-      Status status =
-          underlying->Read(*prefix_length, &prefix, buffer.BufferStart());
+      IOStatus status = underlying->Read(*prefix_length, options.io_options,
+                                         &prefix, buffer.BufferStart(), dbg);
       if (!status.ok()) {
         return status;
       }
       buffer.Size(*prefix_length);
     }
-    return provider_->CreateCipherStream(fname, options, prefix, stream);
+    return status_to_io_status(
+        provider_->CreateCipherStream(fname, options, prefix, stream));
   }
 
   // Creates a CipherStream for the underlying file/name using the options
@@ -603,10 +645,10 @@ class EncryptedEnvImpl : public EnvWrapper {
   // is encrypted
   // @return OK on success, non-OK on failure.
   template <class TypeFile>
-  Status CreateRandomReadCipherStream(
+  IOStatus CreateRandomReadCipherStream(
       const std::string& fname, const std::unique_ptr<TypeFile>& underlying,
-      const EnvOptions& options, size_t* prefix_length,
-      std::unique_ptr<BlockAccessCipherStream>* stream) {
+      const FileOptions& options, size_t* prefix_length,
+      std::unique_ptr<BlockAccessCipherStream>* stream, IODebugContext* dbg) {
     // Read prefix (if needed)
     AlignedBuffer buffer;
     Slice prefix;
@@ -615,42 +657,61 @@ class EncryptedEnvImpl : public EnvWrapper {
       // Read prefix
       buffer.Alignment(underlying->GetRequiredBufferAlignment());
       buffer.AllocateNewBuffer(*prefix_length);
-      Status status =
-          underlying->Read(0, *prefix_length, &prefix, buffer.BufferStart());
+      IOStatus status = underlying->Read(0, *prefix_length, options.io_options,
+                                         &prefix, buffer.BufferStart(), dbg);
       if (!status.ok()) {
         return status;
       }
       buffer.Size(*prefix_length);
     }
-    return provider_->CreateCipherStream(fname, options, prefix, stream);
+    return status_to_io_status(
+        provider_->CreateCipherStream(fname, options, prefix, stream));
   }
 
  public:
-  EncryptedEnvImpl(Env* base_env,
-                   const std::shared_ptr<EncryptionProvider>& provider)
-      : EnvWrapper(base_env) {
+  EncryptedFileSystemImpl(const std::shared_ptr<FileSystem>& base,
+                          const std::shared_ptr<EncryptionProvider>& provider)
+      : EncryptedFileSystem(base) {
     provider_ = provider;
+    RegisterOptions("EncryptionProvider", &provider_, &encrypted_fs_type_info);
+  }
+
+  Status AddCipher(const std::string& descriptor, const char* cipher,
+                   size_t len, bool for_write) override {
+    return provider_->AddCipher(descriptor, cipher, len, for_write);
   }
 
   // NewSequentialFile opens a file for sequential reading.
-  virtual Status NewSequentialFile(const std::string& fname,
-                                   std::unique_ptr<SequentialFile>* result,
-                                   const EnvOptions& options) override {
+  IOStatus NewSequentialFile(const std::string& fname,
+                             const FileOptions& options,
+                             std::unique_ptr<FSSequentialFile>* result,
+                             IODebugContext* dbg) override {
     result->reset();
     if (options.use_mmap_reads) {
-      return Status::InvalidArgument();
+      return IOStatus::InvalidArgument();
     }
     // Open file using underlying Env implementation
-    std::unique_ptr<SequentialFile> underlying;
-    auto status = EnvWrapper::NewSequentialFile(fname, &underlying, options);
+    std::unique_ptr<FSSequentialFile> underlying;
+    auto status =
+        FileSystemWrapper::NewSequentialFile(fname, options, &underlying, dbg);
+    if (!status.ok()) {
+      return status;
+    }
+    uint64_t file_size;
+    status = FileSystemWrapper::GetFileSize(fname, options.io_options,
+                                            &file_size, dbg);
     if (!status.ok()) {
       return status;
     }
+    if (!file_size) {
+      *result = std::move(underlying);
+      return status;
+    }
     // Create cipher stream
     std::unique_ptr<BlockAccessCipherStream> stream;
     size_t prefix_length;
     status = CreateSequentialCipherStream(fname, underlying, options,
-                                          &prefix_length, &stream);
+                                          &prefix_length, &stream, dbg);
     if (status.ok()) {
       result->reset(new EncryptedSequentialFile(
           std::move(underlying), std::move(stream), prefix_length));
@@ -659,23 +720,25 @@ class EncryptedEnvImpl : public EnvWrapper {
   }
 
   // NewRandomAccessFile opens a file for random read access.
-  virtual Status NewRandomAccessFile(const std::string& fname,
-                                     std::unique_ptr<RandomAccessFile>* result,
-                                     const EnvOptions& options) override {
+  IOStatus NewRandomAccessFile(const std::string& fname,
+                               const FileOptions& options,
+                               std::unique_ptr<FSRandomAccessFile>* result,
+                               IODebugContext* dbg) override {
     result->reset();
     if (options.use_mmap_reads) {
-      return Status::InvalidArgument();
+      return IOStatus::InvalidArgument();
     }
     // Open file using underlying Env implementation
-    std::unique_ptr<RandomAccessFile> underlying;
-    auto status = EnvWrapper::NewRandomAccessFile(fname, &underlying, options);
+    std::unique_ptr<FSRandomAccessFile> underlying;
+    auto status = FileSystemWrapper::NewRandomAccessFile(fname, options,
+                                                         &underlying, dbg);
     if (!status.ok()) {
       return status;
     }
     std::unique_ptr<BlockAccessCipherStream> stream;
     size_t prefix_length;
     status = CreateRandomReadCipherStream(fname, underlying, options,
-                                          &prefix_length, &stream);
+                                          &prefix_length, &stream, dbg);
     if (status.ok()) {
       if (stream) {
         result->reset(new EncryptedRandomAccessFile(
@@ -688,20 +751,21 @@ class EncryptedEnvImpl : public EnvWrapper {
   }
 
   // NewWritableFile opens a file for sequential writing.
-  virtual Status NewWritableFile(const std::string& fname,
-                                 std::unique_ptr<WritableFile>* result,
-                                 const EnvOptions& options) override {
+  IOStatus NewWritableFile(const std::string& fname, const FileOptions& options,
+                           std::unique_ptr<FSWritableFile>* result,
+                           IODebugContext* dbg) override {
     result->reset();
     if (options.use_mmap_writes) {
-      return Status::InvalidArgument();
+      return IOStatus::InvalidArgument();
     }
     // Open file using underlying Env implementation
-    std::unique_ptr<WritableFile> underlying;
-    Status status = EnvWrapper::NewWritableFile(fname, &underlying, options);
+    std::unique_ptr<FSWritableFile> underlying;
+    IOStatus status =
+        FileSystemWrapper::NewWritableFile(fname, options, &underlying, dbg);
     if (!status.ok()) {
       return status;
     }
-    return CreateWritableEncryptedFile(fname, underlying, options, result);
+    return CreateWritableEncryptedFile(fname, underlying, options, result, dbg);
   }
 
   // Create an object that writes to a new file with the specified
@@ -711,39 +775,42 @@ class EncryptedEnvImpl : public EnvWrapper {
   // returns non-OK.
   //
   // The returned file will only be accessed by one thread at a time.
-  virtual Status ReopenWritableFile(const std::string& fname,
-                                    std::unique_ptr<WritableFile>* result,
-                                    const EnvOptions& options) override {
+  IOStatus ReopenWritableFile(const std::string& fname,
+                              const FileOptions& options,
+                              std::unique_ptr<FSWritableFile>* result,
+                              IODebugContext* dbg) override {
     result->reset();
     if (options.use_mmap_writes) {
-      return Status::InvalidArgument();
+      return IOStatus::InvalidArgument();
     }
     // Open file using underlying Env implementation
-    std::unique_ptr<WritableFile> underlying;
-    Status status = EnvWrapper::ReopenWritableFile(fname, &underlying, options);
+    std::unique_ptr<FSWritableFile> underlying;
+    IOStatus status =
+        FileSystemWrapper::ReopenWritableFile(fname, options, &underlying, dbg);
     if (!status.ok()) {
       return status;
     }
-    return CreateWritableEncryptedFile(fname, underlying, options, result);
+    return CreateWritableEncryptedFile(fname, underlying, options, result, dbg);
   }
 
   // Reuse an existing file by renaming it and opening it as writable.
-  virtual Status ReuseWritableFile(const std::string& fname,
-                                   const std::string& old_fname,
-                                   std::unique_ptr<WritableFile>* result,
-                                   const EnvOptions& options) override {
+  IOStatus ReuseWritableFile(const std::string& fname,
+                             const std::string& old_fname,
+                             const FileOptions& options,
+                             std::unique_ptr<FSWritableFile>* result,
+                             IODebugContext* dbg) override {
     result->reset();
     if (options.use_mmap_writes) {
-      return Status::InvalidArgument();
+      return IOStatus::InvalidArgument();
     }
     // Open file using underlying Env implementation
-    std::unique_ptr<WritableFile> underlying;
-    Status status =
-        EnvWrapper::ReuseWritableFile(fname, old_fname, &underlying, options);
+    std::unique_ptr<FSWritableFile> underlying;
+    auto status = FileSystemWrapper::ReuseWritableFile(
+        fname, old_fname, options, &underlying, dbg);
     if (!status.ok()) {
       return status;
     }
-    return CreateWritableEncryptedFile(fname, underlying, options, result);
+    return CreateWritableEncryptedFile(fname, underlying, options, result, dbg);
   }
 
   // Open `fname` for random read and write, if file doesn't exist the file
@@ -751,19 +818,20 @@ class EncryptedEnvImpl : public EnvWrapper {
   // *result and returns OK.  On failure returns non-OK.
   //
   // The returned file will only be accessed by one thread at a time.
-  virtual Status NewRandomRWFile(const std::string& fname,
-                                 std::unique_ptr<RandomRWFile>* result,
-                                 const EnvOptions& options) override {
+  IOStatus NewRandomRWFile(const std::string& fname, const FileOptions& options,
+                           std::unique_ptr<FSRandomRWFile>* result,
+                           IODebugContext* dbg) override {
     result->reset();
     if (options.use_mmap_reads || options.use_mmap_writes) {
-      return Status::InvalidArgument();
+      return IOStatus::InvalidArgument();
     }
     // Check file exists
-    bool isNewFile = !FileExists(fname).ok();
+    bool isNewFile = !FileExists(fname, options.io_options, dbg).ok();
 
     // Open file using underlying Env implementation
-    std::unique_ptr<RandomRWFile> underlying;
-    Status status = EnvWrapper::NewRandomRWFile(fname, &underlying, options);
+    std::unique_ptr<FSRandomRWFile> underlying;
+    auto status =
+        FileSystemWrapper::NewRandomRWFile(fname, options, &underlying, dbg);
     if (!status.ok()) {
       return status;
     }
@@ -773,10 +841,10 @@ class EncryptedEnvImpl : public EnvWrapper {
     if (!isNewFile) {
       // File already exists, read prefix
       status = CreateRandomReadCipherStream(fname, underlying, options,
-                                            &prefix_length, &stream);
+                                            &prefix_length, &stream, dbg);
     } else {
       status = CreateRandomWriteCipherStream(fname, underlying, options,
-                                             &prefix_length, &stream);
+                                             &prefix_length, &stream, dbg);
     }
     if (status.ok()) {
       if (stream) {
@@ -803,9 +871,12 @@ class EncryptedEnvImpl : public EnvWrapper {
   //         have
   //                  permission to access "dir", or if "dir" is invalid.
   //         IOError if an IO Error was encountered
-  virtual Status GetChildrenFileAttributes(
-      const std::string& dir, std::vector<FileAttributes>* result) override {
-    auto status = EnvWrapper::GetChildrenFileAttributes(dir, result);
+  IOStatus GetChildrenFileAttributes(const std::string& dir,
+                                     const IOOptions& options,
+                                     std::vector<FileAttributes>* result,
+                                     IODebugContext* dbg) override {
+    auto status =
+        FileSystemWrapper::GetChildrenFileAttributes(dir, options, result, dbg);
     if (!status.ok()) {
       return status;
     }
@@ -823,14 +894,15 @@ class EncryptedEnvImpl : public EnvWrapper {
         it->size_bytes -= provider->GetPrefixLength();
       }
     }
-    return Status::OK();
+    return IOStatus::OK();
   }
 
   // Store the size of fname in *file_size.
-  virtual Status GetFileSize(const std::string& fname,
-                             uint64_t* file_size) override {
-    auto status = EnvWrapper::GetFileSize(fname, file_size);
-    if (!status.ok()) {
+  IOStatus GetFileSize(const std::string& fname, const IOOptions& options,
+                       uint64_t* file_size, IODebugContext* dbg) override {
+    auto status =
+        FileSystemWrapper::GetFileSize(fname, options, file_size, dbg);
+    if (!status.ok() || !(*file_size)) {
       return status;
     }
     EncryptionProvider* provider;
@@ -846,17 +918,43 @@ class EncryptedEnvImpl : public EnvWrapper {
  private:
   std::shared_ptr<EncryptionProvider> provider_;
 };
+}  // namespace
+
+Status NewEncryptedFileSystemImpl(
+    const std::shared_ptr<FileSystem>& base,
+    const std::shared_ptr<EncryptionProvider>& provider,
+    std::unique_ptr<FileSystem>* result) {
+  result->reset(new EncryptedFileSystemImpl(base, provider));
+  return Status::OK();
+}
 
+std::shared_ptr<FileSystem> NewEncryptedFS(
+    const std::shared_ptr<FileSystem>& base,
+    const std::shared_ptr<EncryptionProvider>& provider) {
+  std::unique_ptr<FileSystem> efs;
+  Status s = NewEncryptedFileSystemImpl(base, provider, &efs);
+  if (s.ok()) {
+    s = efs->PrepareOptions(ConfigOptions());
+  }
+  if (s.ok()) {
+    std::shared_ptr<FileSystem> result(efs.release());
+    return result;
+  } else {
+    return nullptr;
+  }
+}
 // Returns an Env that encrypts data when stored on disk and decrypts data when
 // read from disk.
 Env* NewEncryptedEnv(Env* base_env,
                      const std::shared_ptr<EncryptionProvider>& provider) {
-  return new EncryptedEnvImpl(base_env, provider);
+  return new CompositeEnvWrapper(
+      base_env, NewEncryptedFS(base_env->GetFileSystem(), provider));
 }
 
 // Encrypt one or more (partial) blocks of data at the file offset.
 // Length of data is given in dataSize.
-Status BlockAccessCipherStream::Encrypt(uint64_t fileOffset, char *data, size_t dataSize) {
+Status BlockAccessCipherStream::Encrypt(uint64_t fileOffset, char* data,
+                                        size_t dataSize) {
   // Calculate block index
   auto blockSize = BlockSize();
   uint64_t blockIndex = fileOffset / blockSize;
@@ -868,7 +966,7 @@ Status BlockAccessCipherStream::Encrypt(uint64_t fileOffset, char *data, size_t
 
   // Encrypt individual blocks.
   while (1) {
-    char *block = data;
+    charblock = data;
     size_t n = std::min(dataSize, blockSize - blockOffset);
     if (n != blockSize) {
       // We're not encrypting a full block.
@@ -901,7 +999,8 @@ Status BlockAccessCipherStream::Encrypt(uint64_t fileOffset, char *data, size_t
 
 // Decrypt one or more (partial) blocks of data at the file offset.
 // Length of data is given in dataSize.
-Status BlockAccessCipherStream::Decrypt(uint64_t fileOffset, char *data, size_t dataSize) {
+Status BlockAccessCipherStream::Decrypt(uint64_t fileOffset, char* data,
+                                        size_t dataSize) {
   // Calculate block index
   auto blockSize = BlockSize();
   uint64_t blockIndex = fileOffset / blockSize;
@@ -913,7 +1012,7 @@ Status BlockAccessCipherStream::Decrypt(uint64_t fileOffset, char *data, size_t
 
   // Decrypt individual blocks.
   while (1) {
-    char *block = data;
+    charblock = data;
     size_t n = std::min(dataSize, blockSize - blockOffset);
     if (n != blockSize) {
       // We're not decrypting a full block.
@@ -952,20 +1051,53 @@ Status BlockAccessCipherStream::Decrypt(uint64_t fileOffset, char *data, size_t
   }
 }
 
-const char* ROT13BlockCipher::Name() const { return kROT13CipherName; }
+namespace {
+static std::unordered_map<std::string, OptionTypeInfo>
+    rot13_block_cipher_type_info = {
+        {"block_size",
+         {0 /* No offset, whole struct*/, OptionType::kInt,
+          OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
+};
+// Implements a BlockCipher using ROT13.
+//
+// Note: This is a sample implementation of BlockCipher,
+// it is NOT considered safe and should NOT be used in production.
+class ROT13BlockCipher : public BlockCipher {
+ private:
+  size_t blockSize_;
 
-// Encrypt a block of data.
-// Length of data is equal to BlockSize().
-Status ROT13BlockCipher::Encrypt(char* data) {
-  for (size_t i = 0; i < blockSize_; ++i) {
-    data[i] += 13;
+ public:
+  explicit ROT13BlockCipher(size_t blockSize) : blockSize_(blockSize) {
+    RegisterOptions("ROT13BlockCipherOptions", &blockSize_,
+                    &rot13_block_cipher_type_info);
   }
-  return Status::OK();
-}
 
-// Decrypt a block of data.
-// Length of data is equal to BlockSize().
-Status ROT13BlockCipher::Decrypt(char* data) { return Encrypt(data); }
+  static const char* kClassName() { return "ROT13"; }
+  const char* Name() const override { return kClassName(); }
+  // BlockSize returns the size of each block supported by this cipher stream.
+  size_t BlockSize() override { return blockSize_; }
+
+  // Encrypt a block of data.
+  // Length of data is equal to BlockSize().
+  Status Encrypt(char* data) override {
+    for (size_t i = 0; i < blockSize_; ++i) {
+      data[i] += 13;
+    }
+    return Status::OK();
+  }
+
+  // Decrypt a block of data.
+  // Length of data is equal to BlockSize().
+  Status Decrypt(char* data) override { return Encrypt(data); }
+};
+static const std::unordered_map<std::string, OptionTypeInfo>
+    ctr_encryption_provider_type_info = {
+        {"cipher",
+         OptionTypeInfo::AsCustomSharedPtr<BlockCipher>(
+             0 /* No offset, whole struct*/, OptionVerificationType::kByName,
+             OptionTypeFlags::kNone)},
+};
+}  // anonymous namespace
 
 // Allocate scratch space which is passed to EncryptBlock/DecryptBlock.
 void CTRCipherStream::AllocateScratch(std::string& scratch) {
@@ -1003,7 +1135,20 @@ Status CTRCipherStream::DecryptBlock(uint64_t blockIndex, char* data,
   return EncryptBlock(blockIndex, data, scratch);
 }
 
-const char* CTREncryptionProvider::Name() const { return kCTRProviderName; }
+CTREncryptionProvider::CTREncryptionProvider(
+    const std::shared_ptr<BlockCipher>& c)
+    : cipher_(c) {
+  RegisterOptions("Cipher", &cipher_, &ctr_encryption_provider_type_info);
+}
+
+bool CTREncryptionProvider::IsInstanceOf(const std::string& name) const {
+  // Special case for test purposes.
+  if (name == "1://test" && cipher_ != nullptr) {
+    return cipher_->IsInstanceOf(ROT13BlockCipher::kClassName());
+  } else {
+    return EncryptionProvider::IsInstanceOf(name);
+  }
+}
 
 // GetPrefixLength returns the length of the prefix that is added to every file
 // and used for storing encryption options.
@@ -1013,20 +1158,12 @@ size_t CTREncryptionProvider::GetPrefixLength() const {
   return defaultPrefixLength;
 }
 
-Status CTREncryptionProvider::TEST_Initialize() {
-  if (!cipher_) {
-    return BlockCipher::CreateFromString(
-        ConfigOptions(), std::string(kROT13CipherName) + ":32", &cipher_);
-  }
-  return Status::OK();
-}
-
 Status CTREncryptionProvider::AddCipher(const std::string& /*descriptor*/,
                                         const char* cipher, size_t len,
                                         bool /*for_write*/) {
   if (cipher_) {
     return Status::NotSupported("Cannot add keys to CTREncryptionProvider");
-  } else if (strcmp(kROT13CipherName, cipher) == 0) {
+  } else if (strcmp(ROT13BlockCipher::kClassName(), cipher) == 0) {
     cipher_.reset(new ROT13BlockCipher(len));
     return Status::OK();
   } else {
@@ -1054,7 +1191,7 @@ Status CTREncryptionProvider::CreateNewPrefix(const std::string& /*fname*/,
     return Status::InvalidArgument("Encryption Cipher is missing");
   }
   // Create & seed rnd.
-  Random rnd((uint32_t)Env::Default()->NowMicros());
+  Random rnd((uint32_t)SystemClock::Default()->NowMicros());
   // Fill entire prefix block with random values.
   for (size_t i = 0; i < prefixLength; i++) {
     prefix[i] = rnd.Uniform(256) & 0xFF;
@@ -1143,6 +1280,72 @@ Status CTREncryptionProvider::CreateCipherStreamFromPrefix(
   return Status::OK();
 }
 
-#endif // ROCKSDB_LITE
+namespace {
+static void RegisterEncryptionBuiltins() {
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    auto lib = ObjectRegistry::Default()->AddLibrary("encryption");
+    // Match "CTR" or "CTR://test"
+    lib->AddFactory<EncryptionProvider>(
+        ObjectLibrary::PatternEntry(CTREncryptionProvider::kClassName(), true)
+            .AddSuffix("://test"),
+        [](const std::string& uri, std::unique_ptr<EncryptionProvider>* guard,
+           std::string* /*errmsg*/) {
+          if (EndsWith(uri, "://test")) {
+            std::shared_ptr<BlockCipher> cipher =
+                std::make_shared<ROT13BlockCipher>(32);
+            guard->reset(new CTREncryptionProvider(cipher));
+          } else {
+            guard->reset(new CTREncryptionProvider());
+          }
+          return guard->get();
+        });
+
+    lib->AddFactory<EncryptionProvider>(
+        "1://test", [](const std::string& /*uri*/,
+                       std::unique_ptr<EncryptionProvider>* guard,
+                       std::string* /*errmsg*/) {
+          std::shared_ptr<BlockCipher> cipher =
+              std::make_shared<ROT13BlockCipher>(32);
+          guard->reset(new CTREncryptionProvider(cipher));
+          return guard->get();
+        });
+
+    // Match "ROT13" or "ROT13:[0-9]+"
+    lib->AddFactory<BlockCipher>(
+        ObjectLibrary::PatternEntry(ROT13BlockCipher::kClassName(), true)
+            .AddNumber(":"),
+        [](const std::string& uri, std::unique_ptr<BlockCipher>* guard,
+           std::string* /* errmsg */) {
+          size_t colon = uri.find(':');
+          if (colon != std::string::npos) {
+            size_t block_size = ParseSizeT(uri.substr(colon + 1));
+            guard->reset(new ROT13BlockCipher(block_size));
+          } else {
+            guard->reset(new ROT13BlockCipher(32));
+          }
+
+          return guard->get();
+        });
+  });
+}
+}  // namespace
+
+Status BlockCipher::CreateFromString(const ConfigOptions& config_options,
+                                     const std::string& value,
+                                     std::shared_ptr<BlockCipher>* result) {
+  RegisterEncryptionBuiltins();
+  return LoadSharedObject<BlockCipher>(config_options, value, nullptr, result);
+}
+
+Status EncryptionProvider::CreateFromString(
+    const ConfigOptions& config_options, const std::string& value,
+    std::shared_ptr<EncryptionProvider>* result) {
+  RegisterEncryptionBuiltins();
+  return LoadSharedObject<EncryptionProvider>(config_options, value, nullptr,
+                                              result);
+}
+
+#endif  // ROCKSDB_LITE
 
 }  // namespace ROCKSDB_NAMESPACE