]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_compression.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_compression.h"
6 #define dout_subsys ceph_subsys_rgw
10 int rgw_compression_info_from_attr(const bufferlist
& attr
,
11 bool& need_decompress
,
12 RGWCompressionInfo
& cs_info
)
14 auto bliter
= attr
.cbegin();
16 decode(cs_info
, bliter
);
17 } catch (buffer::error
& err
) {
20 if (cs_info
.blocks
.size() == 0) {
23 if (cs_info
.compression_type
!= "none")
24 need_decompress
= true;
26 need_decompress
= false;
30 int rgw_compression_info_from_attrset(const map
<string
, bufferlist
>& attrs
,
31 bool& need_decompress
,
32 RGWCompressionInfo
& cs_info
)
34 auto value
= attrs
.find(RGW_ATTR_COMPRESSION
);
35 if (value
== attrs
.end()) {
36 need_decompress
= false;
39 return rgw_compression_info_from_attr(value
->second
, need_decompress
, cs_info
);
42 //------------RGWPutObj_Compress---------------
44 int RGWPutObj_Compress::process(bufferlist
&& in
, uint64_t logical_offset
)
47 compressed_ofs
= logical_offset
;
49 if (in
.length() > 0) {
51 if ((logical_offset
> 0 && compressed
) || // if previous part was compressed
52 (logical_offset
== 0)) { // or it's the first part
53 ldout(cct
, 10) << "Compression for rgw is enabled, compress part " << in
.length() << dendl
;
54 int cr
= compressor
->compress(in
, out
, compressor_message
);
56 if (logical_offset
> 0) {
57 lderr(cct
) << "Compression failed with exit code " << cr
58 << " for next part, compression process failed" << dendl
;
62 ldout(cct
, 5) << "Compression failed with exit code " << cr
63 << " for first part, storing uncompressed" << dendl
;
68 compression_block newbl
;
69 size_t bs
= blocks
.size();
70 newbl
.old_ofs
= logical_offset
;
71 newbl
.new_ofs
= bs
> 0 ? blocks
[bs
-1].len
+ blocks
[bs
-1].new_ofs
: 0;
72 newbl
.len
= out
.length();
73 blocks
.push_back(newbl
);
75 compressed_ofs
= newbl
.new_ofs
;
81 // end of compression stuff
83 size_t bs
= blocks
.size();
84 compressed_ofs
= bs
> 0 ? blocks
[bs
-1].len
+ blocks
[bs
-1].new_ofs
: logical_offset
;
87 return Pipe::process(std::move(out
), compressed_ofs
);
90 //----------------RGWGetObj_Decompress---------------------
91 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext
* cct_
,
92 RGWCompressionInfo
* cs_info_
,
93 bool partial_content_
,
94 RGWGetObj_Filter
* next
): RGWGetObj_Filter(next
),
97 partial_content(partial_content_
),
102 compressor
= Compressor::create(cct
, cs_info
->compression_type
);
103 if (!compressor
.get())
104 lderr(cct
) << "Cannot load compressor of type " << cs_info
->compression_type
<< dendl
;
107 int RGWGetObj_Decompress::handle_data(bufferlist
& bl
, off_t bl_ofs
, off_t bl_len
)
109 ldout(cct
, 10) << "Compression for rgw is enabled, decompress part "
110 << "bl_ofs=" << bl_ofs
<< ", bl_len=" << bl_len
<< dendl
;
112 if (!compressor
.get()) {
113 // if compressor isn't available - error, because cannot return decompressed data?
114 lderr(cct
) << "Cannot load compressor of type " << cs_info
->compression_type
<< dendl
;
117 bufferlist out_bl
, in_bl
, temp_in_bl
;
118 bl
.begin(bl_ofs
).copy(bl_len
, temp_in_bl
);
121 if (waiting
.length() != 0) {
122 in_bl
.append(waiting
);
123 in_bl
.append(temp_in_bl
);
126 in_bl
= std::move(temp_in_bl
);
128 bl_len
= in_bl
.length();
130 auto iter_in_bl
= in_bl
.cbegin();
131 while (first_block
<= last_block
) {
133 off_t ofs_in_bl
= first_block
->new_ofs
- cur_ofs
;
134 if (ofs_in_bl
+ (off_t
)first_block
->len
> bl_len
) {
135 // not complete block, put it to waiting
136 unsigned tail
= bl_len
- ofs_in_bl
;
137 if (iter_in_bl
.get_off() != ofs_in_bl
) {
138 iter_in_bl
.seek(ofs_in_bl
);
140 iter_in_bl
.copy(tail
, waiting
);
144 if (iter_in_bl
.get_off() != ofs_in_bl
) {
145 iter_in_bl
.seek(ofs_in_bl
);
147 iter_in_bl
.copy(first_block
->len
, tmp
);
148 int cr
= compressor
->decompress(tmp
, out_bl
, cs_info
->compressor_message
);
150 lderr(cct
) << "Decompression failed with exit code " << cr
<< dendl
;
154 while (out_bl
.length() - q_ofs
>=
155 static_cast<off_t
>(cct
->_conf
->rgw_max_chunk_size
)) {
156 off_t ch_len
= std::min
<off_t
>(cct
->_conf
->rgw_max_chunk_size
, q_len
);
158 r
= next
->handle_data(out_bl
, q_ofs
, ch_len
);
160 lsubdout(cct
, rgw
, 0) << "handle_data failed with exit code " << r
<< dendl
;
163 out_bl
.splice(0, q_ofs
+ ch_len
);
169 off_t ch_len
= std::min
<off_t
>(out_bl
.length() - q_ofs
, q_len
);
171 r
= next
->handle_data(out_bl
, q_ofs
, ch_len
);
173 lsubdout(cct
, rgw
, 0) << "handle_data failed with exit code " << r
<< dendl
;
176 out_bl
.splice(0, q_ofs
+ ch_len
);
183 int RGWGetObj_Decompress::fixup_range(off_t
& ofs
, off_t
& end
)
185 if (partial_content
) {
186 // if user set range, we need to calculate it in decompressed data
187 first_block
= cs_info
->blocks
.begin(); last_block
= cs_info
->blocks
.begin();
188 if (cs_info
->blocks
.size() > 1) {
189 vector
<compression_block
>::iterator fb
, lb
;
190 // not bad to use auto for lambda, I think
191 auto cmp_u
= [] (off_t ofs
, const compression_block
& e
) { return (uint64_t)ofs
< e
.old_ofs
; };
192 auto cmp_l
= [] (const compression_block
& e
, off_t ofs
) { return e
.old_ofs
<= (uint64_t)ofs
; };
193 fb
= upper_bound(cs_info
->blocks
.begin()+1,
194 cs_info
->blocks
.end(),
197 first_block
= fb
- 1;
199 cs_info
->blocks
.end(),
205 first_block
= cs_info
->blocks
.begin(); last_block
= cs_info
->blocks
.end() - 1;
208 q_ofs
= ofs
- first_block
->old_ofs
;
209 q_len
= end
+ 1 - ofs
;
211 ofs
= first_block
->new_ofs
;
212 end
= last_block
->new_ofs
+ last_block
->len
- 1;
217 return next
->fixup_range(ofs
, end
);
220 void compression_block::dump(Formatter
*f
) const
222 f
->dump_unsigned("old_ofs", old_ofs
);
223 f
->dump_unsigned("new_ofs", new_ofs
);
224 f
->dump_unsigned("len", len
);
227 void RGWCompressionInfo::dump(Formatter
*f
) const
229 f
->dump_string("compression_type", compression_type
);
230 f
->dump_unsigned("orig_size", orig_size
);
231 if (compressor_message
) {
232 f
->dump_int("compressor_message", *compressor_message
);
234 ::encode_json("blocks", blocks
, f
);