]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/block_based_table_builder.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / table / block_based_table_builder.cc
index 59c385d65aeec7855bb662ad8ceb6b3a881d1f13..479311f5b05b26e08d9e3c6e5f6a0c1e6b763383 100644 (file)
@@ -42,6 +42,7 @@
 #include "util/coding.h"
 #include "util/compression.h"
 #include "util/crc32c.h"
+#include "util/memory_allocator.h"
 #include "util/stop_watch.h"
 #include "util/string_util.h"
 #include "util/xxhash.h"
@@ -79,9 +80,11 @@ FilterBlockBuilder* CreateFilterBlockBuilder(
       // until index builder actully cuts the partition, we take the lower bound
       // as partition size.
       assert(table_opt.block_size_deviation <= 100);
-      auto partition_size = static_cast<uint32_t>(
-          ((table_opt.metadata_block_size *
-          (100 - table_opt.block_size_deviation)) + 99) / 100);
+      auto partition_size =
+          static_cast<uint32_t>(((table_opt.metadata_block_size *
+                                  (100 - table_opt.block_size_deviation)) +
+                                 99) /
+                                100);
       partition_size = std::max(partition_size, static_cast<uint32_t>(1));
       return new PartitionedFilterBlockBuilder(
           mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
@@ -100,83 +103,105 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
   return compressed_size < raw_size - (raw_size / 8u);
 }
 
-}  // namespace
-
-// format_version is the block format as defined in include/rocksdb/table.h
-Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
-                    CompressionType* type, uint32_t format_version,
-                    std::string* compressed_output) {
-  *type = compression_ctx.type();
-  if (compression_ctx.type() == kNoCompression) {
-    return raw;
-  }
-
+bool CompressBlockInternal(const Slice& raw,
+                           const CompressionInfo& compression_info,
+                           uint32_t format_version,
+                           std::string* compressed_output) {
   // Will return compressed block contents if (1) the compression method is
   // supported in this platform and (2) the compression rate is "good enough".
-  switch (compression_ctx.type()) {
+  switch (compression_info.type()) {
     case kSnappyCompression:
-      if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
-                          compressed_output) &&
-          GoodCompressionRatio(compressed_output->size(), raw.size())) {
-        return *compressed_output;
-      }
-      break;  // fall back to no compression.
+      return Snappy_Compress(compression_info, raw.data(), raw.size(),
+                             compressed_output);
     case kZlibCompression:
-      if (Zlib_Compress(
-              compression_ctx,
-              GetCompressFormatForVersion(kZlibCompression, format_version),
-              raw.data(), raw.size(), compressed_output) &&
-          GoodCompressionRatio(compressed_output->size(), raw.size())) {
-        return *compressed_output;
-      }
-      break;  // fall back to no compression.
+      return Zlib_Compress(
+          compression_info,
+          GetCompressFormatForVersion(kZlibCompression, format_version),
+          raw.data(), raw.size(), compressed_output);
     case kBZip2Compression:
-      if (BZip2_Compress(
-              compression_ctx,
-              GetCompressFormatForVersion(kBZip2Compression, format_version),
-              raw.data(), raw.size(), compressed_output) &&
-          GoodCompressionRatio(compressed_output->size(), raw.size())) {
-        return *compressed_output;
-      }
-      break;  // fall back to no compression.
+      return BZip2_Compress(
+          compression_info,
+          GetCompressFormatForVersion(kBZip2Compression, format_version),
+          raw.data(), raw.size(), compressed_output);
     case kLZ4Compression:
-      if (LZ4_Compress(
-              compression_ctx,
-              GetCompressFormatForVersion(kLZ4Compression, format_version),
-              raw.data(), raw.size(), compressed_output) &&
-          GoodCompressionRatio(compressed_output->size(), raw.size())) {
-        return *compressed_output;
-      }
-      break;  // fall back to no compression.
+      return LZ4_Compress(
+          compression_info,
+          GetCompressFormatForVersion(kLZ4Compression, format_version),
+          raw.data(), raw.size(), compressed_output);
     case kLZ4HCCompression:
-      if (LZ4HC_Compress(
-              compression_ctx,
-              GetCompressFormatForVersion(kLZ4HCCompression, format_version),
-              raw.data(), raw.size(), compressed_output) &&
-          GoodCompressionRatio(compressed_output->size(), raw.size())) {
-        return *compressed_output;
-      }
-      break;     // fall back to no compression.
+      return LZ4HC_Compress(
+          compression_info,
+          GetCompressFormatForVersion(kLZ4HCCompression, format_version),
+          raw.data(), raw.size(), compressed_output);
     case kXpressCompression:
-      if (XPRESS_Compress(raw.data(), raw.size(),
-          compressed_output) &&
-          GoodCompressionRatio(compressed_output->size(), raw.size())) {
-        return *compressed_output;
-      }
-      break;
+      return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
     case kZSTD:
     case kZSTDNotFinalCompression:
-      if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
-                        compressed_output) &&
-          GoodCompressionRatio(compressed_output->size(), raw.size())) {
-        return *compressed_output;
-      }
-      break;     // fall back to no compression.
-    default: {}  // Do not recognize this compression type
+      return ZSTD_Compress(compression_info, raw.data(), raw.size(),
+                           compressed_output);
+    default:
+      // Do not recognize this compression type
+      return false;
+  }
+}
+
+}  // namespace
+
+// format_version is the block format as defined in include/rocksdb/table.h
+Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
+                    CompressionType* type, uint32_t format_version,
+                    bool do_sample, std::string* compressed_output,
+                    std::string* sampled_output_fast,
+                    std::string* sampled_output_slow) {
+  *type = info.type();
+
+  if (info.type() == kNoCompression && !info.SampleForCompression()) {
+    return raw;
   }
 
-  // Compression method is not supported, or not good compression ratio, so just
-  // fall back to uncompressed form.
+  // If requested, we sample one in every N block with a
+  // fast and slow compression algorithm and report the stats.
+  // The users can use these stats to decide if it is worthwhile
+  // enabling compression and they also get a hint about which
+  // compression algorithm wil be beneficial.
+  if (do_sample && info.SampleForCompression() &&
+      Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
+      sampled_output_fast && sampled_output_slow) {
+    // Sampling with a fast compression algorithm
+    if (LZ4_Supported() || Snappy_Supported()) {
+      CompressionType c =
+          LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
+      CompressionContext context(c);
+      CompressionOptions options;
+      CompressionInfo info_tmp(options, context,
+                               CompressionDict::GetEmptyDict(), c,
+                               info.SampleForCompression());
+
+      CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
+    }
+
+    // Sampling with a slow but high-compression algorithm
+    if (ZSTD_Supported() || Zlib_Supported()) {
+      CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
+      CompressionContext context(c);
+      CompressionOptions options;
+      CompressionInfo info_tmp(options, context,
+                               CompressionDict::GetEmptyDict(), c,
+                               info.SampleForCompression());
+      CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
+    }
+  }
+
+  // Actually compress the data
+  if (*type != kNoCompression) {
+    if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
+        GoodCompressionRatio(compressed_output->size(), raw.size())) {
+      return *compressed_output;
+    }
+  }
+
+  // Compression method is not supported, or not good
+  // compression ratio, so just fall back to uncompressed form.
   *type = kNoCompression;
   return raw;
 }
@@ -209,14 +234,22 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
         whole_key_filtering_(whole_key_filtering),
         prefix_filtering_(prefix_filtering) {}
 
-  virtual Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
-                             uint64_t /*file_size*/) override {
+  Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
+                     uint64_t /*file_size*/) override {
     // Intentionally left blank. Have no interest in collecting stats for
     // individual key/value pairs.
     return Status::OK();
   }
 
-  virtual Status Finish(UserCollectedProperties* properties) override {
+  virtual void BlockAdd(uint64_t /* blockRawBytes */,
+                        uint64_t /* blockCompressedBytesFast */,
+                        uint64_t /* blockCompressedBytesSlow */) override {
+    // Intentionally left blank. No interest in collecting stats for
+    // blocks.
+    return;
+  }
+
+  Status Finish(UserCollectedProperties* properties) override {
     std::string val;
     PutFixed32(&val, static_cast<uint32_t>(index_type_));
     properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
@@ -228,11 +261,11 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
   }
 
   // The name of the properties collector can be used for debugging purpose.
-  virtual const char* Name() const override {
+  const char* Name() const override {
     return "BlockBasedTablePropertiesCollector";
   }
 
-  virtual UserCollectedProperties GetReadableProperties() const override {
+  UserCollectedProperties GetReadableProperties() const override {
     // Intentionally left blank.
     return UserCollectedProperties();
   }
@@ -253,6 +286,13 @@ struct BlockBasedTableBuilder::Rep {
   Status status;
   size_t alignment;
   BlockBuilder data_block;
+  // Buffers uncompressed data blocks and keys to replay later. Needed when
+  // compression dictionary is enabled so we can finalize the dictionary before
+  // compressing any data blocks.
+  // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
+  // blocks as it's redundant, but it's easier to implement for now.
+  std::vector<std::pair<std::string, std::vector<std::string>>>
+      data_block_and_keys_buffers;
   BlockBuilder range_del_block;
 
   InternalKeySliceTransform internal_prefix_transform;
@@ -260,13 +300,43 @@ struct BlockBasedTableBuilder::Rep {
   PartitionedIndexBuilder* p_index_builder_ = nullptr;
 
   std::string last_key;
-  // Compression dictionary or nullptr
-  const std::string* compression_dict;
+  CompressionType compression_type;
+  uint64_t sample_for_compression;
+  CompressionOptions compression_opts;
+  std::unique_ptr<CompressionDict> compression_dict;
   CompressionContext compression_ctx;
   std::unique_ptr<UncompressionContext> verify_ctx;
+  std::unique_ptr<UncompressionDict> verify_dict;
+
+  size_t data_begin_offset = 0;
+
   TableProperties props;
 
-  bool closed = false;  // Either Finish() or Abandon() has been called.
+  // States of the builder.
+  //
+  // - `kBuffered`: This is the initial state where zero or more data blocks are
+  //   accumulated uncompressed in-memory. From this state, call
+  //   `EnterUnbuffered()` to finalize the compression dictionary if enabled,
+  //   compress/write out any buffered blocks, and proceed to the `kUnbuffered`
+  //   state.
+  //
+  // - `kUnbuffered`: This is the state when compression dictionary is finalized
+  //   either because it wasn't enabled in the first place or it's been created
+  //   from sampling previously buffered data. In this state, blocks are simply
+  //   compressed/written out as they fill up. From this state, call `Finish()`
+  //   to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
+  //   the partially created file.
+  //
+  // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
+  //   called, so the table builder is no longer usable. We must be in this
+  //   state by the time the destructor runs.
+  enum class State {
+    kBuffered,
+    kUnbuffered,
+    kClosed,
+  };
+  State state;
+
   const bool use_delta_encoding_for_index_values;
   std::unique_ptr<FilterBlockBuilder> filter_builder;
   char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
@@ -280,6 +350,7 @@ struct BlockBasedTableBuilder::Rep {
   const std::string& column_family_name;
   uint64_t creation_time = 0;
   uint64_t oldest_key_time = 0;
+  const uint64_t target_file_size;
 
   std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
 
@@ -290,10 +361,10 @@ struct BlockBasedTableBuilder::Rep {
           int_tbl_prop_collector_factories,
       uint32_t _column_family_id, WritableFileWriter* f,
       const CompressionType _compression_type,
-      const CompressionOptions& _compression_opts,
-      const std::string* _compression_dict, const bool skip_filters,
+      const uint64_t _sample_for_compression,
+      const CompressionOptions& _compression_opts, const bool skip_filters,
       const std::string& _column_family_name, const uint64_t _creation_time,
-      const uint64_t _oldest_key_time)
+      const uint64_t _oldest_key_time, const uint64_t _target_file_size)
       : ioptions(_ioptions),
         moptions(_moptions),
         table_options(table_opt),
@@ -312,8 +383,14 @@ struct BlockBasedTableBuilder::Rep {
                    table_options.data_block_hash_table_util_ratio),
         range_del_block(1 /* block_restart_interval */),
         internal_prefix_transform(_moptions.prefix_extractor.get()),
-        compression_dict(_compression_dict),
-        compression_ctx(_compression_type, _compression_opts),
+        compression_type(_compression_type),
+        sample_for_compression(_sample_for_compression),
+        compression_opts(_compression_opts),
+        compression_dict(),
+        compression_ctx(_compression_type),
+        verify_dict(),
+        state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
+                                                     : State::kUnbuffered),
         use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
                                             !table_opt.block_align),
         compressed_cache_key_prefix_size(0),
@@ -323,7 +400,8 @@ struct BlockBasedTableBuilder::Rep {
         column_family_id(_column_family_id),
         column_family_name(_column_family_name),
         creation_time(_creation_time),
-        oldest_key_time(_oldest_key_time) {
+        oldest_key_time(_oldest_key_time),
+        target_file_size(_target_file_size) {
     if (table_options.index_type ==
         BlockBasedTableOptions::kTwoLevelIndexSearch) {
       p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
@@ -354,7 +432,7 @@ struct BlockBasedTableBuilder::Rep {
             _moptions.prefix_extractor != nullptr));
     if (table_options.verify_compression) {
       verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
-                                                compression_ctx.type()));
+                                                compression_type));
     }
   }
 
@@ -372,10 +450,10 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
         int_tbl_prop_collector_factories,
     uint32_t column_family_id, WritableFileWriter* file,
     const CompressionType compression_type,
-    const CompressionOptions& compression_opts,
-    const std::string* compression_dict, const bool skip_filters,
+    const uint64_t sample_for_compression,
+    const CompressionOptions& compression_opts, const bool skip_filters,
     const std::string& column_family_name, const uint64_t creation_time,
-    const uint64_t oldest_key_time) {
+    const uint64_t oldest_key_time, const uint64_t target_file_size) {
   BlockBasedTableOptions sanitized_table_options(table_options);
   if (sanitized_table_options.format_version == 0 &&
       sanitized_table_options.checksum != kCRC32c) {
@@ -388,11 +466,11 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
     sanitized_table_options.format_version = 1;
   }
 
-  rep_ =
-      new Rep(ioptions, moptions, sanitized_table_options, internal_comparator,
-              int_tbl_prop_collector_factories, column_family_id, file,
-              compression_type, compression_opts, compression_dict,
-              skip_filters, column_family_name, creation_time, oldest_key_time);
+  rep_ = new Rep(
+      ioptions, moptions, sanitized_table_options, internal_comparator,
+      int_tbl_prop_collector_factories, column_family_id, file,
+      compression_type, sample_for_compression, compression_opts, skip_filters,
+      column_family_name, creation_time, oldest_key_time, target_file_size);
 
   if (rep_->filter_builder != nullptr) {
     rep_->filter_builder->StartBlock(0);
@@ -406,25 +484,33 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
 }
 
 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
-  assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
+  // Catch errors where caller forgot to call Finish()
+  assert(rep_->state == Rep::State::kClosed);
   delete rep_;
 }
 
 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
   Rep* r = rep_;
-  assert(!r->closed);
+  assert(rep_->state != Rep::State::kClosed);
   if (!ok()) return;
   ValueType value_type = ExtractValueType(key);
   if (IsValueType(value_type)) {
-    if (r->props.num_entries > 0) {
+#ifndef NDEBUG
+    if (r->props.num_entries > r->props.num_range_deletions) {
       assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
     }
+#endif  // NDEBUG
 
     auto should_flush = r->flush_block_policy->Update(key, value);
     if (should_flush) {
       assert(!r->data_block.empty());
       Flush();
 
+      if (r->state == Rep::State::kBuffered &&
+          r->data_begin_offset > r->target_file_size) {
+        EnterUnbuffered();
+      }
+
       // Add item to index block.
       // We do not emit the index entry for a block until we have seen the
       // first key for the next data block.  This allows us to use shorter
@@ -433,52 +519,61 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
       // "the r" as the key for the index block entry since it is >= all
       // entries in the first block and < all entries in subsequent
       // blocks.
-      if (ok()) {
+      if (ok() && r->state == Rep::State::kUnbuffered) {
         r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
       }
     }
 
     // Note: PartitionedFilterBlockBuilder requires key being added to filter
     // builder after being added to index builder.
-    if (r->filter_builder != nullptr) {
+    if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
       r->filter_builder->Add(ExtractUserKey(key));
     }
 
     r->last_key.assign(key.data(), key.size());
     r->data_block.Add(key, value);
-    r->props.num_entries++;
-    r->props.raw_key_size += key.size();
-    r->props.raw_value_size += value.size();
-
-    r->index_builder->OnKeyAdded(key);
+    if (r->state == Rep::State::kBuffered) {
+      // Buffer keys to be replayed during `Finish()` once compression
+      // dictionary has been finalized.
+      if (r->data_block_and_keys_buffers.empty() || should_flush) {
+        r->data_block_and_keys_buffers.emplace_back();
+      }
+      r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
+    } else {
+      r->index_builder->OnKeyAdded(key);
+    }
     NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
                                       r->table_properties_collectors,
                                       r->ioptions.info_log);
 
   } else if (value_type == kTypeRangeDeletion) {
     r->range_del_block.Add(key, value);
-    ++r->props.num_range_deletions;
-    r->props.raw_key_size += key.size();
-    r->props.raw_value_size += value.size();
     NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
                                       r->table_properties_collectors,
                                       r->ioptions.info_log);
   } else {
     assert(false);
   }
+
+  r->props.num_entries++;
+  r->props.raw_key_size += key.size();
+  r->props.raw_value_size += value.size();
+  if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
+    r->props.num_deletions++;
+  } else if (value_type == kTypeRangeDeletion) {
+    r->props.num_deletions++;
+    r->props.num_range_deletions++;
+  } else if (value_type == kTypeMerge) {
+    r->props.num_merge_operands++;
+  }
 }
 
 void BlockBasedTableBuilder::Flush() {
   Rep* r = rep_;
-  assert(!r->closed);
+  assert(rep_->state != Rep::State::kClosed);
   if (!ok()) return;
   if (r->data_block.empty()) return;
   WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
-  if (r->filter_builder != nullptr) {
-    r->filter_builder->StartBlock(r->offset);
-  }
-  r->props.data_size = r->offset;
-  ++r->props.num_data_blocks;
 }
 
 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
@@ -498,42 +593,64 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
   assert(ok());
   Rep* r = rep_;
 
-  auto type = r->compression_ctx.type();
+  auto type = r->compression_type;
+  uint64_t sample_for_compression = r->sample_for_compression;
   Slice block_contents;
   bool abort_compression = false;
 
-  StopWatchNano timer(r->ioptions.env,
-    ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
+  StopWatchNano timer(
+      r->ioptions.env,
+      ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
+
+  if (r->state == Rep::State::kBuffered) {
+    assert(is_data_block);
+    assert(!r->data_block_and_keys_buffers.empty());
+    r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
+    r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
+    return;
+  }
 
   if (raw_block_contents.size() < kCompressionSizeLimit) {
-    Slice compression_dict;
-    if (is_data_block && r->compression_dict && r->compression_dict->size()) {
-      r->compression_ctx.dict() = *r->compression_dict;
-      if (r->table_options.verify_compression) {
-        assert(r->verify_ctx != nullptr);
-        r->verify_ctx->dict() = *r->compression_dict;
-      }
+    const CompressionDict* compression_dict;
+    if (!is_data_block || r->compression_dict == nullptr) {
+      compression_dict = &CompressionDict::GetEmptyDict();
     } else {
-      // Clear dictionary
-      r->compression_ctx.dict() = Slice();
-      if (r->table_options.verify_compression) {
-        assert(r->verify_ctx != nullptr);
-        r->verify_ctx->dict() = Slice();
-      }
+      compression_dict = r->compression_dict.get();
     }
-
-    block_contents =
-        CompressBlock(raw_block_contents, r->compression_ctx, &type,
-                      r->table_options.format_version, &r->compressed_output);
+    assert(compression_dict != nullptr);
+    CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
+                                     *compression_dict, type,
+                                     sample_for_compression);
+
+    std::string sampled_output_fast;
+    std::string sampled_output_slow;
+    block_contents = CompressBlock(
+        raw_block_contents, compression_info, &type,
+        r->table_options.format_version, is_data_block /* do_sample */,
+        &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
+
+    // notify collectors on block add
+    NotifyCollectTableCollectorsOnBlockAdd(
+        r->table_properties_collectors, raw_block_contents.size(),
+        sampled_output_fast.size(), sampled_output_slow.size());
 
     // Some of the compression algorithms are known to be unreliable. If
     // the verify_compression flag is set then try to de-compress the
     // compressed data and compare to the input.
     if (type != kNoCompression && r->table_options.verify_compression) {
       // Retrieve the uncompressed contents into a new buffer
+      const UncompressionDict* verify_dict;
+      if (!is_data_block || r->verify_dict == nullptr) {
+        verify_dict = &UncompressionDict::GetEmptyDict();
+      } else {
+        verify_dict = r->verify_dict.get();
+      }
+      assert(verify_dict != nullptr);
       BlockContents contents;
+      UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
+                                           r->compression_type);
       Status stat = UncompressBlockContentsForCompressionType(
-          *r->verify_ctx, block_contents.data(), block_contents.size(),
+          uncompression_info, block_contents.data(), block_contents.size(),
           &contents, r->table_options.format_version, r->ioptions);
 
       if (stat.ok()) {
@@ -565,16 +682,25 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
     block_contents = raw_block_contents;
   } else if (type != kNoCompression) {
     if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
-      MeasureTime(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
-                  timer.ElapsedNanos());
+      RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
+                            timer.ElapsedNanos());
     }
-    MeasureTime(r->ioptions.statistics, BYTES_COMPRESSED,
-                raw_block_contents.size());
+    RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
+                      raw_block_contents.size());
     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
+  } else if (type != r->compression_type) {
+    RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
   }
 
   WriteRawBlock(block_contents, type, handle, is_data_block);
   r->compressed_output.clear();
+  if (is_data_block) {
+    if (r->filter_builder != nullptr) {
+      r->filter_builder->StartBlock(r->offset);
+    }
+    r->props.data_size = r->offset;
+    ++r->props.num_data_blocks;
+  }
 }
 
 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
@@ -609,9 +735,25 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
         EncodeFixed32(trailer_without_type, XXH32_digest(xxh));
         break;
       }
+      case kxxHash64: {
+        XXH64_state_t* const state = XXH64_createState();
+        XXH64_reset(state, 0);
+        XXH64_update(state, block_contents.data(),
+                     static_cast<uint32_t>(block_contents.size()));
+        XXH64_update(state, trailer, 1);  // Extend  to cover block type
+        EncodeFixed32(
+            trailer_without_type,
+            static_cast<uint32_t>(XXH64_digest(state) &  // lower 32 bits
+                                  uint64_t{0xffffffff}));
+        XXH64_freeState(state);
+        break;
+      }
     }
 
     assert(r->status.ok());
+    TEST_SYNC_POINT_CALLBACK(
+        "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
+        static_cast<char*>(trailer));
     r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
     if (r->status.ok()) {
       r->status = InsertBlockInCache(block_contents, type, handle);
@@ -632,13 +774,11 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
   }
 }
 
-Status BlockBasedTableBuilder::status() const {
-  return rep_->status;
-}
+Status BlockBasedTableBuilder::status() const { return rep_->status; }
 
-static void DeleteCachedBlock(const Slice& /*key*/, void* value) {
-  Block* block = reinterpret_cast<Block*>(value);
-  delete block;
+static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
+  BlockContents* bc = reinterpret_cast<BlockContents*>(value);
+  delete bc;
 }
 
 //
@@ -651,28 +791,31 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
   Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
 
   if (type != kNoCompression && block_cache_compressed != nullptr) {
-
     size_t size = block_contents.size();
 
-    std::unique_ptr<char[]> ubuf(new char[size + 1]);
+    auto ubuf =
+        AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
     memcpy(ubuf.get(), block_contents.data(), size);
     ubuf[size] = type;
 
-    BlockContents results(std::move(ubuf), size, true, type);
-
-    Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber);
+    BlockContents* block_contents_to_cache =
+        new BlockContents(std::move(ubuf), size);
+#ifndef NDEBUG
+    block_contents_to_cache->is_raw_block = true;
+#endif  // NDEBUG
 
     // make cache key by appending the file offset to the cache prefix id
     char* end = EncodeVarint64(
-                  r->compressed_cache_key_prefix +
-                  r->compressed_cache_key_prefix_size,
-                  handle->offset());
-    Slice key(r->compressed_cache_key_prefix, static_cast<size_t>
-              (end - r->compressed_cache_key_prefix));
+        r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
+        handle->offset());
+    Slice key(r->compressed_cache_key_prefix,
+              static_cast<size_t>(end - r->compressed_cache_key_prefix));
 
     // Insert into compressed block cache.
-    block_cache_compressed->Insert(key, block, block->ApproximateMemoryUsage(),
-                                   &DeleteCachedBlock);
+    block_cache_compressed->Insert(
+        key, block_contents_to_cache,
+        block_contents_to_cache->ApproximateMemoryUsage(),
+        &DeleteCachedBlockContents);
 
     // Invalidate OS cache.
     r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
@@ -780,7 +923,9 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
             ? rep_->ioptions.merge_operator->Name()
             : "nullptr";
     rep_->props.compression_name =
-        CompressionTypeToString(rep_->compression_ctx.type());
+        CompressionTypeToString(rep_->compression_type);
+    rep_->props.compression_options =
+        CompressionOptionsToString(rep_->compression_opts);
     rep_->props.prefix_extractor_name =
         rep_->moptions.prefix_extractor != nullptr
             ? rep_->moptions.prefix_extractor->Name()
@@ -823,17 +968,36 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
                   &properties_block_handle);
   }
   if (ok()) {
+#ifndef NDEBUG
+    {
+      uint64_t props_block_offset = properties_block_handle.offset();
+      uint64_t props_block_size = properties_block_handle.size();
+      TEST_SYNC_POINT_CALLBACK(
+          "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
+          &props_block_offset);
+      TEST_SYNC_POINT_CALLBACK(
+          "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
+          &props_block_size);
+    }
+#endif  // !NDEBUG
     meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
   }
 }
 
 void BlockBasedTableBuilder::WriteCompressionDictBlock(
     MetaIndexBuilder* meta_index_builder) {
-  if (rep_->compression_dict && rep_->compression_dict->size()) {
+  if (rep_->compression_dict != nullptr &&
+      rep_->compression_dict->GetRawDict().size()) {
     BlockHandle compression_dict_block_handle;
     if (ok()) {
-      WriteRawBlock(*rep_->compression_dict, kNoCompression,
+      WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
                     &compression_dict_block_handle);
+#ifndef NDEBUG
+      Slice compression_dict = rep_->compression_dict->GetRawDict();
+      TEST_SYNC_POINT_CALLBACK(
+          "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
+          &compression_dict);
+#endif  // NDEBUG
     }
     if (ok()) {
       meta_index_builder->Add(kCompressionDictBlock,
@@ -852,13 +1016,106 @@ void BlockBasedTableBuilder::WriteRangeDelBlock(
   }
 }
 
+void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
+                                         BlockHandle& index_block_handle) {
+  Rep* r = rep_;
+  // No need to write out new footer if we're using default checksum.
+  // We're writing legacy magic number because we want old versions of RocksDB
+  // be able to read files generated with new release (just in case if
+  // somebody wants to roll back after an upgrade)
+  // TODO(icanadi) at some point in the future, when we're absolutely sure
+  // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
+  // number and always write new table files with new magic number
+  bool legacy = (r->table_options.format_version == 0);
+  // this is guaranteed by BlockBasedTableBuilder's constructor
+  assert(r->table_options.checksum == kCRC32c ||
+         r->table_options.format_version != 0);
+  Footer footer(
+      legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
+      r->table_options.format_version);
+  footer.set_metaindex_handle(metaindex_block_handle);
+  footer.set_index_handle(index_block_handle);
+  footer.set_checksum(r->table_options.checksum);
+  std::string footer_encoding;
+  footer.EncodeTo(&footer_encoding);
+  assert(r->status.ok());
+  r->status = r->file->Append(footer_encoding);
+  if (r->status.ok()) {
+    r->offset += footer_encoding.size();
+  }
+}
+
+void BlockBasedTableBuilder::EnterUnbuffered() {
+  Rep* r = rep_;
+  assert(r->state == Rep::State::kBuffered);
+  r->state = Rep::State::kUnbuffered;
+  const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
+                                  ? r->compression_opts.zstd_max_train_bytes
+                                  : r->compression_opts.max_dict_bytes;
+  Random64 generator{r->creation_time};
+  std::string compression_dict_samples;
+  std::vector<size_t> compression_dict_sample_lens;
+  if (!r->data_block_and_keys_buffers.empty()) {
+    while (compression_dict_samples.size() < kSampleBytes) {
+      size_t rand_idx =
+          generator.Uniform(r->data_block_and_keys_buffers.size());
+      size_t copy_len =
+          std::min(kSampleBytes - compression_dict_samples.size(),
+                   r->data_block_and_keys_buffers[rand_idx].first.size());
+      compression_dict_samples.append(
+          r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
+      compression_dict_sample_lens.emplace_back(copy_len);
+    }
+  }
+
+  // final data block flushed, now we can generate dictionary from the samples.
+  // OK if compression_dict_samples is empty, we'll just get empty dictionary.
+  std::string dict;
+  if (r->compression_opts.zstd_max_train_bytes > 0) {
+    dict = ZSTD_TrainDictionary(compression_dict_samples,
+                                compression_dict_sample_lens,
+                                r->compression_opts.max_dict_bytes);
+  } else {
+    dict = std::move(compression_dict_samples);
+  }
+  r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
+                                                r->compression_opts.level));
+  r->verify_dict.reset(new UncompressionDict(
+      dict, r->compression_type == kZSTD ||
+                r->compression_type == kZSTDNotFinalCompression));
+
+  for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
+    const auto& data_block = r->data_block_and_keys_buffers[i].first;
+    auto& keys = r->data_block_and_keys_buffers[i].second;
+    assert(!data_block.empty());
+    assert(!keys.empty());
+
+    for (const auto& key : keys) {
+      if (r->filter_builder != nullptr) {
+        r->filter_builder->Add(ExtractUserKey(key));
+      }
+      r->index_builder->OnKeyAdded(key);
+    }
+    WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
+    if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
+      Slice first_key_in_next_block =
+          r->data_block_and_keys_buffers[i + 1].second.front();
+      Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
+      r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
+                                      r->pending_handle);
+    }
+  }
+  r->data_block_and_keys_buffers.clear();
+}
+
 Status BlockBasedTableBuilder::Finish() {
   Rep* r = rep_;
+  assert(r->state != Rep::State::kClosed);
   bool empty_data_block = r->data_block.empty();
   Flush();
-  assert(!r->closed);
-  r->closed = true;
-
+  if (r->state == Rep::State::kBuffered) {
+    EnterUnbuffered();
+  }
   // To make sure properties block is able to keep the accurate size of index
   // block, we will finish writing all index entries first.
   if (ok() && !empty_data_block) {
@@ -866,13 +1123,14 @@ Status BlockBasedTableBuilder::Finish() {
         &r->last_key, nullptr /* no next data block */, r->pending_handle);
   }
 
-  // Write meta blocks and metaindex block with the following order.
+  // Write meta blocks, metaindex block and footer in the following order.
   //    1. [meta block: filter]
   //    2. [meta block: index]
   //    3. [meta block: compression dictionary]
   //    4. [meta block: range deletion tombstone]
   //    5. [meta block: properties]
   //    6. [metaindex block]
+  //    7. Footer
   BlockHandle metaindex_block_handle, index_block_handle;
   MetaIndexBuilder meta_index_builder;
   WriteFilterBlock(&meta_index_builder);
@@ -885,51 +1143,23 @@ Status BlockBasedTableBuilder::Finish() {
     WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
                   &metaindex_block_handle);
   }
-
-  // Write footer
   if (ok()) {
-    // No need to write out new footer if we're using default checksum.
-    // We're writing legacy magic number because we want old versions of RocksDB
-    // be able to read files generated with new release (just in case if
-    // somebody wants to roll back after an upgrade)
-    // TODO(icanadi) at some point in the future, when we're absolutely sure
-    // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
-    // number and always write new table files with new magic number
-    bool legacy = (r->table_options.format_version == 0);
-    // this is guaranteed by BlockBasedTableBuilder's constructor
-    assert(r->table_options.checksum == kCRC32c ||
-           r->table_options.format_version != 0);
-    Footer footer(legacy ? kLegacyBlockBasedTableMagicNumber
-                         : kBlockBasedTableMagicNumber,
-                  r->table_options.format_version);
-    footer.set_metaindex_handle(metaindex_block_handle);
-    footer.set_index_handle(index_block_handle);
-    footer.set_checksum(r->table_options.checksum);
-    std::string footer_encoding;
-    footer.EncodeTo(&footer_encoding);
-    assert(r->status.ok());
-    r->status = r->file->Append(footer_encoding);
-    if (r->status.ok()) {
-      r->offset += footer_encoding.size();
-    }
+    WriteFooter(metaindex_block_handle, index_block_handle);
   }
-
+  r->state = Rep::State::kClosed;
   return r->status;
 }
 
 void BlockBasedTableBuilder::Abandon() {
-  Rep* r = rep_;
-  assert(!r->closed);
-  r->closed = true;
+  assert(rep_->state != Rep::State::kClosed);
+  rep_->state = Rep::State::kClosed;
 }
 
 uint64_t BlockBasedTableBuilder::NumEntries() const {
   return rep_->props.num_entries;
 }
 
-uint64_t BlockBasedTableBuilder::FileSize() const {
-  return rep_->offset;
-}
+uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
 
 bool BlockBasedTableBuilder::NeedCompact() const {
   for (const auto& collector : rep_->table_properties_collectors) {