]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/column_aware_encoding_util.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / utilities / column_aware_encoding_util.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/column_aware_encoding_util.h"
9
10 #ifndef __STDC_FORMAT_MACROS
11 #define __STDC_FORMAT_MACROS
12 #endif
13
14 #include <stddef.h>
15 #include <stdint.h>
16 #include <stdio.h>
17 #include <algorithm>
18 #include <utility>
19 #include <vector>
20 #include "include/rocksdb/comparator.h"
21 #include "include/rocksdb/slice.h"
22 #include "rocksdb/env.h"
23 #include "rocksdb/status.h"
24 #include "table/block_based_table_builder.h"
25 #include "table/block_based_table_factory.h"
26 #include "table/format.h"
27 #include "table/table_reader.h"
28 #include "util/coding.h"
29 #include "utilities/col_buf_decoder.h"
30 #include "utilities/col_buf_encoder.h"
31
32 #include "port/port.h"
33
34 namespace rocksdb {
35
36 ColumnAwareEncodingReader::ColumnAwareEncodingReader(
37 const std::string& file_path)
38 : file_name_(file_path),
39 ioptions_(options_),
40 internal_comparator_(BytewiseComparator()) {
41 InitTableReader(file_name_);
42 }
43
44 void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) {
45 std::unique_ptr<RandomAccessFile> file;
46 uint64_t file_size;
47 options_.env->NewRandomAccessFile(file_path, &file, soptions_);
48 options_.env->GetFileSize(file_path, &file_size);
49
50 file_.reset(new RandomAccessFileReader(std::move(file)));
51
52 options_.comparator = &internal_comparator_;
53 options_.table_factory = std::make_shared<BlockBasedTableFactory>();
54 shared_ptr<BlockBasedTableFactory> block_table_factory =
55 std::dynamic_pointer_cast<BlockBasedTableFactory>(options_.table_factory);
56
57 std::unique_ptr<TableReader> table_reader;
58 block_table_factory->NewTableReader(
59 TableReaderOptions(ioptions_, soptions_, internal_comparator_,
60 /*skip_filters=*/false),
61 std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false);
62
63 table_reader_.reset(dynamic_cast<BlockBasedTable*>(table_reader.release()));
64 }
65
66 void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks(
67 std::vector<KVPairBlock>* kv_pair_blocks) {
68 table_reader_->GetKVPairsFromDataBlocks(kv_pair_blocks);
69 }
70
71 void ColumnAwareEncodingReader::DecodeBlocks(
72 const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file,
73 const std::vector<std::string>* blocks) {
74 char* decoded_content_base = new char[16384];
75 Options options;
76 ImmutableCFOptions ioptions(options);
77 for (auto& block : *blocks) {
78 KVPairColBufDecoders kvp_col_bufs(kvp_col_declarations);
79 auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
80 auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
81 auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
82
83 auto& slice_final_with_bit = block;
84 uint32_t format_version = 2;
85 Slice compression_dict;
86 BlockContents contents;
87 const char* content_ptr;
88
89 CompressionType type =
90 (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
91 if (type != kNoCompression) {
92 UncompressBlockContents(slice_final_with_bit.c_str(),
93 slice_final_with_bit.size() - 1, &contents,
94 format_version, compression_dict, ioptions);
95 content_ptr = contents.data.data();
96 } else {
97 content_ptr = slice_final_with_bit.data();
98 }
99
100 size_t num_kv_pairs;
101 const char* header_content_ptr = content_ptr;
102 num_kv_pairs = DecodeFixed64(header_content_ptr);
103
104 header_content_ptr += sizeof(size_t);
105 size_t num_key_columns = key_col_bufs.size();
106 size_t num_value_columns = value_col_bufs.size();
107 std::vector<const char*> key_content_ptr(num_key_columns);
108 std::vector<const char*> value_content_ptr(num_value_columns);
109 const char* checksum_content_ptr;
110
111 size_t num_columns = num_key_columns + num_value_columns;
112 const char* col_content_ptr =
113 header_content_ptr + sizeof(size_t) * num_columns;
114
115 // Read headers
116 for (size_t i = 0; i < num_key_columns; ++i) {
117 key_content_ptr[i] = col_content_ptr;
118 key_content_ptr[i] += key_col_bufs[i]->Init(key_content_ptr[i]);
119 size_t offset;
120 offset = DecodeFixed64(header_content_ptr);
121 header_content_ptr += sizeof(size_t);
122 col_content_ptr += offset;
123 }
124 for (size_t i = 0; i < num_value_columns; ++i) {
125 value_content_ptr[i] = col_content_ptr;
126 value_content_ptr[i] += value_col_bufs[i]->Init(value_content_ptr[i]);
127 size_t offset;
128 offset = DecodeFixed64(header_content_ptr);
129 header_content_ptr += sizeof(size_t);
130 col_content_ptr += offset;
131 }
132 checksum_content_ptr = col_content_ptr;
133 checksum_content_ptr += value_checksum_buf->Init(checksum_content_ptr);
134
135 // Decode block
136 char* decoded_content = decoded_content_base;
137 for (size_t j = 0; j < num_kv_pairs; ++j) {
138 for (size_t i = 0; i < num_key_columns; ++i) {
139 key_content_ptr[i] +=
140 key_col_bufs[i]->Decode(key_content_ptr[i], &decoded_content);
141 }
142 for (size_t i = 0; i < num_value_columns; ++i) {
143 value_content_ptr[i] +=
144 value_col_bufs[i]->Decode(value_content_ptr[i], &decoded_content);
145 }
146 checksum_content_ptr +=
147 value_checksum_buf->Decode(checksum_content_ptr, &decoded_content);
148 }
149
150 size_t offset = decoded_content - decoded_content_base;
151 Slice output_content(decoded_content, offset);
152
153 if (out_file != nullptr) {
154 out_file->Append(output_content);
155 }
156 }
157 delete[] decoded_content_base;
158 }
159
160 void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
161 WritableFile* out_file, const std::vector<std::string>* blocks) {
162 Options options;
163 ImmutableCFOptions ioptions(options);
164 for (auto& block : *blocks) {
165 auto& slice_final_with_bit = block;
166 uint32_t format_version = 2;
167 Slice compression_dict;
168 BlockContents contents;
169 std::string decoded_content;
170
171 CompressionType type =
172 (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
173 if (type != kNoCompression) {
174 UncompressBlockContents(slice_final_with_bit.c_str(),
175 slice_final_with_bit.size() - 1, &contents,
176 format_version, compression_dict, ioptions);
177 decoded_content = std::string(contents.data.data(), contents.data.size());
178 } else {
179 decoded_content = std::move(slice_final_with_bit);
180 }
181
182 if (out_file != nullptr) {
183 out_file->Append(decoded_content);
184 }
185 }
186 }
187
188 void ColumnAwareEncodingReader::DumpDataColumns(
189 const std::string& filename,
190 const KVPairColDeclarations& kvp_col_declarations,
191 const std::vector<KVPairBlock>& kv_pair_blocks) {
192 KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations);
193 auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
194 auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
195 auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
196
197 FILE* fp = fopen(filename.c_str(), "w");
198 size_t block_id = 1;
199 for (auto& kv_pairs : kv_pair_blocks) {
200 fprintf(fp, "---------------- Block: %-4" ROCKSDB_PRIszt " ----------------\n", block_id);
201 for (auto& kv_pair : kv_pairs) {
202 const auto& key = kv_pair.first;
203 const auto& value = kv_pair.second;
204 size_t value_offset = 0;
205
206 const char* key_ptr = key.data();
207 for (auto& buf : key_col_bufs) {
208 size_t col_size = buf->Append(key_ptr);
209 std::string tmp_buf(key_ptr, col_size);
210 Slice col(tmp_buf);
211 fprintf(fp, "%s ", col.ToString(true).c_str());
212 key_ptr += col_size;
213 }
214 fprintf(fp, "|");
215
216 const char* value_ptr = value.data();
217 for (auto& buf : value_col_bufs) {
218 size_t col_size = buf->Append(value_ptr);
219 std::string tmp_buf(value_ptr, col_size);
220 Slice col(tmp_buf);
221 fprintf(fp, " %s", col.ToString(true).c_str());
222 value_ptr += col_size;
223 value_offset += col_size;
224 }
225
226 if (value_offset < value.size()) {
227 size_t col_size = value_checksum_buf->Append(value_ptr);
228 std::string tmp_buf(value_ptr, col_size);
229 Slice col(tmp_buf);
230 fprintf(fp, "|%s", col.ToString(true).c_str());
231 } else {
232 value_checksum_buf->Append(nullptr);
233 }
234 fprintf(fp, "\n");
235 }
236 block_id++;
237 }
238 fclose(fp);
239 }
240
241 namespace {
242
243 void CompressDataBlock(const std::string& output_content, Slice* slice_final,
244 CompressionType* type, std::string* compressed_output) {
245 CompressionOptions compression_opts;
246 uint32_t format_version = 2; // hard-coded version
247 Slice compression_dict;
248 *slice_final =
249 CompressBlock(output_content, compression_opts, type, format_version,
250 compression_dict, compressed_output);
251 }
252
253 } // namespace
254
255 void ColumnAwareEncodingReader::EncodeBlocksToRowFormat(
256 WritableFile* out_file, CompressionType compression_type,
257 const std::vector<KVPairBlock>& kv_pair_blocks,
258 std::vector<std::string>* blocks) {
259 std::string output_content;
260 for (auto& kv_pairs : kv_pair_blocks) {
261 output_content.clear();
262 std::string last_key;
263 size_t counter = 0;
264 const size_t block_restart_interval = 16;
265 for (auto& kv_pair : kv_pairs) {
266 const auto& key = kv_pair.first;
267 const auto& value = kv_pair.second;
268
269 Slice last_key_piece(last_key);
270 size_t shared = 0;
271 if (counter >= block_restart_interval) {
272 counter = 0;
273 } else {
274 const size_t min_length = std::min(last_key_piece.size(), key.size());
275 while ((shared < min_length) && last_key_piece[shared] == key[shared]) {
276 shared++;
277 }
278 }
279 const size_t non_shared = key.size() - shared;
280 output_content.append(key.c_str() + shared, non_shared);
281 output_content.append(value);
282
283 last_key.resize(shared);
284 last_key.append(key.data() + shared, non_shared);
285 counter++;
286 }
287 Slice slice_final;
288 auto type = compression_type;
289 std::string compressed_output;
290 CompressDataBlock(output_content, &slice_final, &type, &compressed_output);
291
292 if (out_file != nullptr) {
293 out_file->Append(slice_final);
294 }
295
296 // Add a bit in the end for decoding
297 std::string slice_final_with_bit(slice_final.data(), slice_final.size());
298 slice_final_with_bit.append(reinterpret_cast<char*>(&type), 1);
299 blocks->push_back(
300 std::string(slice_final_with_bit.data(), slice_final_with_bit.size()));
301 }
302 }
303
304 Status ColumnAwareEncodingReader::EncodeBlocks(
305 const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file,
306 CompressionType compression_type,
307 const std::vector<KVPairBlock>& kv_pair_blocks,
308 std::vector<std::string>* blocks, bool print_column_stat) {
309 std::vector<size_t> key_col_sizes(
310 kvp_col_declarations.key_col_declarations->size(), 0);
311 std::vector<size_t> value_col_sizes(
312 kvp_col_declarations.value_col_declarations->size(), 0);
313 size_t value_checksum_size = 0;
314
315 for (auto& kv_pairs : kv_pair_blocks) {
316 KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations);
317 auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
318 auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
319 auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
320
321 size_t num_kv_pairs = 0;
322 for (auto& kv_pair : kv_pairs) {
323 const auto& key = kv_pair.first;
324 const auto& value = kv_pair.second;
325 size_t value_offset = 0;
326 num_kv_pairs++;
327
328 const char* key_ptr = key.data();
329 for (auto& buf : key_col_bufs) {
330 size_t col_size = buf->Append(key_ptr);
331 key_ptr += col_size;
332 }
333
334 const char* value_ptr = value.data();
335 for (auto& buf : value_col_bufs) {
336 size_t col_size = buf->Append(value_ptr);
337 value_ptr += col_size;
338 value_offset += col_size;
339 }
340
341 if (value_offset < value.size()) {
342 value_checksum_buf->Append(value_ptr);
343 } else {
344 value_checksum_buf->Append(nullptr);
345 }
346 }
347
348 kvp_col_bufs.Finish();
349 // Get stats
350 // Compress and write a block
351 if (print_column_stat) {
352 for (size_t i = 0; i < key_col_bufs.size(); ++i) {
353 Slice slice_final;
354 auto type = compression_type;
355 std::string compressed_output;
356 CompressDataBlock(key_col_bufs[i]->GetData(), &slice_final, &type,
357 &compressed_output);
358 out_file->Append(slice_final);
359 key_col_sizes[i] += slice_final.size();
360 }
361 for (size_t i = 0; i < value_col_bufs.size(); ++i) {
362 Slice slice_final;
363 auto type = compression_type;
364 std::string compressed_output;
365 CompressDataBlock(value_col_bufs[i]->GetData(), &slice_final, &type,
366 &compressed_output);
367 out_file->Append(slice_final);
368 value_col_sizes[i] += slice_final.size();
369 }
370 Slice slice_final;
371 auto type = compression_type;
372 std::string compressed_output;
373 CompressDataBlock(value_checksum_buf->GetData(), &slice_final, &type,
374 &compressed_output);
375 out_file->Append(slice_final);
376 value_checksum_size += slice_final.size();
377 } else {
378 std::string output_content;
379 // Write column sizes
380 PutFixed64(&output_content, num_kv_pairs);
381 for (auto& buf : key_col_bufs) {
382 size_t size = buf->GetData().size();
383 PutFixed64(&output_content, size);
384 }
385 for (auto& buf : value_col_bufs) {
386 size_t size = buf->GetData().size();
387 PutFixed64(&output_content, size);
388 }
389 // Write data
390 for (auto& buf : key_col_bufs) {
391 output_content.append(buf->GetData());
392 }
393 for (auto& buf : value_col_bufs) {
394 output_content.append(buf->GetData());
395 }
396 output_content.append(value_checksum_buf->GetData());
397
398 Slice slice_final;
399 auto type = compression_type;
400 std::string compressed_output;
401 CompressDataBlock(output_content, &slice_final, &type,
402 &compressed_output);
403
404 if (out_file != nullptr) {
405 out_file->Append(slice_final);
406 }
407
408 // Add a bit in the end for decoding
409 std::string slice_final_with_bit(slice_final.data(),
410 slice_final.size() + 1);
411 slice_final_with_bit[slice_final.size()] = static_cast<char>(type);
412 blocks->push_back(std::string(slice_final_with_bit.data(),
413 slice_final_with_bit.size()));
414 }
415 }
416
417 if (print_column_stat) {
418 size_t total_size = 0;
419 for (size_t i = 0; i < key_col_sizes.size(); ++i)
420 total_size += key_col_sizes[i];
421 for (size_t i = 0; i < value_col_sizes.size(); ++i)
422 total_size += value_col_sizes[i];
423 total_size += value_checksum_size;
424
425 for (size_t i = 0; i < key_col_sizes.size(); ++i)
426 printf("Key col %" ROCKSDB_PRIszt " size: %" ROCKSDB_PRIszt " percentage %lf%%\n", i, key_col_sizes[i],
427 100.0 * key_col_sizes[i] / total_size);
428 for (size_t i = 0; i < value_col_sizes.size(); ++i)
429 printf("Value col %" ROCKSDB_PRIszt " size: %" ROCKSDB_PRIszt " percentage %lf%%\n", i,
430 value_col_sizes[i], 100.0 * value_col_sizes[i] / total_size);
431 printf("Value checksum size: %" ROCKSDB_PRIszt " percentage %lf%%\n", value_checksum_size,
432 100.0 * value_checksum_size / total_size);
433 }
434 return Status::OK();
435 }
436
437 void ColumnAwareEncodingReader::GetColDeclarationsPrimary(
438 std::vector<ColDeclaration>** key_col_declarations,
439 std::vector<ColDeclaration>** value_col_declarations,
440 ColDeclaration** value_checksum_declaration) {
441 *key_col_declarations = new std::vector<ColDeclaration>{
442 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false,
443 true),
444 ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8,
445 false, true),
446 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
447 false, true),
448 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
449 false, true),
450 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
451
452 *value_col_declarations = new std::vector<ColDeclaration>{
453 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4),
454 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4),
455 ColDeclaration("FixedLength", ColCompressionType::kColRle, 1),
456 ColDeclaration("VariableLength"),
457 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4),
458 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
459 *value_checksum_declaration = new ColDeclaration(
460 "LongFixedLength", ColCompressionType::kColNoCompression, 9,
461 true /* nullable */);
462 }
463
464 void ColumnAwareEncodingReader::GetColDeclarationsSecondary(
465 std::vector<ColDeclaration>** key_col_declarations,
466 std::vector<ColDeclaration>** value_col_declarations,
467 ColDeclaration** value_checksum_declaration) {
468 *key_col_declarations = new std::vector<ColDeclaration>{
469 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false,
470 true),
471 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
472 false, true),
473 ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8,
474 false, true),
475 ColDeclaration("FixedLength", ColCompressionType::kColRle, 1),
476 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4,
477 false, true),
478 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
479 false, true),
480 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8, false,
481 true),
482 ColDeclaration("VariableChunk", ColCompressionType::kColNoCompression),
483 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
484 *value_col_declarations = new std::vector<ColDeclaration>();
485 *value_checksum_declaration = new ColDeclaration(
486 "LongFixedLength", ColCompressionType::kColNoCompression, 9,
487 true /* nullable */);
488 }
489
490 } // namespace rocksdb
491
492 #endif // ROCKSDB_LITE