]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_compression.h" | |
5 | ||
6 | #define dout_subsys ceph_subsys_rgw | |
7 | ||
8 | //------------RGWPutObj_Compress--------------- | |
9 | ||
11fdf7f2 | 10 | int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset) |
7c673cae | 11 | { |
11fdf7f2 TL |
12 | bufferlist out; |
13 | if (in.length() > 0) { | |
7c673cae | 14 | // compression stuff |
11fdf7f2 TL |
15 | if ((logical_offset > 0 && compressed) || // if previous part was compressed |
16 | (logical_offset == 0)) { // or it's the first part | |
17 | ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl; | |
18 | int cr = compressor->compress(in, out); | |
7c673cae | 19 | if (cr < 0) { |
11fdf7f2 | 20 | if (logical_offset > 0) { |
7c673cae FG |
21 | lderr(cct) << "Compression failed with exit code " << cr |
22 | << " for next part, compression process failed" << dendl; | |
23 | return -EIO; | |
24 | } | |
25 | compressed = false; | |
26 | ldout(cct, 5) << "Compression failed with exit code " << cr | |
27 | << " for first part, storing uncompressed" << dendl; | |
11fdf7f2 | 28 | out.claim(in); |
7c673cae FG |
29 | } else { |
30 | compressed = true; | |
31 | ||
32 | compression_block newbl; | |
224ce89b | 33 | size_t bs = blocks.size(); |
11fdf7f2 | 34 | newbl.old_ofs = logical_offset; |
7c673cae | 35 | newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; |
11fdf7f2 | 36 | newbl.len = out.length(); |
7c673cae FG |
37 | blocks.push_back(newbl); |
38 | } | |
39 | } else { | |
40 | compressed = false; | |
11fdf7f2 | 41 | out.claim(in); |
7c673cae FG |
42 | } |
43 | // end of compression stuff | |
44 | } | |
11fdf7f2 | 45 | return Pipe::process(std::move(out), logical_offset); |
7c673cae FG |
46 | } |
47 | ||
48 | //----------------RGWGetObj_Decompress--------------------- | |
49 | RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, | |
50 | RGWCompressionInfo* cs_info_, | |
51 | bool partial_content_, | |
11fdf7f2 | 52 | RGWGetObj_Filter* next): RGWGetObj_Filter(next), |
7c673cae FG |
53 | cct(cct_), |
54 | cs_info(cs_info_), | |
55 | partial_content(partial_content_), | |
56 | q_ofs(0), | |
57 | q_len(0), | |
7c673cae FG |
58 | cur_ofs(0) |
59 | { | |
60 | compressor = Compressor::create(cct, cs_info->compression_type); | |
61 | if (!compressor.get()) | |
62 | lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl; | |
63 | } | |
64 | ||
65 | int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) | |
66 | { | |
224ce89b WB |
67 | ldout(cct, 10) << "Compression for rgw is enabled, decompress part " |
68 | << "bl_ofs="<< bl_ofs << bl_len << dendl; | |
7c673cae FG |
69 | |
70 | if (!compressor.get()) { | |
71 | // if compressor isn't available - error, because cannot return decompressed data? | |
72 | lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl; | |
73 | return -EIO; | |
74 | } | |
31f18b77 FG |
75 | bufferlist out_bl, in_bl, temp_in_bl; |
76 | bl.copy(bl_ofs, bl_len, temp_in_bl); | |
7c673cae | 77 | bl_ofs = 0; |
94b18763 | 78 | int r = 0; |
7c673cae FG |
79 | if (waiting.length() != 0) { |
80 | in_bl.append(waiting); | |
31f18b77 | 81 | in_bl.append(temp_in_bl); |
7c673cae FG |
82 | waiting.clear(); |
83 | } else { | |
31f18b77 | 84 | in_bl.claim(temp_in_bl); |
7c673cae FG |
85 | } |
86 | bl_len = in_bl.length(); | |
224ce89b | 87 | |
7c673cae | 88 | while (first_block <= last_block) { |
224ce89b WB |
89 | bufferlist tmp; |
90 | off_t ofs_in_bl = first_block->new_ofs - cur_ofs; | |
91 | if (ofs_in_bl + (off_t)first_block->len > bl_len) { | |
7c673cae | 92 | // not complete block, put it to waiting |
224ce89b | 93 | unsigned tail = bl_len - ofs_in_bl; |
7c673cae FG |
94 | in_bl.copy(ofs_in_bl, tail, waiting); |
95 | cur_ofs -= tail; | |
96 | break; | |
97 | } | |
98 | in_bl.copy(ofs_in_bl, first_block->len, tmp); | |
224ce89b | 99 | int cr = compressor->decompress(tmp, out_bl); |
7c673cae FG |
100 | if (cr < 0) { |
101 | lderr(cct) << "Compression failed with exit code " << cr << dendl; | |
102 | return cr; | |
103 | } | |
7c673cae | 104 | ++first_block; |
224ce89b WB |
105 | while (out_bl.length() - q_ofs >= cct->_conf->rgw_max_chunk_size) |
106 | { | |
107 | off_t ch_len = std::min<off_t>(cct->_conf->rgw_max_chunk_size, q_len); | |
108 | q_len -= ch_len; | |
109 | r = next->handle_data(out_bl, q_ofs, ch_len); | |
110 | if (r < 0) { | |
111 | lderr(cct) << "handle_data failed with exit code " << r << dendl; | |
112 | return r; | |
113 | } | |
114 | out_bl.splice(0, q_ofs + ch_len); | |
115 | q_ofs = 0; | |
116 | } | |
7c673cae FG |
117 | } |
118 | ||
7c673cae | 119 | cur_ofs += bl_len; |
224ce89b | 120 | off_t ch_len = std::min<off_t>(out_bl.length() - q_ofs, q_len); |
94b18763 FG |
121 | if (ch_len > 0) { |
122 | r = next->handle_data(out_bl, q_ofs, ch_len); | |
123 | if (r < 0) { | |
124 | lderr(cct) << "handle_data failed with exit code " << r << dendl; | |
125 | return r; | |
126 | } | |
127 | out_bl.splice(0, q_ofs + ch_len); | |
128 | q_len -= ch_len; | |
129 | q_ofs = 0; | |
224ce89b | 130 | } |
224ce89b | 131 | return r; |
7c673cae FG |
132 | } |
133 | ||
134 | int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end) | |
135 | { | |
136 | if (partial_content) { | |
137 | // if user set range, we need to calculate it in decompressed data | |
138 | first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin(); | |
139 | if (cs_info->blocks.size() > 1) { | |
140 | vector<compression_block>::iterator fb, lb; | |
141 | // not bad to use auto for lambda, I think | |
224ce89b WB |
142 | auto cmp_u = [] (off_t ofs, const compression_block& e) { return (uint64_t)ofs < e.old_ofs; }; |
143 | auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (uint64_t)ofs; }; | |
7c673cae FG |
144 | fb = upper_bound(cs_info->blocks.begin()+1, |
145 | cs_info->blocks.end(), | |
146 | ofs, | |
147 | cmp_u); | |
148 | first_block = fb - 1; | |
149 | lb = lower_bound(fb, | |
150 | cs_info->blocks.end(), | |
151 | end, | |
152 | cmp_l); | |
153 | last_block = lb - 1; | |
154 | } | |
155 | } else { | |
156 | first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1; | |
157 | } | |
158 | ||
159 | q_ofs = ofs - first_block->old_ofs; | |
224ce89b | 160 | q_len = end + 1 - ofs; |
7c673cae FG |
161 | |
162 | ofs = first_block->new_ofs; | |
31f18b77 | 163 | end = last_block->new_ofs + last_block->len - 1; |
7c673cae | 164 | |
7c673cae FG |
165 | cur_ofs = ofs; |
166 | waiting.clear(); | |
167 | ||
168 | return next->fixup_range(ofs, end); | |
169 | } |