]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_compression.cc
82acf92bff225b2ade664ddd12fced63a4819bf6
[ceph.git] / ceph / src / rgw / rgw_compression.cc
1
2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
4
5 #include "rgw_compression.h"
6
7 #define dout_subsys ceph_subsys_rgw
8
9 //------------RGWPutObj_Compress---------------
10
11 int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again)
12 {
13 bufferlist in_bl;
14 if (*again) {
15 return next->handle_data(in_bl, ofs, phandle, pobj, again);
16 }
17 if (bl.length() > 0) {
18 // compression stuff
19 if ((ofs > 0 && compressed) || // if previous part was compressed
20 (ofs == 0)) { // or it's the first part
21 ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
22 int cr = compressor->compress(bl, in_bl);
23 if (cr < 0) {
24 if (ofs > 0) {
25 lderr(cct) << "Compression failed with exit code " << cr
26 << " for next part, compression process failed" << dendl;
27 return -EIO;
28 }
29 compressed = false;
30 ldout(cct, 5) << "Compression failed with exit code " << cr
31 << " for first part, storing uncompressed" << dendl;
32 in_bl.claim(bl);
33 } else {
34 compressed = true;
35
36 compression_block newbl;
37 int bs = blocks.size();
38 newbl.old_ofs = ofs;
39 newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
40 newbl.len = in_bl.length();
41 blocks.push_back(newbl);
42 }
43 } else {
44 compressed = false;
45 in_bl.claim(bl);
46 }
47 // end of compression stuff
48 }
49 return next->handle_data(in_bl, ofs, phandle, pobj, again);
50 }
51
52 //----------------RGWGetObj_Decompress---------------------
53 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
54 RGWCompressionInfo* cs_info_,
55 bool partial_content_,
56 RGWGetDataCB* next): RGWGetObj_Filter(next),
57 cct(cct_),
58 cs_info(cs_info_),
59 partial_content(partial_content_),
60 q_ofs(0),
61 q_len(0),
62 first_data(true),
63 cur_ofs(0)
64 {
65 compressor = Compressor::create(cct, cs_info->compression_type);
66 if (!compressor.get())
67 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
68 }
69
70 int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
71 {
72 ldout(cct, 10) << "Compression for rgw is enabled, decompress part " << bl_len << dendl;
73
74 if (!compressor.get()) {
75 // if compressor isn't available - error, because cannot return decompressed data?
76 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
77 return -EIO;
78 }
79 bufferlist out_bl, in_bl, temp_in_bl;
80 bl.copy(bl_ofs, bl_len, temp_in_bl);
81 bl_ofs = 0;
82 if (waiting.length() != 0) {
83 in_bl.append(waiting);
84 in_bl.append(temp_in_bl);
85 waiting.clear();
86 } else {
87 in_bl.claim(temp_in_bl);
88 }
89 bl_len = in_bl.length();
90
91 while (first_block <= last_block) {
92 bufferlist tmp, tmp_out;
93 int ofs_in_bl = first_block->new_ofs - cur_ofs;
94 if (ofs_in_bl + (unsigned)first_block->len > bl_len) {
95 // not complete block, put it to waiting
96 int tail = bl_len - ofs_in_bl;
97 in_bl.copy(ofs_in_bl, tail, waiting);
98 cur_ofs -= tail;
99 break;
100 }
101 in_bl.copy(ofs_in_bl, first_block->len, tmp);
102 int cr = compressor->decompress(tmp, tmp_out);
103 if (cr < 0) {
104 lderr(cct) << "Compression failed with exit code " << cr << dendl;
105 return cr;
106 }
107 if (first_block == last_block && partial_content)
108 tmp_out.copy(0, q_len, out_bl);
109 else
110 out_bl.append(tmp_out);
111 ++first_block;
112 }
113
114 if (first_data && partial_content && out_bl.length() != 0)
115 bl_ofs = q_ofs;
116
117 if (first_data && out_bl.length() != 0)
118 first_data = false;
119
120 cur_ofs += bl_len;
121
122 return next->handle_data(out_bl, bl_ofs, out_bl.length() - bl_ofs);
123 }
124
125 int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
126 {
127 if (partial_content) {
128 // if user set range, we need to calculate it in decompressed data
129 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
130 if (cs_info->blocks.size() > 1) {
131 vector<compression_block>::iterator fb, lb;
132 // not bad to use auto for lambda, I think
133 auto cmp_u = [] (off_t ofs, const compression_block& e) { return (unsigned)ofs < e.old_ofs; };
134 auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (unsigned)ofs; };
135 fb = upper_bound(cs_info->blocks.begin()+1,
136 cs_info->blocks.end(),
137 ofs,
138 cmp_u);
139 first_block = fb - 1;
140 lb = lower_bound(fb,
141 cs_info->blocks.end(),
142 end,
143 cmp_l);
144 last_block = lb - 1;
145 }
146 } else {
147 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
148 }
149
150 q_ofs = ofs - first_block->old_ofs;
151 q_len = end - last_block->old_ofs + 1;
152
153 ofs = first_block->new_ofs;
154 end = last_block->new_ofs + last_block->len - 1;
155
156 first_data = true;
157 cur_ofs = ofs;
158 waiting.clear();
159
160 return next->fixup_range(ofs, end);
161 }