]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/rpc/lz4_fragmented_compressor.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / src / rpc / lz4_fragmented_compressor.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2019 Scylladb, Ltd.
20 */
21
22 #include <seastar/rpc/lz4_fragmented_compressor.hh>
23 #include <seastar/core/byteorder.hh>
24
25 #include <lz4.h>
26
27 namespace seastar {
28 namespace rpc {
29
30 const sstring lz4_fragmented_compressor::factory::_name = "LZ4_FRAGMENTED";
31
32 // Compressed message format:
33 // The message consists of one or more data chunks each preceeded by a 4 byte header.
34 // The value of the header detrmines the size of the chunk:
35 // - most significant bit is cleared: intermediate chunk, 31 least significant bits
36 // contain the compressed size of the chunk (i.e. how it appears on wire), the
37 // decompressed size is 32 kB.
38 // - most significant bit is set: last chunk, 31 least significant bits contain the
39 // decompressed size of the chunk, the compressed size is the remaining part of
40 // the message.
41 // Compression and decompression is done using LZ4 streaming interface. Each chunk
42 // depends on the one that precedes it.
43 // All metadata is little-endian.
44
45 static constexpr uint32_t last_chunk_flag = uint32_t(1) << 31;
46 static constexpr size_t chunk_header_size = sizeof(uint32_t);
47 static constexpr size_t chunk_size = 32 * 1024;
48
49 namespace {
50
51 struct compression_stream_deleter {
52 void operator()(LZ4_stream_t* stream) const noexcept {
53 LZ4_freeStream(stream);
54 }
55 };
56
57 struct decompression_stream_deleter {
58 void operator()(LZ4_streamDecode_t* stream) const noexcept {
59 LZ4_freeStreamDecode(stream);
60 }
61 };
62
63 }
64
65 snd_buf lz4_fragmented_compressor::compress(size_t head_space, snd_buf data) {
66 static thread_local auto stream = std::unique_ptr<LZ4_stream_t, compression_stream_deleter>(LZ4_createStream());
67 static_assert(chunk_size <= snd_buf::chunk_size, "chunk_size <= snd_buf::chunk_size");
68
69 LZ4_resetStream(stream.get());
70
71 auto size_left = data.size;
72 auto src = compat::get_if<temporary_buffer<char>>(&data.bufs);
73 if (!src) {
74 src = compat::get<std::vector<temporary_buffer<char>>>(data.bufs).data();
75 }
76
77 auto single_chunk_size = LZ4_COMPRESSBOUND(size_left) + head_space + chunk_header_size;
78 if (single_chunk_size <= chunk_size && size_left <= chunk_size && src->size() == size_left) {
79 // faster path for small messages
80 auto dst = temporary_buffer<char>(single_chunk_size);
81 auto header = dst.get_write() + head_space;
82 auto compressed_data = header + chunk_header_size;
83 auto compressed_size = LZ4_compress_fast_continue(stream.get(), src->get(), compressed_data, size_left, LZ4_COMPRESSBOUND(size_left), 0);
84 write_le(header, last_chunk_flag | size_left);
85 dst.trim(head_space + chunk_header_size + compressed_size);
86 return snd_buf(std::move(dst));
87 }
88
89 // Input chunks do not have to be multiplies of chunk_size (it is unlikely though).
90 // We handle such cases by reassembling a chunk in this temporary buffer.
91 static thread_local char temporary_chunk_data[chunk_size];
92
93 static constexpr size_t chunk_compress_bound = LZ4_COMPRESSBOUND(chunk_size);
94 static constexpr size_t chunk_maximum_compressed_size = chunk_compress_bound + chunk_header_size;
95 static_assert(chunk_maximum_compressed_size < snd_buf::chunk_size);
96
97 std::vector<temporary_buffer<char>> dst_buffers;
98 size_t dst_offset = head_space;
99
100 dst_buffers.emplace_back(std::max<size_t>(head_space, snd_buf::chunk_size));
101
102 // Intermediate chunks
103 size_t total_compressed_size = 0;
104 auto src_left = data.size;
105 size_t src_current_offset = 0;
106
107 // Advance offset in the current source fragment, move to the next fragment if needed.
108 auto advance_src = [&] (size_t n) {
109 src_current_offset += n;
110 if (src_current_offset >= src->size()) {
111 ++src;
112 src_current_offset = 0;
113 }
114 src_left -= n;
115 };
116
117 while (src_left > chunk_size) {
118 // Check if we can fit another chunk in the current destination fragment.
119 // If not allocate a new one.
120 if (dst_offset + chunk_maximum_compressed_size > dst_buffers.back().size()) {
121 dst_buffers.back().trim(dst_offset);
122 dst_buffers.emplace_back(snd_buf::chunk_size);
123 dst_offset = 0;
124 }
125
126 // Check if there is at least a contiguous chunk_size of data in the current
127 // source fragment. If not, linearise that into temporary_chunk_data.
128 auto src_ptr = src->get() + src_current_offset;
129 if (src->size() - src_current_offset < chunk_size) {
130 auto left = chunk_size;
131 auto dst = temporary_chunk_data;
132 while (left) {
133 auto this_size = std::min(src->size() - src_current_offset, left);
134 dst = std::copy_n(src->get() + src_current_offset, this_size, dst);
135 left -= this_size;
136 advance_src(this_size);
137
138 }
139 src_ptr = temporary_chunk_data;
140 } else {
141 advance_src(chunk_size);
142 }
143
144 auto header = dst_buffers.back().get_write() + dst_offset;
145 auto dst = header + chunk_header_size;
146
147 auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, chunk_size, chunk_compress_bound, 0);
148 total_compressed_size += compressed_size + chunk_header_size;
149
150 dst_offset += compressed_size + chunk_header_size;
151 write_le<uint32_t>(header, compressed_size);
152 }
153
154 // Last chunk
155 auto last_chunk_compress_bound = LZ4_COMPRESSBOUND(src_left);
156 auto last_chunk_maximum_compressed_size = last_chunk_compress_bound + chunk_header_size;
157
158 // Check if we can fit the last chunk in the current destination fragment. Allocate a new one if not.
159 if (dst_offset + last_chunk_maximum_compressed_size > dst_buffers.back().size()) {
160 dst_buffers.back().trim(dst_offset);
161 dst_buffers.emplace_back(snd_buf::chunk_size);
162 dst_offset = 0;
163 }
164 auto header = dst_buffers.back().get_write() + dst_offset;
165 auto dst = header + chunk_header_size;
166
167 // Check if all remaining source data is contiguous. If not linearise it into temporary_chunk_data.
168 auto src_ptr = src->get() + src_current_offset;
169 if (src->size() - src_current_offset < src_left) {
170 auto left = src_left;
171 auto dst = temporary_chunk_data;
172 while (left) {
173 auto this_size = src->size() - src_current_offset;
174 assert(this_size <= chunk_size);
175 dst = std::copy_n(src->get() + src_current_offset, this_size, dst);
176 left -= this_size;
177 src_current_offset = 0;
178 ++src;
179 }
180 src_ptr = temporary_chunk_data;
181 }
182
183 auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, src_left, last_chunk_compress_bound, 0);
184 dst_offset += compressed_size + chunk_header_size;
185 write_le<uint32_t>(header, last_chunk_flag | src_left);
186 total_compressed_size += compressed_size + chunk_header_size + head_space;
187
188 auto& last = dst_buffers.back();
189 last.trim(dst_offset);
190
191 if (dst_buffers.size() == 1) {
192 return snd_buf(std::move(dst_buffers.front()));
193 }
194 return snd_buf(std::move(dst_buffers), total_compressed_size);
195 }
196
197 rcv_buf lz4_fragmented_compressor::decompress(rcv_buf data) {
198 if (data.size < 4) {
199 return rcv_buf();
200 }
201
202 static thread_local auto stream = std::unique_ptr<LZ4_streamDecode_t, decompression_stream_deleter>(LZ4_createStreamDecode());
203
204 if (!LZ4_setStreamDecode(stream.get(), nullptr, 0)) {
205 throw std::runtime_error("RPC frame LZ4 decompression failed to reset state");
206 }
207
208 auto src = compat::get_if<temporary_buffer<char>>(&data.bufs);
209 size_t src_left = data.size;
210 size_t src_offset = 0;
211
212 // Prepare source data. Returns pointer to n contiguous bytes of source data.
213 // Avoids copy if possible, otherwise uses dst as a temporary storage.
214 auto copy_src = [&] (char* dst, size_t n) -> const char* {
215 // Fast path, no need to copy anything.
216 if (src->size() - src_offset >= n) {
217 auto ptr = src->get() + src_offset;
218 src_left -= n;
219 src_offset += n;
220 return ptr;
221 }
222
223 // Need to linearise source chunk into dst.
224 auto ptr = dst;
225 src_left -= n;
226 while (n) {
227 if (src_offset == src->size()) {
228 ++src;
229 src_offset = 0;
230 }
231 auto this_size = std::min(n, src->size() - src_offset);
232 std::copy_n(src->get() + src_offset, this_size, dst);
233 n -= this_size;
234 dst += this_size;
235 src_offset += this_size;
236 }
237 return ptr;
238 };
239
240 // Read, possibly fragmented, header.
241 auto read_header = [&] {
242 uint32_t header_value;
243 auto ptr = copy_src(reinterpret_cast<char*>(&header_value), chunk_header_size);
244 if (ptr != reinterpret_cast<char*>(&header_value)) {
245 std::copy_n(ptr, sizeof(uint32_t), reinterpret_cast<char*>(&header_value));
246 }
247 return le_to_cpu(header_value);
248 };
249
250 if (src) {
251 auto header = read_le<uint32_t>(src->get());
252 if (header & last_chunk_flag) {
253 // faster path for small messages: single chunk in a single buffer
254 header &= ~last_chunk_flag;
255 src_offset += chunk_header_size;
256 src_left -= chunk_header_size;
257 auto dst = temporary_buffer<char>(header);
258 if (LZ4_decompress_safe_continue(stream.get(), src->get() + src_offset, dst.get_write(), src_left, header) < 0) {
259 throw std::runtime_error("RPC frame LZ4 decompression failure");
260 }
261 return rcv_buf(std::move(dst));
262 }
263 // not eligible for fast path: multiple chunks in a single buffer
264 } else {
265 // not eligible for fast path: multiple buffers
266 src = compat::get<std::vector<temporary_buffer<char>>>(data.bufs).data();
267 }
268
269 // Let's be a bit paranoid and not assume that the remote has the same
270 // LZ4_COMPRESSBOUND as we do and allow any compressed chunk size.
271 static thread_local auto chunk_buffer = temporary_buffer<char>(LZ4_COMPRESSBOUND(chunk_size));
272
273 std::vector<temporary_buffer<char>> dst_buffers;
274 size_t total_size = 0;
275
276 // Intermediate chunks
277 uint32_t header_value = read_header();
278 while (!(header_value & last_chunk_flag)) {
279 total_size += chunk_size;
280 dst_buffers.emplace_back(chunk_size);
281 if (chunk_buffer.size() < header_value) {
282 chunk_buffer = temporary_buffer<char>(header_value);
283 }
284 auto src_ptr = copy_src(chunk_buffer.get_write(), header_value);
285 if (LZ4_decompress_safe_continue(stream.get(), src_ptr, dst_buffers.back().get_write(), header_value, chunk_size) < 0) {
286 throw std::runtime_error("RPC frame LZ4 decompression failure");
287 }
288 header_value = read_header();
289 }
290
291 // Last chunk
292 header_value &= ~last_chunk_flag;
293 total_size += header_value;
294 dst_buffers.emplace_back(header_value);
295 if (chunk_buffer.size() < src_left) {
296 chunk_buffer = temporary_buffer<char>(src_left);
297 }
298 auto last_chunk_compressed_size = src_left;
299 auto src_ptr = copy_src(chunk_buffer.get_write(), src_left);
300 if (LZ4_decompress_safe_continue(stream.get(), src_ptr, dst_buffers.back().get_write(), last_chunk_compressed_size, header_value) < 0) {
301 throw std::runtime_error("RPC frame LZ4 decompression failure");
302 }
303
304 if (dst_buffers.size() == 1) {
305 return rcv_buf(std::move(dst_buffers.front()));
306 }
307 return rcv_buf(std::move(dst_buffers), total_size);
308 }
309
310 }
311 }