]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_compression.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_compression.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw_compression.h"
5
6#define dout_subsys ceph_subsys_rgw
7
8//------------RGWPutObj_Compress---------------
9
11fdf7f2 10int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
7c673cae 11{
11fdf7f2
TL
12 bufferlist out;
13 if (in.length() > 0) {
7c673cae 14 // compression stuff
11fdf7f2
TL
15 if ((logical_offset > 0 && compressed) || // if previous part was compressed
16 (logical_offset == 0)) { // or it's the first part
17 ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
18 int cr = compressor->compress(in, out);
7c673cae 19 if (cr < 0) {
11fdf7f2 20 if (logical_offset > 0) {
7c673cae
FG
21 lderr(cct) << "Compression failed with exit code " << cr
22 << " for next part, compression process failed" << dendl;
23 return -EIO;
24 }
25 compressed = false;
26 ldout(cct, 5) << "Compression failed with exit code " << cr
27 << " for first part, storing uncompressed" << dendl;
11fdf7f2 28 out.claim(in);
7c673cae
FG
29 } else {
30 compressed = true;
31
32 compression_block newbl;
224ce89b 33 size_t bs = blocks.size();
11fdf7f2 34 newbl.old_ofs = logical_offset;
7c673cae 35 newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
11fdf7f2 36 newbl.len = out.length();
7c673cae
FG
37 blocks.push_back(newbl);
38 }
39 } else {
40 compressed = false;
11fdf7f2 41 out.claim(in);
7c673cae
FG
42 }
43 // end of compression stuff
44 }
11fdf7f2 45 return Pipe::process(std::move(out), logical_offset);
7c673cae
FG
46}
47
48//----------------RGWGetObj_Decompress---------------------
49RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_,
50 RGWCompressionInfo* cs_info_,
51 bool partial_content_,
11fdf7f2 52 RGWGetObj_Filter* next): RGWGetObj_Filter(next),
7c673cae
FG
53 cct(cct_),
54 cs_info(cs_info_),
55 partial_content(partial_content_),
56 q_ofs(0),
57 q_len(0),
7c673cae
FG
58 cur_ofs(0)
59{
60 compressor = Compressor::create(cct, cs_info->compression_type);
61 if (!compressor.get())
62 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
63}
64
65int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
66{
224ce89b
WB
67 ldout(cct, 10) << "Compression for rgw is enabled, decompress part "
68 << "bl_ofs="<< bl_ofs << bl_len << dendl;
7c673cae
FG
69
70 if (!compressor.get()) {
71 // if compressor isn't available - error, because cannot return decompressed data?
72 lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
73 return -EIO;
74 }
31f18b77
FG
75 bufferlist out_bl, in_bl, temp_in_bl;
76 bl.copy(bl_ofs, bl_len, temp_in_bl);
7c673cae 77 bl_ofs = 0;
94b18763 78 int r = 0;
7c673cae
FG
79 if (waiting.length() != 0) {
80 in_bl.append(waiting);
31f18b77 81 in_bl.append(temp_in_bl);
7c673cae
FG
82 waiting.clear();
83 } else {
31f18b77 84 in_bl.claim(temp_in_bl);
7c673cae
FG
85 }
86 bl_len = in_bl.length();
224ce89b 87
7c673cae 88 while (first_block <= last_block) {
224ce89b
WB
89 bufferlist tmp;
90 off_t ofs_in_bl = first_block->new_ofs - cur_ofs;
91 if (ofs_in_bl + (off_t)first_block->len > bl_len) {
7c673cae 92 // not complete block, put it to waiting
224ce89b 93 unsigned tail = bl_len - ofs_in_bl;
7c673cae
FG
94 in_bl.copy(ofs_in_bl, tail, waiting);
95 cur_ofs -= tail;
96 break;
97 }
98 in_bl.copy(ofs_in_bl, first_block->len, tmp);
224ce89b 99 int cr = compressor->decompress(tmp, out_bl);
7c673cae
FG
100 if (cr < 0) {
101 lderr(cct) << "Compression failed with exit code " << cr << dendl;
102 return cr;
103 }
7c673cae 104 ++first_block;
224ce89b
WB
105 while (out_bl.length() - q_ofs >= cct->_conf->rgw_max_chunk_size)
106 {
107 off_t ch_len = std::min<off_t>(cct->_conf->rgw_max_chunk_size, q_len);
108 q_len -= ch_len;
109 r = next->handle_data(out_bl, q_ofs, ch_len);
110 if (r < 0) {
111 lderr(cct) << "handle_data failed with exit code " << r << dendl;
112 return r;
113 }
114 out_bl.splice(0, q_ofs + ch_len);
115 q_ofs = 0;
116 }
7c673cae
FG
117 }
118
7c673cae 119 cur_ofs += bl_len;
224ce89b 120 off_t ch_len = std::min<off_t>(out_bl.length() - q_ofs, q_len);
94b18763
FG
121 if (ch_len > 0) {
122 r = next->handle_data(out_bl, q_ofs, ch_len);
123 if (r < 0) {
124 lderr(cct) << "handle_data failed with exit code " << r << dendl;
125 return r;
126 }
127 out_bl.splice(0, q_ofs + ch_len);
128 q_len -= ch_len;
129 q_ofs = 0;
224ce89b 130 }
224ce89b 131 return r;
7c673cae
FG
132}
133
134int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
135{
136 if (partial_content) {
137 // if user set range, we need to calculate it in decompressed data
138 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
139 if (cs_info->blocks.size() > 1) {
140 vector<compression_block>::iterator fb, lb;
141 // not bad to use auto for lambda, I think
224ce89b
WB
142 auto cmp_u = [] (off_t ofs, const compression_block& e) { return (uint64_t)ofs < e.old_ofs; };
143 auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (uint64_t)ofs; };
7c673cae
FG
144 fb = upper_bound(cs_info->blocks.begin()+1,
145 cs_info->blocks.end(),
146 ofs,
147 cmp_u);
148 first_block = fb - 1;
149 lb = lower_bound(fb,
150 cs_info->blocks.end(),
151 end,
152 cmp_l);
153 last_block = lb - 1;
154 }
155 } else {
156 first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
157 }
158
159 q_ofs = ofs - first_block->old_ofs;
224ce89b 160 q_len = end + 1 - ofs;
7c673cae
FG
161
162 ofs = first_block->new_ofs;
31f18b77 163 end = last_block->new_ofs + last_block->len - 1;
7c673cae 164
7c673cae
FG
165 cur_ofs = ofs;
166 waiting.clear();
167
168 return next->fixup_range(ofs, end);
169}