]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2019 Scylladb, Ltd. | |
20 | */ | |
21 | ||
22 | #include <seastar/rpc/lz4_fragmented_compressor.hh> | |
23 | #include <seastar/core/byteorder.hh> | |
24 | ||
25 | #include <lz4.h> | |
f67539c2 TL |
26 | // LZ4_DECODER_RING_BUFFER_SIZE macro is introduced since v1.8.2 |
27 | // To work with previous lz4 release, copied the definition in lz4 here | |
28 | #ifndef LZ4_DECODER_RING_BUFFER_SIZE | |
29 | #define LZ4_DECODER_RING_BUFFER_SIZE(maxBlockSize) (65536 + 14 + (maxBlockSize)) | |
30 | #endif | |
9f95a23c TL |
31 | |
32 | namespace seastar { | |
33 | namespace rpc { | |
34 | ||
f67539c2 TL |
35 | sstring lz4_fragmented_compressor::name() const { |
36 | return factory{}.supported(); | |
37 | } | |
38 | ||
39 | const sstring& lz4_fragmented_compressor::factory::supported() const { | |
40 | const static sstring name = "LZ4_FRAGMENTED"; | |
41 | return name; | |
42 | } | |
43 | ||
44 | std::unique_ptr<rpc::compressor> lz4_fragmented_compressor::factory::negotiate(sstring feature, bool is_server) const { | |
45 | return feature == supported() ? std::make_unique<lz4_fragmented_compressor>() : nullptr; | |
46 | } | |
9f95a23c TL |
47 | |
48 | // Compressed message format: | |
49 | // The message consists of one or more data chunks each preceeded by a 4 byte header. | |
50 | // The value of the header detrmines the size of the chunk: | |
51 | // - most significant bit is cleared: intermediate chunk, 31 least significant bits | |
52 | // contain the compressed size of the chunk (i.e. how it appears on wire), the | |
53 | // decompressed size is 32 kB. | |
54 | // - most significant bit is set: last chunk, 31 least significant bits contain the | |
55 | // decompressed size of the chunk, the compressed size is the remaining part of | |
56 | // the message. | |
57 | // Compression and decompression is done using LZ4 streaming interface. Each chunk | |
58 | // depends on the one that precedes it. | |
59 | // All metadata is little-endian. | |
60 | ||
61 | static constexpr uint32_t last_chunk_flag = uint32_t(1) << 31; | |
62 | static constexpr size_t chunk_header_size = sizeof(uint32_t); | |
63 | static constexpr size_t chunk_size = 32 * 1024; | |
64 | ||
65 | namespace { | |
66 | ||
67 | struct compression_stream_deleter { | |
68 | void operator()(LZ4_stream_t* stream) const noexcept { | |
69 | LZ4_freeStream(stream); | |
70 | } | |
71 | }; | |
72 | ||
73 | struct decompression_stream_deleter { | |
74 | void operator()(LZ4_streamDecode_t* stream) const noexcept { | |
75 | LZ4_freeStreamDecode(stream); | |
76 | } | |
77 | }; | |
78 | ||
79 | } | |
80 | ||
81 | snd_buf lz4_fragmented_compressor::compress(size_t head_space, snd_buf data) { | |
82 | static thread_local auto stream = std::unique_ptr<LZ4_stream_t, compression_stream_deleter>(LZ4_createStream()); | |
83 | static_assert(chunk_size <= snd_buf::chunk_size, "chunk_size <= snd_buf::chunk_size"); | |
84 | ||
85 | LZ4_resetStream(stream.get()); | |
86 | ||
87 | auto size_left = data.size; | |
f67539c2 | 88 | auto src = std::get_if<temporary_buffer<char>>(&data.bufs); |
9f95a23c | 89 | if (!src) { |
f67539c2 | 90 | src = std::get<std::vector<temporary_buffer<char>>>(data.bufs).data(); |
9f95a23c TL |
91 | } |
92 | ||
93 | auto single_chunk_size = LZ4_COMPRESSBOUND(size_left) + head_space + chunk_header_size; | |
94 | if (single_chunk_size <= chunk_size && size_left <= chunk_size && src->size() == size_left) { | |
95 | // faster path for small messages | |
96 | auto dst = temporary_buffer<char>(single_chunk_size); | |
97 | auto header = dst.get_write() + head_space; | |
98 | auto compressed_data = header + chunk_header_size; | |
99 | auto compressed_size = LZ4_compress_fast_continue(stream.get(), src->get(), compressed_data, size_left, LZ4_COMPRESSBOUND(size_left), 0); | |
100 | write_le(header, last_chunk_flag | size_left); | |
101 | dst.trim(head_space + chunk_header_size + compressed_size); | |
102 | return snd_buf(std::move(dst)); | |
103 | } | |
104 | ||
9f95a23c TL |
105 | static constexpr size_t chunk_compress_bound = LZ4_COMPRESSBOUND(chunk_size); |
106 | static constexpr size_t chunk_maximum_compressed_size = chunk_compress_bound + chunk_header_size; | |
f67539c2 | 107 | static_assert(chunk_maximum_compressed_size < snd_buf::chunk_size, "chunk_maximum_compressed_size is too large"); |
9f95a23c TL |
108 | |
109 | std::vector<temporary_buffer<char>> dst_buffers; | |
110 | size_t dst_offset = head_space; | |
111 | ||
112 | dst_buffers.emplace_back(std::max<size_t>(head_space, snd_buf::chunk_size)); | |
113 | ||
114 | // Intermediate chunks | |
115 | size_t total_compressed_size = 0; | |
116 | auto src_left = data.size; | |
117 | size_t src_current_offset = 0; | |
118 | ||
119 | // Advance offset in the current source fragment, move to the next fragment if needed. | |
120 | auto advance_src = [&] (size_t n) { | |
121 | src_current_offset += n; | |
122 | if (src_current_offset >= src->size()) { | |
123 | ++src; | |
124 | src_current_offset = 0; | |
125 | } | |
126 | src_left -= n; | |
127 | }; | |
128 | ||
f67539c2 TL |
129 | // Input chunks do not have to be multiplies of chunk_size. |
130 | // We handle such cases by reassembling a chunk in this temporary buffer. | |
131 | // Note that this is similar to the ring buffer compression case in docs, | |
132 | // we need to ensure that a suitable amount of (maybe) previous data is | |
133 | // stable in this or input buffer, thus make the temp buffer | |
134 | // LZ4_DECODER_RING_BUFFER_SIZE(chunk_size) large, and treat it as a ring. | |
135 | static constexpr auto lin_buf_size = LZ4_DECODER_RING_BUFFER_SIZE(chunk_size); | |
136 | static thread_local char temporary_chunk_data[lin_buf_size]; | |
137 | size_t lin_off = 0; | |
138 | ||
139 | auto maybe_linearize = [&](size_t size) { | |
140 | auto src_ptr = src->get() + src_current_offset; | |
141 | if (src->size() - src_current_offset < size) { | |
142 | auto left = size; | |
143 | assert(lin_buf_size > size); | |
144 | if (lin_buf_size - lin_off < size) { | |
145 | lin_off = 0; | |
146 | } | |
147 | auto tmp = temporary_chunk_data + std::exchange(lin_off, lin_off + size); | |
148 | src_ptr = tmp; | |
149 | while (left) { | |
150 | auto this_size = std::min(src->size() - src_current_offset, left); | |
151 | tmp = std::copy_n(src->get() + src_current_offset, this_size, tmp); | |
152 | left -= this_size; | |
153 | advance_src(this_size); | |
154 | } | |
155 | } else { | |
156 | advance_src(chunk_size); | |
157 | lin_off = 0; | |
158 | } | |
159 | return src_ptr; | |
160 | }; | |
161 | ||
9f95a23c TL |
162 | while (src_left > chunk_size) { |
163 | // Check if we can fit another chunk in the current destination fragment. | |
164 | // If not allocate a new one. | |
165 | if (dst_offset + chunk_maximum_compressed_size > dst_buffers.back().size()) { | |
166 | dst_buffers.back().trim(dst_offset); | |
167 | dst_buffers.emplace_back(snd_buf::chunk_size); | |
168 | dst_offset = 0; | |
169 | } | |
170 | ||
171 | // Check if there is at least a contiguous chunk_size of data in the current | |
172 | // source fragment. If not, linearise that into temporary_chunk_data. | |
f67539c2 | 173 | auto src_ptr = maybe_linearize(chunk_size); |
9f95a23c TL |
174 | auto header = dst_buffers.back().get_write() + dst_offset; |
175 | auto dst = header + chunk_header_size; | |
176 | ||
177 | auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, chunk_size, chunk_compress_bound, 0); | |
178 | total_compressed_size += compressed_size + chunk_header_size; | |
179 | ||
180 | dst_offset += compressed_size + chunk_header_size; | |
181 | write_le<uint32_t>(header, compressed_size); | |
182 | } | |
183 | ||
184 | // Last chunk | |
185 | auto last_chunk_compress_bound = LZ4_COMPRESSBOUND(src_left); | |
186 | auto last_chunk_maximum_compressed_size = last_chunk_compress_bound + chunk_header_size; | |
187 | ||
188 | // Check if we can fit the last chunk in the current destination fragment. Allocate a new one if not. | |
189 | if (dst_offset + last_chunk_maximum_compressed_size > dst_buffers.back().size()) { | |
190 | dst_buffers.back().trim(dst_offset); | |
191 | dst_buffers.emplace_back(snd_buf::chunk_size); | |
192 | dst_offset = 0; | |
193 | } | |
194 | auto header = dst_buffers.back().get_write() + dst_offset; | |
195 | auto dst = header + chunk_header_size; | |
196 | ||
197 | // Check if all remaining source data is contiguous. If not linearise it into temporary_chunk_data. | |
f67539c2 TL |
198 | auto rem = src_left; |
199 | auto src_ptr = maybe_linearize(src_left); | |
9f95a23c | 200 | |
f67539c2 | 201 | auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, rem, last_chunk_compress_bound, 0); |
9f95a23c | 202 | dst_offset += compressed_size + chunk_header_size; |
f67539c2 | 203 | write_le<uint32_t>(header, last_chunk_flag | rem); |
9f95a23c TL |
204 | total_compressed_size += compressed_size + chunk_header_size + head_space; |
205 | ||
206 | auto& last = dst_buffers.back(); | |
207 | last.trim(dst_offset); | |
208 | ||
209 | if (dst_buffers.size() == 1) { | |
210 | return snd_buf(std::move(dst_buffers.front())); | |
211 | } | |
212 | return snd_buf(std::move(dst_buffers), total_compressed_size); | |
213 | } | |
214 | ||
215 | rcv_buf lz4_fragmented_compressor::decompress(rcv_buf data) { | |
216 | if (data.size < 4) { | |
217 | return rcv_buf(); | |
218 | } | |
219 | ||
220 | static thread_local auto stream = std::unique_ptr<LZ4_streamDecode_t, decompression_stream_deleter>(LZ4_createStreamDecode()); | |
221 | ||
222 | if (!LZ4_setStreamDecode(stream.get(), nullptr, 0)) { | |
f67539c2 | 223 | throw std::runtime_error("RPC frame LZ4_FRAGMENTED decompression failed to reset state"); |
9f95a23c TL |
224 | } |
225 | ||
f67539c2 | 226 | auto src = std::get_if<temporary_buffer<char>>(&data.bufs); |
9f95a23c TL |
227 | size_t src_left = data.size; |
228 | size_t src_offset = 0; | |
229 | ||
230 | // Prepare source data. Returns pointer to n contiguous bytes of source data. | |
231 | // Avoids copy if possible, otherwise uses dst as a temporary storage. | |
232 | auto copy_src = [&] (char* dst, size_t n) -> const char* { | |
233 | // Fast path, no need to copy anything. | |
234 | if (src->size() - src_offset >= n) { | |
235 | auto ptr = src->get() + src_offset; | |
236 | src_left -= n; | |
237 | src_offset += n; | |
238 | return ptr; | |
239 | } | |
240 | ||
241 | // Need to linearise source chunk into dst. | |
242 | auto ptr = dst; | |
243 | src_left -= n; | |
244 | while (n) { | |
245 | if (src_offset == src->size()) { | |
246 | ++src; | |
247 | src_offset = 0; | |
248 | } | |
249 | auto this_size = std::min(n, src->size() - src_offset); | |
250 | std::copy_n(src->get() + src_offset, this_size, dst); | |
251 | n -= this_size; | |
252 | dst += this_size; | |
253 | src_offset += this_size; | |
254 | } | |
255 | return ptr; | |
256 | }; | |
257 | ||
258 | // Read, possibly fragmented, header. | |
259 | auto read_header = [&] { | |
260 | uint32_t header_value; | |
261 | auto ptr = copy_src(reinterpret_cast<char*>(&header_value), chunk_header_size); | |
262 | if (ptr != reinterpret_cast<char*>(&header_value)) { | |
263 | std::copy_n(ptr, sizeof(uint32_t), reinterpret_cast<char*>(&header_value)); | |
264 | } | |
265 | return le_to_cpu(header_value); | |
266 | }; | |
267 | ||
268 | if (src) { | |
269 | auto header = read_le<uint32_t>(src->get()); | |
270 | if (header & last_chunk_flag) { | |
271 | // faster path for small messages: single chunk in a single buffer | |
272 | header &= ~last_chunk_flag; | |
273 | src_offset += chunk_header_size; | |
274 | src_left -= chunk_header_size; | |
275 | auto dst = temporary_buffer<char>(header); | |
276 | if (LZ4_decompress_safe_continue(stream.get(), src->get() + src_offset, dst.get_write(), src_left, header) < 0) { | |
f67539c2 | 277 | throw std::runtime_error("RPC frame LZ4_FRAGMENTED decompression failure (short)"); |
9f95a23c TL |
278 | } |
279 | return rcv_buf(std::move(dst)); | |
280 | } | |
281 | // not eligible for fast path: multiple chunks in a single buffer | |
282 | } else { | |
283 | // not eligible for fast path: multiple buffers | |
f67539c2 | 284 | src = std::get<std::vector<temporary_buffer<char>>>(data.bufs).data(); |
9f95a23c TL |
285 | } |
286 | ||
287 | // Let's be a bit paranoid and not assume that the remote has the same | |
288 | // LZ4_COMPRESSBOUND as we do and allow any compressed chunk size. | |
289 | static thread_local auto chunk_buffer = temporary_buffer<char>(LZ4_COMPRESSBOUND(chunk_size)); | |
290 | ||
291 | std::vector<temporary_buffer<char>> dst_buffers; | |
292 | size_t total_size = 0; | |
293 | ||
f67539c2 TL |
294 | // Decompressing requires either dest to be fully split or |
295 | // "preserved" in 64KB or larger, depending on how it was | |
296 | // compressed. If not, decompression will fail, typically | |
297 | // on text-like constructs. Making our dest buffers 64K | |
298 | // ensures we retain a suitable dictionary region for all | |
299 | // passes. | |
300 | constexpr auto buf_size = 64 * 1024; | |
301 | size_t dst_offset = 0; | |
302 | ||
303 | auto get_dest = [&](size_t size) { | |
304 | if (dst_buffers.empty()) { | |
305 | dst_buffers.emplace_back(buf_size); | |
306 | } | |
307 | if (dst_buffers.back().size() - dst_offset < size) { | |
308 | dst_buffers.back().trim(dst_offset); | |
309 | dst_buffers.emplace_back(buf_size); | |
310 | dst_offset = 0; | |
311 | } | |
312 | return dst_buffers.back().get_write() + std::exchange(dst_offset, dst_offset + size); | |
313 | }; | |
314 | ||
9f95a23c TL |
315 | // Intermediate chunks |
316 | uint32_t header_value = read_header(); | |
317 | while (!(header_value & last_chunk_flag)) { | |
318 | total_size += chunk_size; | |
9f95a23c TL |
319 | if (chunk_buffer.size() < header_value) { |
320 | chunk_buffer = temporary_buffer<char>(header_value); | |
321 | } | |
322 | auto src_ptr = copy_src(chunk_buffer.get_write(), header_value); | |
f67539c2 TL |
323 | auto dst = get_dest(chunk_size); |
324 | if (LZ4_decompress_safe_continue(stream.get(), src_ptr, /*dst_buffers.back().get_write()*/dst, header_value, chunk_size) < 0) { | |
325 | throw std::runtime_error(format("RPC frame LZ4_FRAGMENTED decompression failure (long, at {} bytes)", total_size - chunk_size)); | |
9f95a23c TL |
326 | } |
327 | header_value = read_header(); | |
328 | } | |
329 | ||
330 | // Last chunk | |
331 | header_value &= ~last_chunk_flag; | |
332 | total_size += header_value; | |
f67539c2 | 333 | auto dst = get_dest(header_value); |
9f95a23c TL |
334 | if (chunk_buffer.size() < src_left) { |
335 | chunk_buffer = temporary_buffer<char>(src_left); | |
336 | } | |
337 | auto last_chunk_compressed_size = src_left; | |
338 | auto src_ptr = copy_src(chunk_buffer.get_write(), src_left); | |
f67539c2 TL |
339 | if (LZ4_decompress_safe_continue(stream.get(), src_ptr, /*dst_buffers.back().get_write()*/dst, last_chunk_compressed_size, header_value) < 0) { |
340 | throw std::runtime_error(format("RPC frame LZ4_FRAGMENTED decompression failure (long, last frame, at {} bytes)", total_size - header_value)); | |
9f95a23c TL |
341 | } |
342 | ||
f67539c2 TL |
343 | dst_buffers.back().trim(dst_offset); |
344 | ||
9f95a23c TL |
345 | if (dst_buffers.size() == 1) { |
346 | return rcv_buf(std::move(dst_buffers.front())); | |
347 | } | |
348 | return rcv_buf(std::move(dst_buffers), total_size); | |
349 | } | |
350 | ||
351 | } | |
352 | } |