1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
14 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
16 #include <malloc_np.h>
20 #endif // ROCKSDB_MALLOC_USABLE_SIZE
23 #include "memory/memory_allocator.h"
24 #include "rocksdb/options.h"
25 #include "rocksdb/table.h"
26 #include "test_util/sync_point.h"
27 #include "util/coding.h"
28 #include "util/compression_context_cache.h"
29 #include "util/string_util.h"
51 #if ZSTD_VERSION_NUMBER >= 10103
53 #endif // ZSTD_VERSION_NUMBER >= 10103
55 #if ZSTD_VERSION_NUMBER >= 10400
56 #define ZSTD_STREAMING
57 #endif // ZSTD_VERSION_NUMBER >= 10400
58 namespace ROCKSDB_NAMESPACE
{
59 // Need this for the context allocation override
60 // On windows we need to do this explicitly
61 #if (ZSTD_VERSION_NUMBER >= 500)
62 #if defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
63 defined(ZSTD_STATIC_LINKING_ONLY)
64 #define ROCKSDB_ZSTD_CUSTOM_MEM
66 ZSTD_customMem
GetJeZstdAllocationOverrides();
68 #endif // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
69 // defined(ZSTD_STATIC_LINKING_ONLY)
71 // We require `ZSTD_sizeof_DDict` and `ZSTD_createDDict_byReference` to use
72 // `ZSTD_DDict`. The former was introduced in v1.0.0 and the latter was
73 // introduced in v1.1.3. But an important bug fix for `ZSTD_sizeof_DDict` came
74 // in v1.1.4, so that is the version we require. As of today's latest version
75 // (v1.3.8), they are both still in the experimental API, which means they are
76 // only exported when the compiler flag `ZSTD_STATIC_LINKING_ONLY` is set.
77 #if defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
78 #define ROCKSDB_ZSTD_DDICT
79 #endif // defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
81 // Cached data represents a portion that can be re-used
82 // If, in the future we have more than one native context to
83 // cache we can arrange this as a tuple
84 class ZSTDUncompressCachedData
{
86 using ZSTDNativeContext
= ZSTD_DCtx
*;
87 ZSTDUncompressCachedData() {}
89 ZSTDUncompressCachedData(const ZSTDUncompressCachedData
& o
) = delete;
90 ZSTDUncompressCachedData
& operator=(const ZSTDUncompressCachedData
&) = delete;
91 ZSTDUncompressCachedData(ZSTDUncompressCachedData
&& o
) noexcept
92 : ZSTDUncompressCachedData() {
95 ZSTDUncompressCachedData
& operator=(ZSTDUncompressCachedData
&& o
) noexcept
{
96 assert(zstd_ctx_
== nullptr);
97 std::swap(zstd_ctx_
, o
.zstd_ctx_
);
98 std::swap(cache_idx_
, o
.cache_idx_
);
101 ZSTDNativeContext
Get() const { return zstd_ctx_
; }
102 int64_t GetCacheIndex() const { return cache_idx_
; }
103 void CreateIfNeeded() {
104 if (zstd_ctx_
== nullptr) {
105 #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
107 ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
108 #else // ROCKSDB_ZSTD_CUSTOM_MEM
109 zstd_ctx_
= ZSTD_createDCtx();
110 #endif // ROCKSDB_ZSTD_CUSTOM_MEM
114 void InitFromCache(const ZSTDUncompressCachedData
& o
, int64_t idx
) {
115 zstd_ctx_
= o
.zstd_ctx_
;
118 ~ZSTDUncompressCachedData() {
119 if (zstd_ctx_
!= nullptr && cache_idx_
== -1) {
120 ZSTD_freeDCtx(zstd_ctx_
);
125 ZSTDNativeContext zstd_ctx_
= nullptr;
126 int64_t cache_idx_
= -1; // -1 means this instance owns the context
128 #endif // (ZSTD_VERSION_NUMBER >= 500)
129 } // namespace ROCKSDB_NAMESPACE
132 #if !(defined ZSTD) || !(ZSTD_VERSION_NUMBER >= 500)
133 namespace ROCKSDB_NAMESPACE
{
134 class ZSTDUncompressCachedData
{
135 void* padding
; // unused
137 using ZSTDNativeContext
= void*;
138 ZSTDUncompressCachedData() {}
139 ZSTDUncompressCachedData(const ZSTDUncompressCachedData
&) {}
140 ZSTDUncompressCachedData
& operator=(const ZSTDUncompressCachedData
&) = delete;
141 ZSTDUncompressCachedData(ZSTDUncompressCachedData
&&) noexcept
= default;
142 ZSTDUncompressCachedData
& operator=(ZSTDUncompressCachedData
&&) noexcept
=
144 ZSTDNativeContext
Get() const { return nullptr; }
145 int64_t GetCacheIndex() const { return -1; }
146 void CreateIfNeeded() {}
147 void InitFromCache(const ZSTDUncompressCachedData
&, int64_t) {}
150 void ignore_padding__() { padding
= nullptr; }
152 } // namespace ROCKSDB_NAMESPACE
156 #include "port/xpress.h"
159 namespace ROCKSDB_NAMESPACE
{
161 // Holds dictionary and related data, like ZSTD's digested compression
163 struct CompressionDict
{
164 #if ZSTD_VERSION_NUMBER >= 700
165 ZSTD_CDict
* zstd_cdict_
= nullptr;
166 #endif // ZSTD_VERSION_NUMBER >= 700
170 #if ZSTD_VERSION_NUMBER >= 700
171 CompressionDict(std::string dict
, CompressionType type
, int level
) {
172 #else // ZSTD_VERSION_NUMBER >= 700
173 CompressionDict(std::string dict
, CompressionType
/*type*/, int /*level*/) {
174 #endif // ZSTD_VERSION_NUMBER >= 700
175 dict_
= std::move(dict
);
176 #if ZSTD_VERSION_NUMBER >= 700
177 zstd_cdict_
= nullptr;
178 if (!dict_
.empty() && (type
== kZSTD
|| type
== kZSTDNotFinalCompression
)) {
179 if (level
== CompressionOptions::kDefaultCompressionLevel
) {
180 // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
181 // https://github.com/facebook/zstd/issues/1148
184 // Should be safe (but slower) if below call fails as we'll use the
185 // raw dictionary to compress.
186 zstd_cdict_
= ZSTD_createCDict(dict_
.data(), dict_
.size(), level
);
187 assert(zstd_cdict_
!= nullptr);
189 #endif // ZSTD_VERSION_NUMBER >= 700
193 #if ZSTD_VERSION_NUMBER >= 700
195 if (zstd_cdict_
!= nullptr) {
196 res
= ZSTD_freeCDict(zstd_cdict_
);
198 assert(res
== 0); // Last I checked they can't fail
199 (void)res
; // prevent unused var warning
200 #endif // ZSTD_VERSION_NUMBER >= 700
203 #if ZSTD_VERSION_NUMBER >= 700
204 const ZSTD_CDict
* GetDigestedZstdCDict() const { return zstd_cdict_
; }
205 #endif // ZSTD_VERSION_NUMBER >= 700
207 Slice
GetRawDict() const { return dict_
; }
209 static const CompressionDict
& GetEmptyDict() {
210 static CompressionDict empty_dict
{};
214 CompressionDict() = default;
216 CompressionDict(const CompressionDict
&) = delete;
217 CompressionDict
& operator=(const CompressionDict
&) = delete;
218 CompressionDict(CompressionDict
&&) = delete;
219 CompressionDict
& operator=(CompressionDict
&&) = delete;
222 // Holds dictionary and related data, like ZSTD's digested uncompression
224 struct UncompressionDict
{
225 // Block containing the data for the compression dictionary in case the
226 // constructor that takes a string parameter is used.
229 // Block containing the data for the compression dictionary in case the
230 // constructor that takes a Slice parameter is used and the passed in
231 // CacheAllocationPtr is not nullptr.
232 CacheAllocationPtr allocation_
;
234 // Slice pointing to the compression dictionary data. Can point to
235 // dict_, allocation_, or some other memory location, depending on how
236 // the object was constructed.
239 #ifdef ROCKSDB_ZSTD_DDICT
240 // Processed version of the contents of slice_ for ZSTD compression.
241 ZSTD_DDict
* zstd_ddict_
= nullptr;
242 #endif // ROCKSDB_ZSTD_DDICT
244 #ifdef ROCKSDB_ZSTD_DDICT
245 UncompressionDict(std::string dict
, bool using_zstd
)
246 #else // ROCKSDB_ZSTD_DDICT
247 UncompressionDict(std::string dict
, bool /* using_zstd */)
248 #endif // ROCKSDB_ZSTD_DDICT
249 : dict_(std::move(dict
)), slice_(dict_
) {
250 #ifdef ROCKSDB_ZSTD_DDICT
251 if (!slice_
.empty() && using_zstd
) {
252 zstd_ddict_
= ZSTD_createDDict_byReference(slice_
.data(), slice_
.size());
253 assert(zstd_ddict_
!= nullptr);
255 #endif // ROCKSDB_ZSTD_DDICT
258 #ifdef ROCKSDB_ZSTD_DDICT
259 UncompressionDict(Slice slice
, CacheAllocationPtr
&& allocation
,
261 #else // ROCKSDB_ZSTD_DDICT
262 UncompressionDict(Slice slice
, CacheAllocationPtr
&& allocation
,
263 bool /* using_zstd */)
264 #endif // ROCKSDB_ZSTD_DDICT
265 : allocation_(std::move(allocation
)), slice_(std::move(slice
)) {
266 #ifdef ROCKSDB_ZSTD_DDICT
267 if (!slice_
.empty() && using_zstd
) {
268 zstd_ddict_
= ZSTD_createDDict_byReference(slice_
.data(), slice_
.size());
269 assert(zstd_ddict_
!= nullptr);
271 #endif // ROCKSDB_ZSTD_DDICT
274 UncompressionDict(UncompressionDict
&& rhs
)
275 : dict_(std::move(rhs
.dict_
)),
276 allocation_(std::move(rhs
.allocation_
)),
277 slice_(std::move(rhs
.slice_
))
278 #ifdef ROCKSDB_ZSTD_DDICT
280 zstd_ddict_(rhs
.zstd_ddict_
)
283 #ifdef ROCKSDB_ZSTD_DDICT
284 rhs
.zstd_ddict_
= nullptr;
288 ~UncompressionDict() {
289 #ifdef ROCKSDB_ZSTD_DDICT
291 if (zstd_ddict_
!= nullptr) {
292 res
= ZSTD_freeDDict(zstd_ddict_
);
294 assert(res
== 0); // Last I checked they can't fail
295 (void)res
; // prevent unused var warning
296 #endif // ROCKSDB_ZSTD_DDICT
299 UncompressionDict
& operator=(UncompressionDict
&& rhs
) {
304 dict_
= std::move(rhs
.dict_
);
305 allocation_
= std::move(rhs
.allocation_
);
306 slice_
= std::move(rhs
.slice_
);
308 #ifdef ROCKSDB_ZSTD_DDICT
309 zstd_ddict_
= rhs
.zstd_ddict_
;
310 rhs
.zstd_ddict_
= nullptr;
316 // The object is self-contained if the string constructor is used, or the
317 // Slice constructor is invoked with a non-null allocation. Otherwise, it
318 // is the caller's responsibility to ensure that the underlying storage
319 // outlives this object.
320 bool own_bytes() const { return !dict_
.empty() || allocation_
; }
322 const Slice
& GetRawDict() const { return slice_
; }
324 #ifdef ROCKSDB_ZSTD_DDICT
325 const ZSTD_DDict
* GetDigestedZstdDDict() const { return zstd_ddict_
; }
326 #endif // ROCKSDB_ZSTD_DDICT
328 static const UncompressionDict
& GetEmptyDict() {
329 static UncompressionDict empty_dict
{};
333 size_t ApproximateMemoryUsage() const {
334 size_t usage
= sizeof(struct UncompressionDict
);
335 usage
+= dict_
.size();
337 auto allocator
= allocation_
.get_deleter().allocator
;
339 usage
+= allocator
->UsableSize(allocation_
.get(), slice_
.size());
341 usage
+= slice_
.size();
344 #ifdef ROCKSDB_ZSTD_DDICT
345 usage
+= ZSTD_sizeof_DDict(zstd_ddict_
);
346 #endif // ROCKSDB_ZSTD_DDICT
350 UncompressionDict() = default;
352 UncompressionDict(const CompressionDict
&) = delete;
353 UncompressionDict
& operator=(const CompressionDict
&) = delete;
356 class CompressionContext
{
358 #if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
359 ZSTD_CCtx
* zstd_ctx_
= nullptr;
360 void CreateNativeContext(CompressionType type
) {
361 if (type
== kZSTD
|| type
== kZSTDNotFinalCompression
) {
362 #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
364 ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
365 #else // ROCKSDB_ZSTD_CUSTOM_MEM
366 zstd_ctx_
= ZSTD_createCCtx();
367 #endif // ROCKSDB_ZSTD_CUSTOM_MEM
370 void DestroyNativeContext() {
371 if (zstd_ctx_
!= nullptr) {
372 ZSTD_freeCCtx(zstd_ctx_
);
377 // callable inside ZSTD_Compress
378 ZSTD_CCtx
* ZSTDPreallocCtx() const {
379 assert(zstd_ctx_
!= nullptr);
383 #else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
385 void CreateNativeContext(CompressionType
/* type */) {}
386 void DestroyNativeContext() {}
387 #endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
389 explicit CompressionContext(CompressionType type
) {
390 CreateNativeContext(type
);
392 ~CompressionContext() { DestroyNativeContext(); }
393 CompressionContext(const CompressionContext
&) = delete;
394 CompressionContext
& operator=(const CompressionContext
&) = delete;
397 class CompressionInfo
{
398 const CompressionOptions
& opts_
;
399 const CompressionContext
& context_
;
400 const CompressionDict
& dict_
;
401 const CompressionType type_
;
402 const uint64_t sample_for_compression_
;
405 CompressionInfo(const CompressionOptions
& _opts
,
406 const CompressionContext
& _context
,
407 const CompressionDict
& _dict
, CompressionType _type
,
408 uint64_t _sample_for_compression
)
413 sample_for_compression_(_sample_for_compression
) {}
415 const CompressionOptions
& options() const { return opts_
; }
416 const CompressionContext
& context() const { return context_
; }
417 const CompressionDict
& dict() const { return dict_
; }
418 CompressionType
type() const { return type_
; }
419 uint64_t SampleForCompression() const { return sample_for_compression_
; }
422 class UncompressionContext
{
424 CompressionContextCache
* ctx_cache_
= nullptr;
425 ZSTDUncompressCachedData uncomp_cached_data_
;
428 explicit UncompressionContext(CompressionType type
) {
429 if (type
== kZSTD
|| type
== kZSTDNotFinalCompression
) {
430 ctx_cache_
= CompressionContextCache::Instance();
431 uncomp_cached_data_
= ctx_cache_
->GetCachedZSTDUncompressData();
434 ~UncompressionContext() {
435 if (uncomp_cached_data_
.GetCacheIndex() != -1) {
436 assert(ctx_cache_
!= nullptr);
437 ctx_cache_
->ReturnCachedZSTDUncompressData(
438 uncomp_cached_data_
.GetCacheIndex());
441 UncompressionContext(const UncompressionContext
&) = delete;
442 UncompressionContext
& operator=(const UncompressionContext
&) = delete;
444 ZSTDUncompressCachedData::ZSTDNativeContext
GetZSTDContext() const {
445 return uncomp_cached_data_
.Get();
449 class UncompressionInfo
{
450 const UncompressionContext
& context_
;
451 const UncompressionDict
& dict_
;
452 const CompressionType type_
;
455 UncompressionInfo(const UncompressionContext
& _context
,
456 const UncompressionDict
& _dict
, CompressionType _type
)
457 : context_(_context
), dict_(_dict
), type_(_type
) {}
459 const UncompressionContext
& context() const { return context_
; }
460 const UncompressionDict
& dict() const { return dict_
; }
461 CompressionType
type() const { return type_
; }
464 inline bool Snappy_Supported() {
472 inline bool Zlib_Supported() {
480 inline bool BZip2_Supported() {
488 inline bool LZ4_Supported() {
496 inline bool XPRESS_Supported() {
504 inline bool ZSTD_Supported() {
506 // ZSTD format is finalized since version 0.8.0.
507 return (ZSTD_versionNumber() >= 800);
513 inline bool ZSTDNotFinal_Supported() {
521 inline bool ZSTD_Streaming_Supported() {
522 #if defined(ZSTD) && defined(ZSTD_STREAMING)
529 inline bool StreamingCompressionTypeSupported(
530 CompressionType compression_type
) {
531 switch (compression_type
) {
535 return ZSTD_Streaming_Supported();
541 inline bool CompressionTypeSupported(CompressionType compression_type
) {
542 switch (compression_type
) {
545 case kSnappyCompression
:
546 return Snappy_Supported();
547 case kZlibCompression
:
548 return Zlib_Supported();
549 case kBZip2Compression
:
550 return BZip2_Supported();
551 case kLZ4Compression
:
552 return LZ4_Supported();
553 case kLZ4HCCompression
:
554 return LZ4_Supported();
555 case kXpressCompression
:
556 return XPRESS_Supported();
557 case kZSTDNotFinalCompression
:
558 return ZSTDNotFinal_Supported();
560 return ZSTD_Supported();
567 inline bool DictCompressionTypeSupported(CompressionType compression_type
) {
568 switch (compression_type
) {
571 case kSnappyCompression
:
573 case kZlibCompression
:
574 return Zlib_Supported();
575 case kBZip2Compression
:
577 case kLZ4Compression
:
578 case kLZ4HCCompression
:
579 #if LZ4_VERSION_NUMBER >= 10400 // r124+
580 return LZ4_Supported();
584 case kXpressCompression
:
586 case kZSTDNotFinalCompression
:
587 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
588 return ZSTDNotFinal_Supported();
593 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
594 return ZSTD_Supported();
604 inline std::string
CompressionTypeToString(CompressionType compression_type
) {
605 switch (compression_type
) {
607 return "NoCompression";
608 case kSnappyCompression
:
610 case kZlibCompression
:
612 case kBZip2Compression
:
614 case kLZ4Compression
:
616 case kLZ4HCCompression
:
618 case kXpressCompression
:
622 case kZSTDNotFinalCompression
:
623 return "ZSTDNotFinal";
624 case kDisableCompressionOption
:
625 return "DisableOption";
632 inline std::string
CompressionOptionsToString(
633 CompressionOptions
& compression_options
) {
636 result
.append("window_bits=")
637 .append(std::to_string(compression_options
.window_bits
))
639 result
.append("level=")
640 .append(std::to_string(compression_options
.level
))
642 result
.append("strategy=")
643 .append(std::to_string(compression_options
.strategy
))
645 result
.append("max_dict_bytes=")
646 .append(std::to_string(compression_options
.max_dict_bytes
))
648 result
.append("zstd_max_train_bytes=")
649 .append(std::to_string(compression_options
.zstd_max_train_bytes
))
651 result
.append("enabled=")
652 .append(std::to_string(compression_options
.enabled
))
654 result
.append("max_dict_buffer_bytes=")
655 .append(std::to_string(compression_options
.max_dict_buffer_bytes
))
657 result
.append("use_zstd_dict_trainer=")
658 .append(std::to_string(compression_options
.use_zstd_dict_trainer
))
663 // compress_format_version can have two values:
664 // 1 -- decompressed sizes for BZip2 and Zlib are not included in the compressed
665 // block. Also, decompressed sizes for LZ4 are encoded in platform-dependent
667 // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
668 // start of compressed block. Snappy format is the same as version 1.
670 inline bool Snappy_Compress(const CompressionInfo
& /*info*/, const char* input
,
671 size_t length
, ::std::string
* output
) {
673 output
->resize(snappy::MaxCompressedLength(length
));
675 snappy::RawCompress(input
, length
, &(*output
)[0], &outlen
);
676 output
->resize(outlen
);
686 inline CacheAllocationPtr
Snappy_Uncompress(
687 const char* input
, size_t length
, size_t* uncompressed_size
,
688 MemoryAllocator
* allocator
= nullptr) {
690 size_t uncompressed_length
= 0;
691 if (!snappy::GetUncompressedLength(input
, length
, &uncompressed_length
)) {
695 CacheAllocationPtr output
= AllocateBlock(uncompressed_length
, allocator
);
697 if (!snappy::RawUncompress(input
, length
, output
.get())) {
701 *uncompressed_size
= uncompressed_length
;
707 (void)uncompressed_size
;
713 namespace compression
{
715 inline size_t PutDecompressedSizeInfo(std::string
* output
, uint32_t length
) {
716 PutVarint32(output
, length
);
717 return output
->size();
720 inline bool GetDecompressedSizeInfo(const char** input_data
,
721 size_t* input_length
,
722 uint32_t* output_len
) {
723 auto new_input_data
=
724 GetVarint32Ptr(*input_data
, *input_data
+ *input_length
, output_len
);
725 if (new_input_data
== nullptr) {
728 *input_length
-= (new_input_data
- *input_data
);
729 *input_data
= new_input_data
;
732 } // namespace compression
734 // compress_format_version == 1 -- decompressed size is not included in the
736 // compress_format_version == 2 -- decompressed size is included in the block
737 // header in varint32 format
738 // @param compression_dict Data for presetting the compression library's
740 inline bool Zlib_Compress(const CompressionInfo
& info
,
741 uint32_t compress_format_version
, const char* input
,
742 size_t length
, ::std::string
* output
) {
744 if (length
> std::numeric_limits
<uint32_t>::max()) {
745 // Can't compress more than 4GB
749 size_t output_header_len
= 0;
750 if (compress_format_version
== 2) {
751 output_header_len
= compression::PutDecompressedSizeInfo(
752 output
, static_cast<uint32_t>(length
));
755 // The memLevel parameter specifies how much memory should be allocated for
756 // the internal compression state.
757 // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
758 // memLevel=9 uses maximum memory for optimal speed.
759 // The default value is 8. See zconf.h for more details.
760 static const int memLevel
= 8;
762 if (info
.options().level
== CompressionOptions::kDefaultCompressionLevel
) {
763 level
= Z_DEFAULT_COMPRESSION
;
765 level
= info
.options().level
;
768 memset(&_stream
, 0, sizeof(z_stream
));
769 int st
= deflateInit2(&_stream
, level
, Z_DEFLATED
, info
.options().window_bits
,
770 memLevel
, info
.options().strategy
);
775 Slice compression_dict
= info
.dict().GetRawDict();
776 if (compression_dict
.size()) {
777 // Initialize the compression library's dictionary
778 st
= deflateSetDictionary(
779 &_stream
, reinterpret_cast<const Bytef
*>(compression_dict
.data()),
780 static_cast<unsigned int>(compression_dict
.size()));
782 deflateEnd(&_stream
);
787 // Get an upper bound on the compressed size.
789 deflateBound(&_stream
, static_cast<unsigned long>(length
));
790 output
->resize(output_header_len
+ upper_bound
);
792 // Compress the input, and put compressed data in output.
793 _stream
.next_in
= (Bytef
*)input
;
794 _stream
.avail_in
= static_cast<unsigned int>(length
);
796 // Initialize the output size.
797 _stream
.avail_out
= static_cast<unsigned int>(upper_bound
);
798 _stream
.next_out
= reinterpret_cast<Bytef
*>(&(*output
)[output_header_len
]);
800 bool compressed
= false;
801 st
= deflate(&_stream
, Z_FINISH
);
802 if (st
== Z_STREAM_END
) {
804 output
->resize(output
->size() - _stream
.avail_out
);
806 // The only return value we really care about is Z_STREAM_END.
807 // Z_OK means insufficient output space. This means the compression is
808 // bigger than decompressed size. Just fail the compression in that case.
810 deflateEnd(&_stream
);
814 (void)compress_format_version
;
822 // compress_format_version == 1 -- decompressed size is not included in the
824 // compress_format_version == 2 -- decompressed size is included in the block
825 // header in varint32 format
826 // @param compression_dict Data for presetting the compression library's
828 inline CacheAllocationPtr
Zlib_Uncompress(
829 const UncompressionInfo
& info
, const char* input_data
, size_t input_length
,
830 size_t* uncompressed_size
, uint32_t compress_format_version
,
831 MemoryAllocator
* allocator
= nullptr, int windowBits
= -14) {
833 uint32_t output_len
= 0;
834 if (compress_format_version
== 2) {
835 if (!compression::GetDecompressedSizeInfo(&input_data
, &input_length
,
840 // Assume the decompressed data size will 5x of compressed size, but round
842 size_t proposed_output_len
= ((input_length
* 5) & (~(4096 - 1))) + 4096;
843 output_len
= static_cast<uint32_t>(
844 std::min(proposed_output_len
,
845 static_cast<size_t>(std::numeric_limits
<uint32_t>::max())));
849 memset(&_stream
, 0, sizeof(z_stream
));
851 // For raw inflate, the windowBits should be -8..-15.
852 // If windowBits is bigger than zero, it will use either zlib
853 // header or gzip header. Adding 32 to it will do automatic detection.
855 inflateInit2(&_stream
, windowBits
> 0 ? windowBits
+ 32 : windowBits
);
860 const Slice
& compression_dict
= info
.dict().GetRawDict();
861 if (compression_dict
.size()) {
862 // Initialize the compression library's dictionary
863 st
= inflateSetDictionary(
864 &_stream
, reinterpret_cast<const Bytef
*>(compression_dict
.data()),
865 static_cast<unsigned int>(compression_dict
.size()));
871 _stream
.next_in
= (Bytef
*)input_data
;
872 _stream
.avail_in
= static_cast<unsigned int>(input_length
);
874 auto output
= AllocateBlock(output_len
, allocator
);
876 _stream
.next_out
= (Bytef
*)output
.get();
877 _stream
.avail_out
= static_cast<unsigned int>(output_len
);
881 st
= inflate(&_stream
, Z_SYNC_FLUSH
);
887 // No output space. Increase the output space by 20%.
888 // We should never run out of output space if
889 // compress_format_version == 2
890 assert(compress_format_version
!= 2);
891 size_t old_sz
= output_len
;
892 uint32_t output_len_delta
= output_len
/ 5;
893 output_len
+= output_len_delta
< 10 ? 10 : output_len_delta
;
894 auto tmp
= AllocateBlock(output_len
, allocator
);
895 memcpy(tmp
.get(), output
.get(), old_sz
);
896 output
= std::move(tmp
);
899 _stream
.next_out
= (Bytef
*)(output
.get() + old_sz
);
900 _stream
.avail_out
= static_cast<unsigned int>(output_len
- old_sz
);
905 inflateEnd(&_stream
);
910 // If we encoded decompressed block size, we should have no bytes left
911 assert(compress_format_version
!= 2 || _stream
.avail_out
== 0);
912 assert(output_len
>= _stream
.avail_out
);
913 *uncompressed_size
= output_len
- _stream
.avail_out
;
914 inflateEnd(&_stream
);
920 (void)uncompressed_size
;
921 (void)compress_format_version
;
928 // compress_format_version == 1 -- decompressed size is not included in the
930 // compress_format_version == 2 -- decompressed size is included in the block
931 // header in varint32 format
932 inline bool BZip2_Compress(const CompressionInfo
& /*info*/,
933 uint32_t compress_format_version
, const char* input
,
934 size_t length
, ::std::string
* output
) {
936 if (length
> std::numeric_limits
<uint32_t>::max()) {
937 // Can't compress more than 4GB
940 size_t output_header_len
= 0;
941 if (compress_format_version
== 2) {
942 output_header_len
= compression::PutDecompressedSizeInfo(
943 output
, static_cast<uint32_t>(length
));
945 // Resize output to be the plain data length.
946 // This may not be big enough if the compression actually expands data.
947 output
->resize(output_header_len
+ length
);
950 memset(&_stream
, 0, sizeof(bz_stream
));
952 // Block size 1 is 100K.
954 // 30 is the default workFactor
955 int st
= BZ2_bzCompressInit(&_stream
, 1, 0, 30);
960 // Compress the input, and put compressed data in output.
961 _stream
.next_in
= (char*)input
;
962 _stream
.avail_in
= static_cast<unsigned int>(length
);
964 // Initialize the output size.
965 _stream
.avail_out
= static_cast<unsigned int>(length
);
966 _stream
.next_out
= reinterpret_cast<char*>(&(*output
)[output_header_len
]);
968 bool compressed
= false;
969 st
= BZ2_bzCompress(&_stream
, BZ_FINISH
);
970 if (st
== BZ_STREAM_END
) {
972 output
->resize(output
->size() - _stream
.avail_out
);
974 // The only return value we really care about is BZ_STREAM_END.
975 // BZ_FINISH_OK means insufficient output space. This means the compression
976 // is bigger than decompressed size. Just fail the compression in that case.
978 BZ2_bzCompressEnd(&_stream
);
981 (void)compress_format_version
;
989 // compress_format_version == 1 -- decompressed size is not included in the
991 // compress_format_version == 2 -- decompressed size is included in the block
992 // header in varint32 format
993 inline CacheAllocationPtr
BZip2_Uncompress(
994 const char* input_data
, size_t input_length
, size_t* uncompressed_size
,
995 uint32_t compress_format_version
, MemoryAllocator
* allocator
= nullptr) {
997 uint32_t output_len
= 0;
998 if (compress_format_version
== 2) {
999 if (!compression::GetDecompressedSizeInfo(&input_data
, &input_length
,
1004 // Assume the decompressed data size will 5x of compressed size, but round
1005 // to the next page size
1006 size_t proposed_output_len
= ((input_length
* 5) & (~(4096 - 1))) + 4096;
1007 output_len
= static_cast<uint32_t>(
1008 std::min(proposed_output_len
,
1009 static_cast<size_t>(std::numeric_limits
<uint32_t>::max())));
1013 memset(&_stream
, 0, sizeof(bz_stream
));
1015 int st
= BZ2_bzDecompressInit(&_stream
, 0, 0);
1020 _stream
.next_in
= (char*)input_data
;
1021 _stream
.avail_in
= static_cast<unsigned int>(input_length
);
1023 auto output
= AllocateBlock(output_len
, allocator
);
1025 _stream
.next_out
= (char*)output
.get();
1026 _stream
.avail_out
= static_cast<unsigned int>(output_len
);
1030 st
= BZ2_bzDecompress(&_stream
);
1036 // No output space. Increase the output space by 20%.
1037 // We should never run out of output space if
1038 // compress_format_version == 2
1039 assert(compress_format_version
!= 2);
1040 uint32_t old_sz
= output_len
;
1041 output_len
= output_len
* 1.2;
1042 auto tmp
= AllocateBlock(output_len
, allocator
);
1043 memcpy(tmp
.get(), output
.get(), old_sz
);
1044 output
= std::move(tmp
);
1047 _stream
.next_out
= (char*)(output
.get() + old_sz
);
1048 _stream
.avail_out
= static_cast<unsigned int>(output_len
- old_sz
);
1052 BZ2_bzDecompressEnd(&_stream
);
1057 // If we encoded decompressed block size, we should have no bytes left
1058 assert(compress_format_version
!= 2 || _stream
.avail_out
== 0);
1059 assert(output_len
>= _stream
.avail_out
);
1060 *uncompressed_size
= output_len
- _stream
.avail_out
;
1061 BZ2_bzDecompressEnd(&_stream
);
1066 (void)uncompressed_size
;
1067 (void)compress_format_version
;
1073 // compress_format_version == 1 -- decompressed size is included in the
1074 // block header using memcpy, which makes database non-portable)
1075 // compress_format_version == 2 -- decompressed size is included in the block
1076 // header in varint32 format
1077 // @param compression_dict Data for presetting the compression library's
1079 inline bool LZ4_Compress(const CompressionInfo
& info
,
1080 uint32_t compress_format_version
, const char* input
,
1081 size_t length
, ::std::string
* output
) {
1083 if (length
> std::numeric_limits
<uint32_t>::max()) {
1084 // Can't compress more than 4GB
1088 size_t output_header_len
= 0;
1089 if (compress_format_version
== 2) {
1090 // new encoding, using varint32 to store size information
1091 output_header_len
= compression::PutDecompressedSizeInfo(
1092 output
, static_cast<uint32_t>(length
));
1094 // legacy encoding, which is not really portable (depends on big/little
1096 output_header_len
= 8;
1097 output
->resize(output_header_len
);
1098 char* p
= const_cast<char*>(output
->c_str());
1099 memcpy(p
, &length
, sizeof(length
));
1101 int compress_bound
= LZ4_compressBound(static_cast<int>(length
));
1102 output
->resize(static_cast<size_t>(output_header_len
+ compress_bound
));
1105 #if LZ4_VERSION_NUMBER >= 10400 // r124+
1106 LZ4_stream_t
* stream
= LZ4_createStream();
1107 Slice compression_dict
= info
.dict().GetRawDict();
1108 if (compression_dict
.size()) {
1109 LZ4_loadDict(stream
, compression_dict
.data(),
1110 static_cast<int>(compression_dict
.size()));
1112 #if LZ4_VERSION_NUMBER >= 10700 // r129+
1114 LZ4_compress_fast_continue(stream
, input
, &(*output
)[output_header_len
],
1115 static_cast<int>(length
), compress_bound
, 1);
1117 outlen
= LZ4_compress_limitedOutput_continue(
1118 stream
, input
, &(*output
)[output_header_len
], static_cast<int>(length
),
1121 LZ4_freeStream(stream
);
1123 outlen
= LZ4_compress_limitedOutput(input
, &(*output
)[output_header_len
],
1124 static_cast<int>(length
), compress_bound
);
1125 #endif // LZ4_VERSION_NUMBER >= 10400
1130 output
->resize(static_cast<size_t>(output_header_len
+ outlen
));
1134 (void)compress_format_version
;
1142 // compress_format_version == 1 -- decompressed size is included in the
1143 // block header using memcpy, which makes database non-portable)
1144 // compress_format_version == 2 -- decompressed size is included in the block
1145 // header in varint32 format
1146 // @param compression_dict Data for presetting the compression library's
1148 inline CacheAllocationPtr
LZ4_Uncompress(const UncompressionInfo
& info
,
1149 const char* input_data
,
1150 size_t input_length
,
1151 size_t* uncompressed_size
,
1152 uint32_t compress_format_version
,
1153 MemoryAllocator
* allocator
= nullptr) {
1155 uint32_t output_len
= 0;
1156 if (compress_format_version
== 2) {
1157 // new encoding, using varint32 to store size information
1158 if (!compression::GetDecompressedSizeInfo(&input_data
, &input_length
,
1163 // legacy encoding, which is not really portable (depends on big/little
1165 if (input_length
< 8) {
1168 if (port::kLittleEndian
) {
1169 memcpy(&output_len
, input_data
, sizeof(output_len
));
1171 memcpy(&output_len
, input_data
+ 4, sizeof(output_len
));
1177 auto output
= AllocateBlock(output_len
, allocator
);
1179 int decompress_bytes
= 0;
1181 #if LZ4_VERSION_NUMBER >= 10400 // r124+
1182 LZ4_streamDecode_t
* stream
= LZ4_createStreamDecode();
1183 const Slice
& compression_dict
= info
.dict().GetRawDict();
1184 if (compression_dict
.size()) {
1185 LZ4_setStreamDecode(stream
, compression_dict
.data(),
1186 static_cast<int>(compression_dict
.size()));
1188 decompress_bytes
= LZ4_decompress_safe_continue(
1189 stream
, input_data
, output
.get(), static_cast<int>(input_length
),
1190 static_cast<int>(output_len
));
1191 LZ4_freeStreamDecode(stream
);
1193 decompress_bytes
= LZ4_decompress_safe(input_data
, output
.get(),
1194 static_cast<int>(input_length
),
1195 static_cast<int>(output_len
));
1196 #endif // LZ4_VERSION_NUMBER >= 10400
1198 if (decompress_bytes
< 0) {
1201 assert(decompress_bytes
== static_cast<int>(output_len
));
1202 *uncompressed_size
= decompress_bytes
;
1208 (void)uncompressed_size
;
1209 (void)compress_format_version
;
1215 // compress_format_version == 1 -- decompressed size is included in the
1216 // block header using memcpy, which makes database non-portable)
1217 // compress_format_version == 2 -- decompressed size is included in the block
1218 // header in varint32 format
1219 // @param compression_dict Data for presetting the compression library's
1221 inline bool LZ4HC_Compress(const CompressionInfo
& info
,
1222 uint32_t compress_format_version
, const char* input
,
1223 size_t length
, ::std::string
* output
) {
1225 if (length
> std::numeric_limits
<uint32_t>::max()) {
1226 // Can't compress more than 4GB
1230 size_t output_header_len
= 0;
1231 if (compress_format_version
== 2) {
1232 // new encoding, using varint32 to store size information
1233 output_header_len
= compression::PutDecompressedSizeInfo(
1234 output
, static_cast<uint32_t>(length
));
1236 // legacy encoding, which is not really portable (depends on big/little
1238 output_header_len
= 8;
1239 output
->resize(output_header_len
);
1240 char* p
= const_cast<char*>(output
->c_str());
1241 memcpy(p
, &length
, sizeof(length
));
1243 int compress_bound
= LZ4_compressBound(static_cast<int>(length
));
1244 output
->resize(static_cast<size_t>(output_header_len
+ compress_bound
));
1248 if (info
.options().level
== CompressionOptions::kDefaultCompressionLevel
) {
1249 level
= 0; // lz4hc.h says any value < 1 will be sanitized to default
1251 level
= info
.options().level
;
1253 #if LZ4_VERSION_NUMBER >= 10400 // r124+
1254 LZ4_streamHC_t
* stream
= LZ4_createStreamHC();
1255 LZ4_resetStreamHC(stream
, level
);
1256 Slice compression_dict
= info
.dict().GetRawDict();
1257 const char* compression_dict_data
=
1258 compression_dict
.size() > 0 ? compression_dict
.data() : nullptr;
1259 size_t compression_dict_size
= compression_dict
.size();
1260 if (compression_dict_data
!= nullptr) {
1261 LZ4_loadDictHC(stream
, compression_dict_data
,
1262 static_cast<int>(compression_dict_size
));
1265 #if LZ4_VERSION_NUMBER >= 10700 // r129+
1267 LZ4_compress_HC_continue(stream
, input
, &(*output
)[output_header_len
],
1268 static_cast<int>(length
), compress_bound
);
1270 outlen
= LZ4_compressHC_limitedOutput_continue(
1271 stream
, input
, &(*output
)[output_header_len
], static_cast<int>(length
),
1273 #endif // LZ4_VERSION_NUMBER >= 10700
1274 LZ4_freeStreamHC(stream
);
1276 #elif LZ4_VERSION_MAJOR // r113-r123
1277 outlen
= LZ4_compressHC2_limitedOutput(input
, &(*output
)[output_header_len
],
1278 static_cast<int>(length
),
1279 compress_bound
, level
);
1282 LZ4_compressHC_limitedOutput(input
, &(*output
)[output_header_len
],
1283 static_cast<int>(length
), compress_bound
);
1284 #endif // LZ4_VERSION_NUMBER >= 10400
1289 output
->resize(static_cast<size_t>(output_header_len
+ outlen
));
1293 (void)compress_format_version
;
1302 inline bool XPRESS_Compress(const char* input
, size_t length
,
1303 std::string
* output
) {
1304 return port::xpress::Compress(input
, length
, output
);
1307 inline bool XPRESS_Compress(const char* /*input*/, size_t /*length*/,
1308 std::string
* /*output*/) {
1314 inline char* XPRESS_Uncompress(const char* input_data
, size_t input_length
,
1315 size_t* uncompressed_size
) {
1316 return port::xpress::Decompress(input_data
, input_length
, uncompressed_size
);
1319 inline char* XPRESS_Uncompress(const char* /*input_data*/,
1320 size_t /*input_length*/,
1321 size_t* /*uncompressed_size*/) {
1326 inline bool ZSTD_Compress(const CompressionInfo
& info
, const char* input
,
1327 size_t length
, ::std::string
* output
) {
1329 if (length
> std::numeric_limits
<uint32_t>::max()) {
1330 // Can't compress more than 4GB
1334 size_t output_header_len
= compression::PutDecompressedSizeInfo(
1335 output
, static_cast<uint32_t>(length
));
1337 size_t compressBound
= ZSTD_compressBound(length
);
1338 output
->resize(static_cast<size_t>(output_header_len
+ compressBound
));
1341 if (info
.options().level
== CompressionOptions::kDefaultCompressionLevel
) {
1342 // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
1343 // https://github.com/facebook/zstd/issues/1148
1346 level
= info
.options().level
;
1348 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
1349 ZSTD_CCtx
* context
= info
.context().ZSTDPreallocCtx();
1350 assert(context
!= nullptr);
1351 #if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
1352 if (info
.dict().GetDigestedZstdCDict() != nullptr) {
1353 outlen
= ZSTD_compress_usingCDict(context
, &(*output
)[output_header_len
],
1354 compressBound
, input
, length
,
1355 info
.dict().GetDigestedZstdCDict());
1357 #endif // ZSTD_VERSION_NUMBER >= 700
1359 outlen
= ZSTD_compress_usingDict(context
, &(*output
)[output_header_len
],
1360 compressBound
, input
, length
,
1361 info
.dict().GetRawDict().data(),
1362 info
.dict().GetRawDict().size(), level
);
1364 #else // up to v0.4.x
1365 outlen
= ZSTD_compress(&(*output
)[output_header_len
], compressBound
, input
,
1367 #endif // ZSTD_VERSION_NUMBER >= 500
1371 output
->resize(output_header_len
+ outlen
);
1382 // @param compression_dict Data for presetting the compression library's
1384 inline CacheAllocationPtr
ZSTD_Uncompress(
1385 const UncompressionInfo
& info
, const char* input_data
, size_t input_length
,
1386 size_t* uncompressed_size
, MemoryAllocator
* allocator
= nullptr) {
1388 uint32_t output_len
= 0;
1389 if (!compression::GetDecompressedSizeInfo(&input_data
, &input_length
,
1394 auto output
= AllocateBlock(output_len
, allocator
);
1395 size_t actual_output_length
= 0;
1396 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
1397 ZSTD_DCtx
* context
= info
.context().GetZSTDContext();
1398 assert(context
!= nullptr);
1399 #ifdef ROCKSDB_ZSTD_DDICT
1400 if (info
.dict().GetDigestedZstdDDict() != nullptr) {
1401 actual_output_length
= ZSTD_decompress_usingDDict(
1402 context
, output
.get(), output_len
, input_data
, input_length
,
1403 info
.dict().GetDigestedZstdDDict());
1405 #endif // ROCKSDB_ZSTD_DDICT
1406 if (actual_output_length
== 0) {
1407 actual_output_length
= ZSTD_decompress_usingDict(
1408 context
, output
.get(), output_len
, input_data
, input_length
,
1409 info
.dict().GetRawDict().data(), info
.dict().GetRawDict().size());
1411 #else // up to v0.4.x
1413 actual_output_length
=
1414 ZSTD_decompress(output
.get(), output_len
, input_data
, input_length
);
1415 #endif // ZSTD_VERSION_NUMBER >= 500
1416 assert(actual_output_length
== output_len
);
1417 *uncompressed_size
= actual_output_length
;
1423 (void)uncompressed_size
;
1429 inline bool ZSTD_TrainDictionarySupported() {
1431 // Dictionary trainer is available since v0.6.1 for static linking, but not
1432 // available for dynamic linking until v1.1.3. For now we enable the feature
1434 return (ZSTD_versionNumber() >= 10103);
1440 inline std::string
ZSTD_TrainDictionary(const std::string
& samples
,
1441 const std::vector
<size_t>& sample_lens
,
1442 size_t max_dict_bytes
) {
1443 // Dictionary trainer is available since v0.6.1 for static linking, but not
1444 // available for dynamic linking until v1.1.3. For now we enable the feature
1446 #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
1447 assert(samples
.empty() == sample_lens
.empty());
1448 if (samples
.empty()) {
1451 std::string
dict_data(max_dict_bytes
, '\0');
1452 size_t dict_len
= ZDICT_trainFromBuffer(
1453 &dict_data
[0], max_dict_bytes
, &samples
[0], &sample_lens
[0],
1454 static_cast<unsigned>(sample_lens
.size()));
1455 if (ZDICT_isError(dict_len
)) {
1458 assert(dict_len
<= max_dict_bytes
);
1459 dict_data
.resize(dict_len
);
1461 #else // up to v1.1.2
1465 (void)max_dict_bytes
;
1467 #endif // ZSTD_VERSION_NUMBER >= 10103
1470 inline std::string
ZSTD_TrainDictionary(const std::string
& samples
,
1471 size_t sample_len_shift
,
1472 size_t max_dict_bytes
) {
1473 // Dictionary trainer is available since v0.6.1, but ZSTD was marked stable
1474 // only since v0.8.0. For now we enable the feature in stable versions only.
1475 #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
1476 // skips potential partial sample at the end of "samples"
1477 size_t num_samples
= samples
.size() >> sample_len_shift
;
1478 std::vector
<size_t> sample_lens(num_samples
, size_t(1) << sample_len_shift
);
1479 return ZSTD_TrainDictionary(samples
, sample_lens
, max_dict_bytes
);
1480 #else // up to v1.1.2
1483 (void)sample_len_shift
;
1484 (void)max_dict_bytes
;
1486 #endif // ZSTD_VERSION_NUMBER >= 10103
1489 inline bool ZSTD_FinalizeDictionarySupported() {
1491 // ZDICT_finalizeDictionary API is stable since v1.4.5
1492 return (ZSTD_versionNumber() >= 10405);
1498 inline std::string
ZSTD_FinalizeDictionary(
1499 const std::string
& samples
, const std::vector
<size_t>& sample_lens
,
1500 size_t max_dict_bytes
, int level
) {
1501 // ZDICT_finalizeDictionary is stable since version v1.4.5
1502 #if ZSTD_VERSION_NUMBER >= 10405 // v1.4.5+
1503 assert(samples
.empty() == sample_lens
.empty());
1504 if (samples
.empty()) {
1507 if (level
== CompressionOptions::kDefaultCompressionLevel
) {
1508 // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
1509 // https://github.com/facebook/zstd/issues/1148
1512 std::string
dict_data(max_dict_bytes
, '\0');
1513 size_t dict_len
= ZDICT_finalizeDictionary(
1514 dict_data
.data(), max_dict_bytes
, samples
.data(),
1515 std::min(static_cast<size_t>(samples
.size()), max_dict_bytes
),
1516 samples
.data(), sample_lens
.data(),
1517 static_cast<unsigned>(sample_lens
.size()),
1518 {level
, 0 /* notificationLevel */, 0 /* dictID */});
1519 if (ZDICT_isError(dict_len
)) {
1522 assert(dict_len
<= max_dict_bytes
);
1523 dict_data
.resize(dict_len
);
1526 #else // up to v1.4.4
1529 (void)max_dict_bytes
;
1532 #endif // ZSTD_VERSION_NUMBER >= 10405
1535 inline bool CompressData(const Slice
& raw
,
1536 const CompressionInfo
& compression_info
,
1537 uint32_t compress_format_version
,
1538 std::string
* compressed_output
) {
1541 // Will return compressed block contents if (1) the compression method is
1542 // supported in this platform and (2) the compression rate is "good enough".
1543 switch (compression_info
.type()) {
1544 case kSnappyCompression
:
1545 ret
= Snappy_Compress(compression_info
, raw
.data(), raw
.size(),
1548 case kZlibCompression
:
1549 ret
= Zlib_Compress(compression_info
, compress_format_version
, raw
.data(),
1550 raw
.size(), compressed_output
);
1552 case kBZip2Compression
:
1553 ret
= BZip2_Compress(compression_info
, compress_format_version
,
1554 raw
.data(), raw
.size(), compressed_output
);
1556 case kLZ4Compression
:
1557 ret
= LZ4_Compress(compression_info
, compress_format_version
, raw
.data(),
1558 raw
.size(), compressed_output
);
1560 case kLZ4HCCompression
:
1561 ret
= LZ4HC_Compress(compression_info
, compress_format_version
,
1562 raw
.data(), raw
.size(), compressed_output
);
1564 case kXpressCompression
:
1565 ret
= XPRESS_Compress(raw
.data(), raw
.size(), compressed_output
);
1568 case kZSTDNotFinalCompression
:
1569 ret
= ZSTD_Compress(compression_info
, raw
.data(), raw
.size(),
1573 // Do not recognize this compression type
1577 TEST_SYNC_POINT_CALLBACK("CompressData:TamperWithReturnValue",
1578 static_cast<void*>(&ret
));
1583 inline CacheAllocationPtr
UncompressData(
1584 const UncompressionInfo
& uncompression_info
, const char* data
, size_t n
,
1585 size_t* uncompressed_size
, uint32_t compress_format_version
,
1586 MemoryAllocator
* allocator
= nullptr) {
1587 switch (uncompression_info
.type()) {
1588 case kSnappyCompression
:
1589 return Snappy_Uncompress(data
, n
, uncompressed_size
, allocator
);
1590 case kZlibCompression
:
1591 return Zlib_Uncompress(uncompression_info
, data
, n
, uncompressed_size
,
1592 compress_format_version
, allocator
);
1593 case kBZip2Compression
:
1594 return BZip2_Uncompress(data
, n
, uncompressed_size
,
1595 compress_format_version
, allocator
);
1596 case kLZ4Compression
:
1597 case kLZ4HCCompression
:
1598 return LZ4_Uncompress(uncompression_info
, data
, n
, uncompressed_size
,
1599 compress_format_version
, allocator
);
1600 case kXpressCompression
:
1601 // XPRESS allocates memory internally, thus no support for custom
1603 return CacheAllocationPtr(XPRESS_Uncompress(data
, n
, uncompressed_size
));
1605 case kZSTDNotFinalCompression
:
1606 return ZSTD_Uncompress(uncompression_info
, data
, n
, uncompressed_size
,
1609 return CacheAllocationPtr();
1613 // Records the compression type for subsequent WAL records.
1614 class CompressionTypeRecord
{
1616 explicit CompressionTypeRecord(CompressionType compression_type
)
1617 : compression_type_(compression_type
) {}
1619 CompressionType
GetCompressionType() const { return compression_type_
; }
1621 inline void EncodeTo(std::string
* dst
) const {
1622 assert(dst
!= nullptr);
1623 PutFixed32(dst
, compression_type_
);
1626 inline Status
DecodeFrom(Slice
* src
) {
1627 constexpr char class_name
[] = "CompressionTypeRecord";
1630 if (!GetFixed32(src
, &val
)) {
1631 return Status::Corruption(class_name
,
1632 "Error decoding WAL compression type");
1634 CompressionType compression_type
= static_cast<CompressionType
>(val
);
1635 if (!StreamingCompressionTypeSupported(compression_type
)) {
1636 return Status::Corruption(class_name
,
1637 "WAL compression type not supported");
1639 compression_type_
= compression_type
;
1640 return Status::OK();
1643 inline std::string
DebugString() const {
1644 return "compression_type: " + CompressionTypeToString(compression_type_
);
1648 CompressionType compression_type_
;
1651 // Base class to implement compression for a stream of buffers.
1652 // Instantiate an implementation of the class using Create() with the
1653 // compression type and use Compress() repeatedly.
1654 // The output buffer needs to be at least max_output_len.
1655 // Call Reset() in between frame boundaries or in case of an error.
1656 // NOTE: This class is not thread safe.
1657 class StreamingCompress
{
1659 StreamingCompress(CompressionType compression_type
,
1660 const CompressionOptions
& opts
,
1661 uint32_t compress_format_version
, size_t max_output_len
)
1662 : compression_type_(compression_type
),
1664 compress_format_version_(compress_format_version
),
1665 max_output_len_(max_output_len
) {}
1666 virtual ~StreamingCompress() = default;
1667 // compress should be called repeatedly with the same input till the method
1670 // input - buffer to compress
1671 // input_size - size of input buffer
1672 // output - compressed buffer allocated by caller, should be at least
1674 // output_size - size of the output buffer
1675 // Returns -1 for errors, the remaining size of the input buffer that needs to
1677 virtual int Compress(const char* input
, size_t input_size
, char* output
,
1678 size_t* output_pos
) = 0;
1679 // static method to create object of a class inherited from StreamingCompress
1680 // based on the actual compression type.
1681 static StreamingCompress
* Create(CompressionType compression_type
,
1682 const CompressionOptions
& opts
,
1683 uint32_t compress_format_version
,
1684 size_t max_output_len
);
1685 virtual void Reset() = 0;
1688 const CompressionType compression_type_
;
1689 const CompressionOptions opts_
;
1690 const uint32_t compress_format_version_
;
1691 const size_t max_output_len_
;
1694 // Base class to uncompress a stream of compressed buffers.
1695 // Instantiate an implementation of the class using Create() with the
1696 // compression type and use Uncompress() repeatedly.
1697 // The output buffer needs to be at least max_output_len.
1698 // Call Reset() in between frame boundaries or in case of an error.
1699 // NOTE: This class is not thread safe.
1700 class StreamingUncompress
{
1702 StreamingUncompress(CompressionType compression_type
,
1703 uint32_t compress_format_version
, size_t max_output_len
)
1704 : compression_type_(compression_type
),
1705 compress_format_version_(compress_format_version
),
1706 max_output_len_(max_output_len
) {}
1707 virtual ~StreamingUncompress() = default;
1708 // uncompress should be called again with the same input if output_size is
1709 // equal to max_output_len or with the next input fragment.
1711 // input - buffer to uncompress
1712 // input_size - size of input buffer
1713 // output - uncompressed buffer allocated by caller, should be at least
1715 // output_size - size of the output buffer
1716 // Returns -1 for errors, remaining input to be processed otherwise.
1717 virtual int Uncompress(const char* input
, size_t input_size
, char* output
,
1718 size_t* output_pos
) = 0;
1719 static StreamingUncompress
* Create(CompressionType compression_type
,
1720 uint32_t compress_format_version
,
1721 size_t max_output_len
);
1722 virtual void Reset() = 0;
1725 CompressionType compression_type_
;
1726 uint32_t compress_format_version_
;
1727 size_t max_output_len_
;
1730 class ZSTDStreamingCompress final
: public StreamingCompress
{
1732 explicit ZSTDStreamingCompress(const CompressionOptions
& opts
,
1733 uint32_t compress_format_version
,
1734 size_t max_output_len
)
1735 : StreamingCompress(kZSTD
, opts
, compress_format_version
,
1737 #ifdef ZSTD_STREAMING
1738 cctx_
= ZSTD_createCCtx();
1739 // Each compressed frame will have a checksum
1740 ZSTD_CCtx_setParameter(cctx_
, ZSTD_c_checksumFlag
, 1);
1741 assert(cctx_
!= nullptr);
1742 input_buffer_
= {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
1745 ~ZSTDStreamingCompress() override
{
1746 #ifdef ZSTD_STREAMING
1747 ZSTD_freeCCtx(cctx_
);
1750 int Compress(const char* input
, size_t input_size
, char* output
,
1751 size_t* output_pos
) override
;
1752 void Reset() override
;
1753 #ifdef ZSTD_STREAMING
1755 ZSTD_inBuffer input_buffer_
;
1759 class ZSTDStreamingUncompress final
: public StreamingUncompress
{
1761 explicit ZSTDStreamingUncompress(uint32_t compress_format_version
,
1762 size_t max_output_len
)
1763 : StreamingUncompress(kZSTD
, compress_format_version
, max_output_len
) {
1764 #ifdef ZSTD_STREAMING
1765 dctx_
= ZSTD_createDCtx();
1766 assert(dctx_
!= nullptr);
1767 input_buffer_
= {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
1770 ~ZSTDStreamingUncompress() override
{
1771 #ifdef ZSTD_STREAMING
1772 ZSTD_freeDCtx(dctx_
);
1775 int Uncompress(const char* input
, size_t input_size
, char* output
,
1776 size_t* output_size
) override
;
1777 void Reset() override
;
1780 #ifdef ZSTD_STREAMING
1782 ZSTD_inBuffer input_buffer_
;
1786 } // namespace ROCKSDB_NAMESPACE