2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2019 Scylladb, Ltd.
22 #include <seastar/rpc/lz4_fragmented_compressor.hh>
23 #include <seastar/core/byteorder.hh>
30 const sstring
lz4_fragmented_compressor::factory::_name
= "LZ4_FRAGMENTED";
32 // Compressed message format:
33 // The message consists of one or more data chunks each preceeded by a 4 byte header.
34 // The value of the header detrmines the size of the chunk:
35 // - most significant bit is cleared: intermediate chunk, 31 least significant bits
36 // contain the compressed size of the chunk (i.e. how it appears on wire), the
37 // decompressed size is 32 kB.
38 // - most significant bit is set: last chunk, 31 least significant bits contain the
39 // decompressed size of the chunk, the compressed size is the remaining part of
41 // Compression and decompression is done using LZ4 streaming interface. Each chunk
42 // depends on the one that precedes it.
43 // All metadata is little-endian.
45 static constexpr uint32_t last_chunk_flag
= uint32_t(1) << 31;
46 static constexpr size_t chunk_header_size
= sizeof(uint32_t);
47 static constexpr size_t chunk_size
= 32 * 1024;
51 struct compression_stream_deleter
{
52 void operator()(LZ4_stream_t
* stream
) const noexcept
{
53 LZ4_freeStream(stream
);
57 struct decompression_stream_deleter
{
58 void operator()(LZ4_streamDecode_t
* stream
) const noexcept
{
59 LZ4_freeStreamDecode(stream
);
65 snd_buf
lz4_fragmented_compressor::compress(size_t head_space
, snd_buf data
) {
66 static thread_local
auto stream
= std::unique_ptr
<LZ4_stream_t
, compression_stream_deleter
>(LZ4_createStream());
67 static_assert(chunk_size
<= snd_buf::chunk_size
, "chunk_size <= snd_buf::chunk_size");
69 LZ4_resetStream(stream
.get());
71 auto size_left
= data
.size
;
72 auto src
= compat::get_if
<temporary_buffer
<char>>(&data
.bufs
);
74 src
= compat::get
<std::vector
<temporary_buffer
<char>>>(data
.bufs
).data();
77 auto single_chunk_size
= LZ4_COMPRESSBOUND(size_left
) + head_space
+ chunk_header_size
;
78 if (single_chunk_size
<= chunk_size
&& size_left
<= chunk_size
&& src
->size() == size_left
) {
79 // faster path for small messages
80 auto dst
= temporary_buffer
<char>(single_chunk_size
);
81 auto header
= dst
.get_write() + head_space
;
82 auto compressed_data
= header
+ chunk_header_size
;
83 auto compressed_size
= LZ4_compress_fast_continue(stream
.get(), src
->get(), compressed_data
, size_left
, LZ4_COMPRESSBOUND(size_left
), 0);
84 write_le(header
, last_chunk_flag
| size_left
);
85 dst
.trim(head_space
+ chunk_header_size
+ compressed_size
);
86 return snd_buf(std::move(dst
));
89 // Input chunks do not have to be multiplies of chunk_size (it is unlikely though).
90 // We handle such cases by reassembling a chunk in this temporary buffer.
91 static thread_local
char temporary_chunk_data
[chunk_size
];
93 static constexpr size_t chunk_compress_bound
= LZ4_COMPRESSBOUND(chunk_size
);
94 static constexpr size_t chunk_maximum_compressed_size
= chunk_compress_bound
+ chunk_header_size
;
95 static_assert(chunk_maximum_compressed_size
< snd_buf::chunk_size
);
97 std::vector
<temporary_buffer
<char>> dst_buffers
;
98 size_t dst_offset
= head_space
;
100 dst_buffers
.emplace_back(std::max
<size_t>(head_space
, snd_buf::chunk_size
));
102 // Intermediate chunks
103 size_t total_compressed_size
= 0;
104 auto src_left
= data
.size
;
105 size_t src_current_offset
= 0;
107 // Advance offset in the current source fragment, move to the next fragment if needed.
108 auto advance_src
= [&] (size_t n
) {
109 src_current_offset
+= n
;
110 if (src_current_offset
>= src
->size()) {
112 src_current_offset
= 0;
117 while (src_left
> chunk_size
) {
118 // Check if we can fit another chunk in the current destination fragment.
119 // If not allocate a new one.
120 if (dst_offset
+ chunk_maximum_compressed_size
> dst_buffers
.back().size()) {
121 dst_buffers
.back().trim(dst_offset
);
122 dst_buffers
.emplace_back(snd_buf::chunk_size
);
126 // Check if there is at least a contiguous chunk_size of data in the current
127 // source fragment. If not, linearise that into temporary_chunk_data.
128 auto src_ptr
= src
->get() + src_current_offset
;
129 if (src
->size() - src_current_offset
< chunk_size
) {
130 auto left
= chunk_size
;
131 auto dst
= temporary_chunk_data
;
133 auto this_size
= std::min(src
->size() - src_current_offset
, left
);
134 dst
= std::copy_n(src
->get() + src_current_offset
, this_size
, dst
);
136 advance_src(this_size
);
139 src_ptr
= temporary_chunk_data
;
141 advance_src(chunk_size
);
144 auto header
= dst_buffers
.back().get_write() + dst_offset
;
145 auto dst
= header
+ chunk_header_size
;
147 auto compressed_size
= LZ4_compress_fast_continue(stream
.get(), src_ptr
, dst
, chunk_size
, chunk_compress_bound
, 0);
148 total_compressed_size
+= compressed_size
+ chunk_header_size
;
150 dst_offset
+= compressed_size
+ chunk_header_size
;
151 write_le
<uint32_t>(header
, compressed_size
);
155 auto last_chunk_compress_bound
= LZ4_COMPRESSBOUND(src_left
);
156 auto last_chunk_maximum_compressed_size
= last_chunk_compress_bound
+ chunk_header_size
;
158 // Check if we can fit the last chunk in the current destination fragment. Allocate a new one if not.
159 if (dst_offset
+ last_chunk_maximum_compressed_size
> dst_buffers
.back().size()) {
160 dst_buffers
.back().trim(dst_offset
);
161 dst_buffers
.emplace_back(snd_buf::chunk_size
);
164 auto header
= dst_buffers
.back().get_write() + dst_offset
;
165 auto dst
= header
+ chunk_header_size
;
167 // Check if all remaining source data is contiguous. If not linearise it into temporary_chunk_data.
168 auto src_ptr
= src
->get() + src_current_offset
;
169 if (src
->size() - src_current_offset
< src_left
) {
170 auto left
= src_left
;
171 auto dst
= temporary_chunk_data
;
173 auto this_size
= src
->size() - src_current_offset
;
174 assert(this_size
<= chunk_size
);
175 dst
= std::copy_n(src
->get() + src_current_offset
, this_size
, dst
);
177 src_current_offset
= 0;
180 src_ptr
= temporary_chunk_data
;
183 auto compressed_size
= LZ4_compress_fast_continue(stream
.get(), src_ptr
, dst
, src_left
, last_chunk_compress_bound
, 0);
184 dst_offset
+= compressed_size
+ chunk_header_size
;
185 write_le
<uint32_t>(header
, last_chunk_flag
| src_left
);
186 total_compressed_size
+= compressed_size
+ chunk_header_size
+ head_space
;
188 auto& last
= dst_buffers
.back();
189 last
.trim(dst_offset
);
191 if (dst_buffers
.size() == 1) {
192 return snd_buf(std::move(dst_buffers
.front()));
194 return snd_buf(std::move(dst_buffers
), total_compressed_size
);
197 rcv_buf
lz4_fragmented_compressor::decompress(rcv_buf data
) {
202 static thread_local
auto stream
= std::unique_ptr
<LZ4_streamDecode_t
, decompression_stream_deleter
>(LZ4_createStreamDecode());
204 if (!LZ4_setStreamDecode(stream
.get(), nullptr, 0)) {
205 throw std::runtime_error("RPC frame LZ4 decompression failed to reset state");
208 auto src
= compat::get_if
<temporary_buffer
<char>>(&data
.bufs
);
209 size_t src_left
= data
.size
;
210 size_t src_offset
= 0;
212 // Prepare source data. Returns pointer to n contiguous bytes of source data.
213 // Avoids copy if possible, otherwise uses dst as a temporary storage.
214 auto copy_src
= [&] (char* dst
, size_t n
) -> const char* {
215 // Fast path, no need to copy anything.
216 if (src
->size() - src_offset
>= n
) {
217 auto ptr
= src
->get() + src_offset
;
223 // Need to linearise source chunk into dst.
227 if (src_offset
== src
->size()) {
231 auto this_size
= std::min(n
, src
->size() - src_offset
);
232 std::copy_n(src
->get() + src_offset
, this_size
, dst
);
235 src_offset
+= this_size
;
240 // Read, possibly fragmented, header.
241 auto read_header
= [&] {
242 uint32_t header_value
;
243 auto ptr
= copy_src(reinterpret_cast<char*>(&header_value
), chunk_header_size
);
244 if (ptr
!= reinterpret_cast<char*>(&header_value
)) {
245 std::copy_n(ptr
, sizeof(uint32_t), reinterpret_cast<char*>(&header_value
));
247 return le_to_cpu(header_value
);
251 auto header
= read_le
<uint32_t>(src
->get());
252 if (header
& last_chunk_flag
) {
253 // faster path for small messages: single chunk in a single buffer
254 header
&= ~last_chunk_flag
;
255 src_offset
+= chunk_header_size
;
256 src_left
-= chunk_header_size
;
257 auto dst
= temporary_buffer
<char>(header
);
258 if (LZ4_decompress_safe_continue(stream
.get(), src
->get() + src_offset
, dst
.get_write(), src_left
, header
) < 0) {
259 throw std::runtime_error("RPC frame LZ4 decompression failure");
261 return rcv_buf(std::move(dst
));
263 // not eligible for fast path: multiple chunks in a single buffer
265 // not eligible for fast path: multiple buffers
266 src
= compat::get
<std::vector
<temporary_buffer
<char>>>(data
.bufs
).data();
269 // Let's be a bit paranoid and not assume that the remote has the same
270 // LZ4_COMPRESSBOUND as we do and allow any compressed chunk size.
271 static thread_local
auto chunk_buffer
= temporary_buffer
<char>(LZ4_COMPRESSBOUND(chunk_size
));
273 std::vector
<temporary_buffer
<char>> dst_buffers
;
274 size_t total_size
= 0;
276 // Intermediate chunks
277 uint32_t header_value
= read_header();
278 while (!(header_value
& last_chunk_flag
)) {
279 total_size
+= chunk_size
;
280 dst_buffers
.emplace_back(chunk_size
);
281 if (chunk_buffer
.size() < header_value
) {
282 chunk_buffer
= temporary_buffer
<char>(header_value
);
284 auto src_ptr
= copy_src(chunk_buffer
.get_write(), header_value
);
285 if (LZ4_decompress_safe_continue(stream
.get(), src_ptr
, dst_buffers
.back().get_write(), header_value
, chunk_size
) < 0) {
286 throw std::runtime_error("RPC frame LZ4 decompression failure");
288 header_value
= read_header();
292 header_value
&= ~last_chunk_flag
;
293 total_size
+= header_value
;
294 dst_buffers
.emplace_back(header_value
);
295 if (chunk_buffer
.size() < src_left
) {
296 chunk_buffer
= temporary_buffer
<char>(src_left
);
298 auto last_chunk_compressed_size
= src_left
;
299 auto src_ptr
= copy_src(chunk_buffer
.get_write(), src_left
);
300 if (LZ4_decompress_safe_continue(stream
.get(), src_ptr
, dst_buffers
.back().get_write(), last_chunk_compressed_size
, header_value
) < 0) {
301 throw std::runtime_error("RPC frame LZ4 decompression failure");
304 if (dst_buffers
.size() == 1) {
305 return rcv_buf(std::move(dst_buffers
.front()));
307 return rcv_buf(std::move(dst_buffers
), total_size
);