]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2019 Scylladb, Ltd. | |
20 | */ | |
21 | ||
22 | #include <seastar/rpc/lz4_fragmented_compressor.hh> | |
23 | #include <seastar/core/byteorder.hh> | |
24 | ||
25 | #include <lz4.h> | |
26 | ||
27 | namespace seastar { | |
28 | namespace rpc { | |
29 | ||
30 | const sstring lz4_fragmented_compressor::factory::_name = "LZ4_FRAGMENTED"; | |
31 | ||
32 | // Compressed message format: | |
33 | // The message consists of one or more data chunks each preceeded by a 4 byte header. | |
34 | // The value of the header detrmines the size of the chunk: | |
35 | // - most significant bit is cleared: intermediate chunk, 31 least significant bits | |
36 | // contain the compressed size of the chunk (i.e. how it appears on wire), the | |
37 | // decompressed size is 32 kB. | |
38 | // - most significant bit is set: last chunk, 31 least significant bits contain the | |
39 | // decompressed size of the chunk, the compressed size is the remaining part of | |
40 | // the message. | |
41 | // Compression and decompression is done using LZ4 streaming interface. Each chunk | |
42 | // depends on the one that precedes it. | |
43 | // All metadata is little-endian. | |
44 | ||
45 | static constexpr uint32_t last_chunk_flag = uint32_t(1) << 31; | |
46 | static constexpr size_t chunk_header_size = sizeof(uint32_t); | |
47 | static constexpr size_t chunk_size = 32 * 1024; | |
48 | ||
49 | namespace { | |
50 | ||
51 | struct compression_stream_deleter { | |
52 | void operator()(LZ4_stream_t* stream) const noexcept { | |
53 | LZ4_freeStream(stream); | |
54 | } | |
55 | }; | |
56 | ||
57 | struct decompression_stream_deleter { | |
58 | void operator()(LZ4_streamDecode_t* stream) const noexcept { | |
59 | LZ4_freeStreamDecode(stream); | |
60 | } | |
61 | }; | |
62 | ||
63 | } | |
64 | ||
65 | snd_buf lz4_fragmented_compressor::compress(size_t head_space, snd_buf data) { | |
66 | static thread_local auto stream = std::unique_ptr<LZ4_stream_t, compression_stream_deleter>(LZ4_createStream()); | |
67 | static_assert(chunk_size <= snd_buf::chunk_size, "chunk_size <= snd_buf::chunk_size"); | |
68 | ||
69 | LZ4_resetStream(stream.get()); | |
70 | ||
71 | auto size_left = data.size; | |
72 | auto src = compat::get_if<temporary_buffer<char>>(&data.bufs); | |
73 | if (!src) { | |
74 | src = compat::get<std::vector<temporary_buffer<char>>>(data.bufs).data(); | |
75 | } | |
76 | ||
77 | auto single_chunk_size = LZ4_COMPRESSBOUND(size_left) + head_space + chunk_header_size; | |
78 | if (single_chunk_size <= chunk_size && size_left <= chunk_size && src->size() == size_left) { | |
79 | // faster path for small messages | |
80 | auto dst = temporary_buffer<char>(single_chunk_size); | |
81 | auto header = dst.get_write() + head_space; | |
82 | auto compressed_data = header + chunk_header_size; | |
83 | auto compressed_size = LZ4_compress_fast_continue(stream.get(), src->get(), compressed_data, size_left, LZ4_COMPRESSBOUND(size_left), 0); | |
84 | write_le(header, last_chunk_flag | size_left); | |
85 | dst.trim(head_space + chunk_header_size + compressed_size); | |
86 | return snd_buf(std::move(dst)); | |
87 | } | |
88 | ||
89 | // Input chunks do not have to be multiplies of chunk_size (it is unlikely though). | |
90 | // We handle such cases by reassembling a chunk in this temporary buffer. | |
91 | static thread_local char temporary_chunk_data[chunk_size]; | |
92 | ||
93 | static constexpr size_t chunk_compress_bound = LZ4_COMPRESSBOUND(chunk_size); | |
94 | static constexpr size_t chunk_maximum_compressed_size = chunk_compress_bound + chunk_header_size; | |
95 | static_assert(chunk_maximum_compressed_size < snd_buf::chunk_size); | |
96 | ||
97 | std::vector<temporary_buffer<char>> dst_buffers; | |
98 | size_t dst_offset = head_space; | |
99 | ||
100 | dst_buffers.emplace_back(std::max<size_t>(head_space, snd_buf::chunk_size)); | |
101 | ||
102 | // Intermediate chunks | |
103 | size_t total_compressed_size = 0; | |
104 | auto src_left = data.size; | |
105 | size_t src_current_offset = 0; | |
106 | ||
107 | // Advance offset in the current source fragment, move to the next fragment if needed. | |
108 | auto advance_src = [&] (size_t n) { | |
109 | src_current_offset += n; | |
110 | if (src_current_offset >= src->size()) { | |
111 | ++src; | |
112 | src_current_offset = 0; | |
113 | } | |
114 | src_left -= n; | |
115 | }; | |
116 | ||
117 | while (src_left > chunk_size) { | |
118 | // Check if we can fit another chunk in the current destination fragment. | |
119 | // If not allocate a new one. | |
120 | if (dst_offset + chunk_maximum_compressed_size > dst_buffers.back().size()) { | |
121 | dst_buffers.back().trim(dst_offset); | |
122 | dst_buffers.emplace_back(snd_buf::chunk_size); | |
123 | dst_offset = 0; | |
124 | } | |
125 | ||
126 | // Check if there is at least a contiguous chunk_size of data in the current | |
127 | // source fragment. If not, linearise that into temporary_chunk_data. | |
128 | auto src_ptr = src->get() + src_current_offset; | |
129 | if (src->size() - src_current_offset < chunk_size) { | |
130 | auto left = chunk_size; | |
131 | auto dst = temporary_chunk_data; | |
132 | while (left) { | |
133 | auto this_size = std::min(src->size() - src_current_offset, left); | |
134 | dst = std::copy_n(src->get() + src_current_offset, this_size, dst); | |
135 | left -= this_size; | |
136 | advance_src(this_size); | |
137 | ||
138 | } | |
139 | src_ptr = temporary_chunk_data; | |
140 | } else { | |
141 | advance_src(chunk_size); | |
142 | } | |
143 | ||
144 | auto header = dst_buffers.back().get_write() + dst_offset; | |
145 | auto dst = header + chunk_header_size; | |
146 | ||
147 | auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, chunk_size, chunk_compress_bound, 0); | |
148 | total_compressed_size += compressed_size + chunk_header_size; | |
149 | ||
150 | dst_offset += compressed_size + chunk_header_size; | |
151 | write_le<uint32_t>(header, compressed_size); | |
152 | } | |
153 | ||
154 | // Last chunk | |
155 | auto last_chunk_compress_bound = LZ4_COMPRESSBOUND(src_left); | |
156 | auto last_chunk_maximum_compressed_size = last_chunk_compress_bound + chunk_header_size; | |
157 | ||
158 | // Check if we can fit the last chunk in the current destination fragment. Allocate a new one if not. | |
159 | if (dst_offset + last_chunk_maximum_compressed_size > dst_buffers.back().size()) { | |
160 | dst_buffers.back().trim(dst_offset); | |
161 | dst_buffers.emplace_back(snd_buf::chunk_size); | |
162 | dst_offset = 0; | |
163 | } | |
164 | auto header = dst_buffers.back().get_write() + dst_offset; | |
165 | auto dst = header + chunk_header_size; | |
166 | ||
167 | // Check if all remaining source data is contiguous. If not linearise it into temporary_chunk_data. | |
168 | auto src_ptr = src->get() + src_current_offset; | |
169 | if (src->size() - src_current_offset < src_left) { | |
170 | auto left = src_left; | |
171 | auto dst = temporary_chunk_data; | |
172 | while (left) { | |
173 | auto this_size = src->size() - src_current_offset; | |
174 | assert(this_size <= chunk_size); | |
175 | dst = std::copy_n(src->get() + src_current_offset, this_size, dst); | |
176 | left -= this_size; | |
177 | src_current_offset = 0; | |
178 | ++src; | |
179 | } | |
180 | src_ptr = temporary_chunk_data; | |
181 | } | |
182 | ||
183 | auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, src_left, last_chunk_compress_bound, 0); | |
184 | dst_offset += compressed_size + chunk_header_size; | |
185 | write_le<uint32_t>(header, last_chunk_flag | src_left); | |
186 | total_compressed_size += compressed_size + chunk_header_size + head_space; | |
187 | ||
188 | auto& last = dst_buffers.back(); | |
189 | last.trim(dst_offset); | |
190 | ||
191 | if (dst_buffers.size() == 1) { | |
192 | return snd_buf(std::move(dst_buffers.front())); | |
193 | } | |
194 | return snd_buf(std::move(dst_buffers), total_compressed_size); | |
195 | } | |
196 | ||
197 | rcv_buf lz4_fragmented_compressor::decompress(rcv_buf data) { | |
198 | if (data.size < 4) { | |
199 | return rcv_buf(); | |
200 | } | |
201 | ||
202 | static thread_local auto stream = std::unique_ptr<LZ4_streamDecode_t, decompression_stream_deleter>(LZ4_createStreamDecode()); | |
203 | ||
204 | if (!LZ4_setStreamDecode(stream.get(), nullptr, 0)) { | |
205 | throw std::runtime_error("RPC frame LZ4 decompression failed to reset state"); | |
206 | } | |
207 | ||
208 | auto src = compat::get_if<temporary_buffer<char>>(&data.bufs); | |
209 | size_t src_left = data.size; | |
210 | size_t src_offset = 0; | |
211 | ||
212 | // Prepare source data. Returns pointer to n contiguous bytes of source data. | |
213 | // Avoids copy if possible, otherwise uses dst as a temporary storage. | |
214 | auto copy_src = [&] (char* dst, size_t n) -> const char* { | |
215 | // Fast path, no need to copy anything. | |
216 | if (src->size() - src_offset >= n) { | |
217 | auto ptr = src->get() + src_offset; | |
218 | src_left -= n; | |
219 | src_offset += n; | |
220 | return ptr; | |
221 | } | |
222 | ||
223 | // Need to linearise source chunk into dst. | |
224 | auto ptr = dst; | |
225 | src_left -= n; | |
226 | while (n) { | |
227 | if (src_offset == src->size()) { | |
228 | ++src; | |
229 | src_offset = 0; | |
230 | } | |
231 | auto this_size = std::min(n, src->size() - src_offset); | |
232 | std::copy_n(src->get() + src_offset, this_size, dst); | |
233 | n -= this_size; | |
234 | dst += this_size; | |
235 | src_offset += this_size; | |
236 | } | |
237 | return ptr; | |
238 | }; | |
239 | ||
240 | // Read, possibly fragmented, header. | |
241 | auto read_header = [&] { | |
242 | uint32_t header_value; | |
243 | auto ptr = copy_src(reinterpret_cast<char*>(&header_value), chunk_header_size); | |
244 | if (ptr != reinterpret_cast<char*>(&header_value)) { | |
245 | std::copy_n(ptr, sizeof(uint32_t), reinterpret_cast<char*>(&header_value)); | |
246 | } | |
247 | return le_to_cpu(header_value); | |
248 | }; | |
249 | ||
250 | if (src) { | |
251 | auto header = read_le<uint32_t>(src->get()); | |
252 | if (header & last_chunk_flag) { | |
253 | // faster path for small messages: single chunk in a single buffer | |
254 | header &= ~last_chunk_flag; | |
255 | src_offset += chunk_header_size; | |
256 | src_left -= chunk_header_size; | |
257 | auto dst = temporary_buffer<char>(header); | |
258 | if (LZ4_decompress_safe_continue(stream.get(), src->get() + src_offset, dst.get_write(), src_left, header) < 0) { | |
259 | throw std::runtime_error("RPC frame LZ4 decompression failure"); | |
260 | } | |
261 | return rcv_buf(std::move(dst)); | |
262 | } | |
263 | // not eligible for fast path: multiple chunks in a single buffer | |
264 | } else { | |
265 | // not eligible for fast path: multiple buffers | |
266 | src = compat::get<std::vector<temporary_buffer<char>>>(data.bufs).data(); | |
267 | } | |
268 | ||
269 | // Let's be a bit paranoid and not assume that the remote has the same | |
270 | // LZ4_COMPRESSBOUND as we do and allow any compressed chunk size. | |
271 | static thread_local auto chunk_buffer = temporary_buffer<char>(LZ4_COMPRESSBOUND(chunk_size)); | |
272 | ||
273 | std::vector<temporary_buffer<char>> dst_buffers; | |
274 | size_t total_size = 0; | |
275 | ||
276 | // Intermediate chunks | |
277 | uint32_t header_value = read_header(); | |
278 | while (!(header_value & last_chunk_flag)) { | |
279 | total_size += chunk_size; | |
280 | dst_buffers.emplace_back(chunk_size); | |
281 | if (chunk_buffer.size() < header_value) { | |
282 | chunk_buffer = temporary_buffer<char>(header_value); | |
283 | } | |
284 | auto src_ptr = copy_src(chunk_buffer.get_write(), header_value); | |
285 | if (LZ4_decompress_safe_continue(stream.get(), src_ptr, dst_buffers.back().get_write(), header_value, chunk_size) < 0) { | |
286 | throw std::runtime_error("RPC frame LZ4 decompression failure"); | |
287 | } | |
288 | header_value = read_header(); | |
289 | } | |
290 | ||
291 | // Last chunk | |
292 | header_value &= ~last_chunk_flag; | |
293 | total_size += header_value; | |
294 | dst_buffers.emplace_back(header_value); | |
295 | if (chunk_buffer.size() < src_left) { | |
296 | chunk_buffer = temporary_buffer<char>(src_left); | |
297 | } | |
298 | auto last_chunk_compressed_size = src_left; | |
299 | auto src_ptr = copy_src(chunk_buffer.get_write(), src_left); | |
300 | if (LZ4_decompress_safe_continue(stream.get(), src_ptr, dst_buffers.back().get_write(), last_chunk_compressed_size, header_value) < 0) { | |
301 | throw std::runtime_error("RPC frame LZ4 decompression failure"); | |
302 | } | |
303 | ||
304 | if (dst_buffers.size() == 1) { | |
305 | return rcv_buf(std::move(dst_buffers.front())); | |
306 | } | |
307 | return rcv_buf(std::move(dst_buffers), total_size); | |
308 | } | |
309 | ||
310 | } | |
311 | } |