]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/compression.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / compression.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 #pragma once
11
12 #include <algorithm>
13 #include <limits>
14 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
15 #ifdef OS_FREEBSD
16 #include <malloc_np.h>
17 #else // OS_FREEBSD
18 #include <malloc.h>
19 #endif // OS_FREEBSD
20 #endif // ROCKSDB_MALLOC_USABLE_SIZE
21 #include <string>
22
23 #include "memory/memory_allocator.h"
24 #include "rocksdb/options.h"
25 #include "rocksdb/table.h"
26 #include "test_util/sync_point.h"
27 #include "util/coding.h"
28 #include "util/compression_context_cache.h"
29 #include "util/string_util.h"
30
31 #ifdef SNAPPY
32 #include <snappy.h>
33 #endif
34
35 #ifdef ZLIB
36 #include <zlib.h>
37 #endif
38
39 #ifdef BZIP2
40 #include <bzlib.h>
41 #endif
42
43 #if defined(LZ4)
44 #include <lz4.h>
45 #include <lz4hc.h>
46 #endif
47
48 #if defined(ZSTD)
49 #include <zstd.h>
50 // v1.1.3+
51 #if ZSTD_VERSION_NUMBER >= 10103
52 #include <zdict.h>
53 #endif // ZSTD_VERSION_NUMBER >= 10103
54 // v1.4.0+
55 #if ZSTD_VERSION_NUMBER >= 10400
56 #define ZSTD_STREAMING
57 #endif // ZSTD_VERSION_NUMBER >= 10400
58 namespace ROCKSDB_NAMESPACE {
59 // Need this for the context allocation override
60 // On windows we need to do this explicitly
61 #if (ZSTD_VERSION_NUMBER >= 500)
62 #if defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
63 defined(ZSTD_STATIC_LINKING_ONLY)
64 #define ROCKSDB_ZSTD_CUSTOM_MEM
65 namespace port {
66 ZSTD_customMem GetJeZstdAllocationOverrides();
67 } // namespace port
68 #endif // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
69 // defined(ZSTD_STATIC_LINKING_ONLY)
70
71 // We require `ZSTD_sizeof_DDict` and `ZSTD_createDDict_byReference` to use
72 // `ZSTD_DDict`. The former was introduced in v1.0.0 and the latter was
73 // introduced in v1.1.3. But an important bug fix for `ZSTD_sizeof_DDict` came
74 // in v1.1.4, so that is the version we require. As of today's latest version
75 // (v1.3.8), they are both still in the experimental API, which means they are
76 // only exported when the compiler flag `ZSTD_STATIC_LINKING_ONLY` is set.
77 #if defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
78 #define ROCKSDB_ZSTD_DDICT
79 #endif // defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
80
81 // Cached data represents a portion that can be re-used
82 // If, in the future we have more than one native context to
83 // cache we can arrange this as a tuple
84 class ZSTDUncompressCachedData {
85 public:
86 using ZSTDNativeContext = ZSTD_DCtx*;
87 ZSTDUncompressCachedData() {}
88 // Init from cache
89 ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
90 ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
91 ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
92 : ZSTDUncompressCachedData() {
93 *this = std::move(o);
94 }
95 ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
96 assert(zstd_ctx_ == nullptr);
97 std::swap(zstd_ctx_, o.zstd_ctx_);
98 std::swap(cache_idx_, o.cache_idx_);
99 return *this;
100 }
101 ZSTDNativeContext Get() const { return zstd_ctx_; }
102 int64_t GetCacheIndex() const { return cache_idx_; }
103 void CreateIfNeeded() {
104 if (zstd_ctx_ == nullptr) {
105 #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
106 zstd_ctx_ =
107 ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
108 #else // ROCKSDB_ZSTD_CUSTOM_MEM
109 zstd_ctx_ = ZSTD_createDCtx();
110 #endif // ROCKSDB_ZSTD_CUSTOM_MEM
111 cache_idx_ = -1;
112 }
113 }
114 void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) {
115 zstd_ctx_ = o.zstd_ctx_;
116 cache_idx_ = idx;
117 }
118 ~ZSTDUncompressCachedData() {
119 if (zstd_ctx_ != nullptr && cache_idx_ == -1) {
120 ZSTD_freeDCtx(zstd_ctx_);
121 }
122 }
123
124 private:
125 ZSTDNativeContext zstd_ctx_ = nullptr;
126 int64_t cache_idx_ = -1; // -1 means this instance owns the context
127 };
128 #endif // (ZSTD_VERSION_NUMBER >= 500)
129 } // namespace ROCKSDB_NAMESPACE
130 #endif // ZSTD
131
132 #if !(defined ZSTD) || !(ZSTD_VERSION_NUMBER >= 500)
133 namespace ROCKSDB_NAMESPACE {
134 class ZSTDUncompressCachedData {
135 void* padding; // unused
136 public:
137 using ZSTDNativeContext = void*;
138 ZSTDUncompressCachedData() {}
139 ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {}
140 ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
141 ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) noexcept = default;
142 ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) noexcept =
143 default;
144 ZSTDNativeContext Get() const { return nullptr; }
145 int64_t GetCacheIndex() const { return -1; }
146 void CreateIfNeeded() {}
147 void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {}
148
149 private:
150 void ignore_padding__() { padding = nullptr; }
151 };
152 } // namespace ROCKSDB_NAMESPACE
153 #endif
154
155 #if defined(XPRESS)
156 #include "port/xpress.h"
157 #endif
158
159 namespace ROCKSDB_NAMESPACE {
160
161 // Holds dictionary and related data, like ZSTD's digested compression
162 // dictionary.
163 struct CompressionDict {
164 #if ZSTD_VERSION_NUMBER >= 700
165 ZSTD_CDict* zstd_cdict_ = nullptr;
166 #endif // ZSTD_VERSION_NUMBER >= 700
167 std::string dict_;
168
169 public:
170 #if ZSTD_VERSION_NUMBER >= 700
171 CompressionDict(std::string dict, CompressionType type, int level) {
172 #else // ZSTD_VERSION_NUMBER >= 700
173 CompressionDict(std::string dict, CompressionType /*type*/, int /*level*/) {
174 #endif // ZSTD_VERSION_NUMBER >= 700
175 dict_ = std::move(dict);
176 #if ZSTD_VERSION_NUMBER >= 700
177 zstd_cdict_ = nullptr;
178 if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) {
179 if (level == CompressionOptions::kDefaultCompressionLevel) {
180 // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
181 // https://github.com/facebook/zstd/issues/1148
182 level = 3;
183 }
184 // Should be safe (but slower) if below call fails as we'll use the
185 // raw dictionary to compress.
186 zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
187 assert(zstd_cdict_ != nullptr);
188 }
189 #endif // ZSTD_VERSION_NUMBER >= 700
190 }
191
192 ~CompressionDict() {
193 #if ZSTD_VERSION_NUMBER >= 700
194 size_t res = 0;
195 if (zstd_cdict_ != nullptr) {
196 res = ZSTD_freeCDict(zstd_cdict_);
197 }
198 assert(res == 0); // Last I checked they can't fail
199 (void)res; // prevent unused var warning
200 #endif // ZSTD_VERSION_NUMBER >= 700
201 }
202
203 #if ZSTD_VERSION_NUMBER >= 700
204 const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; }
205 #endif // ZSTD_VERSION_NUMBER >= 700
206
207 Slice GetRawDict() const { return dict_; }
208
209 static const CompressionDict& GetEmptyDict() {
210 static CompressionDict empty_dict{};
211 return empty_dict;
212 }
213
214 CompressionDict() = default;
215 // Disable copy/move
216 CompressionDict(const CompressionDict&) = delete;
217 CompressionDict& operator=(const CompressionDict&) = delete;
218 CompressionDict(CompressionDict&&) = delete;
219 CompressionDict& operator=(CompressionDict&&) = delete;
220 };
221
222 // Holds dictionary and related data, like ZSTD's digested uncompression
223 // dictionary.
224 struct UncompressionDict {
225 // Block containing the data for the compression dictionary in case the
226 // constructor that takes a string parameter is used.
227 std::string dict_;
228
229 // Block containing the data for the compression dictionary in case the
230 // constructor that takes a Slice parameter is used and the passed in
231 // CacheAllocationPtr is not nullptr.
232 CacheAllocationPtr allocation_;
233
234 // Slice pointing to the compression dictionary data. Can point to
235 // dict_, allocation_, or some other memory location, depending on how
236 // the object was constructed.
237 Slice slice_;
238
239 #ifdef ROCKSDB_ZSTD_DDICT
240 // Processed version of the contents of slice_ for ZSTD compression.
241 ZSTD_DDict* zstd_ddict_ = nullptr;
242 #endif // ROCKSDB_ZSTD_DDICT
243
244 #ifdef ROCKSDB_ZSTD_DDICT
245 UncompressionDict(std::string dict, bool using_zstd)
246 #else // ROCKSDB_ZSTD_DDICT
247 UncompressionDict(std::string dict, bool /* using_zstd */)
248 #endif // ROCKSDB_ZSTD_DDICT
249 : dict_(std::move(dict)), slice_(dict_) {
250 #ifdef ROCKSDB_ZSTD_DDICT
251 if (!slice_.empty() && using_zstd) {
252 zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
253 assert(zstd_ddict_ != nullptr);
254 }
255 #endif // ROCKSDB_ZSTD_DDICT
256 }
257
258 #ifdef ROCKSDB_ZSTD_DDICT
259 UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
260 bool using_zstd)
261 #else // ROCKSDB_ZSTD_DDICT
262 UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
263 bool /* using_zstd */)
264 #endif // ROCKSDB_ZSTD_DDICT
265 : allocation_(std::move(allocation)), slice_(std::move(slice)) {
266 #ifdef ROCKSDB_ZSTD_DDICT
267 if (!slice_.empty() && using_zstd) {
268 zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
269 assert(zstd_ddict_ != nullptr);
270 }
271 #endif // ROCKSDB_ZSTD_DDICT
272 }
273
274 UncompressionDict(UncompressionDict&& rhs)
275 : dict_(std::move(rhs.dict_)),
276 allocation_(std::move(rhs.allocation_)),
277 slice_(std::move(rhs.slice_))
278 #ifdef ROCKSDB_ZSTD_DDICT
279 ,
280 zstd_ddict_(rhs.zstd_ddict_)
281 #endif
282 {
283 #ifdef ROCKSDB_ZSTD_DDICT
284 rhs.zstd_ddict_ = nullptr;
285 #endif
286 }
287
288 ~UncompressionDict() {
289 #ifdef ROCKSDB_ZSTD_DDICT
290 size_t res = 0;
291 if (zstd_ddict_ != nullptr) {
292 res = ZSTD_freeDDict(zstd_ddict_);
293 }
294 assert(res == 0); // Last I checked they can't fail
295 (void)res; // prevent unused var warning
296 #endif // ROCKSDB_ZSTD_DDICT
297 }
298
299 UncompressionDict& operator=(UncompressionDict&& rhs) {
300 if (this == &rhs) {
301 return *this;
302 }
303
304 dict_ = std::move(rhs.dict_);
305 allocation_ = std::move(rhs.allocation_);
306 slice_ = std::move(rhs.slice_);
307
308 #ifdef ROCKSDB_ZSTD_DDICT
309 zstd_ddict_ = rhs.zstd_ddict_;
310 rhs.zstd_ddict_ = nullptr;
311 #endif
312
313 return *this;
314 }
315
316 // The object is self-contained if the string constructor is used, or the
317 // Slice constructor is invoked with a non-null allocation. Otherwise, it
318 // is the caller's responsibility to ensure that the underlying storage
319 // outlives this object.
320 bool own_bytes() const { return !dict_.empty() || allocation_; }
321
322 const Slice& GetRawDict() const { return slice_; }
323
324 #ifdef ROCKSDB_ZSTD_DDICT
325 const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; }
326 #endif // ROCKSDB_ZSTD_DDICT
327
328 static const UncompressionDict& GetEmptyDict() {
329 static UncompressionDict empty_dict{};
330 return empty_dict;
331 }
332
333 size_t ApproximateMemoryUsage() const {
334 size_t usage = sizeof(struct UncompressionDict);
335 usage += dict_.size();
336 if (allocation_) {
337 auto allocator = allocation_.get_deleter().allocator;
338 if (allocator) {
339 usage += allocator->UsableSize(allocation_.get(), slice_.size());
340 } else {
341 usage += slice_.size();
342 }
343 }
344 #ifdef ROCKSDB_ZSTD_DDICT
345 usage += ZSTD_sizeof_DDict(zstd_ddict_);
346 #endif // ROCKSDB_ZSTD_DDICT
347 return usage;
348 }
349
350 UncompressionDict() = default;
351 // Disable copy
352 UncompressionDict(const CompressionDict&) = delete;
353 UncompressionDict& operator=(const CompressionDict&) = delete;
354 };
355
356 class CompressionContext {
357 private:
358 #if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
359 ZSTD_CCtx* zstd_ctx_ = nullptr;
360 void CreateNativeContext(CompressionType type) {
361 if (type == kZSTD || type == kZSTDNotFinalCompression) {
362 #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
363 zstd_ctx_ =
364 ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
365 #else // ROCKSDB_ZSTD_CUSTOM_MEM
366 zstd_ctx_ = ZSTD_createCCtx();
367 #endif // ROCKSDB_ZSTD_CUSTOM_MEM
368 }
369 }
370 void DestroyNativeContext() {
371 if (zstd_ctx_ != nullptr) {
372 ZSTD_freeCCtx(zstd_ctx_);
373 }
374 }
375
376 public:
377 // callable inside ZSTD_Compress
378 ZSTD_CCtx* ZSTDPreallocCtx() const {
379 assert(zstd_ctx_ != nullptr);
380 return zstd_ctx_;
381 }
382
383 #else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
384 private:
385 void CreateNativeContext(CompressionType /* type */) {}
386 void DestroyNativeContext() {}
387 #endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
388 public:
389 explicit CompressionContext(CompressionType type) {
390 CreateNativeContext(type);
391 }
392 ~CompressionContext() { DestroyNativeContext(); }
393 CompressionContext(const CompressionContext&) = delete;
394 CompressionContext& operator=(const CompressionContext&) = delete;
395 };
396
397 class CompressionInfo {
398 const CompressionOptions& opts_;
399 const CompressionContext& context_;
400 const CompressionDict& dict_;
401 const CompressionType type_;
402 const uint64_t sample_for_compression_;
403
404 public:
405 CompressionInfo(const CompressionOptions& _opts,
406 const CompressionContext& _context,
407 const CompressionDict& _dict, CompressionType _type,
408 uint64_t _sample_for_compression)
409 : opts_(_opts),
410 context_(_context),
411 dict_(_dict),
412 type_(_type),
413 sample_for_compression_(_sample_for_compression) {}
414
415 const CompressionOptions& options() const { return opts_; }
416 const CompressionContext& context() const { return context_; }
417 const CompressionDict& dict() const { return dict_; }
418 CompressionType type() const { return type_; }
419 uint64_t SampleForCompression() const { return sample_for_compression_; }
420 };
421
422 class UncompressionContext {
423 private:
424 CompressionContextCache* ctx_cache_ = nullptr;
425 ZSTDUncompressCachedData uncomp_cached_data_;
426
427 public:
428 explicit UncompressionContext(CompressionType type) {
429 if (type == kZSTD || type == kZSTDNotFinalCompression) {
430 ctx_cache_ = CompressionContextCache::Instance();
431 uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
432 }
433 }
434 ~UncompressionContext() {
435 if (uncomp_cached_data_.GetCacheIndex() != -1) {
436 assert(ctx_cache_ != nullptr);
437 ctx_cache_->ReturnCachedZSTDUncompressData(
438 uncomp_cached_data_.GetCacheIndex());
439 }
440 }
441 UncompressionContext(const UncompressionContext&) = delete;
442 UncompressionContext& operator=(const UncompressionContext&) = delete;
443
444 ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
445 return uncomp_cached_data_.Get();
446 }
447 };
448
449 class UncompressionInfo {
450 const UncompressionContext& context_;
451 const UncompressionDict& dict_;
452 const CompressionType type_;
453
454 public:
455 UncompressionInfo(const UncompressionContext& _context,
456 const UncompressionDict& _dict, CompressionType _type)
457 : context_(_context), dict_(_dict), type_(_type) {}
458
459 const UncompressionContext& context() const { return context_; }
460 const UncompressionDict& dict() const { return dict_; }
461 CompressionType type() const { return type_; }
462 };
463
464 inline bool Snappy_Supported() {
465 #ifdef SNAPPY
466 return true;
467 #else
468 return false;
469 #endif
470 }
471
472 inline bool Zlib_Supported() {
473 #ifdef ZLIB
474 return true;
475 #else
476 return false;
477 #endif
478 }
479
480 inline bool BZip2_Supported() {
481 #ifdef BZIP2
482 return true;
483 #else
484 return false;
485 #endif
486 }
487
488 inline bool LZ4_Supported() {
489 #ifdef LZ4
490 return true;
491 #else
492 return false;
493 #endif
494 }
495
496 inline bool XPRESS_Supported() {
497 #ifdef XPRESS
498 return true;
499 #else
500 return false;
501 #endif
502 }
503
504 inline bool ZSTD_Supported() {
505 #ifdef ZSTD
506 // ZSTD format is finalized since version 0.8.0.
507 return (ZSTD_versionNumber() >= 800);
508 #else
509 return false;
510 #endif
511 }
512
513 inline bool ZSTDNotFinal_Supported() {
514 #ifdef ZSTD
515 return true;
516 #else
517 return false;
518 #endif
519 }
520
521 inline bool ZSTD_Streaming_Supported() {
522 #if defined(ZSTD) && defined(ZSTD_STREAMING)
523 return true;
524 #else
525 return false;
526 #endif
527 }
528
529 inline bool StreamingCompressionTypeSupported(
530 CompressionType compression_type) {
531 switch (compression_type) {
532 case kNoCompression:
533 return true;
534 case kZSTD:
535 return ZSTD_Streaming_Supported();
536 default:
537 return false;
538 }
539 }
540
541 inline bool CompressionTypeSupported(CompressionType compression_type) {
542 switch (compression_type) {
543 case kNoCompression:
544 return true;
545 case kSnappyCompression:
546 return Snappy_Supported();
547 case kZlibCompression:
548 return Zlib_Supported();
549 case kBZip2Compression:
550 return BZip2_Supported();
551 case kLZ4Compression:
552 return LZ4_Supported();
553 case kLZ4HCCompression:
554 return LZ4_Supported();
555 case kXpressCompression:
556 return XPRESS_Supported();
557 case kZSTDNotFinalCompression:
558 return ZSTDNotFinal_Supported();
559 case kZSTD:
560 return ZSTD_Supported();
561 default:
562 assert(false);
563 return false;
564 }
565 }
566
567 inline bool DictCompressionTypeSupported(CompressionType compression_type) {
568 switch (compression_type) {
569 case kNoCompression:
570 return false;
571 case kSnappyCompression:
572 return false;
573 case kZlibCompression:
574 return Zlib_Supported();
575 case kBZip2Compression:
576 return false;
577 case kLZ4Compression:
578 case kLZ4HCCompression:
579 #if LZ4_VERSION_NUMBER >= 10400 // r124+
580 return LZ4_Supported();
581 #else
582 return false;
583 #endif
584 case kXpressCompression:
585 return false;
586 case kZSTDNotFinalCompression:
587 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
588 return ZSTDNotFinal_Supported();
589 #else
590 return false;
591 #endif
592 case kZSTD:
593 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
594 return ZSTD_Supported();
595 #else
596 return false;
597 #endif
598 default:
599 assert(false);
600 return false;
601 }
602 }
603
604 inline std::string CompressionTypeToString(CompressionType compression_type) {
605 switch (compression_type) {
606 case kNoCompression:
607 return "NoCompression";
608 case kSnappyCompression:
609 return "Snappy";
610 case kZlibCompression:
611 return "Zlib";
612 case kBZip2Compression:
613 return "BZip2";
614 case kLZ4Compression:
615 return "LZ4";
616 case kLZ4HCCompression:
617 return "LZ4HC";
618 case kXpressCompression:
619 return "Xpress";
620 case kZSTD:
621 return "ZSTD";
622 case kZSTDNotFinalCompression:
623 return "ZSTDNotFinal";
624 case kDisableCompressionOption:
625 return "DisableOption";
626 default:
627 assert(false);
628 return "";
629 }
630 }
631
632 inline std::string CompressionOptionsToString(
633 CompressionOptions& compression_options) {
634 std::string result;
635 result.reserve(512);
636 result.append("window_bits=")
637 .append(std::to_string(compression_options.window_bits))
638 .append("; ");
639 result.append("level=")
640 .append(std::to_string(compression_options.level))
641 .append("; ");
642 result.append("strategy=")
643 .append(std::to_string(compression_options.strategy))
644 .append("; ");
645 result.append("max_dict_bytes=")
646 .append(std::to_string(compression_options.max_dict_bytes))
647 .append("; ");
648 result.append("zstd_max_train_bytes=")
649 .append(std::to_string(compression_options.zstd_max_train_bytes))
650 .append("; ");
651 result.append("enabled=")
652 .append(std::to_string(compression_options.enabled))
653 .append("; ");
654 result.append("max_dict_buffer_bytes=")
655 .append(std::to_string(compression_options.max_dict_buffer_bytes))
656 .append("; ");
657 result.append("use_zstd_dict_trainer=")
658 .append(std::to_string(compression_options.use_zstd_dict_trainer))
659 .append("; ");
660 return result;
661 }
662
663 // compress_format_version can have two values:
664 // 1 -- decompressed sizes for BZip2 and Zlib are not included in the compressed
665 // block. Also, decompressed sizes for LZ4 are encoded in platform-dependent
666 // way.
667 // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
668 // start of compressed block. Snappy format is the same as version 1.
669
670 inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input,
671 size_t length, ::std::string* output) {
672 #ifdef SNAPPY
673 output->resize(snappy::MaxCompressedLength(length));
674 size_t outlen;
675 snappy::RawCompress(input, length, &(*output)[0], &outlen);
676 output->resize(outlen);
677 return true;
678 #else
679 (void)input;
680 (void)length;
681 (void)output;
682 return false;
683 #endif
684 }
685
686 inline CacheAllocationPtr Snappy_Uncompress(
687 const char* input, size_t length, size_t* uncompressed_size,
688 MemoryAllocator* allocator = nullptr) {
689 #ifdef SNAPPY
690 size_t uncompressed_length = 0;
691 if (!snappy::GetUncompressedLength(input, length, &uncompressed_length)) {
692 return nullptr;
693 }
694
695 CacheAllocationPtr output = AllocateBlock(uncompressed_length, allocator);
696
697 if (!snappy::RawUncompress(input, length, output.get())) {
698 return nullptr;
699 }
700
701 *uncompressed_size = uncompressed_length;
702
703 return output;
704 #else
705 (void)input;
706 (void)length;
707 (void)uncompressed_size;
708 (void)allocator;
709 return nullptr;
710 #endif
711 }
712
713 namespace compression {
714 // returns size
715 inline size_t PutDecompressedSizeInfo(std::string* output, uint32_t length) {
716 PutVarint32(output, length);
717 return output->size();
718 }
719
720 inline bool GetDecompressedSizeInfo(const char** input_data,
721 size_t* input_length,
722 uint32_t* output_len) {
723 auto new_input_data =
724 GetVarint32Ptr(*input_data, *input_data + *input_length, output_len);
725 if (new_input_data == nullptr) {
726 return false;
727 }
728 *input_length -= (new_input_data - *input_data);
729 *input_data = new_input_data;
730 return true;
731 }
732 } // namespace compression
733
734 // compress_format_version == 1 -- decompressed size is not included in the
735 // block header
736 // compress_format_version == 2 -- decompressed size is included in the block
737 // header in varint32 format
738 // @param compression_dict Data for presetting the compression library's
739 // dictionary.
740 inline bool Zlib_Compress(const CompressionInfo& info,
741 uint32_t compress_format_version, const char* input,
742 size_t length, ::std::string* output) {
743 #ifdef ZLIB
744 if (length > std::numeric_limits<uint32_t>::max()) {
745 // Can't compress more than 4GB
746 return false;
747 }
748
749 size_t output_header_len = 0;
750 if (compress_format_version == 2) {
751 output_header_len = compression::PutDecompressedSizeInfo(
752 output, static_cast<uint32_t>(length));
753 }
754
755 // The memLevel parameter specifies how much memory should be allocated for
756 // the internal compression state.
757 // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
758 // memLevel=9 uses maximum memory for optimal speed.
759 // The default value is 8. See zconf.h for more details.
760 static const int memLevel = 8;
761 int level;
762 if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
763 level = Z_DEFAULT_COMPRESSION;
764 } else {
765 level = info.options().level;
766 }
767 z_stream _stream;
768 memset(&_stream, 0, sizeof(z_stream));
769 int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits,
770 memLevel, info.options().strategy);
771 if (st != Z_OK) {
772 return false;
773 }
774
775 Slice compression_dict = info.dict().GetRawDict();
776 if (compression_dict.size()) {
777 // Initialize the compression library's dictionary
778 st = deflateSetDictionary(
779 &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
780 static_cast<unsigned int>(compression_dict.size()));
781 if (st != Z_OK) {
782 deflateEnd(&_stream);
783 return false;
784 }
785 }
786
787 // Get an upper bound on the compressed size.
788 size_t upper_bound =
789 deflateBound(&_stream, static_cast<unsigned long>(length));
790 output->resize(output_header_len + upper_bound);
791
792 // Compress the input, and put compressed data in output.
793 _stream.next_in = (Bytef*)input;
794 _stream.avail_in = static_cast<unsigned int>(length);
795
796 // Initialize the output size.
797 _stream.avail_out = static_cast<unsigned int>(upper_bound);
798 _stream.next_out = reinterpret_cast<Bytef*>(&(*output)[output_header_len]);
799
800 bool compressed = false;
801 st = deflate(&_stream, Z_FINISH);
802 if (st == Z_STREAM_END) {
803 compressed = true;
804 output->resize(output->size() - _stream.avail_out);
805 }
806 // The only return value we really care about is Z_STREAM_END.
807 // Z_OK means insufficient output space. This means the compression is
808 // bigger than decompressed size. Just fail the compression in that case.
809
810 deflateEnd(&_stream);
811 return compressed;
812 #else
813 (void)info;
814 (void)compress_format_version;
815 (void)input;
816 (void)length;
817 (void)output;
818 return false;
819 #endif
820 }
821
822 // compress_format_version == 1 -- decompressed size is not included in the
823 // block header
824 // compress_format_version == 2 -- decompressed size is included in the block
825 // header in varint32 format
826 // @param compression_dict Data for presetting the compression library's
827 // dictionary.
828 inline CacheAllocationPtr Zlib_Uncompress(
829 const UncompressionInfo& info, const char* input_data, size_t input_length,
830 size_t* uncompressed_size, uint32_t compress_format_version,
831 MemoryAllocator* allocator = nullptr, int windowBits = -14) {
832 #ifdef ZLIB
833 uint32_t output_len = 0;
834 if (compress_format_version == 2) {
835 if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
836 &output_len)) {
837 return nullptr;
838 }
839 } else {
840 // Assume the decompressed data size will 5x of compressed size, but round
841 // to the page size
842 size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
843 output_len = static_cast<uint32_t>(
844 std::min(proposed_output_len,
845 static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
846 }
847
848 z_stream _stream;
849 memset(&_stream, 0, sizeof(z_stream));
850
851 // For raw inflate, the windowBits should be -8..-15.
852 // If windowBits is bigger than zero, it will use either zlib
853 // header or gzip header. Adding 32 to it will do automatic detection.
854 int st =
855 inflateInit2(&_stream, windowBits > 0 ? windowBits + 32 : windowBits);
856 if (st != Z_OK) {
857 return nullptr;
858 }
859
860 const Slice& compression_dict = info.dict().GetRawDict();
861 if (compression_dict.size()) {
862 // Initialize the compression library's dictionary
863 st = inflateSetDictionary(
864 &_stream, reinterpret_cast<const Bytef*>(compression_dict.data()),
865 static_cast<unsigned int>(compression_dict.size()));
866 if (st != Z_OK) {
867 return nullptr;
868 }
869 }
870
871 _stream.next_in = (Bytef*)input_data;
872 _stream.avail_in = static_cast<unsigned int>(input_length);
873
874 auto output = AllocateBlock(output_len, allocator);
875
876 _stream.next_out = (Bytef*)output.get();
877 _stream.avail_out = static_cast<unsigned int>(output_len);
878
879 bool done = false;
880 while (!done) {
881 st = inflate(&_stream, Z_SYNC_FLUSH);
882 switch (st) {
883 case Z_STREAM_END:
884 done = true;
885 break;
886 case Z_OK: {
887 // No output space. Increase the output space by 20%.
888 // We should never run out of output space if
889 // compress_format_version == 2
890 assert(compress_format_version != 2);
891 size_t old_sz = output_len;
892 uint32_t output_len_delta = output_len / 5;
893 output_len += output_len_delta < 10 ? 10 : output_len_delta;
894 auto tmp = AllocateBlock(output_len, allocator);
895 memcpy(tmp.get(), output.get(), old_sz);
896 output = std::move(tmp);
897
898 // Set more output.
899 _stream.next_out = (Bytef*)(output.get() + old_sz);
900 _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
901 break;
902 }
903 case Z_BUF_ERROR:
904 default:
905 inflateEnd(&_stream);
906 return nullptr;
907 }
908 }
909
910 // If we encoded decompressed block size, we should have no bytes left
911 assert(compress_format_version != 2 || _stream.avail_out == 0);
912 assert(output_len >= _stream.avail_out);
913 *uncompressed_size = output_len - _stream.avail_out;
914 inflateEnd(&_stream);
915 return output;
916 #else
917 (void)info;
918 (void)input_data;
919 (void)input_length;
920 (void)uncompressed_size;
921 (void)compress_format_version;
922 (void)allocator;
923 (void)windowBits;
924 return nullptr;
925 #endif
926 }
927
928 // compress_format_version == 1 -- decompressed size is not included in the
929 // block header
930 // compress_format_version == 2 -- decompressed size is included in the block
931 // header in varint32 format
932 inline bool BZip2_Compress(const CompressionInfo& /*info*/,
933 uint32_t compress_format_version, const char* input,
934 size_t length, ::std::string* output) {
935 #ifdef BZIP2
936 if (length > std::numeric_limits<uint32_t>::max()) {
937 // Can't compress more than 4GB
938 return false;
939 }
940 size_t output_header_len = 0;
941 if (compress_format_version == 2) {
942 output_header_len = compression::PutDecompressedSizeInfo(
943 output, static_cast<uint32_t>(length));
944 }
945 // Resize output to be the plain data length.
946 // This may not be big enough if the compression actually expands data.
947 output->resize(output_header_len + length);
948
949 bz_stream _stream;
950 memset(&_stream, 0, sizeof(bz_stream));
951
952 // Block size 1 is 100K.
953 // 0 is for silent.
954 // 30 is the default workFactor
955 int st = BZ2_bzCompressInit(&_stream, 1, 0, 30);
956 if (st != BZ_OK) {
957 return false;
958 }
959
960 // Compress the input, and put compressed data in output.
961 _stream.next_in = (char*)input;
962 _stream.avail_in = static_cast<unsigned int>(length);
963
964 // Initialize the output size.
965 _stream.avail_out = static_cast<unsigned int>(length);
966 _stream.next_out = reinterpret_cast<char*>(&(*output)[output_header_len]);
967
968 bool compressed = false;
969 st = BZ2_bzCompress(&_stream, BZ_FINISH);
970 if (st == BZ_STREAM_END) {
971 compressed = true;
972 output->resize(output->size() - _stream.avail_out);
973 }
974 // The only return value we really care about is BZ_STREAM_END.
975 // BZ_FINISH_OK means insufficient output space. This means the compression
976 // is bigger than decompressed size. Just fail the compression in that case.
977
978 BZ2_bzCompressEnd(&_stream);
979 return compressed;
980 #else
981 (void)compress_format_version;
982 (void)input;
983 (void)length;
984 (void)output;
985 return false;
986 #endif
987 }
988
989 // compress_format_version == 1 -- decompressed size is not included in the
990 // block header
991 // compress_format_version == 2 -- decompressed size is included in the block
992 // header in varint32 format
993 inline CacheAllocationPtr BZip2_Uncompress(
994 const char* input_data, size_t input_length, size_t* uncompressed_size,
995 uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) {
996 #ifdef BZIP2
997 uint32_t output_len = 0;
998 if (compress_format_version == 2) {
999 if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
1000 &output_len)) {
1001 return nullptr;
1002 }
1003 } else {
1004 // Assume the decompressed data size will 5x of compressed size, but round
1005 // to the next page size
1006 size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
1007 output_len = static_cast<uint32_t>(
1008 std::min(proposed_output_len,
1009 static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
1010 }
1011
1012 bz_stream _stream;
1013 memset(&_stream, 0, sizeof(bz_stream));
1014
1015 int st = BZ2_bzDecompressInit(&_stream, 0, 0);
1016 if (st != BZ_OK) {
1017 return nullptr;
1018 }
1019
1020 _stream.next_in = (char*)input_data;
1021 _stream.avail_in = static_cast<unsigned int>(input_length);
1022
1023 auto output = AllocateBlock(output_len, allocator);
1024
1025 _stream.next_out = (char*)output.get();
1026 _stream.avail_out = static_cast<unsigned int>(output_len);
1027
1028 bool done = false;
1029 while (!done) {
1030 st = BZ2_bzDecompress(&_stream);
1031 switch (st) {
1032 case BZ_STREAM_END:
1033 done = true;
1034 break;
1035 case BZ_OK: {
1036 // No output space. Increase the output space by 20%.
1037 // We should never run out of output space if
1038 // compress_format_version == 2
1039 assert(compress_format_version != 2);
1040 uint32_t old_sz = output_len;
1041 output_len = output_len * 1.2;
1042 auto tmp = AllocateBlock(output_len, allocator);
1043 memcpy(tmp.get(), output.get(), old_sz);
1044 output = std::move(tmp);
1045
1046 // Set more output.
1047 _stream.next_out = (char*)(output.get() + old_sz);
1048 _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
1049 break;
1050 }
1051 default:
1052 BZ2_bzDecompressEnd(&_stream);
1053 return nullptr;
1054 }
1055 }
1056
1057 // If we encoded decompressed block size, we should have no bytes left
1058 assert(compress_format_version != 2 || _stream.avail_out == 0);
1059 assert(output_len >= _stream.avail_out);
1060 *uncompressed_size = output_len - _stream.avail_out;
1061 BZ2_bzDecompressEnd(&_stream);
1062 return output;
1063 #else
1064 (void)input_data;
1065 (void)input_length;
1066 (void)uncompressed_size;
1067 (void)compress_format_version;
1068 (void)allocator;
1069 return nullptr;
1070 #endif
1071 }
1072
1073 // compress_format_version == 1 -- decompressed size is included in the
1074 // block header using memcpy, which makes database non-portable)
1075 // compress_format_version == 2 -- decompressed size is included in the block
1076 // header in varint32 format
1077 // @param compression_dict Data for presetting the compression library's
1078 // dictionary.
1079 inline bool LZ4_Compress(const CompressionInfo& info,
1080 uint32_t compress_format_version, const char* input,
1081 size_t length, ::std::string* output) {
1082 #ifdef LZ4
1083 if (length > std::numeric_limits<uint32_t>::max()) {
1084 // Can't compress more than 4GB
1085 return false;
1086 }
1087
1088 size_t output_header_len = 0;
1089 if (compress_format_version == 2) {
1090 // new encoding, using varint32 to store size information
1091 output_header_len = compression::PutDecompressedSizeInfo(
1092 output, static_cast<uint32_t>(length));
1093 } else {
1094 // legacy encoding, which is not really portable (depends on big/little
1095 // endianness)
1096 output_header_len = 8;
1097 output->resize(output_header_len);
1098 char* p = const_cast<char*>(output->c_str());
1099 memcpy(p, &length, sizeof(length));
1100 }
1101 int compress_bound = LZ4_compressBound(static_cast<int>(length));
1102 output->resize(static_cast<size_t>(output_header_len + compress_bound));
1103
1104 int outlen;
1105 #if LZ4_VERSION_NUMBER >= 10400 // r124+
1106 LZ4_stream_t* stream = LZ4_createStream();
1107 Slice compression_dict = info.dict().GetRawDict();
1108 if (compression_dict.size()) {
1109 LZ4_loadDict(stream, compression_dict.data(),
1110 static_cast<int>(compression_dict.size()));
1111 }
1112 #if LZ4_VERSION_NUMBER >= 10700 // r129+
1113 outlen =
1114 LZ4_compress_fast_continue(stream, input, &(*output)[output_header_len],
1115 static_cast<int>(length), compress_bound, 1);
1116 #else // up to r128
1117 outlen = LZ4_compress_limitedOutput_continue(
1118 stream, input, &(*output)[output_header_len], static_cast<int>(length),
1119 compress_bound);
1120 #endif
1121 LZ4_freeStream(stream);
1122 #else // up to r123
1123 outlen = LZ4_compress_limitedOutput(input, &(*output)[output_header_len],
1124 static_cast<int>(length), compress_bound);
1125 #endif // LZ4_VERSION_NUMBER >= 10400
1126
1127 if (outlen == 0) {
1128 return false;
1129 }
1130 output->resize(static_cast<size_t>(output_header_len + outlen));
1131 return true;
1132 #else // LZ4
1133 (void)info;
1134 (void)compress_format_version;
1135 (void)input;
1136 (void)length;
1137 (void)output;
1138 return false;
1139 #endif
1140 }
1141
1142 // compress_format_version == 1 -- decompressed size is included in the
1143 // block header using memcpy, which makes database non-portable)
1144 // compress_format_version == 2 -- decompressed size is included in the block
1145 // header in varint32 format
1146 // @param compression_dict Data for presetting the compression library's
1147 // dictionary.
1148 inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
1149 const char* input_data,
1150 size_t input_length,
1151 size_t* uncompressed_size,
1152 uint32_t compress_format_version,
1153 MemoryAllocator* allocator = nullptr) {
1154 #ifdef LZ4
1155 uint32_t output_len = 0;
1156 if (compress_format_version == 2) {
1157 // new encoding, using varint32 to store size information
1158 if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
1159 &output_len)) {
1160 return nullptr;
1161 }
1162 } else {
1163 // legacy encoding, which is not really portable (depends on big/little
1164 // endianness)
1165 if (input_length < 8) {
1166 return nullptr;
1167 }
1168 if (port::kLittleEndian) {
1169 memcpy(&output_len, input_data, sizeof(output_len));
1170 } else {
1171 memcpy(&output_len, input_data + 4, sizeof(output_len));
1172 }
1173 input_length -= 8;
1174 input_data += 8;
1175 }
1176
1177 auto output = AllocateBlock(output_len, allocator);
1178
1179 int decompress_bytes = 0;
1180
1181 #if LZ4_VERSION_NUMBER >= 10400 // r124+
1182 LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
1183 const Slice& compression_dict = info.dict().GetRawDict();
1184 if (compression_dict.size()) {
1185 LZ4_setStreamDecode(stream, compression_dict.data(),
1186 static_cast<int>(compression_dict.size()));
1187 }
1188 decompress_bytes = LZ4_decompress_safe_continue(
1189 stream, input_data, output.get(), static_cast<int>(input_length),
1190 static_cast<int>(output_len));
1191 LZ4_freeStreamDecode(stream);
1192 #else // up to r123
1193 decompress_bytes = LZ4_decompress_safe(input_data, output.get(),
1194 static_cast<int>(input_length),
1195 static_cast<int>(output_len));
1196 #endif // LZ4_VERSION_NUMBER >= 10400
1197
1198 if (decompress_bytes < 0) {
1199 return nullptr;
1200 }
1201 assert(decompress_bytes == static_cast<int>(output_len));
1202 *uncompressed_size = decompress_bytes;
1203 return output;
1204 #else // LZ4
1205 (void)info;
1206 (void)input_data;
1207 (void)input_length;
1208 (void)uncompressed_size;
1209 (void)compress_format_version;
1210 (void)allocator;
1211 return nullptr;
1212 #endif
1213 }
1214
1215 // compress_format_version == 1 -- decompressed size is included in the
1216 // block header using memcpy, which makes database non-portable)
1217 // compress_format_version == 2 -- decompressed size is included in the block
1218 // header in varint32 format
1219 // @param compression_dict Data for presetting the compression library's
1220 // dictionary.
1221 inline bool LZ4HC_Compress(const CompressionInfo& info,
1222 uint32_t compress_format_version, const char* input,
1223 size_t length, ::std::string* output) {
1224 #ifdef LZ4
1225 if (length > std::numeric_limits<uint32_t>::max()) {
1226 // Can't compress more than 4GB
1227 return false;
1228 }
1229
1230 size_t output_header_len = 0;
1231 if (compress_format_version == 2) {
1232 // new encoding, using varint32 to store size information
1233 output_header_len = compression::PutDecompressedSizeInfo(
1234 output, static_cast<uint32_t>(length));
1235 } else {
1236 // legacy encoding, which is not really portable (depends on big/little
1237 // endianness)
1238 output_header_len = 8;
1239 output->resize(output_header_len);
1240 char* p = const_cast<char*>(output->c_str());
1241 memcpy(p, &length, sizeof(length));
1242 }
1243 int compress_bound = LZ4_compressBound(static_cast<int>(length));
1244 output->resize(static_cast<size_t>(output_header_len + compress_bound));
1245
1246 int outlen;
1247 int level;
1248 if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
1249 level = 0; // lz4hc.h says any value < 1 will be sanitized to default
1250 } else {
1251 level = info.options().level;
1252 }
1253 #if LZ4_VERSION_NUMBER >= 10400 // r124+
1254 LZ4_streamHC_t* stream = LZ4_createStreamHC();
1255 LZ4_resetStreamHC(stream, level);
1256 Slice compression_dict = info.dict().GetRawDict();
1257 const char* compression_dict_data =
1258 compression_dict.size() > 0 ? compression_dict.data() : nullptr;
1259 size_t compression_dict_size = compression_dict.size();
1260 if (compression_dict_data != nullptr) {
1261 LZ4_loadDictHC(stream, compression_dict_data,
1262 static_cast<int>(compression_dict_size));
1263 }
1264
1265 #if LZ4_VERSION_NUMBER >= 10700 // r129+
1266 outlen =
1267 LZ4_compress_HC_continue(stream, input, &(*output)[output_header_len],
1268 static_cast<int>(length), compress_bound);
1269 #else // r124-r128
1270 outlen = LZ4_compressHC_limitedOutput_continue(
1271 stream, input, &(*output)[output_header_len], static_cast<int>(length),
1272 compress_bound);
1273 #endif // LZ4_VERSION_NUMBER >= 10700
1274 LZ4_freeStreamHC(stream);
1275
1276 #elif LZ4_VERSION_MAJOR // r113-r123
1277 outlen = LZ4_compressHC2_limitedOutput(input, &(*output)[output_header_len],
1278 static_cast<int>(length),
1279 compress_bound, level);
1280 #else // up to r112
1281 outlen =
1282 LZ4_compressHC_limitedOutput(input, &(*output)[output_header_len],
1283 static_cast<int>(length), compress_bound);
1284 #endif // LZ4_VERSION_NUMBER >= 10400
1285
1286 if (outlen == 0) {
1287 return false;
1288 }
1289 output->resize(static_cast<size_t>(output_header_len + outlen));
1290 return true;
1291 #else // LZ4
1292 (void)info;
1293 (void)compress_format_version;
1294 (void)input;
1295 (void)length;
1296 (void)output;
1297 return false;
1298 #endif
1299 }
1300
1301 #ifdef XPRESS
1302 inline bool XPRESS_Compress(const char* input, size_t length,
1303 std::string* output) {
1304 return port::xpress::Compress(input, length, output);
1305 }
1306 #else
1307 inline bool XPRESS_Compress(const char* /*input*/, size_t /*length*/,
1308 std::string* /*output*/) {
1309 return false;
1310 }
1311 #endif
1312
1313 #ifdef XPRESS
1314 inline char* XPRESS_Uncompress(const char* input_data, size_t input_length,
1315 size_t* uncompressed_size) {
1316 return port::xpress::Decompress(input_data, input_length, uncompressed_size);
1317 }
1318 #else
1319 inline char* XPRESS_Uncompress(const char* /*input_data*/,
1320 size_t /*input_length*/,
1321 size_t* /*uncompressed_size*/) {
1322 return nullptr;
1323 }
1324 #endif
1325
1326 inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
1327 size_t length, ::std::string* output) {
1328 #ifdef ZSTD
1329 if (length > std::numeric_limits<uint32_t>::max()) {
1330 // Can't compress more than 4GB
1331 return false;
1332 }
1333
1334 size_t output_header_len = compression::PutDecompressedSizeInfo(
1335 output, static_cast<uint32_t>(length));
1336
1337 size_t compressBound = ZSTD_compressBound(length);
1338 output->resize(static_cast<size_t>(output_header_len + compressBound));
1339 size_t outlen = 0;
1340 int level;
1341 if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
1342 // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
1343 // https://github.com/facebook/zstd/issues/1148
1344 level = 3;
1345 } else {
1346 level = info.options().level;
1347 }
1348 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
1349 ZSTD_CCtx* context = info.context().ZSTDPreallocCtx();
1350 assert(context != nullptr);
1351 #if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
1352 if (info.dict().GetDigestedZstdCDict() != nullptr) {
1353 outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len],
1354 compressBound, input, length,
1355 info.dict().GetDigestedZstdCDict());
1356 }
1357 #endif // ZSTD_VERSION_NUMBER >= 700
1358 if (outlen == 0) {
1359 outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
1360 compressBound, input, length,
1361 info.dict().GetRawDict().data(),
1362 info.dict().GetRawDict().size(), level);
1363 }
1364 #else // up to v0.4.x
1365 outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
1366 length, level);
1367 #endif // ZSTD_VERSION_NUMBER >= 500
1368 if (outlen == 0) {
1369 return false;
1370 }
1371 output->resize(output_header_len + outlen);
1372 return true;
1373 #else // ZSTD
1374 (void)info;
1375 (void)input;
1376 (void)length;
1377 (void)output;
1378 return false;
1379 #endif
1380 }
1381
1382 // @param compression_dict Data for presetting the compression library's
1383 // dictionary.
1384 inline CacheAllocationPtr ZSTD_Uncompress(
1385 const UncompressionInfo& info, const char* input_data, size_t input_length,
1386 size_t* uncompressed_size, MemoryAllocator* allocator = nullptr) {
1387 #ifdef ZSTD
1388 uint32_t output_len = 0;
1389 if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
1390 &output_len)) {
1391 return nullptr;
1392 }
1393
1394 auto output = AllocateBlock(output_len, allocator);
1395 size_t actual_output_length = 0;
1396 #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
1397 ZSTD_DCtx* context = info.context().GetZSTDContext();
1398 assert(context != nullptr);
1399 #ifdef ROCKSDB_ZSTD_DDICT
1400 if (info.dict().GetDigestedZstdDDict() != nullptr) {
1401 actual_output_length = ZSTD_decompress_usingDDict(
1402 context, output.get(), output_len, input_data, input_length,
1403 info.dict().GetDigestedZstdDDict());
1404 }
1405 #endif // ROCKSDB_ZSTD_DDICT
1406 if (actual_output_length == 0) {
1407 actual_output_length = ZSTD_decompress_usingDict(
1408 context, output.get(), output_len, input_data, input_length,
1409 info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
1410 }
1411 #else // up to v0.4.x
1412 (void)info;
1413 actual_output_length =
1414 ZSTD_decompress(output.get(), output_len, input_data, input_length);
1415 #endif // ZSTD_VERSION_NUMBER >= 500
1416 assert(actual_output_length == output_len);
1417 *uncompressed_size = actual_output_length;
1418 return output;
1419 #else // ZSTD
1420 (void)info;
1421 (void)input_data;
1422 (void)input_length;
1423 (void)uncompressed_size;
1424 (void)allocator;
1425 return nullptr;
1426 #endif
1427 }
1428
1429 inline bool ZSTD_TrainDictionarySupported() {
1430 #ifdef ZSTD
1431 // Dictionary trainer is available since v0.6.1 for static linking, but not
1432 // available for dynamic linking until v1.1.3. For now we enable the feature
1433 // in v1.1.3+ only.
1434 return (ZSTD_versionNumber() >= 10103);
1435 #else
1436 return false;
1437 #endif
1438 }
1439
1440 inline std::string ZSTD_TrainDictionary(const std::string& samples,
1441 const std::vector<size_t>& sample_lens,
1442 size_t max_dict_bytes) {
1443 // Dictionary trainer is available since v0.6.1 for static linking, but not
1444 // available for dynamic linking until v1.1.3. For now we enable the feature
1445 // in v1.1.3+ only.
1446 #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
1447 assert(samples.empty() == sample_lens.empty());
1448 if (samples.empty()) {
1449 return "";
1450 }
1451 std::string dict_data(max_dict_bytes, '\0');
1452 size_t dict_len = ZDICT_trainFromBuffer(
1453 &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
1454 static_cast<unsigned>(sample_lens.size()));
1455 if (ZDICT_isError(dict_len)) {
1456 return "";
1457 }
1458 assert(dict_len <= max_dict_bytes);
1459 dict_data.resize(dict_len);
1460 return dict_data;
1461 #else // up to v1.1.2
1462 assert(false);
1463 (void)samples;
1464 (void)sample_lens;
1465 (void)max_dict_bytes;
1466 return "";
1467 #endif // ZSTD_VERSION_NUMBER >= 10103
1468 }
1469
1470 inline std::string ZSTD_TrainDictionary(const std::string& samples,
1471 size_t sample_len_shift,
1472 size_t max_dict_bytes) {
1473 // Dictionary trainer is available since v0.6.1, but ZSTD was marked stable
1474 // only since v0.8.0. For now we enable the feature in stable versions only.
1475 #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
1476 // skips potential partial sample at the end of "samples"
1477 size_t num_samples = samples.size() >> sample_len_shift;
1478 std::vector<size_t> sample_lens(num_samples, size_t(1) << sample_len_shift);
1479 return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
1480 #else // up to v1.1.2
1481 assert(false);
1482 (void)samples;
1483 (void)sample_len_shift;
1484 (void)max_dict_bytes;
1485 return "";
1486 #endif // ZSTD_VERSION_NUMBER >= 10103
1487 }
1488
1489 inline bool ZSTD_FinalizeDictionarySupported() {
1490 #ifdef ZSTD
1491 // ZDICT_finalizeDictionary API is stable since v1.4.5
1492 return (ZSTD_versionNumber() >= 10405);
1493 #else
1494 return false;
1495 #endif
1496 }
1497
1498 inline std::string ZSTD_FinalizeDictionary(
1499 const std::string& samples, const std::vector<size_t>& sample_lens,
1500 size_t max_dict_bytes, int level) {
1501 // ZDICT_finalizeDictionary is stable since version v1.4.5
1502 #if ZSTD_VERSION_NUMBER >= 10405 // v1.4.5+
1503 assert(samples.empty() == sample_lens.empty());
1504 if (samples.empty()) {
1505 return "";
1506 }
1507 if (level == CompressionOptions::kDefaultCompressionLevel) {
1508 // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
1509 // https://github.com/facebook/zstd/issues/1148
1510 level = 3;
1511 }
1512 std::string dict_data(max_dict_bytes, '\0');
1513 size_t dict_len = ZDICT_finalizeDictionary(
1514 dict_data.data(), max_dict_bytes, samples.data(),
1515 std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
1516 samples.data(), sample_lens.data(),
1517 static_cast<unsigned>(sample_lens.size()),
1518 {level, 0 /* notificationLevel */, 0 /* dictID */});
1519 if (ZDICT_isError(dict_len)) {
1520 return "";
1521 } else {
1522 assert(dict_len <= max_dict_bytes);
1523 dict_data.resize(dict_len);
1524 return dict_data;
1525 }
1526 #else // up to v1.4.4
1527 (void)samples;
1528 (void)sample_lens;
1529 (void)max_dict_bytes;
1530 (void)level;
1531 return "";
1532 #endif // ZSTD_VERSION_NUMBER >= 10405
1533 }
1534
1535 inline bool CompressData(const Slice& raw,
1536 const CompressionInfo& compression_info,
1537 uint32_t compress_format_version,
1538 std::string* compressed_output) {
1539 bool ret = false;
1540
1541 // Will return compressed block contents if (1) the compression method is
1542 // supported in this platform and (2) the compression rate is "good enough".
1543 switch (compression_info.type()) {
1544 case kSnappyCompression:
1545 ret = Snappy_Compress(compression_info, raw.data(), raw.size(),
1546 compressed_output);
1547 break;
1548 case kZlibCompression:
1549 ret = Zlib_Compress(compression_info, compress_format_version, raw.data(),
1550 raw.size(), compressed_output);
1551 break;
1552 case kBZip2Compression:
1553 ret = BZip2_Compress(compression_info, compress_format_version,
1554 raw.data(), raw.size(), compressed_output);
1555 break;
1556 case kLZ4Compression:
1557 ret = LZ4_Compress(compression_info, compress_format_version, raw.data(),
1558 raw.size(), compressed_output);
1559 break;
1560 case kLZ4HCCompression:
1561 ret = LZ4HC_Compress(compression_info, compress_format_version,
1562 raw.data(), raw.size(), compressed_output);
1563 break;
1564 case kXpressCompression:
1565 ret = XPRESS_Compress(raw.data(), raw.size(), compressed_output);
1566 break;
1567 case kZSTD:
1568 case kZSTDNotFinalCompression:
1569 ret = ZSTD_Compress(compression_info, raw.data(), raw.size(),
1570 compressed_output);
1571 break;
1572 default:
1573 // Do not recognize this compression type
1574 break;
1575 }
1576
1577 TEST_SYNC_POINT_CALLBACK("CompressData:TamperWithReturnValue",
1578 static_cast<void*>(&ret));
1579
1580 return ret;
1581 }
1582
1583 inline CacheAllocationPtr UncompressData(
1584 const UncompressionInfo& uncompression_info, const char* data, size_t n,
1585 size_t* uncompressed_size, uint32_t compress_format_version,
1586 MemoryAllocator* allocator = nullptr) {
1587 switch (uncompression_info.type()) {
1588 case kSnappyCompression:
1589 return Snappy_Uncompress(data, n, uncompressed_size, allocator);
1590 case kZlibCompression:
1591 return Zlib_Uncompress(uncompression_info, data, n, uncompressed_size,
1592 compress_format_version, allocator);
1593 case kBZip2Compression:
1594 return BZip2_Uncompress(data, n, uncompressed_size,
1595 compress_format_version, allocator);
1596 case kLZ4Compression:
1597 case kLZ4HCCompression:
1598 return LZ4_Uncompress(uncompression_info, data, n, uncompressed_size,
1599 compress_format_version, allocator);
1600 case kXpressCompression:
1601 // XPRESS allocates memory internally, thus no support for custom
1602 // allocator.
1603 return CacheAllocationPtr(XPRESS_Uncompress(data, n, uncompressed_size));
1604 case kZSTD:
1605 case kZSTDNotFinalCompression:
1606 return ZSTD_Uncompress(uncompression_info, data, n, uncompressed_size,
1607 allocator);
1608 default:
1609 return CacheAllocationPtr();
1610 }
1611 }
1612
1613 // Records the compression type for subsequent WAL records.
1614 class CompressionTypeRecord {
1615 public:
1616 explicit CompressionTypeRecord(CompressionType compression_type)
1617 : compression_type_(compression_type) {}
1618
1619 CompressionType GetCompressionType() const { return compression_type_; }
1620
1621 inline void EncodeTo(std::string* dst) const {
1622 assert(dst != nullptr);
1623 PutFixed32(dst, compression_type_);
1624 }
1625
1626 inline Status DecodeFrom(Slice* src) {
1627 constexpr char class_name[] = "CompressionTypeRecord";
1628
1629 uint32_t val;
1630 if (!GetFixed32(src, &val)) {
1631 return Status::Corruption(class_name,
1632 "Error decoding WAL compression type");
1633 }
1634 CompressionType compression_type = static_cast<CompressionType>(val);
1635 if (!StreamingCompressionTypeSupported(compression_type)) {
1636 return Status::Corruption(class_name,
1637 "WAL compression type not supported");
1638 }
1639 compression_type_ = compression_type;
1640 return Status::OK();
1641 }
1642
1643 inline std::string DebugString() const {
1644 return "compression_type: " + CompressionTypeToString(compression_type_);
1645 }
1646
1647 private:
1648 CompressionType compression_type_;
1649 };
1650
1651 // Base class to implement compression for a stream of buffers.
1652 // Instantiate an implementation of the class using Create() with the
1653 // compression type and use Compress() repeatedly.
1654 // The output buffer needs to be at least max_output_len.
1655 // Call Reset() in between frame boundaries or in case of an error.
1656 // NOTE: This class is not thread safe.
1657 class StreamingCompress {
1658 public:
1659 StreamingCompress(CompressionType compression_type,
1660 const CompressionOptions& opts,
1661 uint32_t compress_format_version, size_t max_output_len)
1662 : compression_type_(compression_type),
1663 opts_(opts),
1664 compress_format_version_(compress_format_version),
1665 max_output_len_(max_output_len) {}
1666 virtual ~StreamingCompress() = default;
1667 // compress should be called repeatedly with the same input till the method
1668 // returns 0
1669 // Parameters:
1670 // input - buffer to compress
1671 // input_size - size of input buffer
1672 // output - compressed buffer allocated by caller, should be at least
1673 // max_output_len
1674 // output_size - size of the output buffer
1675 // Returns -1 for errors, the remaining size of the input buffer that needs to
1676 // be compressed
1677 virtual int Compress(const char* input, size_t input_size, char* output,
1678 size_t* output_pos) = 0;
1679 // static method to create object of a class inherited from StreamingCompress
1680 // based on the actual compression type.
1681 static StreamingCompress* Create(CompressionType compression_type,
1682 const CompressionOptions& opts,
1683 uint32_t compress_format_version,
1684 size_t max_output_len);
1685 virtual void Reset() = 0;
1686
1687 protected:
1688 const CompressionType compression_type_;
1689 const CompressionOptions opts_;
1690 const uint32_t compress_format_version_;
1691 const size_t max_output_len_;
1692 };
1693
1694 // Base class to uncompress a stream of compressed buffers.
1695 // Instantiate an implementation of the class using Create() with the
1696 // compression type and use Uncompress() repeatedly.
1697 // The output buffer needs to be at least max_output_len.
1698 // Call Reset() in between frame boundaries or in case of an error.
1699 // NOTE: This class is not thread safe.
1700 class StreamingUncompress {
1701 public:
1702 StreamingUncompress(CompressionType compression_type,
1703 uint32_t compress_format_version, size_t max_output_len)
1704 : compression_type_(compression_type),
1705 compress_format_version_(compress_format_version),
1706 max_output_len_(max_output_len) {}
1707 virtual ~StreamingUncompress() = default;
1708 // uncompress should be called again with the same input if output_size is
1709 // equal to max_output_len or with the next input fragment.
1710 // Parameters:
1711 // input - buffer to uncompress
1712 // input_size - size of input buffer
1713 // output - uncompressed buffer allocated by caller, should be at least
1714 // max_output_len
1715 // output_size - size of the output buffer
1716 // Returns -1 for errors, remaining input to be processed otherwise.
1717 virtual int Uncompress(const char* input, size_t input_size, char* output,
1718 size_t* output_pos) = 0;
1719 static StreamingUncompress* Create(CompressionType compression_type,
1720 uint32_t compress_format_version,
1721 size_t max_output_len);
1722 virtual void Reset() = 0;
1723
1724 protected:
1725 CompressionType compression_type_;
1726 uint32_t compress_format_version_;
1727 size_t max_output_len_;
1728 };
1729
1730 class ZSTDStreamingCompress final : public StreamingCompress {
1731 public:
1732 explicit ZSTDStreamingCompress(const CompressionOptions& opts,
1733 uint32_t compress_format_version,
1734 size_t max_output_len)
1735 : StreamingCompress(kZSTD, opts, compress_format_version,
1736 max_output_len) {
1737 #ifdef ZSTD_STREAMING
1738 cctx_ = ZSTD_createCCtx();
1739 // Each compressed frame will have a checksum
1740 ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
1741 assert(cctx_ != nullptr);
1742 input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
1743 #endif
1744 }
1745 ~ZSTDStreamingCompress() override {
1746 #ifdef ZSTD_STREAMING
1747 ZSTD_freeCCtx(cctx_);
1748 #endif
1749 }
1750 int Compress(const char* input, size_t input_size, char* output,
1751 size_t* output_pos) override;
1752 void Reset() override;
1753 #ifdef ZSTD_STREAMING
1754 ZSTD_CCtx* cctx_;
1755 ZSTD_inBuffer input_buffer_;
1756 #endif
1757 };
1758
1759 class ZSTDStreamingUncompress final : public StreamingUncompress {
1760 public:
1761 explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
1762 size_t max_output_len)
1763 : StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
1764 #ifdef ZSTD_STREAMING
1765 dctx_ = ZSTD_createDCtx();
1766 assert(dctx_ != nullptr);
1767 input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
1768 #endif
1769 }
1770 ~ZSTDStreamingUncompress() override {
1771 #ifdef ZSTD_STREAMING
1772 ZSTD_freeDCtx(dctx_);
1773 #endif
1774 }
1775 int Uncompress(const char* input, size_t input_size, char* output,
1776 size_t* output_size) override;
1777 void Reset() override;
1778
1779 private:
1780 #ifdef ZSTD_STREAMING
1781 ZSTD_DCtx* dctx_;
1782 ZSTD_inBuffer input_buffer_;
1783 #endif
1784 };
1785
1786 } // namespace ROCKSDB_NAMESPACE