#if defined(ZSTD)
#include <zstd.h>
-#if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
+// v1.1.3+
+#if ZSTD_VERSION_NUMBER >= 10103
#include <zdict.h>
#endif // ZSTD_VERSION_NUMBER >= 10103
+// v1.4.0+
+#if ZSTD_VERSION_NUMBER >= 10400
+#define ZSTD_STREAMING
+#endif // ZSTD_VERSION_NUMBER >= 10400
namespace ROCKSDB_NAMESPACE {
// Need this for the context allocation override
// On windows we need to do this explicitly
// Init from cache
ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
- ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT
+ ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
: ZSTDUncompressCachedData() {
*this = std::move(o);
}
- ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o)
- ROCKSDB_NOEXCEPT {
+ ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
assert(zstd_ctx_ == nullptr);
std::swap(zstd_ctx_, o.zstd_ctx_);
std::swap(cache_idx_, o.cache_idx_);
ZSTDUncompressCachedData() {}
ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {}
ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
- ZSTDUncompressCachedData(ZSTDUncompressCachedData&&)
- ROCKSDB_NOEXCEPT = default;
- ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&)
- ROCKSDB_NOEXCEPT = default;
+ ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) noexcept = default;
+ ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) noexcept =
+ default;
ZSTDNativeContext Get() const { return nullptr; }
int64_t GetCacheIndex() const { return -1; }
void CreateIfNeeded() {}
void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {}
+
private:
void ignore_padding__() { padding = nullptr; }
};
#endif
}
+inline bool ZSTD_Streaming_Supported() {
+#if defined(ZSTD) && defined(ZSTD_STREAMING)
+ return true;
+#else
+ return false;
+#endif
+}
+
+inline bool StreamingCompressionTypeSupported(
+ CompressionType compression_type) {
+ switch (compression_type) {
+ case kNoCompression:
+ return true;
+ case kZSTD:
+ return ZSTD_Streaming_Supported();
+ default:
+ return false;
+ }
+}
+
inline bool CompressionTypeSupported(CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
std::string result;
result.reserve(512);
result.append("window_bits=")
- .append(ToString(compression_options.window_bits))
+ .append(std::to_string(compression_options.window_bits))
.append("; ");
result.append("level=")
- .append(ToString(compression_options.level))
+ .append(std::to_string(compression_options.level))
.append("; ");
result.append("strategy=")
- .append(ToString(compression_options.strategy))
+ .append(std::to_string(compression_options.strategy))
.append("; ");
result.append("max_dict_bytes=")
- .append(ToString(compression_options.max_dict_bytes))
+ .append(std::to_string(compression_options.max_dict_bytes))
.append("; ");
result.append("zstd_max_train_bytes=")
- .append(ToString(compression_options.zstd_max_train_bytes))
+ .append(std::to_string(compression_options.zstd_max_train_bytes))
.append("; ");
result.append("enabled=")
- .append(ToString(compression_options.enabled))
+ .append(std::to_string(compression_options.enabled))
+ .append("; ");
+ result.append("max_dict_buffer_bytes=")
+ .append(std::to_string(compression_options.max_dict_buffer_bytes))
+ .append("; ");
+ result.append("use_zstd_dict_trainer=")
+ .append(std::to_string(compression_options.use_zstd_dict_trainer))
.append("; ");
return result;
}
output_header_len = compression::PutDecompressedSizeInfo(
output, static_cast<uint32_t>(length));
}
- // Resize output to be the plain data length.
- // This may not be big enough if the compression actually expands data.
- output->resize(output_header_len + length);
// The memLevel parameter specifies how much memory should be allocated for
// the internal compression state.
}
}
+ // Get an upper bound on the compressed size.
+ size_t upper_bound =
+ deflateBound(&_stream, static_cast<unsigned long>(length));
+ output->resize(output_header_len + upper_bound);
+
// Compress the input, and put compressed data in output.
_stream.next_in = (Bytef*)input;
_stream.avail_in = static_cast<unsigned int>(length);
// Initialize the output size.
- _stream.avail_out = static_cast<unsigned int>(length);
+ _stream.avail_out = static_cast<unsigned int>(upper_bound);
_stream.next_out = reinterpret_cast<Bytef*>(&(*output)[output_header_len]);
bool compressed = false;
if (input_length < 8) {
return nullptr;
}
- memcpy(&output_len, input_data, sizeof(output_len));
+ if (port::kLittleEndian) {
+ memcpy(&output_len, input_data, sizeof(output_len));
+ } else {
+ memcpy(&output_len, input_data + 4, sizeof(output_len));
+ }
input_length -= 8;
input_data += 8;
}
const char* compression_dict_data =
compression_dict.size() > 0 ? compression_dict.data() : nullptr;
size_t compression_dict_size = compression_dict.size();
- LZ4_loadDictHC(stream, compression_dict_data,
- static_cast<int>(compression_dict_size));
+ if (compression_dict_data != nullptr) {
+ LZ4_loadDictHC(stream, compression_dict_data,
+ static_cast<int>(compression_dict_size));
+ }
#if LZ4_VERSION_NUMBER >= 10700 // r129+
outlen =
#endif // ZSTD_VERSION_NUMBER >= 10103
}
+inline bool ZSTD_FinalizeDictionarySupported() {
+#ifdef ZSTD
+ // ZDICT_finalizeDictionary API is stable since v1.4.5
+ return (ZSTD_versionNumber() >= 10405);
+#else
+ return false;
+#endif
+}
+
+inline std::string ZSTD_FinalizeDictionary(
+ const std::string& samples, const std::vector<size_t>& sample_lens,
+ size_t max_dict_bytes, int level) {
+ // ZDICT_finalizeDictionary is stable since version v1.4.5
+#if ZSTD_VERSION_NUMBER >= 10405 // v1.4.5+
+ assert(samples.empty() == sample_lens.empty());
+ if (samples.empty()) {
+ return "";
+ }
+ if (level == CompressionOptions::kDefaultCompressionLevel) {
+ // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
+ // https://github.com/facebook/zstd/issues/1148
+ level = 3;
+ }
+ std::string dict_data(max_dict_bytes, '\0');
+ size_t dict_len = ZDICT_finalizeDictionary(
+ dict_data.data(), max_dict_bytes, samples.data(),
+ std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
+ samples.data(), sample_lens.data(),
+ static_cast<unsigned>(sample_lens.size()),
+ {level, 0 /* notificationLevel */, 0 /* dictID */});
+ if (ZDICT_isError(dict_len)) {
+ return "";
+ } else {
+ assert(dict_len <= max_dict_bytes);
+ dict_data.resize(dict_len);
+ return dict_data;
+ }
+#else // up to v1.4.4
+ (void)samples;
+ (void)sample_lens;
+ (void)max_dict_bytes;
+ (void)level;
+ return "";
+#endif // ZSTD_VERSION_NUMBER >= 10405
+}
+
inline bool CompressData(const Slice& raw,
const CompressionInfo& compression_info,
uint32_t compress_format_version,
}
}
+// Records the compression type for subsequent WAL records.
+class CompressionTypeRecord {
+ public:
+ explicit CompressionTypeRecord(CompressionType compression_type)
+ : compression_type_(compression_type) {}
+
+ CompressionType GetCompressionType() const { return compression_type_; }
+
+ inline void EncodeTo(std::string* dst) const {
+ assert(dst != nullptr);
+ PutFixed32(dst, compression_type_);
+ }
+
+ inline Status DecodeFrom(Slice* src) {
+ constexpr char class_name[] = "CompressionTypeRecord";
+
+ uint32_t val;
+ if (!GetFixed32(src, &val)) {
+ return Status::Corruption(class_name,
+ "Error decoding WAL compression type");
+ }
+ CompressionType compression_type = static_cast<CompressionType>(val);
+ if (!StreamingCompressionTypeSupported(compression_type)) {
+ return Status::Corruption(class_name,
+ "WAL compression type not supported");
+ }
+ compression_type_ = compression_type;
+ return Status::OK();
+ }
+
+ inline std::string DebugString() const {
+ return "compression_type: " + CompressionTypeToString(compression_type_);
+ }
+
+ private:
+ CompressionType compression_type_;
+};
+
+// Base class to implement compression for a stream of buffers.
+// Instantiate an implementation of the class using Create() with the
+// compression type and use Compress() repeatedly.
+// The output buffer needs to be at least max_output_len.
+// Call Reset() in between frame boundaries or in case of an error.
+// NOTE: This class is not thread safe.
+class StreamingCompress {
+ public:
+ StreamingCompress(CompressionType compression_type,
+ const CompressionOptions& opts,
+ uint32_t compress_format_version, size_t max_output_len)
+ : compression_type_(compression_type),
+ opts_(opts),
+ compress_format_version_(compress_format_version),
+ max_output_len_(max_output_len) {}
+ virtual ~StreamingCompress() = default;
+ // compress should be called repeatedly with the same input till the method
+ // returns 0
+ // Parameters:
+ // input - buffer to compress
+ // input_size - size of input buffer
+ // output - compressed buffer allocated by caller, should be at least
+ // max_output_len
+ // output_size - size of the output buffer
+ // Returns -1 for errors, the remaining size of the input buffer that needs to
+ // be compressed
+ virtual int Compress(const char* input, size_t input_size, char* output,
+ size_t* output_pos) = 0;
+ // static method to create object of a class inherited from StreamingCompress
+ // based on the actual compression type.
+ static StreamingCompress* Create(CompressionType compression_type,
+ const CompressionOptions& opts,
+ uint32_t compress_format_version,
+ size_t max_output_len);
+ virtual void Reset() = 0;
+
+ protected:
+ const CompressionType compression_type_;
+ const CompressionOptions opts_;
+ const uint32_t compress_format_version_;
+ const size_t max_output_len_;
+};
+
+// Base class to uncompress a stream of compressed buffers.
+// Instantiate an implementation of the class using Create() with the
+// compression type and use Uncompress() repeatedly.
+// The output buffer needs to be at least max_output_len.
+// Call Reset() in between frame boundaries or in case of an error.
+// NOTE: This class is not thread safe.
+class StreamingUncompress {
+ public:
+ StreamingUncompress(CompressionType compression_type,
+ uint32_t compress_format_version, size_t max_output_len)
+ : compression_type_(compression_type),
+ compress_format_version_(compress_format_version),
+ max_output_len_(max_output_len) {}
+ virtual ~StreamingUncompress() = default;
+ // uncompress should be called again with the same input if output_size is
+ // equal to max_output_len or with the next input fragment.
+ // Parameters:
+ // input - buffer to uncompress
+ // input_size - size of input buffer
+ // output - uncompressed buffer allocated by caller, should be at least
+ // max_output_len
+ // output_size - size of the output buffer
+ // Returns -1 for errors, remaining input to be processed otherwise.
+ virtual int Uncompress(const char* input, size_t input_size, char* output,
+ size_t* output_pos) = 0;
+ static StreamingUncompress* Create(CompressionType compression_type,
+ uint32_t compress_format_version,
+ size_t max_output_len);
+ virtual void Reset() = 0;
+
+ protected:
+ CompressionType compression_type_;
+ uint32_t compress_format_version_;
+ size_t max_output_len_;
+};
+
+class ZSTDStreamingCompress final : public StreamingCompress {
+ public:
+ explicit ZSTDStreamingCompress(const CompressionOptions& opts,
+ uint32_t compress_format_version,
+ size_t max_output_len)
+ : StreamingCompress(kZSTD, opts, compress_format_version,
+ max_output_len) {
+#ifdef ZSTD_STREAMING
+ cctx_ = ZSTD_createCCtx();
+ // Each compressed frame will have a checksum
+ ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
+ assert(cctx_ != nullptr);
+ input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
+#endif
+ }
+ ~ZSTDStreamingCompress() override {
+#ifdef ZSTD_STREAMING
+ ZSTD_freeCCtx(cctx_);
+#endif
+ }
+ int Compress(const char* input, size_t input_size, char* output,
+ size_t* output_pos) override;
+ void Reset() override;
+#ifdef ZSTD_STREAMING
+ ZSTD_CCtx* cctx_;
+ ZSTD_inBuffer input_buffer_;
+#endif
+};
+
+class ZSTDStreamingUncompress final : public StreamingUncompress {
+ public:
+ explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
+ size_t max_output_len)
+ : StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
+#ifdef ZSTD_STREAMING
+ dctx_ = ZSTD_createDCtx();
+ assert(dctx_ != nullptr);
+ input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
+#endif
+ }
+ ~ZSTDStreamingUncompress() override {
+#ifdef ZSTD_STREAMING
+ ZSTD_freeDCtx(dctx_);
+#endif
+ }
+ int Uncompress(const char* input, size_t input_size, char* output,
+ size_t* output_size) override;
+ void Reset() override;
+
+ private:
+#ifdef ZSTD_STREAMING
+ ZSTD_DCtx* dctx_;
+ ZSTD_inBuffer input_buffer_;
+#endif
+};
+
} // namespace ROCKSDB_NAMESPACE