]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_compression.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_compression.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_compression.h"
5
6 #define dout_subsys ceph_subsys_rgw
7
8 //------------RGWPutObj_Compress---------------
9
10 int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
11 {
12 bufferlist out;
13 if (in.length() > 0) {
14 // compression stuff
15 if ((logical_offset > 0 && compressed) || // if previous part was compressed
16 (logical_offset == 0)) { // or it's the first part
17 ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
18 int cr = compressor->compress(in, out);
19 if (cr < 0) {
20 if (logical_offset > 0) {
21 lderr(cct) << "Compression failed with exit code " << cr
22 << " for next part, compression process failed" << dendl;
23 return -EIO;
24 }
25 compressed = false;
26 ldout(cct, 5) << "Compression failed with exit code " << cr
27 << " for first part, storing uncompressed" << dendl;
28 out.claim(in);
29 } else {
30 compressed = true;
31
32 compression_block newbl;
33 size_t bs = blocks.size();
34 newbl.old_ofs = logical_offset;
35 newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
36 newbl.len = out.length();
37 blocks.push_back(newbl);
38 }
39 } else {
40 compressed = false;
41 out.claim(in);
42 }
43 // end of compression stuff
44 }
45 return Pipe::process(std::move(out), logical_offset);
46 }
47
48 //----------------RGWGetObj_Decompress---------------------
49 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
50 RGWCompressionInfo* cs_info_,
51 bool partial_content_,
52 RGWGetObj_Filter* next): RGWGetObj_Filter(next),
53 cct(cct_),
54 cs_info(cs_info_),
55 partial_content(partial_content_),
56 q_ofs(0),
57 q_len(0),
58 cur_ofs(0)
59 {
60 compressor = Compressor::create(cct, cs_info->compression_type);
61 if (!compressor.get())
62 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
63 }
64
65 int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
66 {
67 ldout(cct, 10) << "Compression for rgw is enabled, decompress part "
68 << "bl_ofs="<< bl_ofs << bl_len << dendl;
69
70 if (!compressor.get()) {
71 // if compressor isn't available - error, because cannot return decompressed data?
72 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
73 return -EIO;
74 }
75 bufferlist out_bl, in_bl, temp_in_bl;
76 bl.copy(bl_ofs, bl_len, temp_in_bl);
77 bl_ofs = 0;
78 int r = 0;
79 if (waiting.length() != 0) {
80 in_bl.append(waiting);
81 in_bl.append(temp_in_bl);
82 waiting.clear();
83 } else {
84 in_bl.claim(temp_in_bl);
85 }
86 bl_len = in_bl.length();
87
88 while (first_block <= last_block) {
89 bufferlist tmp;
90 off_t ofs_in_bl = first_block->new_ofs - cur_ofs;
91 if (ofs_in_bl + (off_t)first_block->len > bl_len) {
92 // not complete block, put it to waiting
93 unsigned tail = bl_len - ofs_in_bl;
94 in_bl.copy(ofs_in_bl, tail, waiting);
95 cur_ofs -= tail;
96 break;
97 }
98 in_bl.copy(ofs_in_bl, first_block->len, tmp);
99 int cr = compressor->decompress(tmp, out_bl);
100 if (cr < 0) {
101 lderr(cct) << "Compression failed with exit code " << cr << dendl;
102 return cr;
103 }
104 ++first_block;
105 while (out_bl.length() - q_ofs >= cct->_conf->rgw_max_chunk_size)
106 {
107 off_t ch_len = std::min<off_t>(cct->_conf->rgw_max_chunk_size, q_len);
108 q_len -= ch_len;
109 r = next->handle_data(out_bl, q_ofs, ch_len);
110 if (r < 0) {
111 lderr(cct) << "handle_data failed with exit code " << r << dendl;
112 return r;
113 }
114 out_bl.splice(0, q_ofs + ch_len);
115 q_ofs = 0;
116 }
117 }
118
119 cur_ofs += bl_len;
120 off_t ch_len = std::min<off_t>(out_bl.length() - q_ofs, q_len);
121 if (ch_len > 0) {
122 r = next->handle_data(out_bl, q_ofs, ch_len);
123 if (r < 0) {
124 lderr(cct) << "handle_data failed with exit code " << r << dendl;
125 return r;
126 }
127 out_bl.splice(0, q_ofs + ch_len);
128 q_len -= ch_len;
129 q_ofs = 0;
130 }
131 return r;
132 }
133
134 int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
135 {
136 if (partial_content) {
137 // if user set range, we need to calculate it in decompressed data
138 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
139 if (cs_info->blocks.size() > 1) {
140 vector<compression_block>::iterator fb, lb;
141 // not bad to use auto for lambda, I think
142 auto cmp_u = [] (off_t ofs, const compression_block& e) { return (uint64_t)ofs < e.old_ofs; };
143 auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (uint64_t)ofs; };
144 fb = upper_bound(cs_info->blocks.begin()+1,
145 cs_info->blocks.end(),
146 ofs,
147 cmp_u);
148 first_block = fb - 1;
149 lb = lower_bound(fb,
150 cs_info->blocks.end(),
151 end,
152 cmp_l);
153 last_block = lb - 1;
154 }
155 } else {
156 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
157 }
158
159 q_ofs = ofs - first_block->old_ofs;
160 q_len = end + 1 - ofs;
161
162 ofs = first_block->new_ofs;
163 end = last_block->new_ofs + last_block->len - 1;
164
165 cur_ofs = ofs;
166 waiting.clear();
167
168 return next->fixup_range(ofs, end);
169 }