]>
Commit | Line | Data |
---|---|---|
31f18b77 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2017 Haomai Wang <haomaiwang@gmail.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_LZ4COMPRESSOR_H | |
16 | #define CEPH_LZ4COMPRESSOR_H | |
17 | ||
18 | #include <lz4.h> | |
19 | ||
20 | #include "compressor/Compressor.h" | |
21 | #include "include/buffer.h" | |
22 | #include "include/encoding.h" | |
11fdf7f2 | 23 | #include "common/config.h" |
31f18b77 FG |
24 | #include "common/Tub.h" |
25 | ||
26 | ||
27 | class LZ4Compressor : public Compressor { | |
28 | public: | |
11fdf7f2 TL |
29 | LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") { |
30 | #ifdef HAVE_QATZIP | |
31 | if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) | |
32 | qat_enabled = true; | |
33 | else | |
34 | qat_enabled = false; | |
35 | #endif | |
36 | } | |
31f18b77 FG |
37 | |
38 | int compress(const bufferlist &src, bufferlist &dst) override { | |
9f95a23c TL |
39 | // older versions of liblz4 introduce bit errors when compressing |
40 | // fragmented buffers. this was fixed in lz4 commit | |
41 | // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first | |
42 | // appeared in v1.8.2. | |
43 | // | |
44 | // workaround: rebuild if not contiguous. | |
45 | if (!src.is_contiguous()) { | |
46 | bufferlist new_src = src; | |
47 | new_src.rebuild(); | |
48 | return compress(new_src, dst); | |
49 | } | |
50 | ||
11fdf7f2 TL |
51 | #ifdef HAVE_QATZIP |
52 | if (qat_enabled) | |
53 | return qat_accel.compress(src, dst); | |
54 | #endif | |
55 | bufferptr outptr = buffer::create_small_page_aligned( | |
31f18b77 FG |
56 | LZ4_compressBound(src.length())); |
57 | LZ4_stream_t lz4_stream; | |
58 | LZ4_resetStream(&lz4_stream); | |
59 | ||
60 | auto p = src.begin(); | |
61 | size_t left = src.length(); | |
62 | int pos = 0; | |
63 | const char *data; | |
64 | unsigned num = src.get_num_buffers(); | |
11fdf7f2 | 65 | encode((uint32_t)num, dst); |
31f18b77 | 66 | while (left) { |
11fdf7f2 TL |
67 | uint32_t origin_len = p.get_ptr_and_advance(left, &data); |
68 | int compressed_len = LZ4_compress_fast_continue( | |
31f18b77 FG |
69 | &lz4_stream, data, outptr.c_str()+pos, origin_len, |
70 | outptr.length()-pos, 1); | |
71 | if (compressed_len <= 0) | |
72 | return -1; | |
73 | pos += compressed_len; | |
74 | left -= origin_len; | |
11fdf7f2 TL |
75 | encode(origin_len, dst); |
76 | encode((uint32_t)compressed_len, dst); | |
31f18b77 | 77 | } |
11fdf7f2 | 78 | ceph_assert(p.end()); |
31f18b77 FG |
79 | |
80 | dst.append(outptr, 0, pos); | |
81 | return 0; | |
82 | } | |
83 | ||
84 | int decompress(const bufferlist &src, bufferlist &dst) override { | |
11fdf7f2 TL |
85 | #ifdef HAVE_QATZIP |
86 | if (qat_enabled) | |
87 | return qat_accel.decompress(src, dst); | |
88 | #endif | |
89 | auto i = std::cbegin(src); | |
31f18b77 FG |
90 | return decompress(i, src.length(), dst); |
91 | } | |
92 | ||
11fdf7f2 | 93 | int decompress(bufferlist::const_iterator &p, |
31f18b77 FG |
94 | size_t compressed_len, |
95 | bufferlist &dst) override { | |
11fdf7f2 TL |
96 | #ifdef HAVE_QATZIP |
97 | if (qat_enabled) | |
98 | return qat_accel.decompress(p, compressed_len, dst); | |
99 | #endif | |
31f18b77 FG |
100 | uint32_t count; |
101 | std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs; | |
11fdf7f2 | 102 | decode(count, p); |
31f18b77 FG |
103 | compressed_pairs.resize(count); |
104 | uint32_t total_origin = 0; | |
105 | for (unsigned i = 0; i < count; ++i) { | |
11fdf7f2 TL |
106 | decode(compressed_pairs[i].first, p); |
107 | decode(compressed_pairs[i].second, p); | |
31f18b77 FG |
108 | total_origin += compressed_pairs[i].first; |
109 | } | |
110 | compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); | |
111 | ||
112 | bufferptr dstptr(total_origin); | |
113 | LZ4_streamDecode_t lz4_stream_decode; | |
114 | LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); | |
115 | ||
116 | bufferptr cur_ptr = p.get_current_ptr(); | |
117 | bufferptr *ptr = &cur_ptr; | |
118 | Tub<bufferptr> data_holder; | |
119 | if (compressed_len != cur_ptr.length()) { | |
120 | data_holder.construct(compressed_len); | |
121 | p.copy_deep(compressed_len, *data_holder); | |
122 | ptr = data_holder.get(); | |
123 | } | |
124 | ||
125 | char *c_in = ptr->c_str(); | |
126 | char *c_out = dstptr.c_str(); | |
127 | for (unsigned i = 0; i < count; ++i) { | |
128 | int r = LZ4_decompress_safe_continue( | |
129 | &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); | |
130 | if (r == (int)compressed_pairs[i].first) { | |
131 | c_in += compressed_pairs[i].second; | |
132 | c_out += compressed_pairs[i].first; | |
133 | } else if (r < 0) { | |
134 | return -1; | |
11fdf7f2 | 135 | } else { |
31f18b77 FG |
136 | return -2; |
137 | } | |
138 | } | |
139 | dst.push_back(std::move(dstptr)); | |
140 | return 0; | |
141 | } | |
142 | }; | |
143 | ||
144 | #endif |