]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
3 | // vim: ts=8 sw=2 smarttab | |
4 | ||
5 | #include "rgw_compression.h" | |
6 | ||
7 | #define dout_subsys ceph_subsys_rgw | |
8 | ||
9 | //------------RGWPutObj_Compress--------------- | |
10 | ||
11 | int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) | |
12 | { | |
13 | bufferlist in_bl; | |
14 | if (*again) { | |
15 | return next->handle_data(in_bl, ofs, phandle, pobj, again); | |
16 | } | |
17 | if (bl.length() > 0) { | |
18 | // compression stuff | |
19 | if ((ofs > 0 && compressed) || // if previous part was compressed | |
20 | (ofs == 0)) { // or it's the first part | |
21 | ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl; | |
22 | int cr = compressor->compress(bl, in_bl); | |
23 | if (cr < 0) { | |
24 | if (ofs > 0) { | |
25 | lderr(cct) << "Compression failed with exit code " << cr | |
26 | << " for next part, compression process failed" << dendl; | |
27 | return -EIO; | |
28 | } | |
29 | compressed = false; | |
30 | ldout(cct, 5) << "Compression failed with exit code " << cr | |
31 | << " for first part, storing uncompressed" << dendl; | |
32 | in_bl.claim(bl); | |
33 | } else { | |
34 | compressed = true; | |
35 | ||
36 | compression_block newbl; | |
37 | int bs = blocks.size(); | |
38 | newbl.old_ofs = ofs; | |
39 | newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0; | |
40 | newbl.len = in_bl.length(); | |
41 | blocks.push_back(newbl); | |
42 | } | |
43 | } else { | |
44 | compressed = false; | |
45 | in_bl.claim(bl); | |
46 | } | |
47 | // end of compression stuff | |
48 | } | |
49 | return next->handle_data(in_bl, ofs, phandle, pobj, again); | |
50 | } | |
51 | ||
52 | //----------------RGWGetObj_Decompress--------------------- | |
53 | RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, | |
54 | RGWCompressionInfo* cs_info_, | |
55 | bool partial_content_, | |
56 | RGWGetDataCB* next): RGWGetObj_Filter(next), | |
57 | cct(cct_), | |
58 | cs_info(cs_info_), | |
59 | partial_content(partial_content_), | |
60 | q_ofs(0), | |
61 | q_len(0), | |
62 | first_data(true), | |
63 | cur_ofs(0) | |
64 | { | |
65 | compressor = Compressor::create(cct, cs_info->compression_type); | |
66 | if (!compressor.get()) | |
67 | lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl; | |
68 | } | |
69 | ||
70 | int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) | |
71 | { | |
72 | ldout(cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl; | |
73 | ||
74 | if (!compressor.get()) { | |
75 | // if compressor isn't available - error, because cannot return decompressed data? | |
76 | lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl; | |
77 | return -EIO; | |
78 | } | |
31f18b77 FG |
79 | bufferlist out_bl, in_bl, temp_in_bl; |
80 | bl.copy(bl_ofs, bl_len, temp_in_bl); | |
7c673cae FG |
81 | bl_ofs = 0; |
82 | if (waiting.length() != 0) { | |
83 | in_bl.append(waiting); | |
31f18b77 | 84 | in_bl.append(temp_in_bl); |
7c673cae FG |
85 | waiting.clear(); |
86 | } else { | |
31f18b77 | 87 | in_bl.claim(temp_in_bl); |
7c673cae FG |
88 | } |
89 | bl_len = in_bl.length(); | |
31f18b77 | 90 | |
7c673cae FG |
91 | while (first_block <= last_block) { |
92 | bufferlist tmp, tmp_out; | |
93 | int ofs_in_bl = first_block->new_ofs - cur_ofs; | |
94 | if (ofs_in_bl + (unsigned)first_block->len > bl_len) { | |
95 | // not complete block, put it to waiting | |
96 | int tail = bl_len - ofs_in_bl; | |
97 | in_bl.copy(ofs_in_bl, tail, waiting); | |
98 | cur_ofs -= tail; | |
99 | break; | |
100 | } | |
101 | in_bl.copy(ofs_in_bl, first_block->len, tmp); | |
102 | int cr = compressor->decompress(tmp, tmp_out); | |
103 | if (cr < 0) { | |
104 | lderr(cct) << "Compression failed with exit code " << cr << dendl; | |
105 | return cr; | |
106 | } | |
107 | if (first_block == last_block && partial_content) | |
108 | tmp_out.copy(0, q_len, out_bl); | |
109 | else | |
110 | out_bl.append(tmp_out); | |
111 | ++first_block; | |
112 | } | |
113 | ||
114 | if (first_data && partial_content && out_bl.length() != 0) | |
115 | bl_ofs = q_ofs; | |
116 | ||
117 | if (first_data && out_bl.length() != 0) | |
118 | first_data = false; | |
119 | ||
120 | cur_ofs += bl_len; | |
121 | ||
122 | return next->handle_data(out_bl, bl_ofs, out_bl.length() - bl_ofs); | |
123 | } | |
124 | ||
125 | int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end) | |
126 | { | |
127 | if (partial_content) { | |
128 | // if user set range, we need to calculate it in decompressed data | |
129 | first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin(); | |
130 | if (cs_info->blocks.size() > 1) { | |
131 | vector<compression_block>::iterator fb, lb; | |
132 | // not bad to use auto for lambda, I think | |
133 | auto cmp_u = [] (off_t ofs, const compression_block& e) { return (unsigned)ofs < e.old_ofs; }; | |
31f18b77 | 134 | auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (unsigned)ofs; }; |
7c673cae FG |
135 | fb = upper_bound(cs_info->blocks.begin()+1, |
136 | cs_info->blocks.end(), | |
137 | ofs, | |
138 | cmp_u); | |
139 | first_block = fb - 1; | |
140 | lb = lower_bound(fb, | |
141 | cs_info->blocks.end(), | |
142 | end, | |
143 | cmp_l); | |
144 | last_block = lb - 1; | |
145 | } | |
146 | } else { | |
147 | first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1; | |
148 | } | |
149 | ||
150 | q_ofs = ofs - first_block->old_ofs; | |
151 | q_len = end - last_block->old_ofs + 1; | |
152 | ||
153 | ofs = first_block->new_ofs; | |
31f18b77 | 154 | end = last_block->new_ofs + last_block->len - 1; |
7c673cae FG |
155 | |
156 | first_data = true; | |
157 | cur_ofs = ofs; | |
158 | waiting.clear(); | |
159 | ||
160 | return next->fixup_range(ofs, end); | |
161 | } |