]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_compression.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_compression.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_compression.h"
5
6 #define dout_subsys ceph_subsys_rgw
7
8 using namespace std;
9
10 int rgw_compression_info_from_attr(const bufferlist& attr,
11 bool& need_decompress,
12 RGWCompressionInfo& cs_info)
13 {
14 auto bliter = attr.cbegin();
15 try {
16 decode(cs_info, bliter);
17 } catch (buffer::error& err) {
18 return -EIO;
19 }
20 if (cs_info.blocks.size() == 0) {
21 return -EIO;
22 }
23 if (cs_info.compression_type != "none")
24 need_decompress = true;
25 else
26 need_decompress = false;
27 return 0;
28 }
29
30 int rgw_compression_info_from_attrset(const map<string, bufferlist>& attrs,
31 bool& need_decompress,
32 RGWCompressionInfo& cs_info)
33 {
34 auto value = attrs.find(RGW_ATTR_COMPRESSION);
35 if (value == attrs.end()) {
36 need_decompress = false;
37 return 0;
38 }
39 return rgw_compression_info_from_attr(value->second, need_decompress, cs_info);
40 }
41
42 //------------RGWPutObj_Compress---------------
43
44 int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
45 {
46 bufferlist out;
47 compressed_ofs = logical_offset;
48
49 if (in.length() > 0) {
50 // compression stuff
51 if ((logical_offset > 0 && compressed) || // if previous part was compressed
52 (logical_offset == 0)) { // or it's the first part
53 ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
54 int cr = compressor->compress(in, out, compressor_message);
55 if (cr < 0) {
56 if (logical_offset > 0) {
57 lderr(cct) << "Compression failed with exit code " << cr
58 << " for next part, compression process failed" << dendl;
59 return -EIO;
60 }
61 compressed = false;
62 ldout(cct, 5) << "Compression failed with exit code " << cr
63 << " for first part, storing uncompressed" << dendl;
64 out = std::move(in);
65 } else {
66 compressed = true;
67
68 compression_block newbl;
69 size_t bs = blocks.size();
70 newbl.old_ofs = logical_offset;
71 newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
72 newbl.len = out.length();
73 blocks.push_back(newbl);
74
75 compressed_ofs = newbl.new_ofs;
76 }
77 } else {
78 compressed = false;
79 out = std::move(in);
80 }
81 // end of compression stuff
82 } else {
83 size_t bs = blocks.size();
84 compressed_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : logical_offset;
85 }
86
87 return Pipe::process(std::move(out), compressed_ofs);
88 }
89
90 //----------------RGWGetObj_Decompress---------------------
91 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
92 RGWCompressionInfo* cs_info_,
93 bool partial_content_,
94 RGWGetObj_Filter* next): RGWGetObj_Filter(next),
95 cct(cct_),
96 cs_info(cs_info_),
97 partial_content(partial_content_),
98 q_ofs(0),
99 q_len(0),
100 cur_ofs(0)
101 {
102 compressor = Compressor::create(cct, cs_info->compression_type);
103 if (!compressor.get())
104 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
105 }
106
107 int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
108 {
109 ldout(cct, 10) << "Compression for rgw is enabled, decompress part "
110 << "bl_ofs=" << bl_ofs << ", bl_len=" << bl_len << dendl;
111
112 if (!compressor.get()) {
113 // if compressor isn't available - error, because cannot return decompressed data?
114 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
115 return -EIO;
116 }
117 bufferlist out_bl, in_bl, temp_in_bl;
118 bl.begin(bl_ofs).copy(bl_len, temp_in_bl);
119 bl_ofs = 0;
120 int r = 0;
121 if (waiting.length() != 0) {
122 in_bl.append(waiting);
123 in_bl.append(temp_in_bl);
124 waiting.clear();
125 } else {
126 in_bl = std::move(temp_in_bl);
127 }
128 bl_len = in_bl.length();
129
130 auto iter_in_bl = in_bl.cbegin();
131 while (first_block <= last_block) {
132 bufferlist tmp;
133 off_t ofs_in_bl = first_block->new_ofs - cur_ofs;
134 if (ofs_in_bl + (off_t)first_block->len > bl_len) {
135 // not complete block, put it to waiting
136 unsigned tail = bl_len - ofs_in_bl;
137 if (iter_in_bl.get_off() != ofs_in_bl) {
138 iter_in_bl.seek(ofs_in_bl);
139 }
140 iter_in_bl.copy(tail, waiting);
141 cur_ofs -= tail;
142 break;
143 }
144 if (iter_in_bl.get_off() != ofs_in_bl) {
145 iter_in_bl.seek(ofs_in_bl);
146 }
147 iter_in_bl.copy(first_block->len, tmp);
148 int cr = compressor->decompress(tmp, out_bl, cs_info->compressor_message);
149 if (cr < 0) {
150 lderr(cct) << "Decompression failed with exit code " << cr << dendl;
151 return cr;
152 }
153 ++first_block;
154 while (out_bl.length() - q_ofs >=
155 static_cast<off_t>(cct->_conf->rgw_max_chunk_size)) {
156 off_t ch_len = std::min<off_t>(cct->_conf->rgw_max_chunk_size, q_len);
157 q_len -= ch_len;
158 r = next->handle_data(out_bl, q_ofs, ch_len);
159 if (r < 0) {
160 lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl;
161 return r;
162 }
163 out_bl.splice(0, q_ofs + ch_len);
164 q_ofs = 0;
165 }
166 }
167
168 cur_ofs += bl_len;
169 off_t ch_len = std::min<off_t>(out_bl.length() - q_ofs, q_len);
170 if (ch_len > 0) {
171 r = next->handle_data(out_bl, q_ofs, ch_len);
172 if (r < 0) {
173 lsubdout(cct, rgw, 0) << "handle_data failed with exit code " << r << dendl;
174 return r;
175 }
176 out_bl.splice(0, q_ofs + ch_len);
177 q_len -= ch_len;
178 q_ofs = 0;
179 }
180 return r;
181 }
182
183 int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
184 {
185 if (partial_content) {
186 // if user set range, we need to calculate it in decompressed data
187 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
188 if (cs_info->blocks.size() > 1) {
189 vector<compression_block>::iterator fb, lb;
190 // not bad to use auto for lambda, I think
191 auto cmp_u = [] (off_t ofs, const compression_block& e) { return (uint64_t)ofs < e.old_ofs; };
192 auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (uint64_t)ofs; };
193 fb = upper_bound(cs_info->blocks.begin()+1,
194 cs_info->blocks.end(),
195 ofs,
196 cmp_u);
197 first_block = fb - 1;
198 lb = lower_bound(fb,
199 cs_info->blocks.end(),
200 end,
201 cmp_l);
202 last_block = lb - 1;
203 }
204 } else {
205 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
206 }
207
208 q_ofs = ofs - first_block->old_ofs;
209 q_len = end + 1 - ofs;
210
211 ofs = first_block->new_ofs;
212 end = last_block->new_ofs + last_block->len - 1;
213
214 cur_ofs = ofs;
215 waiting.clear();
216
217 return next->fixup_range(ofs, end);
218 }
219
220 void compression_block::dump(Formatter *f) const
221 {
222 f->dump_unsigned("old_ofs", old_ofs);
223 f->dump_unsigned("new_ofs", new_ofs);
224 f->dump_unsigned("len", len);
225 }
226
227 void RGWCompressionInfo::dump(Formatter *f) const
228 {
229 f->dump_string("compression_type", compression_type);
230 f->dump_unsigned("orig_size", orig_size);
231 if (compressor_message) {
232 f->dump_int("compressor_message", *compressor_message);
233 }
234 ::encode_json("blocks", blocks, f);
235 }
236