]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/block_based/block_based_table_builder.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / block_based / block_based_table_builder.cc
index 2003008fe12dcdb003ca1ae58bfa5b4a96770697..8ab775c1d31124ef05c3aa25d7c83322096264a7 100644 (file)
@@ -11,6 +11,7 @@
 
 #include <assert.h>
 #include <stdio.h>
+#include <atomic>
 #include <list>
 #include <map>
 #include <memory>
@@ -46,6 +47,7 @@
 #include "util/crc32c.h"
 #include "util/stop_watch.h"
 #include "util/string_util.h"
+#include "util/work_queue.h"
 #include "util/xxhash.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -53,7 +55,6 @@ namespace ROCKSDB_NAMESPACE {
 extern const std::string kHashIndexPrefixesBlock;
 extern const std::string kHashIndexPrefixesMetadataBlock;
 
-typedef BlockBasedTableOptions::IndexType IndexType;
 
 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
 namespace {
@@ -102,48 +103,6 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
   return compressed_size < raw_size - (raw_size / 8u);
 }
 
-bool CompressBlockInternal(const Slice& raw,
-                           const CompressionInfo& compression_info,
-                           uint32_t format_version,
-                           std::string* compressed_output) {
-  // Will return compressed block contents if (1) the compression method is
-  // supported in this platform and (2) the compression rate is "good enough".
-  switch (compression_info.type()) {
-    case kSnappyCompression:
-      return Snappy_Compress(compression_info, raw.data(), raw.size(),
-                             compressed_output);
-    case kZlibCompression:
-      return Zlib_Compress(
-          compression_info,
-          GetCompressFormatForVersion(kZlibCompression, format_version),
-          raw.data(), raw.size(), compressed_output);
-    case kBZip2Compression:
-      return BZip2_Compress(
-          compression_info,
-          GetCompressFormatForVersion(kBZip2Compression, format_version),
-          raw.data(), raw.size(), compressed_output);
-    case kLZ4Compression:
-      return LZ4_Compress(
-          compression_info,
-          GetCompressFormatForVersion(kLZ4Compression, format_version),
-          raw.data(), raw.size(), compressed_output);
-    case kLZ4HCCompression:
-      return LZ4HC_Compress(
-          compression_info,
-          GetCompressFormatForVersion(kLZ4HCCompression, format_version),
-          raw.data(), raw.size(), compressed_output);
-    case kXpressCompression:
-      return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
-    case kZSTD:
-    case kZSTDNotFinalCompression:
-      return ZSTD_Compress(compression_info, raw.data(), raw.size(),
-                           compressed_output);
-    default:
-      // Do not recognize this compression type
-      return false;
-  }
-}
-
 }  // namespace
 
 // format_version is the block format as defined in include/rocksdb/table.h
@@ -152,11 +111,9 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
                     bool do_sample, std::string* compressed_output,
                     std::string* sampled_output_fast,
                     std::string* sampled_output_slow) {
-  *type = info.type();
-
-  if (info.type() == kNoCompression && !info.SampleForCompression()) {
-    return raw;
-  }
+  assert(type);
+  assert(compressed_output);
+  assert(compressed_output->empty());
 
   // If requested, we sample one in every N block with a
   // fast and slow compression algorithm and report the stats.
@@ -164,10 +121,10 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
   // enabling compression and they also get a hint about which
   // compression algorithm wil be beneficial.
   if (do_sample && info.SampleForCompression() &&
-      Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
-      sampled_output_fast && sampled_output_slow) {
+      Random::GetTLSInstance()->OneIn(
+          static_cast<int>(info.SampleForCompression()))) {
     // Sampling with a fast compression algorithm
-    if (LZ4_Supported() || Snappy_Supported()) {
+    if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
       CompressionType c =
           LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
       CompressionContext context(c);
@@ -176,33 +133,46 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
                                CompressionDict::GetEmptyDict(), c,
                                info.SampleForCompression());
 
-      CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
+      CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
+                   sampled_output_fast);
     }
 
     // Sampling with a slow but high-compression algorithm
-    if (ZSTD_Supported() || Zlib_Supported()) {
+    if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
       CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
       CompressionContext context(c);
       CompressionOptions options;
       CompressionInfo info_tmp(options, context,
                                CompressionDict::GetEmptyDict(), c,
                                info.SampleForCompression());
-      CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
+
+      CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
+                   sampled_output_slow);
     }
   }
 
-  // Actually compress the data
-  if (*type != kNoCompression) {
-    if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
-        GoodCompressionRatio(compressed_output->size(), raw.size())) {
-      return *compressed_output;
-    }
+  if (info.type() == kNoCompression) {
+    *type = kNoCompression;
+    return raw;
   }
 
-  // Compression method is not supported, or not good
-  // compression ratio, so just fall back to uncompressed form.
-  *type = kNoCompression;
-  return raw;
+  // Actually compress the data; if the compression method is not supported,
+  // or the compression fails etc., just fall back to uncompressed
+  if (!CompressData(raw, info, GetCompressFormatForVersion(format_version),
+                    compressed_output)) {
+    *type = kNoCompression;
+    return raw;
+  }
+
+  // Check the compression ratio; if it's not good enough, just fall back to
+  // uncompressed
+  if (!GoodCompressionRatio(compressed_output->size(), raw.size())) {
+    *type = kNoCompression;
+    return raw;
+  }
+
+  *type = info.type();
+  return *compressed_output;
 }
 
 // kBlockBasedTableMagicNumber was picked by running
@@ -281,8 +251,7 @@ struct BlockBasedTableBuilder::Rep {
   const BlockBasedTableOptions table_options;
   const InternalKeyComparator& internal_comparator;
   WritableFileWriter* file;
-  uint64_t offset = 0;
-  Status status;
+  std::atomic<uint64_t> offset;
   size_t alignment;
   BlockBuilder data_block;
   // Buffers uncompressed data blocks and keys to replay later. Needed when
@@ -299,12 +268,13 @@ struct BlockBasedTableBuilder::Rep {
   PartitionedIndexBuilder* p_index_builder_ = nullptr;
 
   std::string last_key;
+  const Slice* first_key_in_next_block = nullptr;
   CompressionType compression_type;
   uint64_t sample_for_compression;
   CompressionOptions compression_opts;
   std::unique_ptr<CompressionDict> compression_dict;
-  CompressionContext compression_ctx;
-  std::unique_ptr<UncompressionContext> verify_ctx;
+  std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
+  std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
   std::unique_ptr<UncompressionDict> verify_dict;
 
   size_t data_begin_offset = 0;
@@ -353,8 +323,78 @@ struct BlockBasedTableBuilder::Rep {
   const uint64_t target_file_size;
   uint64_t file_creation_time = 0;
 
+  // DB IDs
+  const std::string db_id;
+  const std::string db_session_id;
+  std::string db_host_id;
+
   std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
 
+  std::unique_ptr<ParallelCompressionRep> pc_rep;
+
+  uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
+  void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
+
+  bool IsParallelCompressionEnabled() const {
+    return compression_opts.parallel_threads > 1;
+  }
+
+  Status GetStatus() {
+    // We need to make modifications of status visible when status_ok is set
+    // to false, and this is ensured by status_mutex, so no special memory
+    // order for status_ok is required.
+    if (status_ok.load(std::memory_order_relaxed)) {
+      return Status::OK();
+    } else {
+      return CopyStatus();
+    }
+  }
+
+  Status CopyStatus() {
+    std::lock_guard<std::mutex> lock(status_mutex);
+    return status;
+  }
+
+  IOStatus GetIOStatus() {
+    // We need to make modifications of io_status visible when status_ok is set
+    // to false, and this is ensured by io_status_mutex, so no special memory
+    // order for io_status_ok is required.
+    if (io_status_ok.load(std::memory_order_relaxed)) {
+      return IOStatus::OK();
+    } else {
+      return CopyIOStatus();
+    }
+  }
+
+  IOStatus CopyIOStatus() {
+    std::lock_guard<std::mutex> lock(io_status_mutex);
+    return io_status;
+  }
+
+  // Never erase an existing status that is not OK.
+  void SetStatus(Status s) {
+    if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
+      // Locking is an overkill for non compression_opts.parallel_threads
+      // case but since it's unlikely that s is not OK, we take this cost
+      // to be simplicity.
+      std::lock_guard<std::mutex> lock(status_mutex);
+      status = s;
+      status_ok.store(false, std::memory_order_relaxed);
+    }
+  }
+
+  // Never erase an existing I/O status that is not OK.
+  void SetIOStatus(IOStatus ios) {
+    if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
+      // Locking is an overkill for non compression_opts.parallel_threads
+      // case but since it's unlikely that s is not OK, we take this cost
+      // to be simplicity.
+      std::lock_guard<std::mutex> lock(io_status_mutex);
+      io_status = ios;
+      io_status_ok.store(false, std::memory_order_relaxed);
+    }
+  }
+
   Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
       const BlockBasedTableOptions& table_opt,
       const InternalKeyComparator& icomparator,
@@ -366,12 +406,14 @@ struct BlockBasedTableBuilder::Rep {
       const CompressionOptions& _compression_opts, const bool skip_filters,
       const int _level_at_creation, const std::string& _column_family_name,
       const uint64_t _creation_time, const uint64_t _oldest_key_time,
-      const uint64_t _target_file_size, const uint64_t _file_creation_time)
+      const uint64_t _target_file_size, const uint64_t _file_creation_time,
+      const std::string& _db_id, const std::string& _db_session_id)
       : ioptions(_ioptions),
         moptions(_moptions),
         table_options(table_opt),
         internal_comparator(icomparator),
         file(f),
+        offset(0),
         alignment(table_options.block_align
                       ? std::min(table_options.block_size, kDefaultPageSize)
                       : 0),
@@ -389,7 +431,8 @@ struct BlockBasedTableBuilder::Rep {
         sample_for_compression(_sample_for_compression),
         compression_opts(_compression_opts),
         compression_dict(),
-        compression_ctx(_compression_type),
+        compression_ctxs(_compression_opts.parallel_threads),
+        verify_ctxs(_compression_opts.parallel_threads),
         verify_dict(),
         state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
                                                      : State::kUnbuffered),
@@ -405,7 +448,15 @@ struct BlockBasedTableBuilder::Rep {
         creation_time(_creation_time),
         oldest_key_time(_oldest_key_time),
         target_file_size(_target_file_size),
-        file_creation_time(_file_creation_time) {
+        file_creation_time(_file_creation_time),
+        db_id(_db_id),
+        db_session_id(_db_session_id),
+        db_host_id(ioptions.db_host_id),
+        status_ok(true),
+        io_status_ok(true) {
+    for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
+      compression_ctxs[i].reset(new CompressionContext(compression_type));
+    }
     if (table_options.index_type ==
         BlockBasedTableOptions::kTwoLevelIndexSearch) {
       p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
@@ -440,15 +491,336 @@ struct BlockBasedTableBuilder::Rep {
             table_options.index_type, table_options.whole_key_filtering,
             _moptions.prefix_extractor != nullptr));
     if (table_options.verify_compression) {
-      verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
-                                                compression_type));
+      for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
+        verify_ctxs[i].reset(new UncompressionContext(compression_type));
+      }
+    }
+
+    if (!ReifyDbHostIdProperty(ioptions.env, &db_host_id).ok()) {
+      ROCKS_LOG_INFO(ioptions.info_log, "db_host_id property will not be set");
     }
   }
 
   Rep(const Rep&) = delete;
   Rep& operator=(const Rep&) = delete;
 
-  ~Rep() {}
+ private:
+  // Synchronize status & io_status accesses across threads from main thread,
+  // compression thread and write thread in parallel compression.
+  std::mutex status_mutex;
+  std::atomic<bool> status_ok;
+  Status status;
+  std::mutex io_status_mutex;
+  std::atomic<bool> io_status_ok;
+  IOStatus io_status;
+};
+
+struct BlockBasedTableBuilder::ParallelCompressionRep {
+  // Keys is a wrapper of vector of strings avoiding
+  // releasing string memories during vector clear()
+  // in order to save memory allocation overhead
+  class Keys {
+   public:
+    Keys() : keys_(kKeysInitSize), size_(0) {}
+    void PushBack(const Slice& key) {
+      if (size_ == keys_.size()) {
+        keys_.emplace_back(key.data(), key.size());
+      } else {
+        keys_[size_].assign(key.data(), key.size());
+      }
+      size_++;
+    }
+    void SwapAssign(std::vector<std::string>& keys) {
+      size_ = keys.size();
+      std::swap(keys_, keys);
+    }
+    void Clear() { size_ = 0; }
+    size_t Size() { return size_; }
+    std::string& Back() { return keys_[size_ - 1]; }
+    std::string& operator[](size_t idx) {
+      assert(idx < size_);
+      return keys_[idx];
+    }
+
+   private:
+    const size_t kKeysInitSize = 32;
+    std::vector<std::string> keys_;
+    size_t size_;
+  };
+  std::unique_ptr<Keys> curr_block_keys;
+
+  class BlockRepSlot;
+
+  // BlockRep instances are fetched from and recycled to
+  // block_rep_pool during parallel compression.
+  struct BlockRep {
+    Slice contents;
+    Slice compressed_contents;
+    std::unique_ptr<std::string> data;
+    std::unique_ptr<std::string> compressed_data;
+    CompressionType compression_type;
+    std::unique_ptr<std::string> first_key_in_next_block;
+    std::unique_ptr<Keys> keys;
+    std::unique_ptr<BlockRepSlot> slot;
+    Status status;
+  };
+  // Use a vector of BlockRep as a buffer for a determined number
+  // of BlockRep structures. All data referenced by pointers in
+  // BlockRep will be freed when this vector is destructed.
+  typedef std::vector<BlockRep> BlockRepBuffer;
+  BlockRepBuffer block_rep_buf;
+  // Use a thread-safe queue for concurrent access from block
+  // building thread and writer thread.
+  typedef WorkQueue<BlockRep*> BlockRepPool;
+  BlockRepPool block_rep_pool;
+
+  // Use BlockRepSlot to keep block order in write thread.
+  // slot_ will pass references to BlockRep
+  class BlockRepSlot {
+   public:
+    BlockRepSlot() : slot_(1) {}
+    template <typename T>
+    void Fill(T&& rep) {
+      slot_.push(std::forward<T>(rep));
+    };
+    void Take(BlockRep*& rep) { slot_.pop(rep); }
+
+   private:
+    // slot_ will pass references to BlockRep in block_rep_buf,
+    // and those references are always valid before the destruction of
+    // block_rep_buf.
+    WorkQueue<BlockRep*> slot_;
+  };
+
+  // Compression queue will pass references to BlockRep in block_rep_buf,
+  // and those references are always valid before the destruction of
+  // block_rep_buf.
+  typedef WorkQueue<BlockRep*> CompressQueue;
+  CompressQueue compress_queue;
+  std::vector<port::Thread> compress_thread_pool;
+
+  // Write queue will pass references to BlockRep::slot in block_rep_buf,
+  // and those references are always valid before the corresponding
+  // BlockRep::slot is destructed, which is before the destruction of
+  // block_rep_buf.
+  typedef WorkQueue<BlockRepSlot*> WriteQueue;
+  WriteQueue write_queue;
+  std::unique_ptr<port::Thread> write_thread;
+
+  // Estimate output file size when parallel compression is enabled. This is
+  // necessary because compression & flush are no longer synchronized,
+  // and BlockBasedTableBuilder::FileSize() is no longer accurate.
+  // memory_order_relaxed suffices because accurate statistics is not required.
+  class FileSizeEstimator {
+   public:
+    explicit FileSizeEstimator()
+        : raw_bytes_compressed(0),
+          raw_bytes_curr_block(0),
+          raw_bytes_curr_block_set(false),
+          raw_bytes_inflight(0),
+          blocks_inflight(0),
+          curr_compression_ratio(0),
+          estimated_file_size(0) {}
+
+    // Estimate file size when a block is about to be emitted to
+    // compression thread
+    void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
+      uint64_t new_raw_bytes_inflight =
+          raw_bytes_inflight.fetch_add(raw_block_size,
+                                       std::memory_order_relaxed) +
+          raw_block_size;
+
+      uint64_t new_blocks_inflight =
+          blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
+
+      estimated_file_size.store(
+          curr_file_size +
+              static_cast<uint64_t>(
+                  static_cast<double>(new_raw_bytes_inflight) *
+                  curr_compression_ratio.load(std::memory_order_relaxed)) +
+              new_blocks_inflight * kBlockTrailerSize,
+          std::memory_order_relaxed);
+    }
+
+    // Estimate file size when a block is already reaped from
+    // compression thread
+    void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
+      assert(raw_bytes_curr_block_set);
+
+      uint64_t new_raw_bytes_compressed =
+          raw_bytes_compressed + raw_bytes_curr_block;
+      assert(new_raw_bytes_compressed > 0);
+
+      curr_compression_ratio.store(
+          (curr_compression_ratio.load(std::memory_order_relaxed) *
+               raw_bytes_compressed +
+           compressed_block_size) /
+              static_cast<double>(new_raw_bytes_compressed),
+          std::memory_order_relaxed);
+      raw_bytes_compressed = new_raw_bytes_compressed;
+
+      uint64_t new_raw_bytes_inflight =
+          raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
+                                       std::memory_order_relaxed) -
+          raw_bytes_curr_block;
+
+      uint64_t new_blocks_inflight =
+          blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
+
+      estimated_file_size.store(
+          curr_file_size +
+              static_cast<uint64_t>(
+                  static_cast<double>(new_raw_bytes_inflight) *
+                  curr_compression_ratio.load(std::memory_order_relaxed)) +
+              new_blocks_inflight * kBlockTrailerSize,
+          std::memory_order_relaxed);
+
+      raw_bytes_curr_block_set = false;
+    }
+
+    void SetEstimatedFileSize(uint64_t size) {
+      estimated_file_size.store(size, std::memory_order_relaxed);
+    }
+
+    uint64_t GetEstimatedFileSize() {
+      return estimated_file_size.load(std::memory_order_relaxed);
+    }
+
+    void SetCurrBlockRawSize(uint64_t size) {
+      raw_bytes_curr_block = size;
+      raw_bytes_curr_block_set = true;
+    }
+
+   private:
+    // Raw bytes compressed so far.
+    uint64_t raw_bytes_compressed;
+    // Size of current block being appended.
+    uint64_t raw_bytes_curr_block;
+    // Whether raw_bytes_curr_block has been set for next
+    // ReapBlock call.
+    bool raw_bytes_curr_block_set;
+    // Raw bytes under compression and not appended yet.
+    std::atomic<uint64_t> raw_bytes_inflight;
+    // Number of blocks under compression and not appended yet.
+    std::atomic<uint64_t> blocks_inflight;
+    // Current compression ratio, maintained by BGWorkWriteRawBlock.
+    std::atomic<double> curr_compression_ratio;
+    // Estimated SST file size.
+    std::atomic<uint64_t> estimated_file_size;
+  };
+  FileSizeEstimator file_size_estimator;
+
+  // Facilities used for waiting first block completion. Need to Wait for
+  // the completion of first block compression and flush to get a non-zero
+  // compression ratio.
+  std::atomic<bool> first_block_processed;
+  std::condition_variable first_block_cond;
+  std::mutex first_block_mutex;
+
+  explicit ParallelCompressionRep(uint32_t parallel_threads)
+      : curr_block_keys(new Keys()),
+        block_rep_buf(parallel_threads),
+        block_rep_pool(parallel_threads),
+        compress_queue(parallel_threads),
+        write_queue(parallel_threads),
+        first_block_processed(false) {
+    for (uint32_t i = 0; i < parallel_threads; i++) {
+      block_rep_buf[i].contents = Slice();
+      block_rep_buf[i].compressed_contents = Slice();
+      block_rep_buf[i].data.reset(new std::string());
+      block_rep_buf[i].compressed_data.reset(new std::string());
+      block_rep_buf[i].compression_type = CompressionType();
+      block_rep_buf[i].first_key_in_next_block.reset(new std::string());
+      block_rep_buf[i].keys.reset(new Keys());
+      block_rep_buf[i].slot.reset(new BlockRepSlot());
+      block_rep_buf[i].status = Status::OK();
+      block_rep_pool.push(&block_rep_buf[i]);
+    }
+  }
+
+  ~ParallelCompressionRep() { block_rep_pool.finish(); }
+
+  // Make a block prepared to be emitted to compression thread
+  // Used in non-buffered mode
+  BlockRep* PrepareBlock(CompressionType compression_type,
+                         const Slice* first_key_in_next_block,
+                         BlockBuilder* data_block) {
+    BlockRep* block_rep =
+        PrepareBlockInternal(compression_type, first_key_in_next_block);
+    assert(block_rep != nullptr);
+    data_block->SwapAndReset(*(block_rep->data));
+    block_rep->contents = *(block_rep->data);
+    std::swap(block_rep->keys, curr_block_keys);
+    curr_block_keys->Clear();
+    return block_rep;
+  }
+
+  // Used in EnterUnbuffered
+  BlockRep* PrepareBlock(CompressionType compression_type,
+                         const Slice* first_key_in_next_block,
+                         std::string* data_block,
+                         std::vector<std::string>* keys) {
+    BlockRep* block_rep =
+        PrepareBlockInternal(compression_type, first_key_in_next_block);
+    assert(block_rep != nullptr);
+    std::swap(*(block_rep->data), *data_block);
+    block_rep->contents = *(block_rep->data);
+    block_rep->keys->SwapAssign(*keys);
+    return block_rep;
+  }
+
+  // Emit a block to compression thread
+  void EmitBlock(BlockRep* block_rep) {
+    assert(block_rep != nullptr);
+    assert(block_rep->status.ok());
+    if (!write_queue.push(block_rep->slot.get())) {
+      return;
+    }
+    if (!compress_queue.push(block_rep)) {
+      return;
+    }
+
+    if (!first_block_processed.load(std::memory_order_relaxed)) {
+      std::unique_lock<std::mutex> lock(first_block_mutex);
+      first_block_cond.wait(lock, [this] {
+        return first_block_processed.load(std::memory_order_relaxed);
+      });
+    }
+  }
+
+  // Reap a block from compression thread
+  void ReapBlock(BlockRep* block_rep) {
+    assert(block_rep != nullptr);
+    block_rep->compressed_data->clear();
+    block_rep_pool.push(block_rep);
+
+    if (!first_block_processed.load(std::memory_order_relaxed)) {
+      std::lock_guard<std::mutex> lock(first_block_mutex);
+      first_block_processed.store(true, std::memory_order_relaxed);
+      first_block_cond.notify_one();
+    }
+  }
+
+ private:
+  BlockRep* PrepareBlockInternal(CompressionType compression_type,
+                                 const Slice* first_key_in_next_block) {
+    BlockRep* block_rep = nullptr;
+    block_rep_pool.pop(block_rep);
+    assert(block_rep != nullptr);
+
+    assert(block_rep->data);
+
+    block_rep->compression_type = compression_type;
+
+    if (first_key_in_next_block == nullptr) {
+      block_rep->first_key_in_next_block.reset(nullptr);
+    } else {
+      block_rep->first_key_in_next_block->assign(
+          first_key_in_next_block->data(), first_key_in_next_block->size());
+    }
+
+    return block_rep;
+  }
 };
 
 BlockBasedTableBuilder::BlockBasedTableBuilder(
@@ -463,7 +835,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
     const CompressionOptions& compression_opts, const bool skip_filters,
     const std::string& column_family_name, const int level_at_creation,
     const uint64_t creation_time, const uint64_t oldest_key_time,
-    const uint64_t target_file_size, const uint64_t file_creation_time) {
+    const uint64_t target_file_size, const uint64_t file_creation_time,
+    const std::string& db_id, const std::string& db_session_id) {
   BlockBasedTableOptions sanitized_table_options(table_options);
   if (sanitized_table_options.format_version == 0 &&
       sanitized_table_options.checksum != kCRC32c) {
@@ -476,22 +849,26 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
     sanitized_table_options.format_version = 1;
   }
 
-  rep_ = new Rep(ioptions, moptions, sanitized_table_options,
-                 internal_comparator, int_tbl_prop_collector_factories,
-                 column_family_id, file, compression_type,
-                 sample_for_compression, compression_opts, skip_filters,
-                 level_at_creation, column_family_name, creation_time,
-                 oldest_key_time, target_file_size, file_creation_time);
+  rep_ = new Rep(
+      ioptions, moptions, sanitized_table_options, internal_comparator,
+      int_tbl_prop_collector_factories, column_family_id, file,
+      compression_type, sample_for_compression, compression_opts, skip_filters,
+      level_at_creation, column_family_name, creation_time, oldest_key_time,
+      target_file_size, file_creation_time, db_id, db_session_id);
 
   if (rep_->filter_builder != nullptr) {
     rep_->filter_builder->StartBlock(0);
   }
   if (table_options.block_cache_compressed.get() != nullptr) {
-    BlockBasedTable::GenerateCachePrefix(
+    BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
         table_options.block_cache_compressed.get(), file->writable_file(),
         &rep_->compressed_cache_key_prefix[0],
         &rep_->compressed_cache_key_prefix_size);
   }
+
+  if (rep_->IsParallelCompressionEnabled()) {
+    StartParallelCompression();
+  }
 }
 
 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
@@ -510,14 +887,15 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
     if (r->props.num_entries > r->props.num_range_deletions) {
       assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
     }
-#endif  // NDEBUG
+#endif  // !NDEBUG
 
     auto should_flush = r->flush_block_policy->Update(key, value);
     if (should_flush) {
       assert(!r->data_block.empty());
+      r->first_key_in_next_block = &key;
       Flush();
 
-      if (r->state == Rep::State::kBuffered &&
+      if (r->state == Rep::State::kBuffered && r->target_file_size != 0 &&
           r->data_begin_offset > r->target_file_size) {
         EnterUnbuffered();
       }
@@ -531,15 +909,27 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
       // entries in the first block and < all entries in subsequent
       // blocks.
       if (ok() && r->state == Rep::State::kUnbuffered) {
-        r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
+        if (r->IsParallelCompressionEnabled()) {
+          r->pc_rep->curr_block_keys->Clear();
+        } else {
+          r->index_builder->AddIndexEntry(&r->last_key, &key,
+                                          r->pending_handle);
+        }
       }
     }
 
     // Note: PartitionedFilterBlockBuilder requires key being added to filter
     // builder after being added to index builder.
-    if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
-      size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size();
-      r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+    if (r->state == Rep::State::kUnbuffered) {
+      if (r->IsParallelCompressionEnabled()) {
+        r->pc_rep->curr_block_keys->PushBack(key);
+      } else {
+        if (r->filter_builder != nullptr) {
+          size_t ts_sz =
+              r->internal_comparator.user_comparator()->timestamp_size();
+          r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+        }
+      }
     }
 
     r->last_key.assign(key.data(), key.size());
@@ -552,15 +942,19 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
       }
       r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
     } else {
-      r->index_builder->OnKeyAdded(key);
+      if (!r->IsParallelCompressionEnabled()) {
+        r->index_builder->OnKeyAdded(key);
+      }
     }
-    NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
+    // TODO offset passed in is not accurate for parallel compression case
+    NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
                                       r->table_properties_collectors,
                                       r->ioptions.info_log);
 
   } else if (value_type == kTypeRangeDeletion) {
     r->range_del_block.Add(key, value);
-    NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
+    // TODO offset passed in is not accurate for parallel compression case
+    NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
                                       r->table_properties_collectors,
                                       r->ioptions.info_log);
   } else {
@@ -585,7 +979,18 @@ void BlockBasedTableBuilder::Flush() {
   assert(rep_->state != Rep::State::kClosed);
   if (!ok()) return;
   if (r->data_block.empty()) return;
-  WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
+  if (r->IsParallelCompressionEnabled() &&
+      r->state == Rep::State::kUnbuffered) {
+    r->data_block.Finish();
+    ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
+        r->compression_type, r->first_key_in_next_block, &(r->data_block));
+    assert(block_rep != nullptr);
+    r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
+                                             r->get_offset());
+    r->pc_rep->EmitBlock(block_rep);
+  } else {
+    WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
+  }
 }
 
 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
@@ -598,31 +1003,75 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
                                         BlockHandle* handle,
                                         bool is_data_block) {
+  Rep* r = rep_;
+  Slice block_contents;
+  CompressionType type;
+  if (r->state == Rep::State::kBuffered) {
+    assert(is_data_block);
+    assert(!r->data_block_and_keys_buffers.empty());
+    r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
+    r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
+    return;
+  }
+  Status compress_status;
+  CompressAndVerifyBlock(raw_block_contents, is_data_block,
+                         *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
+                         &(r->compressed_output), &(block_contents), &type,
+                         &compress_status);
+  r->SetStatus(compress_status);
+  if (!ok()) {
+    return;
+  }
+  WriteRawBlock(block_contents, type, handle, is_data_block);
+  r->compressed_output.clear();
+  if (is_data_block) {
+    if (r->filter_builder != nullptr) {
+      r->filter_builder->StartBlock(r->get_offset());
+    }
+    r->props.data_size = r->get_offset();
+    ++r->props.num_data_blocks;
+  }
+}
+
+void BlockBasedTableBuilder::BGWorkCompression(
+    const CompressionContext& compression_ctx,
+    UncompressionContext* verify_ctx) {
+  ParallelCompressionRep::BlockRep* block_rep = nullptr;
+  while (rep_->pc_rep->compress_queue.pop(block_rep)) {
+    assert(block_rep != nullptr);
+    CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
+                           compression_ctx, verify_ctx,
+                           block_rep->compressed_data.get(),
+                           &block_rep->compressed_contents,
+                           &(block_rep->compression_type), &block_rep->status);
+    block_rep->slot->Fill(block_rep);
+  }
+}
+
+void BlockBasedTableBuilder::CompressAndVerifyBlock(
+    const Slice& raw_block_contents, bool is_data_block,
+    const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
+    std::string* compressed_output, Slice* block_contents,
+    CompressionType* type, Status* out_status) {
   // File format contains a sequence of blocks where each block has:
   //    block_data: uint8[n]
   //    type: uint8
   //    crc: uint32
-  assert(ok());
   Rep* r = rep_;
+  bool is_status_ok = ok();
+  if (!r->IsParallelCompressionEnabled()) {
+    assert(is_status_ok);
+  }
 
-  auto type = r->compression_type;
+  *type = r->compression_type;
   uint64_t sample_for_compression = r->sample_for_compression;
-  Slice block_contents;
   bool abort_compression = false;
 
   StopWatchNano timer(
       r->ioptions.env,
       ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
 
-  if (r->state == Rep::State::kBuffered) {
-    assert(is_data_block);
-    assert(!r->data_block_and_keys_buffers.empty());
-    r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
-    r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
-    return;
-  }
-
-  if (raw_block_contents.size() < kCompressionSizeLimit) {
+  if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
     const CompressionDict* compression_dict;
     if (!is_data_block || r->compression_dict == nullptr) {
       compression_dict = &CompressionDict::GetEmptyDict();
@@ -630,16 +1079,16 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
       compression_dict = r->compression_dict.get();
     }
     assert(compression_dict != nullptr);
-    CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
-                                     *compression_dict, type,
+    CompressionInfo compression_info(r->compression_opts, compression_ctx,
+                                     *compression_dict, *type,
                                      sample_for_compression);
 
     std::string sampled_output_fast;
     std::string sampled_output_slow;
-    block_contents = CompressBlock(
-        raw_block_contents, compression_info, &type,
+    *block_contents = CompressBlock(
+        raw_block_contents, compression_info, type,
         r->table_options.format_version, is_data_block /* do_sample */,
-        &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
+        compressed_output, &sampled_output_fast, &sampled_output_slow);
 
     // notify collectors on block add
     NotifyCollectTableCollectorsOnBlockAdd(
@@ -649,7 +1098,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
     // Some of the compression algorithms are known to be unreliable. If
     // the verify_compression flag is set then try to de-compress the
     // compressed data and compare to the input.
-    if (type != kNoCompression && r->table_options.verify_compression) {
+    if (*type != kNoCompression && r->table_options.verify_compression) {
       // Retrieve the uncompressed contents into a new buffer
       const UncompressionDict* verify_dict;
       if (!is_data_block || r->verify_dict == nullptr) {
@@ -659,10 +1108,10 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
       }
       assert(verify_dict != nullptr);
       BlockContents contents;
-      UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
+      UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
                                            r->compression_type);
       Status stat = UncompressBlockContentsForCompressionType(
-          uncompression_info, block_contents.data(), block_contents.size(),
+          uncompression_info, block_contents->data(), block_contents->size(),
           &contents, r->table_options.format_version, r->ioptions);
 
       if (stat.ok()) {
@@ -672,12 +1121,13 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
           abort_compression = true;
           ROCKS_LOG_ERROR(r->ioptions.info_log,
                           "Decompressed block did not match raw block");
-          r->status =
+          *out_status =
               Status::Corruption("Decompressed block did not match raw block");
         }
       } else {
         // Decompression reported an error. abort.
-        r->status = Status::Corruption("Could not decompress");
+        *out_status = Status::Corruption(std::string("Could not decompress: ") +
+                                         stat.getState());
         abort_compression = true;
       }
     }
@@ -690,9 +1140,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
   // verification.
   if (abort_compression) {
     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
-    type = kNoCompression;
-    block_contents = raw_block_contents;
-  } else if (type != kNoCompression) {
+    *type = kNoCompression;
+    *block_contents = raw_block_contents;
+  } else if (*type != kNoCompression) {
     if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
       RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
                             timer.ElapsedNanos());
@@ -700,19 +1150,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
     RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
                       raw_block_contents.size());
     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
-  } else if (type != r->compression_type) {
+  } else if (*type != r->compression_type) {
     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
   }
-
-  WriteRawBlock(block_contents, type, handle, is_data_block);
-  r->compressed_output.clear();
-  if (is_data_block) {
-    if (r->filter_builder != nullptr) {
-      r->filter_builder->StartBlock(r->offset);
-    }
-    r->props.data_size = r->offset;
-    ++r->props.num_data_blocks;
-  }
 }
 
 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
@@ -720,75 +1160,184 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
                                            BlockHandle* handle,
                                            bool is_data_block) {
   Rep* r = rep_;
+  Status s = Status::OK();
+  IOStatus io_s = IOStatus::OK();
   StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
-  handle->set_offset(r->offset);
+  handle->set_offset(r->get_offset());
   handle->set_size(block_contents.size());
-  assert(r->status.ok());
-  r->status = r->file->Append(block_contents);
-  if (r->status.ok()) {
+  assert(status().ok());
+  assert(io_status().ok());
+  io_s = r->file->Append(block_contents);
+  if (io_s.ok()) {
     char trailer[kBlockTrailerSize];
     trailer[0] = type;
-    char* trailer_without_type = trailer + 1;
+    uint32_t checksum = 0;
     switch (r->table_options.checksum) {
       case kNoChecksum:
-        EncodeFixed32(trailer_without_type, 0);
         break;
       case kCRC32c: {
-        auto crc = crc32c::Value(block_contents.data(), block_contents.size());
-        crc = crc32c::Extend(crc, trailer, 1);  // Extend to cover block type
-        EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
+        uint32_t crc =
+            crc32c::Value(block_contents.data(), block_contents.size());
+        // Extend to cover compression type
+        crc = crc32c::Extend(crc, trailer, 1);
+        checksum = crc32c::Mask(crc);
         break;
       }
       case kxxHash: {
         XXH32_state_t* const state = XXH32_createState();
         XXH32_reset(state, 0);
-        XXH32_update(state, block_contents.data(),
-                     static_cast<uint32_t>(block_contents.size()));
-        XXH32_update(state, trailer, 1);  // Extend  to cover block type
-        EncodeFixed32(trailer_without_type, XXH32_digest(state));
+        XXH32_update(state, block_contents.data(), block_contents.size());
+        // Extend to cover compression type
+        XXH32_update(state, trailer, 1);
+        checksum = XXH32_digest(state);
         XXH32_freeState(state);
         break;
       }
       case kxxHash64: {
         XXH64_state_t* const state = XXH64_createState();
         XXH64_reset(state, 0);
-        XXH64_update(state, block_contents.data(),
-                     static_cast<uint32_t>(block_contents.size()));
-        XXH64_update(state, trailer, 1);  // Extend  to cover block type
-        EncodeFixed32(
-            trailer_without_type,
-            static_cast<uint32_t>(XXH64_digest(state) &  // lower 32 bits
-                                  uint64_t{0xffffffff}));
+        XXH64_update(state, block_contents.data(), block_contents.size());
+        // Extend to cover compression type
+        XXH64_update(state, trailer, 1);
+        checksum = Lower32of64(XXH64_digest(state));
         XXH64_freeState(state);
         break;
       }
+      default:
+        assert(false);
+        break;
     }
-
-    assert(r->status.ok());
+    EncodeFixed32(trailer + 1, checksum);
+    assert(io_s.ok());
     TEST_SYNC_POINT_CALLBACK(
         "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
         static_cast<char*>(trailer));
-    r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
-    if (r->status.ok()) {
-      r->status = InsertBlockInCache(block_contents, type, handle);
+    io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
+    if (io_s.ok()) {
+      assert(s.ok());
+      s = InsertBlockInCache(block_contents, type, handle);
+      if (!s.ok()) {
+        r->SetStatus(s);
+      }
+    } else {
+      r->SetIOStatus(io_s);
     }
-    if (r->status.ok()) {
-      r->offset += block_contents.size() + kBlockTrailerSize;
+    if (s.ok() && io_s.ok()) {
+      r->set_offset(r->get_offset() + block_contents.size() +
+                    kBlockTrailerSize);
       if (r->table_options.block_align && is_data_block) {
         size_t pad_bytes =
             (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
                              (r->alignment - 1))) &
             (r->alignment - 1);
-        r->status = r->file->Pad(pad_bytes);
-        if (r->status.ok()) {
-          r->offset += pad_bytes;
+        io_s = r->file->Pad(pad_bytes);
+        if (io_s.ok()) {
+          r->set_offset(r->get_offset() + pad_bytes);
+        } else {
+          r->SetIOStatus(io_s);
+        }
+      }
+      if (r->IsParallelCompressionEnabled()) {
+        if (is_data_block) {
+          r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
+                                                   r->get_offset());
+        } else {
+          r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
         }
       }
     }
+  } else {
+    r->SetIOStatus(io_s);
+  }
+  if (!io_s.ok() && s.ok()) {
+    r->SetStatus(io_s);
   }
 }
 
-Status BlockBasedTableBuilder::status() const { return rep_->status; }
+void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
+  Rep* r = rep_;
+  ParallelCompressionRep::BlockRepSlot* slot = nullptr;
+  ParallelCompressionRep::BlockRep* block_rep = nullptr;
+  while (r->pc_rep->write_queue.pop(slot)) {
+    assert(slot != nullptr);
+    slot->Take(block_rep);
+    assert(block_rep != nullptr);
+    if (!block_rep->status.ok()) {
+      r->SetStatus(block_rep->status);
+      // Reap block so that blocked Flush() can finish
+      // if there is one, and Flush() will notice !ok() next time.
+      block_rep->status = Status::OK();
+      r->pc_rep->ReapBlock(block_rep);
+      continue;
+    }
+
+    for (size_t i = 0; i < block_rep->keys->Size(); i++) {
+      auto& key = (*block_rep->keys)[i];
+      if (r->filter_builder != nullptr) {
+        size_t ts_sz =
+            r->internal_comparator.user_comparator()->timestamp_size();
+        r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+      }
+      r->index_builder->OnKeyAdded(key);
+    }
+
+    r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
+    WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
+                  &r->pending_handle, true /* is_data_block*/);
+    if (!ok()) {
+      break;
+    }
+
+    if (r->filter_builder != nullptr) {
+      r->filter_builder->StartBlock(r->get_offset());
+    }
+    r->props.data_size = r->get_offset();
+    ++r->props.num_data_blocks;
+
+    if (block_rep->first_key_in_next_block == nullptr) {
+      r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
+                                      r->pending_handle);
+    } else {
+      Slice first_key_in_next_block =
+          Slice(*block_rep->first_key_in_next_block);
+      r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
+                                      &first_key_in_next_block,
+                                      r->pending_handle);
+    }
+
+    r->pc_rep->ReapBlock(block_rep);
+  }
+}
+
+void BlockBasedTableBuilder::StartParallelCompression() {
+  rep_->pc_rep.reset(
+      new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
+  rep_->pc_rep->compress_thread_pool.reserve(
+      rep_->compression_opts.parallel_threads);
+  for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
+    rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
+      BGWorkCompression(*(rep_->compression_ctxs[i]),
+                        rep_->verify_ctxs[i].get());
+    });
+  }
+  rep_->pc_rep->write_thread.reset(
+      new port::Thread([this] { BGWorkWriteRawBlock(); }));
+}
+
+void BlockBasedTableBuilder::StopParallelCompression() {
+  rep_->pc_rep->compress_queue.finish();
+  for (auto& thread : rep_->pc_rep->compress_thread_pool) {
+    thread.join();
+  }
+  rep_->pc_rep->write_queue.finish();
+  rep_->pc_rep->write_thread->join();
+}
+
+Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
+
+IOStatus BlockBasedTableBuilder::io_status() const {
+  return rep_->GetIOStatus();
+}
 
 static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
   BlockContents* bc = reinterpret_cast<BlockContents*>(value);
@@ -826,13 +1375,16 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
               static_cast<size_t>(end - r->compressed_cache_key_prefix));
 
     // Insert into compressed block cache.
-    block_cache_compressed->Insert(
-        key, block_contents_to_cache,
-        block_contents_to_cache->ApproximateMemoryUsage(),
-        &DeleteCachedBlockContents);
+    // How should we deal with compressed cache full?
+    block_cache_compressed
+        ->Insert(key, block_contents_to_cache,
+                 block_contents_to_cache->ApproximateMemoryUsage(),
+                 &DeleteCachedBlockContents)
+        .PermitUncheckedError();
 
     // Invalidate OS cache.
-    r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
+    r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
+        .PermitUncheckedError();
   }
   return Status::OK();
 }
@@ -878,7 +1430,7 @@ void BlockBasedTableBuilder::WriteIndexBlock(
     // HashIndexBuilder which is not multi-partition.
     assert(index_blocks.meta_blocks.empty());
   } else if (ok() && !index_builder_status.ok()) {
-    rep_->status = index_builder_status;
+    rep_->SetStatus(index_builder_status);
   }
   if (ok()) {
     for (const auto& item : index_blocks.meta_blocks) {
@@ -903,7 +1455,7 @@ void BlockBasedTableBuilder::WriteIndexBlock(
   while (ok() && s.IsIncomplete()) {
     s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
     if (!s.ok() && !s.IsIncomplete()) {
-      rep_->status = s;
+      rep_->SetStatus(s);
       return;
     }
     if (rep_->table_options.enable_index_compression) {
@@ -970,6 +1522,9 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
     rep_->props.creation_time = rep_->creation_time;
     rep_->props.oldest_key_time = rep_->oldest_key_time;
     rep_->props.file_creation_time = rep_->file_creation_time;
+    rep_->props.db_id = rep_->db_id;
+    rep_->props.db_session_id = rep_->db_session_id;
+    rep_->props.db_host_id = rep_->db_host_id;
 
     // Add basic properties
     property_block_builder.AddTableProperty(rep_->props);
@@ -1053,10 +1608,13 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
   footer.set_checksum(r->table_options.checksum);
   std::string footer_encoding;
   footer.EncodeTo(&footer_encoding);
-  assert(r->status.ok());
-  r->status = r->file->Append(footer_encoding);
-  if (r->status.ok()) {
-    r->offset += footer_encoding.size();
+  assert(ok());
+  IOStatus ios = r->file->Append(footer_encoding);
+  if (ios.ok()) {
+    r->set_offset(r->get_offset() + footer_encoding.size());
+  } else {
+    r->SetIOStatus(ios);
+    r->SetStatus(ios);
   }
 }
 
@@ -1101,26 +1659,45 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
                 r->compression_type == kZSTDNotFinalCompression));
 
   for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
-    const auto& data_block = r->data_block_and_keys_buffers[i].first;
+    auto& data_block = r->data_block_and_keys_buffers[i].first;
     auto& keys = r->data_block_and_keys_buffers[i].second;
     assert(!data_block.empty());
     assert(!keys.empty());
 
-    for (const auto& key : keys) {
-      if (r->filter_builder != nullptr) {
-        size_t ts_sz =
-            r->internal_comparator.user_comparator()->timestamp_size();
-        r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+    if (r->IsParallelCompressionEnabled()) {
+      Slice first_key_in_next_block;
+      const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
+      if (i + 1 < r->data_block_and_keys_buffers.size()) {
+        first_key_in_next_block =
+            r->data_block_and_keys_buffers[i + 1].second.front();
+      } else {
+        first_key_in_next_block_ptr = r->first_key_in_next_block;
+      }
+
+      ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
+          r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
+      assert(block_rep != nullptr);
+      r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
+                                               r->get_offset());
+      r->pc_rep->EmitBlock(block_rep);
+    } else {
+      for (const auto& key : keys) {
+        if (r->filter_builder != nullptr) {
+          size_t ts_sz =
+              r->internal_comparator.user_comparator()->timestamp_size();
+          r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+        }
+        r->index_builder->OnKeyAdded(key);
+      }
+      WriteBlock(Slice(data_block), &r->pending_handle,
+                 true /* is_data_block */);
+      if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
+        Slice first_key_in_next_block =
+            r->data_block_and_keys_buffers[i + 1].second.front();
+        Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
+        r->index_builder->AddIndexEntry(
+            &keys.back(), first_key_in_next_block_ptr, r->pending_handle);
       }
-      r->index_builder->OnKeyAdded(key);
-    }
-    WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
-    if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
-      Slice first_key_in_next_block =
-          r->data_block_and_keys_buffers[i + 1].second.front();
-      Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
-      r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
-                                      r->pending_handle);
     }
   }
   r->data_block_and_keys_buffers.clear();
@@ -1130,15 +1707,25 @@ Status BlockBasedTableBuilder::Finish() {
   Rep* r = rep_;
   assert(r->state != Rep::State::kClosed);
   bool empty_data_block = r->data_block.empty();
+  r->first_key_in_next_block = nullptr;
   Flush();
   if (r->state == Rep::State::kBuffered) {
     EnterUnbuffered();
   }
-  // To make sure properties block is able to keep the accurate size of index
-  // block, we will finish writing all index entries first.
-  if (ok() && !empty_data_block) {
-    r->index_builder->AddIndexEntry(
-        &r->last_key, nullptr /* no next data block */, r->pending_handle);
+  if (r->IsParallelCompressionEnabled()) {
+    StopParallelCompression();
+#ifndef NDEBUG
+    for (const auto& br : r->pc_rep->block_rep_buf) {
+      assert(br.status.ok());
+    }
+#endif  // !NDEBUG
+  } else {
+    // To make sure properties block is able to keep the accurate size of index
+    // block, we will finish writing all index entries first.
+    if (ok() && !empty_data_block) {
+      r->index_builder->AddIndexEntry(
+          &r->last_key, nullptr /* no next data block */, r->pending_handle);
+    }
   }
 
   // Write meta blocks, metaindex block and footer in the following order.
@@ -1164,24 +1751,43 @@ Status BlockBasedTableBuilder::Finish() {
   if (ok()) {
     WriteFooter(metaindex_block_handle, index_block_handle);
   }
-  if (r->file != nullptr) {
-    file_checksum_ = r->file->GetFileChecksum();
-  }
   r->state = Rep::State::kClosed;
-  return r->status;
+  r->SetStatus(r->CopyIOStatus());
+  Status ret_status = r->CopyStatus();
+  assert(!ret_status.ok() || io_status().ok());
+  return ret_status;
 }
 
 void BlockBasedTableBuilder::Abandon() {
   assert(rep_->state != Rep::State::kClosed);
+  if (rep_->IsParallelCompressionEnabled()) {
+    StopParallelCompression();
+  }
   rep_->state = Rep::State::kClosed;
+  rep_->CopyStatus().PermitUncheckedError();
+  rep_->CopyIOStatus().PermitUncheckedError();
 }
 
 uint64_t BlockBasedTableBuilder::NumEntries() const {
   return rep_->props.num_entries;
 }
 
+bool BlockBasedTableBuilder::IsEmpty() const {
+  return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
+}
+
 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
 
+uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
+  if (rep_->IsParallelCompressionEnabled()) {
+    // Use compression ratio so far and inflight raw bytes to estimate
+    // final SST size.
+    return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
+  } else {
+    return FileSize();
+  }
+}
+
 bool BlockBasedTableBuilder::NeedCompact() const {
   for (const auto& collector : rep_->table_properties_collectors) {
     if (collector->NeedCompact()) {
@@ -1197,16 +1803,24 @@ TableProperties BlockBasedTableBuilder::GetTableProperties() const {
     for (const auto& prop : collector->GetReadableProperties()) {
       ret.readable_properties.insert(prop);
     }
-    collector->Finish(&ret.user_collected_properties);
+    collector->Finish(&ret.user_collected_properties).PermitUncheckedError();
   }
   return ret;
 }
 
+std::string BlockBasedTableBuilder::GetFileChecksum() const {
+  if (rep_->file != nullptr) {
+    return rep_->file->GetFileChecksum();
+  } else {
+    return kUnknownFileChecksum;
+  }
+}
+
 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
   if (rep_->file != nullptr) {
     return rep_->file->GetFileChecksumFuncName();
   } else {
-    return kUnknownFileChecksumFuncName.c_str();
+    return kUnknownFileChecksumFuncName;
   }
 }