]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/rpc/lz4_fragmented_compressor.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / rpc / lz4_fragmented_compressor.cc
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2019 Scylladb, Ltd.
20 */
21
22#include <seastar/rpc/lz4_fragmented_compressor.hh>
23#include <seastar/core/byteorder.hh>
24
25#include <lz4.h>
f67539c2
TL
26// LZ4_DECODER_RING_BUFFER_SIZE macro is introduced since v1.8.2
27// To work with previous lz4 release, copied the definition in lz4 here
28#ifndef LZ4_DECODER_RING_BUFFER_SIZE
29#define LZ4_DECODER_RING_BUFFER_SIZE(maxBlockSize) (65536 + 14 + (maxBlockSize))
30#endif
9f95a23c
TL
31
32namespace seastar {
33namespace rpc {
34
f67539c2
TL
35sstring lz4_fragmented_compressor::name() const {
36 return factory{}.supported();
37}
38
39const sstring& lz4_fragmented_compressor::factory::supported() const {
40 const static sstring name = "LZ4_FRAGMENTED";
41 return name;
42}
43
44std::unique_ptr<rpc::compressor> lz4_fragmented_compressor::factory::negotiate(sstring feature, bool is_server) const {
45 return feature == supported() ? std::make_unique<lz4_fragmented_compressor>() : nullptr;
46}
9f95a23c
TL
47
48// Compressed message format:
49// The message consists of one or more data chunks each preceeded by a 4 byte header.
50// The value of the header detrmines the size of the chunk:
51// - most significant bit is cleared: intermediate chunk, 31 least significant bits
52// contain the compressed size of the chunk (i.e. how it appears on wire), the
53// decompressed size is 32 kB.
54// - most significant bit is set: last chunk, 31 least significant bits contain the
55// decompressed size of the chunk, the compressed size is the remaining part of
56// the message.
57// Compression and decompression is done using LZ4 streaming interface. Each chunk
58// depends on the one that precedes it.
59// All metadata is little-endian.
60
61static constexpr uint32_t last_chunk_flag = uint32_t(1) << 31;
62static constexpr size_t chunk_header_size = sizeof(uint32_t);
63static constexpr size_t chunk_size = 32 * 1024;
64
65namespace {
66
67struct compression_stream_deleter {
68 void operator()(LZ4_stream_t* stream) const noexcept {
69 LZ4_freeStream(stream);
70 }
71};
72
73struct decompression_stream_deleter {
74 void operator()(LZ4_streamDecode_t* stream) const noexcept {
75 LZ4_freeStreamDecode(stream);
76 }
77};
78
79}
80
81snd_buf lz4_fragmented_compressor::compress(size_t head_space, snd_buf data) {
82 static thread_local auto stream = std::unique_ptr<LZ4_stream_t, compression_stream_deleter>(LZ4_createStream());
83 static_assert(chunk_size <= snd_buf::chunk_size, "chunk_size <= snd_buf::chunk_size");
84
85 LZ4_resetStream(stream.get());
86
87 auto size_left = data.size;
f67539c2 88 auto src = std::get_if<temporary_buffer<char>>(&data.bufs);
9f95a23c 89 if (!src) {
f67539c2 90 src = std::get<std::vector<temporary_buffer<char>>>(data.bufs).data();
9f95a23c
TL
91 }
92
93 auto single_chunk_size = LZ4_COMPRESSBOUND(size_left) + head_space + chunk_header_size;
94 if (single_chunk_size <= chunk_size && size_left <= chunk_size && src->size() == size_left) {
95 // faster path for small messages
96 auto dst = temporary_buffer<char>(single_chunk_size);
97 auto header = dst.get_write() + head_space;
98 auto compressed_data = header + chunk_header_size;
99 auto compressed_size = LZ4_compress_fast_continue(stream.get(), src->get(), compressed_data, size_left, LZ4_COMPRESSBOUND(size_left), 0);
100 write_le(header, last_chunk_flag | size_left);
101 dst.trim(head_space + chunk_header_size + compressed_size);
102 return snd_buf(std::move(dst));
103 }
104
9f95a23c
TL
105 static constexpr size_t chunk_compress_bound = LZ4_COMPRESSBOUND(chunk_size);
106 static constexpr size_t chunk_maximum_compressed_size = chunk_compress_bound + chunk_header_size;
f67539c2 107 static_assert(chunk_maximum_compressed_size < snd_buf::chunk_size, "chunk_maximum_compressed_size is too large");
9f95a23c
TL
108
109 std::vector<temporary_buffer<char>> dst_buffers;
110 size_t dst_offset = head_space;
111
112 dst_buffers.emplace_back(std::max<size_t>(head_space, snd_buf::chunk_size));
113
114 // Intermediate chunks
115 size_t total_compressed_size = 0;
116 auto src_left = data.size;
117 size_t src_current_offset = 0;
118
119 // Advance offset in the current source fragment, move to the next fragment if needed.
120 auto advance_src = [&] (size_t n) {
121 src_current_offset += n;
122 if (src_current_offset >= src->size()) {
123 ++src;
124 src_current_offset = 0;
125 }
126 src_left -= n;
127 };
128
f67539c2
TL
129 // Input chunks do not have to be multiplies of chunk_size.
130 // We handle such cases by reassembling a chunk in this temporary buffer.
131 // Note that this is similar to the ring buffer compression case in docs,
132 // we need to ensure that a suitable amount of (maybe) previous data is
133 // stable in this or input buffer, thus make the temp buffer
134 // LZ4_DECODER_RING_BUFFER_SIZE(chunk_size) large, and treat it as a ring.
135 static constexpr auto lin_buf_size = LZ4_DECODER_RING_BUFFER_SIZE(chunk_size);
136 static thread_local char temporary_chunk_data[lin_buf_size];
137 size_t lin_off = 0;
138
139 auto maybe_linearize = [&](size_t size) {
140 auto src_ptr = src->get() + src_current_offset;
141 if (src->size() - src_current_offset < size) {
142 auto left = size;
143 assert(lin_buf_size > size);
144 if (lin_buf_size - lin_off < size) {
145 lin_off = 0;
146 }
147 auto tmp = temporary_chunk_data + std::exchange(lin_off, lin_off + size);
148 src_ptr = tmp;
149 while (left) {
150 auto this_size = std::min(src->size() - src_current_offset, left);
151 tmp = std::copy_n(src->get() + src_current_offset, this_size, tmp);
152 left -= this_size;
153 advance_src(this_size);
154 }
155 } else {
156 advance_src(chunk_size);
157 lin_off = 0;
158 }
159 return src_ptr;
160 };
161
9f95a23c
TL
162 while (src_left > chunk_size) {
163 // Check if we can fit another chunk in the current destination fragment.
164 // If not allocate a new one.
165 if (dst_offset + chunk_maximum_compressed_size > dst_buffers.back().size()) {
166 dst_buffers.back().trim(dst_offset);
167 dst_buffers.emplace_back(snd_buf::chunk_size);
168 dst_offset = 0;
169 }
170
171 // Check if there is at least a contiguous chunk_size of data in the current
172 // source fragment. If not, linearise that into temporary_chunk_data.
f67539c2 173 auto src_ptr = maybe_linearize(chunk_size);
9f95a23c
TL
174 auto header = dst_buffers.back().get_write() + dst_offset;
175 auto dst = header + chunk_header_size;
176
177 auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, chunk_size, chunk_compress_bound, 0);
178 total_compressed_size += compressed_size + chunk_header_size;
179
180 dst_offset += compressed_size + chunk_header_size;
181 write_le<uint32_t>(header, compressed_size);
182 }
183
184 // Last chunk
185 auto last_chunk_compress_bound = LZ4_COMPRESSBOUND(src_left);
186 auto last_chunk_maximum_compressed_size = last_chunk_compress_bound + chunk_header_size;
187
188 // Check if we can fit the last chunk in the current destination fragment. Allocate a new one if not.
189 if (dst_offset + last_chunk_maximum_compressed_size > dst_buffers.back().size()) {
190 dst_buffers.back().trim(dst_offset);
191 dst_buffers.emplace_back(snd_buf::chunk_size);
192 dst_offset = 0;
193 }
194 auto header = dst_buffers.back().get_write() + dst_offset;
195 auto dst = header + chunk_header_size;
196
197 // Check if all remaining source data is contiguous. If not linearise it into temporary_chunk_data.
f67539c2
TL
198 auto rem = src_left;
199 auto src_ptr = maybe_linearize(src_left);
9f95a23c 200
f67539c2 201 auto compressed_size = LZ4_compress_fast_continue(stream.get(), src_ptr, dst, rem, last_chunk_compress_bound, 0);
9f95a23c 202 dst_offset += compressed_size + chunk_header_size;
f67539c2 203 write_le<uint32_t>(header, last_chunk_flag | rem);
9f95a23c
TL
204 total_compressed_size += compressed_size + chunk_header_size + head_space;
205
206 auto& last = dst_buffers.back();
207 last.trim(dst_offset);
208
209 if (dst_buffers.size() == 1) {
210 return snd_buf(std::move(dst_buffers.front()));
211 }
212 return snd_buf(std::move(dst_buffers), total_compressed_size);
213}
214
215rcv_buf lz4_fragmented_compressor::decompress(rcv_buf data) {
216 if (data.size < 4) {
217 return rcv_buf();
218 }
219
220 static thread_local auto stream = std::unique_ptr<LZ4_streamDecode_t, decompression_stream_deleter>(LZ4_createStreamDecode());
221
222 if (!LZ4_setStreamDecode(stream.get(), nullptr, 0)) {
f67539c2 223 throw std::runtime_error("RPC frame LZ4_FRAGMENTED decompression failed to reset state");
9f95a23c
TL
224 }
225
f67539c2 226 auto src = std::get_if<temporary_buffer<char>>(&data.bufs);
9f95a23c
TL
227 size_t src_left = data.size;
228 size_t src_offset = 0;
229
230 // Prepare source data. Returns pointer to n contiguous bytes of source data.
231 // Avoids copy if possible, otherwise uses dst as a temporary storage.
232 auto copy_src = [&] (char* dst, size_t n) -> const char* {
233 // Fast path, no need to copy anything.
234 if (src->size() - src_offset >= n) {
235 auto ptr = src->get() + src_offset;
236 src_left -= n;
237 src_offset += n;
238 return ptr;
239 }
240
241 // Need to linearise source chunk into dst.
242 auto ptr = dst;
243 src_left -= n;
244 while (n) {
245 if (src_offset == src->size()) {
246 ++src;
247 src_offset = 0;
248 }
249 auto this_size = std::min(n, src->size() - src_offset);
250 std::copy_n(src->get() + src_offset, this_size, dst);
251 n -= this_size;
252 dst += this_size;
253 src_offset += this_size;
254 }
255 return ptr;
256 };
257
258 // Read, possibly fragmented, header.
259 auto read_header = [&] {
260 uint32_t header_value;
261 auto ptr = copy_src(reinterpret_cast<char*>(&header_value), chunk_header_size);
262 if (ptr != reinterpret_cast<char*>(&header_value)) {
263 std::copy_n(ptr, sizeof(uint32_t), reinterpret_cast<char*>(&header_value));
264 }
265 return le_to_cpu(header_value);
266 };
267
268 if (src) {
269 auto header = read_le<uint32_t>(src->get());
270 if (header & last_chunk_flag) {
271 // faster path for small messages: single chunk in a single buffer
272 header &= ~last_chunk_flag;
273 src_offset += chunk_header_size;
274 src_left -= chunk_header_size;
275 auto dst = temporary_buffer<char>(header);
276 if (LZ4_decompress_safe_continue(stream.get(), src->get() + src_offset, dst.get_write(), src_left, header) < 0) {
f67539c2 277 throw std::runtime_error("RPC frame LZ4_FRAGMENTED decompression failure (short)");
9f95a23c
TL
278 }
279 return rcv_buf(std::move(dst));
280 }
281 // not eligible for fast path: multiple chunks in a single buffer
282 } else {
283 // not eligible for fast path: multiple buffers
f67539c2 284 src = std::get<std::vector<temporary_buffer<char>>>(data.bufs).data();
9f95a23c
TL
285 }
286
287 // Let's be a bit paranoid and not assume that the remote has the same
288 // LZ4_COMPRESSBOUND as we do and allow any compressed chunk size.
289 static thread_local auto chunk_buffer = temporary_buffer<char>(LZ4_COMPRESSBOUND(chunk_size));
290
291 std::vector<temporary_buffer<char>> dst_buffers;
292 size_t total_size = 0;
293
f67539c2
TL
294 // Decompressing requires either dest to be fully split or
295 // "preserved" in 64KB or larger, depending on how it was
296 // compressed. If not, decompression will fail, typically
297 // on text-like constructs. Making our dest buffers 64K
298 // ensures we retain a suitable dictionary region for all
299 // passes.
300 constexpr auto buf_size = 64 * 1024;
301 size_t dst_offset = 0;
302
303 auto get_dest = [&](size_t size) {
304 if (dst_buffers.empty()) {
305 dst_buffers.emplace_back(buf_size);
306 }
307 if (dst_buffers.back().size() - dst_offset < size) {
308 dst_buffers.back().trim(dst_offset);
309 dst_buffers.emplace_back(buf_size);
310 dst_offset = 0;
311 }
312 return dst_buffers.back().get_write() + std::exchange(dst_offset, dst_offset + size);
313 };
314
9f95a23c
TL
315 // Intermediate chunks
316 uint32_t header_value = read_header();
317 while (!(header_value & last_chunk_flag)) {
318 total_size += chunk_size;
9f95a23c
TL
319 if (chunk_buffer.size() < header_value) {
320 chunk_buffer = temporary_buffer<char>(header_value);
321 }
322 auto src_ptr = copy_src(chunk_buffer.get_write(), header_value);
f67539c2
TL
323 auto dst = get_dest(chunk_size);
324 if (LZ4_decompress_safe_continue(stream.get(), src_ptr, /*dst_buffers.back().get_write()*/dst, header_value, chunk_size) < 0) {
325 throw std::runtime_error(format("RPC frame LZ4_FRAGMENTED decompression failure (long, at {} bytes)", total_size - chunk_size));
9f95a23c
TL
326 }
327 header_value = read_header();
328 }
329
330 // Last chunk
331 header_value &= ~last_chunk_flag;
332 total_size += header_value;
f67539c2 333 auto dst = get_dest(header_value);
9f95a23c
TL
334 if (chunk_buffer.size() < src_left) {
335 chunk_buffer = temporary_buffer<char>(src_left);
336 }
337 auto last_chunk_compressed_size = src_left;
338 auto src_ptr = copy_src(chunk_buffer.get_write(), src_left);
f67539c2
TL
339 if (LZ4_decompress_safe_continue(stream.get(), src_ptr, /*dst_buffers.back().get_write()*/dst, last_chunk_compressed_size, header_value) < 0) {
340 throw std::runtime_error(format("RPC frame LZ4_FRAGMENTED decompression failure (long, last frame, at {} bytes)", total_size - header_value));
9f95a23c
TL
341 }
342
f67539c2
TL
343 dst_buffers.back().trim(dst_offset);
344
9f95a23c
TL
345 if (dst_buffers.size() == 1) {
346 return rcv_buf(std::move(dst_buffers.front()));
347 }
348 return rcv_buf(std::move(dst_buffers), total_size);
349}
350
351}
352}