]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/rpc/lz4_compressor.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / src / rpc / lz4_compressor.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2016 Scylladb, Ltd.
20 */
21
22 #include <seastar/rpc/lz4_compressor.hh>
23 #include <seastar/core/byteorder.hh>
24
25 namespace seastar {
26
27 namespace rpc {
28
29 const sstring lz4_compressor::factory::_name = "LZ4";
30
31
32 static temporary_buffer<char> linearize(compat::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>>& v, uint32_t size) {
33 auto* one = compat::get_if<temporary_buffer<char>>(&v);
34 if (one) {
35 // no need to linearize
36 return std::move(*one);
37 } else {
38 temporary_buffer<char> src(size);
39 auto p = src.get_write();
40 for (auto&& b : compat::get<std::vector<temporary_buffer<char>>>(v)) {
41 p = std::copy_n(b.begin(), b.size(), p);
42 }
43 return src;
44 }
45 }
46
47 snd_buf lz4_compressor::compress(size_t head_space, snd_buf data) {
48 head_space += 4;
49 temporary_buffer<char> dst(head_space + LZ4_compressBound(data.size));
50 temporary_buffer<char> src = linearize(data.bufs, data.size);
51 #ifdef SEASTAR_HAVE_LZ4_COMPRESS_DEFAULT
52 auto size = LZ4_compress_default(src.begin(), dst.get_write() + head_space, src.size(), LZ4_compressBound(src.size()));
53 #else
54 // Safe since output buffer is sized properly.
55 auto size = LZ4_compress(src.begin(), dst.get_write() + head_space, src.size());
56 #endif
57 if (size == 0) {
58 throw std::runtime_error("RPC frame LZ4 compression failure");
59 }
60 dst.trim(size + head_space);
61 write_le<uint32_t>(dst.get_write() + (head_space - 4), data.size);
62 return snd_buf(std::move(dst));
63 }
64
65 rcv_buf lz4_compressor::decompress(rcv_buf data) {
66 if (data.size < 4) {
67 return rcv_buf();
68 } else {
69 auto in = make_deserializer_stream(data);
70 uint32_t v32;
71 in.read(reinterpret_cast<char*>(&v32), 4);
72 auto size = le_to_cpu(v32);
73 if (size) {
74 temporary_buffer<char> src = linearize(data.bufs, data.size);
75 src.trim_front(4);
76 rcv_buf rb(size);
77 rb.bufs = temporary_buffer<char>(size);
78 auto& dst = compat::get<temporary_buffer<char>>(rb.bufs);
79 if (LZ4_decompress_fast(src.begin(), dst.get_write(), dst.size()) < 0) {
80 throw std::runtime_error("RPC frame LZ4 decompression failure");
81 }
82 return rb;
83 } else {
84 // special case: if uncompressed size is zero it means that data was not compressed
85 // compress side still not use this but we want to be ready for the future
86 return data;
87 }
88 }
89 }
90
91 }
92
93 }