]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/util/compression.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / compression.h
index 53e977c88b51e67f2670f0828f9e5990724cf677..0d4febcfb2cfb08a4e2c3f98538457702469f48a 100644 (file)
 
 #if defined(ZSTD)
 #include <zstd.h>
-#if ZSTD_VERSION_NUMBER >= 10103  // v1.1.3+
+// v1.1.3+
+#if ZSTD_VERSION_NUMBER >= 10103
 #include <zdict.h>
 #endif  // ZSTD_VERSION_NUMBER >= 10103
+// v1.4.0+
+#if ZSTD_VERSION_NUMBER >= 10400
+#define ZSTD_STREAMING
+#endif  // ZSTD_VERSION_NUMBER >= 10400
 namespace ROCKSDB_NAMESPACE {
 // Need this for the context allocation override
 // On windows we need to do this explicitly
@@ -83,12 +88,11 @@ class ZSTDUncompressCachedData {
   // Init from cache
   ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
   ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
-  ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT
+  ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
       : ZSTDUncompressCachedData() {
     *this = std::move(o);
   }
-  ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o)
-      ROCKSDB_NOEXCEPT {
+  ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
     assert(zstd_ctx_ == nullptr);
     std::swap(zstd_ctx_, o.zstd_ctx_);
     std::swap(cache_idx_, o.cache_idx_);
@@ -134,14 +138,14 @@ class ZSTDUncompressCachedData {
   ZSTDUncompressCachedData() {}
   ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {}
   ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
-  ZSTDUncompressCachedData(ZSTDUncompressCachedData&&)
-      ROCKSDB_NOEXCEPT = default;
-  ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&)
-      ROCKSDB_NOEXCEPT = default;
+  ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) noexcept = default;
+  ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) noexcept =
+      default;
   ZSTDNativeContext Get() const { return nullptr; }
   int64_t GetCacheIndex() const { return -1; }
   void CreateIfNeeded() {}
   void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {}
+
  private:
   void ignore_padding__() { padding = nullptr; }
 };
@@ -514,6 +518,26 @@ inline bool ZSTDNotFinal_Supported() {
 #endif
 }
 
+inline bool ZSTD_Streaming_Supported() {
+#if defined(ZSTD) && defined(ZSTD_STREAMING)
+  return true;
+#else
+  return false;
+#endif
+}
+
+inline bool StreamingCompressionTypeSupported(
+    CompressionType compression_type) {
+  switch (compression_type) {
+    case kNoCompression:
+      return true;
+    case kZSTD:
+      return ZSTD_Streaming_Supported();
+    default:
+      return false;
+  }
+}
+
 inline bool CompressionTypeSupported(CompressionType compression_type) {
   switch (compression_type) {
     case kNoCompression:
@@ -610,22 +634,28 @@ inline std::string CompressionOptionsToString(
   std::string result;
   result.reserve(512);
   result.append("window_bits=")
-      .append(ToString(compression_options.window_bits))
+      .append(std::to_string(compression_options.window_bits))
       .append("; ");
   result.append("level=")
-      .append(ToString(compression_options.level))
+      .append(std::to_string(compression_options.level))
       .append("; ");
   result.append("strategy=")
-      .append(ToString(compression_options.strategy))
+      .append(std::to_string(compression_options.strategy))
       .append("; ");
   result.append("max_dict_bytes=")
-      .append(ToString(compression_options.max_dict_bytes))
+      .append(std::to_string(compression_options.max_dict_bytes))
       .append("; ");
   result.append("zstd_max_train_bytes=")
-      .append(ToString(compression_options.zstd_max_train_bytes))
+      .append(std::to_string(compression_options.zstd_max_train_bytes))
       .append("; ");
   result.append("enabled=")
-      .append(ToString(compression_options.enabled))
+      .append(std::to_string(compression_options.enabled))
+      .append("; ");
+  result.append("max_dict_buffer_bytes=")
+      .append(std::to_string(compression_options.max_dict_buffer_bytes))
+      .append("; ");
+  result.append("use_zstd_dict_trainer=")
+      .append(std::to_string(compression_options.use_zstd_dict_trainer))
       .append("; ");
   return result;
 }
@@ -721,9 +751,6 @@ inline bool Zlib_Compress(const CompressionInfo& info,
     output_header_len = compression::PutDecompressedSizeInfo(
         output, static_cast<uint32_t>(length));
   }
-  // Resize output to be the plain data length.
-  // This may not be big enough if the compression actually expands data.
-  output->resize(output_header_len + length);
 
   // The memLevel parameter specifies how much memory should be allocated for
   // the internal compression state.
@@ -757,12 +784,17 @@ inline bool Zlib_Compress(const CompressionInfo& info,
     }
   }
 
+  // Get an upper bound on the compressed size.
+  size_t upper_bound =
+      deflateBound(&_stream, static_cast<unsigned long>(length));
+  output->resize(output_header_len + upper_bound);
+
   // Compress the input, and put compressed data in output.
   _stream.next_in = (Bytef*)input;
   _stream.avail_in = static_cast<unsigned int>(length);
 
   // Initialize the output size.
-  _stream.avail_out = static_cast<unsigned int>(length);
+  _stream.avail_out = static_cast<unsigned int>(upper_bound);
   _stream.next_out = reinterpret_cast<Bytef*>(&(*output)[output_header_len]);
 
   bool compressed = false;
@@ -1133,7 +1165,11 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
     if (input_length < 8) {
       return nullptr;
     }
-    memcpy(&output_len, input_data, sizeof(output_len));
+    if (port::kLittleEndian) {
+      memcpy(&output_len, input_data, sizeof(output_len));
+    } else {
+      memcpy(&output_len, input_data + 4, sizeof(output_len));
+    }
     input_length -= 8;
     input_data += 8;
   }
@@ -1221,8 +1257,10 @@ inline bool LZ4HC_Compress(const CompressionInfo& info,
   const char* compression_dict_data =
       compression_dict.size() > 0 ? compression_dict.data() : nullptr;
   size_t compression_dict_size = compression_dict.size();
-  LZ4_loadDictHC(stream, compression_dict_data,
-                 static_cast<int>(compression_dict_size));
+  if (compression_dict_data != nullptr) {
+    LZ4_loadDictHC(stream, compression_dict_data,
+                   static_cast<int>(compression_dict_size));
+  }
 
 #if LZ4_VERSION_NUMBER >= 10700  // r129+
   outlen =
@@ -1448,6 +1486,52 @@ inline std::string ZSTD_TrainDictionary(const std::string& samples,
 #endif  // ZSTD_VERSION_NUMBER >= 10103
 }
 
+inline bool ZSTD_FinalizeDictionarySupported() {
+#ifdef ZSTD
+  // ZDICT_finalizeDictionary API is stable since v1.4.5
+  return (ZSTD_versionNumber() >= 10405);
+#else
+  return false;
+#endif
+}
+
+inline std::string ZSTD_FinalizeDictionary(
+    const std::string& samples, const std::vector<size_t>& sample_lens,
+    size_t max_dict_bytes, int level) {
+  // ZDICT_finalizeDictionary is stable since version v1.4.5
+#if ZSTD_VERSION_NUMBER >= 10405  // v1.4.5+
+  assert(samples.empty() == sample_lens.empty());
+  if (samples.empty()) {
+    return "";
+  }
+  if (level == CompressionOptions::kDefaultCompressionLevel) {
+    // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
+    // https://github.com/facebook/zstd/issues/1148
+    level = 3;
+  }
+  std::string dict_data(max_dict_bytes, '\0');
+  size_t dict_len = ZDICT_finalizeDictionary(
+      dict_data.data(), max_dict_bytes, samples.data(),
+      std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
+      samples.data(), sample_lens.data(),
+      static_cast<unsigned>(sample_lens.size()),
+      {level, 0 /* notificationLevel */, 0 /* dictID */});
+  if (ZDICT_isError(dict_len)) {
+    return "";
+  } else {
+    assert(dict_len <= max_dict_bytes);
+    dict_data.resize(dict_len);
+    return dict_data;
+  }
+#else   // up to v1.4.4
+  (void)samples;
+  (void)sample_lens;
+  (void)max_dict_bytes;
+  (void)level;
+  return "";
+#endif  // ZSTD_VERSION_NUMBER >= 10405
+}
+
 inline bool CompressData(const Slice& raw,
                          const CompressionInfo& compression_info,
                          uint32_t compress_format_version,
@@ -1526,4 +1610,177 @@ inline CacheAllocationPtr UncompressData(
   }
 }
 
+// Records the compression type for subsequent WAL records.
+class CompressionTypeRecord {
+ public:
+  explicit CompressionTypeRecord(CompressionType compression_type)
+      : compression_type_(compression_type) {}
+
+  CompressionType GetCompressionType() const { return compression_type_; }
+
+  inline void EncodeTo(std::string* dst) const {
+    assert(dst != nullptr);
+    PutFixed32(dst, compression_type_);
+  }
+
+  inline Status DecodeFrom(Slice* src) {
+    constexpr char class_name[] = "CompressionTypeRecord";
+
+    uint32_t val;
+    if (!GetFixed32(src, &val)) {
+      return Status::Corruption(class_name,
+                                "Error decoding WAL compression type");
+    }
+    CompressionType compression_type = static_cast<CompressionType>(val);
+    if (!StreamingCompressionTypeSupported(compression_type)) {
+      return Status::Corruption(class_name,
+                                "WAL compression type not supported");
+    }
+    compression_type_ = compression_type;
+    return Status::OK();
+  }
+
+  inline std::string DebugString() const {
+    return "compression_type: " + CompressionTypeToString(compression_type_);
+  }
+
+ private:
+  CompressionType compression_type_;
+};
+
+// Base class to implement compression for a stream of buffers.
+// Instantiate an implementation of the class using Create() with the
+// compression type and use Compress() repeatedly.
+// The output buffer needs to be at least max_output_len.
+// Call Reset() in between frame boundaries or in case of an error.
+// NOTE: This class is not thread safe.
+class StreamingCompress {
+ public:
+  StreamingCompress(CompressionType compression_type,
+                    const CompressionOptions& opts,
+                    uint32_t compress_format_version, size_t max_output_len)
+      : compression_type_(compression_type),
+        opts_(opts),
+        compress_format_version_(compress_format_version),
+        max_output_len_(max_output_len) {}
+  virtual ~StreamingCompress() = default;
+  // compress should be called repeatedly with the same input till the method
+  // returns 0
+  // Parameters:
+  // input - buffer to compress
+  // input_size - size of input buffer
+  // output - compressed buffer allocated by caller, should be at least
+  // max_output_len
+  // output_size - size of the output buffer
+  // Returns -1 for errors, the remaining size of the input buffer that needs to
+  // be compressed
+  virtual int Compress(const char* input, size_t input_size, char* output,
+                       size_t* output_pos) = 0;
+  // static method to create object of a class inherited from StreamingCompress
+  // based on the actual compression type.
+  static StreamingCompress* Create(CompressionType compression_type,
+                                   const CompressionOptions& opts,
+                                   uint32_t compress_format_version,
+                                   size_t max_output_len);
+  virtual void Reset() = 0;
+
+ protected:
+  const CompressionType compression_type_;
+  const CompressionOptions opts_;
+  const uint32_t compress_format_version_;
+  const size_t max_output_len_;
+};
+
+// Base class to uncompress a stream of compressed buffers.
+// Instantiate an implementation of the class using Create() with the
+// compression type and use Uncompress() repeatedly.
+// The output buffer needs to be at least max_output_len.
+// Call Reset() in between frame boundaries or in case of an error.
+// NOTE: This class is not thread safe.
+class StreamingUncompress {
+ public:
+  StreamingUncompress(CompressionType compression_type,
+                      uint32_t compress_format_version, size_t max_output_len)
+      : compression_type_(compression_type),
+        compress_format_version_(compress_format_version),
+        max_output_len_(max_output_len) {}
+  virtual ~StreamingUncompress() = default;
+  // uncompress should be called again with the same input if output_size is
+  // equal to max_output_len or with the next input fragment.
+  // Parameters:
+  // input - buffer to uncompress
+  // input_size - size of input buffer
+  // output - uncompressed buffer allocated by caller, should be at least
+  // max_output_len
+  // output_size - size of the output buffer
+  // Returns -1 for errors, remaining input to be processed otherwise.
+  virtual int Uncompress(const char* input, size_t input_size, char* output,
+                         size_t* output_pos) = 0;
+  static StreamingUncompress* Create(CompressionType compression_type,
+                                     uint32_t compress_format_version,
+                                     size_t max_output_len);
+  virtual void Reset() = 0;
+
+ protected:
+  CompressionType compression_type_;
+  uint32_t compress_format_version_;
+  size_t max_output_len_;
+};
+
+class ZSTDStreamingCompress final : public StreamingCompress {
+ public:
+  explicit ZSTDStreamingCompress(const CompressionOptions& opts,
+                                 uint32_t compress_format_version,
+                                 size_t max_output_len)
+      : StreamingCompress(kZSTD, opts, compress_format_version,
+                          max_output_len) {
+#ifdef ZSTD_STREAMING
+    cctx_ = ZSTD_createCCtx();
+    // Each compressed frame will have a checksum
+    ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
+    assert(cctx_ != nullptr);
+    input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
+#endif
+  }
+  ~ZSTDStreamingCompress() override {
+#ifdef ZSTD_STREAMING
+    ZSTD_freeCCtx(cctx_);
+#endif
+  }
+  int Compress(const char* input, size_t input_size, char* output,
+               size_t* output_pos) override;
+  void Reset() override;
+#ifdef ZSTD_STREAMING
+  ZSTD_CCtx* cctx_;
+  ZSTD_inBuffer input_buffer_;
+#endif
+};
+
+class ZSTDStreamingUncompress final : public StreamingUncompress {
+ public:
+  explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
+                                   size_t max_output_len)
+      : StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
+#ifdef ZSTD_STREAMING
+    dctx_ = ZSTD_createDCtx();
+    assert(dctx_ != nullptr);
+    input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
+#endif
+  }
+  ~ZSTDStreamingUncompress() override {
+#ifdef ZSTD_STREAMING
+    ZSTD_freeDCtx(dctx_);
+#endif
+  }
+  int Uncompress(const char* input, size_t input_size, char* output,
+                 size_t* output_size) override;
+  void Reset() override;
+
+ private:
+#ifdef ZSTD_STREAMING
+  ZSTD_DCtx* dctx_;
+  ZSTD_inBuffer input_buffer_;
+#endif
+};
+
 }  // namespace ROCKSDB_NAMESPACE