]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_compression.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_compression.h"
6 #define dout_subsys ceph_subsys_rgw
8 int rgw_compression_info_from_attrset(map
<string
, bufferlist
>& attrs
,
10 RGWCompressionInfo
& cs_info
) {
11 map
<string
, bufferlist
>::iterator value
= attrs
.find(RGW_ATTR_COMPRESSION
);
12 if (value
!= attrs
.end()) {
13 auto bliter
= value
->second
.cbegin();
15 decode(cs_info
, bliter
);
16 } catch (buffer::error
& err
) {
19 if (cs_info
.blocks
.size() == 0) {
22 if (cs_info
.compression_type
!= "none")
23 need_decompress
= true;
25 need_decompress
= false;
28 need_decompress
= false;
33 //------------RGWPutObj_Compress---------------
35 int RGWPutObj_Compress::process(bufferlist
&& in
, uint64_t logical_offset
)
38 if (in
.length() > 0) {
40 if ((logical_offset
> 0 && compressed
) || // if previous part was compressed
41 (logical_offset
== 0)) { // or it's the first part
42 ldout(cct
, 10) << "Compression for rgw is enabled, compress part " << in
.length() << dendl
;
43 int cr
= compressor
->compress(in
, out
);
45 if (logical_offset
> 0) {
46 lderr(cct
) << "Compression failed with exit code " << cr
47 << " for next part, compression process failed" << dendl
;
51 ldout(cct
, 5) << "Compression failed with exit code " << cr
52 << " for first part, storing uncompressed" << dendl
;
57 compression_block newbl
;
58 size_t bs
= blocks
.size();
59 newbl
.old_ofs
= logical_offset
;
60 newbl
.new_ofs
= bs
> 0 ? blocks
[bs
-1].len
+ blocks
[bs
-1].new_ofs
: 0;
61 newbl
.len
= out
.length();
62 blocks
.push_back(newbl
);
68 // end of compression stuff
70 return Pipe::process(std::move(out
), logical_offset
);
73 //----------------RGWGetObj_Decompress---------------------
74 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext
* cct_
,
75 RGWCompressionInfo
* cs_info_
,
76 bool partial_content_
,
77 RGWGetObj_Filter
* next
): RGWGetObj_Filter(next
),
80 partial_content(partial_content_
),
85 compressor
= Compressor::create(cct
, cs_info
->compression_type
);
86 if (!compressor
.get())
87 lderr(cct
) << "Cannot load compressor of type " << cs_info
->compression_type
<< dendl
;
90 int RGWGetObj_Decompress::handle_data(bufferlist
& bl
, off_t bl_ofs
, off_t bl_len
)
92 ldout(cct
, 10) << "Compression for rgw is enabled, decompress part "
93 << "bl_ofs="<< bl_ofs
<< bl_len
<< dendl
;
95 if (!compressor
.get()) {
96 // if compressor isn't available - error, because cannot return decompressed data?
97 lderr(cct
) << "Cannot load compressor of type " << cs_info
->compression_type
<< dendl
;
100 bufferlist out_bl
, in_bl
, temp_in_bl
;
101 bl
.begin(bl_ofs
).copy(bl_len
, temp_in_bl
);
104 if (waiting
.length() != 0) {
105 in_bl
.append(waiting
);
106 in_bl
.append(temp_in_bl
);
109 in_bl
.claim(temp_in_bl
);
111 bl_len
= in_bl
.length();
113 auto iter_in_bl
= in_bl
.cbegin();
114 while (first_block
<= last_block
) {
116 off_t ofs_in_bl
= first_block
->new_ofs
- cur_ofs
;
117 if (ofs_in_bl
+ (off_t
)first_block
->len
> bl_len
) {
118 // not complete block, put it to waiting
119 unsigned tail
= bl_len
- ofs_in_bl
;
120 if (iter_in_bl
.get_off() != ofs_in_bl
) {
121 iter_in_bl
.seek(ofs_in_bl
);
123 iter_in_bl
.copy(tail
, waiting
);
127 if (iter_in_bl
.get_off() != ofs_in_bl
) {
128 iter_in_bl
.seek(ofs_in_bl
);
130 iter_in_bl
.copy(first_block
->len
, tmp
);
131 int cr
= compressor
->decompress(tmp
, out_bl
);
133 lderr(cct
) << "Decompression failed with exit code " << cr
<< dendl
;
137 while (out_bl
.length() - q_ofs
>= cct
->_conf
->rgw_max_chunk_size
)
139 off_t ch_len
= std::min
<off_t
>(cct
->_conf
->rgw_max_chunk_size
, q_len
);
141 r
= next
->handle_data(out_bl
, q_ofs
, ch_len
);
143 lderr(cct
) << "handle_data failed with exit code " << r
<< dendl
;
146 out_bl
.splice(0, q_ofs
+ ch_len
);
152 off_t ch_len
= std::min
<off_t
>(out_bl
.length() - q_ofs
, q_len
);
154 r
= next
->handle_data(out_bl
, q_ofs
, ch_len
);
156 lderr(cct
) << "handle_data failed with exit code " << r
<< dendl
;
159 out_bl
.splice(0, q_ofs
+ ch_len
);
166 int RGWGetObj_Decompress::fixup_range(off_t
& ofs
, off_t
& end
)
168 if (partial_content
) {
169 // if user set range, we need to calculate it in decompressed data
170 first_block
= cs_info
->blocks
.begin(); last_block
= cs_info
->blocks
.begin();
171 if (cs_info
->blocks
.size() > 1) {
172 vector
<compression_block
>::iterator fb
, lb
;
173 // not bad to use auto for lambda, I think
174 auto cmp_u
= [] (off_t ofs
, const compression_block
& e
) { return (uint64_t)ofs
< e
.old_ofs
; };
175 auto cmp_l
= [] (const compression_block
& e
, off_t ofs
) { return e
.old_ofs
<= (uint64_t)ofs
; };
176 fb
= upper_bound(cs_info
->blocks
.begin()+1,
177 cs_info
->blocks
.end(),
180 first_block
= fb
- 1;
182 cs_info
->blocks
.end(),
188 first_block
= cs_info
->blocks
.begin(); last_block
= cs_info
->blocks
.end() - 1;
191 q_ofs
= ofs
- first_block
->old_ofs
;
192 q_len
= end
+ 1 - ofs
;
194 ofs
= first_block
->new_ofs
;
195 end
= last_block
->new_ofs
+ last_block
->len
- 1;
200 return next
->fixup_range(ofs
, end
);