]> git.proxmox.com Git - ceph.git/blob - ceph/src/compressor/zstd/ZstdCompressor.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / compressor / zstd / ZstdCompressor.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_ZSTDCOMPRESSOR_H
16 #define CEPH_ZSTDCOMPRESSOR_H
17
18 #include "zstd/lib/zstd.h"
19 #include "include/buffer.h"
20 #include "include/encoding.h"
21 #include "compressor/Compressor.h"
22
23 #define COMPRESSION_LEVEL 5
24
25 class ZstdCompressor : public Compressor {
26 public:
27 ZstdCompressor() : Compressor(COMP_ALG_ZSTD, "zstd") {}
28
29 int compress(const bufferlist &src, bufferlist &dst) override {
30 bufferptr outptr = buffer::create_page_aligned(
31 ZSTD_compressBound(src.length()));
32 ZSTD_outBuffer_s outbuf;
33 outbuf.dst = outptr.c_str();
34 outbuf.size = outptr.length();
35 outbuf.pos = 0;
36
37 ZSTD_CStream *s = ZSTD_createCStream();
38 ZSTD_initCStream(s, COMPRESSION_LEVEL);
39 auto p = src.begin();
40 size_t left = src.length();
41 while (left) {
42 assert(!p.end());
43 struct ZSTD_inBuffer_s inbuf;
44 inbuf.pos = 0;
45 inbuf.size = p.get_ptr_and_advance(left, (const char**)&inbuf.src);
46 ZSTD_compressStream(s, &outbuf, &inbuf);
47 left -= inbuf.size;
48 }
49 assert(p.end());
50 ZSTD_flushStream(s, &outbuf);
51 ZSTD_endStream(s, &outbuf);
52 ZSTD_freeCStream(s);
53
54 // prefix with decompressed length
55 ::encode((uint32_t)src.length(), dst);
56 dst.append(outptr, 0, outbuf.pos);
57 return 0;
58 }
59
60 int decompress(const bufferlist &src, bufferlist &dst) override {
61 bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
62 return decompress(i, src.length(), dst);
63 }
64
65 int decompress(bufferlist::iterator &p,
66 size_t compressed_len,
67 bufferlist &dst) override {
68 if (compressed_len < 4) {
69 return -1;
70 }
71 compressed_len -= 4;
72 uint32_t dst_len;
73 ::decode(dst_len, p);
74
75 bufferptr dstptr(dst_len);
76 ZSTD_outBuffer_s outbuf;
77 outbuf.dst = dstptr.c_str();
78 outbuf.size = dstptr.length();
79 outbuf.pos = 0;
80 ZSTD_DStream *s = ZSTD_createDStream();
81 ZSTD_initDStream(s);
82 while (compressed_len > 0) {
83 if (p.end()) {
84 return -1;
85 }
86 ZSTD_inBuffer_s inbuf;
87 inbuf.pos = 0;
88 inbuf.size = p.get_ptr_and_advance(compressed_len, (const char**)&inbuf.src);
89 ZSTD_decompressStream(s, &outbuf, &inbuf);
90 compressed_len -= inbuf.size;
91 }
92 ZSTD_freeDStream(s);
93
94 dst.append(dstptr, 0, outbuf.pos);
95 return 0;
96 }
97 };
98
99 #endif