]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/col_buf_decoder.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / utilities / col_buf_decoder.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5
6 #include "utilities/col_buf_decoder.h"
7 #include <cstring>
8 #include <string>
9 #include "port/port.h"
10
11 namespace rocksdb {
12
13 ColBufDecoder::~ColBufDecoder() {}
14
15 namespace {
16
17 inline uint64_t EncodeFixed64WithEndian(uint64_t val, bool big_endian,
18 size_t size) {
19 if (big_endian && port::kLittleEndian) {
20 val = EndianTransform(val, size);
21 } else if (!big_endian && !port::kLittleEndian) {
22 val = EndianTransform(val, size);
23 }
24 return val;
25 }
26
27 } // namespace
28
29 ColBufDecoder* ColBufDecoder::NewColBufDecoder(
30 const ColDeclaration& col_declaration) {
31 if (col_declaration.col_type == "FixedLength") {
32 return new FixedLengthColBufDecoder(
33 col_declaration.size, col_declaration.col_compression_type,
34 col_declaration.nullable, col_declaration.big_endian);
35 } else if (col_declaration.col_type == "VariableLength") {
36 return new VariableLengthColBufDecoder();
37 } else if (col_declaration.col_type == "VariableChunk") {
38 return new VariableChunkColBufDecoder(col_declaration.col_compression_type);
39 } else if (col_declaration.col_type == "LongFixedLength") {
40 return new LongFixedLengthColBufDecoder(col_declaration.size,
41 col_declaration.nullable);
42 }
43 // Unrecognized column type
44 return nullptr;
45 }
46
47 namespace {
48
49 void ReadVarint64(const char** src_ptr, uint64_t* val_ptr) {
50 const char* q = GetVarint64Ptr(*src_ptr, *src_ptr + 10, val_ptr);
51 assert(q != nullptr);
52 *src_ptr = q;
53 }
54 } // namespace
55
56 size_t FixedLengthColBufDecoder::Init(const char* src) {
57 remain_runs_ = 0;
58 last_val_ = 0;
59 // Dictionary initialization
60 dict_vec_.clear();
61 const char* orig_src = src;
62 if (col_compression_type_ == kColDict ||
63 col_compression_type_ == kColRleDict) {
64 const char* q;
65 uint64_t dict_size;
66 // Bypass limit
67 q = GetVarint64Ptr(src, src + 10, &dict_size);
68 assert(q != nullptr);
69 src = q;
70
71 uint64_t dict_key;
72 for (uint64_t i = 0; i < dict_size; ++i) {
73 // Bypass limit
74 ReadVarint64(&src, &dict_key);
75
76 dict_key = EncodeFixed64WithEndian(dict_key, big_endian_, size_);
77 dict_vec_.push_back(dict_key);
78 }
79 }
80 return src - orig_src;
81 }
82
83 size_t FixedLengthColBufDecoder::Decode(const char* src, char** dest) {
84 uint64_t read_val = 0;
85 const char* orig_src = src;
86 const char* src_limit = src + 20;
87 if (nullable_) {
88 bool not_null;
89 not_null = *src;
90 src += 1;
91 if (!not_null) {
92 return 1;
93 }
94 }
95 if (IsRunLength(col_compression_type_)) {
96 if (remain_runs_ == 0) {
97 const char* q;
98 run_val_ = 0;
99 if (col_compression_type_ == kColRle) {
100 memcpy(&run_val_, src, size_);
101 src += size_;
102 } else {
103 q = GetVarint64Ptr(src, src_limit, &run_val_);
104 assert(q != nullptr);
105 src = q;
106 }
107
108 q = GetVarint64Ptr(src, src_limit, &remain_runs_);
109 assert(q != nullptr);
110 src = q;
111
112 if (col_compression_type_ != kColRleDeltaVarint &&
113 col_compression_type_ != kColRleDict) {
114 run_val_ = EncodeFixed64WithEndian(run_val_, big_endian_, size_);
115 }
116 }
117 read_val = run_val_;
118 } else {
119 if (col_compression_type_ == kColNoCompression) {
120 memcpy(&read_val, src, size_);
121 src += size_;
122 } else {
123 // Assume a column does not exceed 8 bytes here
124 const char* q = GetVarint64Ptr(src, src_limit, &read_val);
125 assert(q != nullptr);
126 src = q;
127 }
128 if (col_compression_type_ != kColDeltaVarint &&
129 col_compression_type_ != kColDict) {
130 read_val = EncodeFixed64WithEndian(read_val, big_endian_, size_);
131 }
132 }
133
134 uint64_t write_val = read_val;
135 if (col_compression_type_ == kColDeltaVarint ||
136 col_compression_type_ == kColRleDeltaVarint) {
137 // does not support 64 bit
138
139 uint64_t mask = (write_val & 1) ? (~uint64_t(0)) : 0;
140 int64_t delta = (write_val >> 1) ^ mask;
141 write_val = last_val_ + delta;
142
143 uint64_t tmp = write_val;
144 write_val = EncodeFixed64WithEndian(write_val, big_endian_, size_);
145 last_val_ = tmp;
146 } else if (col_compression_type_ == kColRleDict ||
147 col_compression_type_ == kColDict) {
148 uint64_t dict_val = read_val;
149 assert(dict_val < dict_vec_.size());
150 write_val = dict_vec_[dict_val];
151 }
152
153 // dest->append(reinterpret_cast<char*>(&write_val), size_);
154 memcpy(*dest, reinterpret_cast<char*>(&write_val), size_);
155 *dest += size_;
156 if (IsRunLength(col_compression_type_)) {
157 --remain_runs_;
158 }
159 return src - orig_src;
160 }
161
162 size_t LongFixedLengthColBufDecoder::Decode(const char* src, char** dest) {
163 if (nullable_) {
164 bool not_null;
165 not_null = *src;
166 src += 1;
167 if (!not_null) {
168 return 1;
169 }
170 }
171 memcpy(*dest, src, size_);
172 *dest += size_;
173 return size_ + 1;
174 }
175
176 size_t VariableLengthColBufDecoder::Decode(const char* src, char** dest) {
177 uint8_t len;
178 len = *src;
179 memcpy(dest, reinterpret_cast<char*>(&len), 1);
180 *dest += 1;
181 src += 1;
182 memcpy(*dest, src, len);
183 *dest += len;
184 return len + 1;
185 }
186
187 size_t VariableChunkColBufDecoder::Init(const char* src) {
188 // Dictionary initialization
189 dict_vec_.clear();
190 const char* orig_src = src;
191 if (col_compression_type_ == kColDict) {
192 const char* q;
193 uint64_t dict_size;
194 // Bypass limit
195 q = GetVarint64Ptr(src, src + 10, &dict_size);
196 assert(q != nullptr);
197 src = q;
198
199 uint64_t dict_key;
200 for (uint64_t i = 0; i < dict_size; ++i) {
201 // Bypass limit
202 ReadVarint64(&src, &dict_key);
203 dict_vec_.push_back(dict_key);
204 }
205 }
206 return src - orig_src;
207 }
208
209 size_t VariableChunkColBufDecoder::Decode(const char* src, char** dest) {
210 const char* orig_src = src;
211 uint64_t size = 0;
212 ReadVarint64(&src, &size);
213 int64_t full_chunks = size / 8;
214 uint64_t chunk_buf;
215 size_t chunk_size = 8;
216 for (int64_t i = 0; i < full_chunks + 1; ++i) {
217 chunk_buf = 0;
218 if (i == full_chunks) {
219 chunk_size = size % 8;
220 }
221 if (col_compression_type_ == kColDict) {
222 uint64_t dict_val;
223 ReadVarint64(&src, &dict_val);
224 assert(dict_val < dict_vec_.size());
225 chunk_buf = dict_vec_[dict_val];
226 } else {
227 memcpy(&chunk_buf, src, chunk_size);
228 src += chunk_size;
229 }
230 memcpy(*dest, reinterpret_cast<char*>(&chunk_buf), 8);
231 *dest += 8;
232 uint8_t mask = ((0xFF - 8) + chunk_size) & 0xFF;
233 memcpy(*dest, reinterpret_cast<char*>(&mask), 1);
234 *dest += 1;
235 }
236
237 return src - orig_src;
238 }
239
240 } // namespace rocksdb