]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_compression.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_compression.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_compression.h"
5
6 #define dout_subsys ceph_subsys_rgw
7
8 int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs,
9 bool& need_decompress,
10 RGWCompressionInfo& cs_info) {
11 map<string, bufferlist>::iterator value = attrs.find(RGW_ATTR_COMPRESSION);
12 if (value != attrs.end()) {
13 auto bliter = value->second.cbegin();
14 try {
15 decode(cs_info, bliter);
16 } catch (buffer::error& err) {
17 return -EIO;
18 }
19 if (cs_info.blocks.size() == 0) {
20 return -EIO;
21 }
22 if (cs_info.compression_type != "none")
23 need_decompress = true;
24 else
25 need_decompress = false;
26 return 0;
27 } else {
28 need_decompress = false;
29 return 0;
30 }
31 }
32
33 //------------RGWPutObj_Compress---------------
34
35 int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
36 {
37 bufferlist out;
38 if (in.length() > 0) {
39 // compression stuff
40 if ((logical_offset > 0 && compressed) || // if previous part was compressed
41 (logical_offset == 0)) { // or it's the first part
42 ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
43 int cr = compressor->compress(in, out);
44 if (cr < 0) {
45 if (logical_offset > 0) {
46 lderr(cct) << "Compression failed with exit code " << cr
47 << " for next part, compression process failed" << dendl;
48 return -EIO;
49 }
50 compressed = false;
51 ldout(cct, 5) << "Compression failed with exit code " << cr
52 << " for first part, storing uncompressed" << dendl;
53 out.claim(in);
54 } else {
55 compressed = true;
56
57 compression_block newbl;
58 size_t bs = blocks.size();
59 newbl.old_ofs = logical_offset;
60 newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
61 newbl.len = out.length();
62 blocks.push_back(newbl);
63 }
64 } else {
65 compressed = false;
66 out.claim(in);
67 }
68 // end of compression stuff
69 }
70 return Pipe::process(std::move(out), logical_offset);
71 }
72
73 //----------------RGWGetObj_Decompress---------------------
74 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
75 RGWCompressionInfo* cs_info_,
76 bool partial_content_,
77 RGWGetObj_Filter* next): RGWGetObj_Filter(next),
78 cct(cct_),
79 cs_info(cs_info_),
80 partial_content(partial_content_),
81 q_ofs(0),
82 q_len(0),
83 cur_ofs(0)
84 {
85 compressor = Compressor::create(cct, cs_info->compression_type);
86 if (!compressor.get())
87 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
88 }
89
90 int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
91 {
92 ldout(cct, 10) << "Compression for rgw is enabled, decompress part "
93 << "bl_ofs="<< bl_ofs << bl_len << dendl;
94
95 if (!compressor.get()) {
96 // if compressor isn't available - error, because cannot return decompressed data?
97 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
98 return -EIO;
99 }
100 bufferlist out_bl, in_bl, temp_in_bl;
101 bl.begin(bl_ofs).copy(bl_len, temp_in_bl);
102 bl_ofs = 0;
103 int r = 0;
104 if (waiting.length() != 0) {
105 in_bl.append(waiting);
106 in_bl.append(temp_in_bl);
107 waiting.clear();
108 } else {
109 in_bl.claim(temp_in_bl);
110 }
111 bl_len = in_bl.length();
112
113 auto iter_in_bl = in_bl.cbegin();
114 while (first_block <= last_block) {
115 bufferlist tmp;
116 off_t ofs_in_bl = first_block->new_ofs - cur_ofs;
117 if (ofs_in_bl + (off_t)first_block->len > bl_len) {
118 // not complete block, put it to waiting
119 unsigned tail = bl_len - ofs_in_bl;
120 if (iter_in_bl.get_off() != ofs_in_bl) {
121 iter_in_bl.seek(ofs_in_bl);
122 }
123 iter_in_bl.copy(tail, waiting);
124 cur_ofs -= tail;
125 break;
126 }
127 if (iter_in_bl.get_off() != ofs_in_bl) {
128 iter_in_bl.seek(ofs_in_bl);
129 }
130 iter_in_bl.copy(first_block->len, tmp);
131 int cr = compressor->decompress(tmp, out_bl);
132 if (cr < 0) {
133 lderr(cct) << "Decompression failed with exit code " << cr << dendl;
134 return cr;
135 }
136 ++first_block;
137 while (out_bl.length() - q_ofs >= cct->_conf->rgw_max_chunk_size)
138 {
139 off_t ch_len = std::min<off_t>(cct->_conf->rgw_max_chunk_size, q_len);
140 q_len -= ch_len;
141 r = next->handle_data(out_bl, q_ofs, ch_len);
142 if (r < 0) {
143 lderr(cct) << "handle_data failed with exit code " << r << dendl;
144 return r;
145 }
146 out_bl.splice(0, q_ofs + ch_len);
147 q_ofs = 0;
148 }
149 }
150
151 cur_ofs += bl_len;
152 off_t ch_len = std::min<off_t>(out_bl.length() - q_ofs, q_len);
153 if (ch_len > 0) {
154 r = next->handle_data(out_bl, q_ofs, ch_len);
155 if (r < 0) {
156 lderr(cct) << "handle_data failed with exit code " << r << dendl;
157 return r;
158 }
159 out_bl.splice(0, q_ofs + ch_len);
160 q_len -= ch_len;
161 q_ofs = 0;
162 }
163 return r;
164 }
165
166 int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
167 {
168 if (partial_content) {
169 // if user set range, we need to calculate it in decompressed data
170 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
171 if (cs_info->blocks.size() > 1) {
172 vector<compression_block>::iterator fb, lb;
173 // not bad to use auto for lambda, I think
174 auto cmp_u = [] (off_t ofs, const compression_block& e) { return (uint64_t)ofs < e.old_ofs; };
175 auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (uint64_t)ofs; };
176 fb = upper_bound(cs_info->blocks.begin()+1,
177 cs_info->blocks.end(),
178 ofs,
179 cmp_u);
180 first_block = fb - 1;
181 lb = lower_bound(fb,
182 cs_info->blocks.end(),
183 end,
184 cmp_l);
185 last_block = lb - 1;
186 }
187 } else {
188 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
189 }
190
191 q_ofs = ofs - first_block->old_ofs;
192 q_len = end + 1 - ofs;
193
194 ofs = first_block->new_ofs;
195 end = last_block->new_ofs + last_block->len - 1;
196
197 cur_ofs = ofs;
198 waiting.clear();
199
200 return next->fixup_range(ofs, end);
201 }