]>
Commit | Line | Data |
---|---|---|
31f18b77 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2017 Haomai Wang <haomaiwang@gmail.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_LZ4COMPRESSOR_H | |
16 | #define CEPH_LZ4COMPRESSOR_H | |
17 | ||
20effc67 | 18 | #include <optional> |
31f18b77 FG |
19 | #include <lz4.h> |
20 | ||
21 | #include "compressor/Compressor.h" | |
22 | #include "include/buffer.h" | |
23 | #include "include/encoding.h" | |
11fdf7f2 | 24 | #include "common/config.h" |
31f18b77 FG |
25 | |
26 | ||
27 | class LZ4Compressor : public Compressor { | |
28 | public: | |
11fdf7f2 TL |
29 | LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") { |
30 | #ifdef HAVE_QATZIP | |
31 | if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) | |
32 | qat_enabled = true; | |
33 | else | |
34 | qat_enabled = false; | |
35 | #endif | |
36 | } | |
31f18b77 | 37 | |
1e59de90 | 38 | int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> &compressor_message) override { |
9f95a23c TL |
39 | // older versions of liblz4 introduce bit errors when compressing |
40 | // fragmented buffers. this was fixed in lz4 commit | |
41 | // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first | |
42 | // appeared in v1.8.2. | |
43 | // | |
44 | // workaround: rebuild if not contiguous. | |
45 | if (!src.is_contiguous()) { | |
f67539c2 | 46 | ceph::buffer::list new_src = src; |
9f95a23c | 47 | new_src.rebuild(); |
f67539c2 | 48 | return compress(new_src, dst, compressor_message); |
9f95a23c TL |
49 | } |
50 | ||
11fdf7f2 TL |
51 | #ifdef HAVE_QATZIP |
52 | if (qat_enabled) | |
f67539c2 | 53 | return qat_accel.compress(src, dst, compressor_message); |
11fdf7f2 | 54 | #endif |
f67539c2 | 55 | ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned( |
31f18b77 FG |
56 | LZ4_compressBound(src.length())); |
57 | LZ4_stream_t lz4_stream; | |
58 | LZ4_resetStream(&lz4_stream); | |
59 | ||
f67539c2 TL |
60 | using ceph::encode; |
61 | ||
31f18b77 FG |
62 | auto p = src.begin(); |
63 | size_t left = src.length(); | |
64 | int pos = 0; | |
65 | const char *data; | |
66 | unsigned num = src.get_num_buffers(); | |
11fdf7f2 | 67 | encode((uint32_t)num, dst); |
31f18b77 | 68 | while (left) { |
11fdf7f2 TL |
69 | uint32_t origin_len = p.get_ptr_and_advance(left, &data); |
70 | int compressed_len = LZ4_compress_fast_continue( | |
31f18b77 FG |
71 | &lz4_stream, data, outptr.c_str()+pos, origin_len, |
72 | outptr.length()-pos, 1); | |
73 | if (compressed_len <= 0) | |
74 | return -1; | |
75 | pos += compressed_len; | |
76 | left -= origin_len; | |
11fdf7f2 TL |
77 | encode(origin_len, dst); |
78 | encode((uint32_t)compressed_len, dst); | |
31f18b77 | 79 | } |
11fdf7f2 | 80 | ceph_assert(p.end()); |
31f18b77 FG |
81 | |
82 | dst.append(outptr, 0, pos); | |
83 | return 0; | |
84 | } | |
85 | ||
1e59de90 | 86 | int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> compressor_message) override { |
11fdf7f2 TL |
87 | #ifdef HAVE_QATZIP |
88 | if (qat_enabled) | |
f67539c2 | 89 | return qat_accel.decompress(src, dst, compressor_message); |
11fdf7f2 TL |
90 | #endif |
91 | auto i = std::cbegin(src); | |
f67539c2 | 92 | return decompress(i, src.length(), dst, compressor_message); |
31f18b77 FG |
93 | } |
94 | ||
f67539c2 | 95 | int decompress(ceph::buffer::list::const_iterator &p, |
31f18b77 | 96 | size_t compressed_len, |
f67539c2 | 97 | ceph::buffer::list &dst, |
1e59de90 | 98 | std::optional<int32_t> compressor_message) override { |
11fdf7f2 TL |
99 | #ifdef HAVE_QATZIP |
100 | if (qat_enabled) | |
f67539c2 | 101 | return qat_accel.decompress(p, compressed_len, dst, compressor_message); |
11fdf7f2 | 102 | #endif |
f67539c2 | 103 | using ceph::decode; |
31f18b77 | 104 | uint32_t count; |
11fdf7f2 | 105 | decode(count, p); |
20effc67 | 106 | std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs(count); |
31f18b77 | 107 | uint32_t total_origin = 0; |
20effc67 TL |
108 | for (auto& [dst_size, src_size] : compressed_pairs) { |
109 | decode(dst_size, p); | |
110 | decode(src_size, p); | |
111 | total_origin += dst_size; | |
31f18b77 FG |
112 | } |
113 | compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); | |
114 | ||
f67539c2 | 115 | ceph::buffer::ptr dstptr(total_origin); |
31f18b77 FG |
116 | LZ4_streamDecode_t lz4_stream_decode; |
117 | LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); | |
118 | ||
f67539c2 TL |
119 | ceph::buffer::ptr cur_ptr = p.get_current_ptr(); |
120 | ceph::buffer::ptr *ptr = &cur_ptr; | |
20effc67 | 121 | std::optional<ceph::buffer::ptr> data_holder; |
31f18b77 | 122 | if (compressed_len != cur_ptr.length()) { |
20effc67 | 123 | data_holder.emplace(compressed_len); |
31f18b77 | 124 | p.copy_deep(compressed_len, *data_holder); |
20effc67 | 125 | ptr = &*data_holder; |
31f18b77 FG |
126 | } |
127 | ||
128 | char *c_in = ptr->c_str(); | |
129 | char *c_out = dstptr.c_str(); | |
130 | for (unsigned i = 0; i < count; ++i) { | |
131 | int r = LZ4_decompress_safe_continue( | |
132 | &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); | |
133 | if (r == (int)compressed_pairs[i].first) { | |
134 | c_in += compressed_pairs[i].second; | |
135 | c_out += compressed_pairs[i].first; | |
136 | } else if (r < 0) { | |
137 | return -1; | |
11fdf7f2 | 138 | } else { |
31f18b77 FG |
139 | return -2; |
140 | } | |
141 | } | |
142 | dst.push_back(std::move(dstptr)); | |
143 | return 0; | |
144 | } | |
145 | }; | |
146 | ||
147 | #endif |