]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/column_aware_encoding_util.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / column_aware_encoding_util.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6#ifndef ROCKSDB_LITE
7
8#include "utilities/column_aware_encoding_util.h"
9
10#ifndef __STDC_FORMAT_MACROS
11#define __STDC_FORMAT_MACROS
12#endif
13
14#include <stddef.h>
15#include <stdint.h>
16#include <stdio.h>
17#include <algorithm>
18#include <utility>
19#include <vector>
20#include "include/rocksdb/comparator.h"
21#include "include/rocksdb/slice.h"
22#include "rocksdb/env.h"
23#include "rocksdb/status.h"
24#include "table/block_based_table_builder.h"
25#include "table/block_based_table_factory.h"
26#include "table/format.h"
27#include "table/table_reader.h"
11fdf7f2 28#include "util/cast_util.h"
7c673cae
FG
29#include "util/coding.h"
30#include "utilities/col_buf_decoder.h"
31#include "utilities/col_buf_encoder.h"
32
33#include "port/port.h"
34
35namespace rocksdb {
36
37ColumnAwareEncodingReader::ColumnAwareEncodingReader(
38 const std::string& file_path)
39 : file_name_(file_path),
40 ioptions_(options_),
11fdf7f2 41 moptions_(options_),
7c673cae
FG
42 internal_comparator_(BytewiseComparator()) {
43 InitTableReader(file_name_);
44}
45
46void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) {
47 std::unique_ptr<RandomAccessFile> file;
48 uint64_t file_size;
49 options_.env->NewRandomAccessFile(file_path, &file, soptions_);
50 options_.env->GetFileSize(file_path, &file_size);
51
11fdf7f2 52 file_.reset(new RandomAccessFileReader(std::move(file), file_path));
7c673cae
FG
53
54 options_.comparator = &internal_comparator_;
55 options_.table_factory = std::make_shared<BlockBasedTableFactory>();
7c673cae
FG
56
57 std::unique_ptr<TableReader> table_reader;
11fdf7f2
TL
58 options_.table_factory->NewTableReader(
59 TableReaderOptions(ioptions_, moptions_.prefix_extractor.get(), soptions_,
60 internal_comparator_),
7c673cae
FG
61 std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false);
62
11fdf7f2
TL
63 table_reader_.reset(static_cast_with_check<BlockBasedTable, TableReader>(
64 table_reader.release()));
7c673cae
FG
65}
66
67void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks(
68 std::vector<KVPairBlock>* kv_pair_blocks) {
69 table_reader_->GetKVPairsFromDataBlocks(kv_pair_blocks);
70}
71
72void ColumnAwareEncodingReader::DecodeBlocks(
73 const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file,
74 const std::vector<std::string>* blocks) {
75 char* decoded_content_base = new char[16384];
76 Options options;
77 ImmutableCFOptions ioptions(options);
78 for (auto& block : *blocks) {
79 KVPairColBufDecoders kvp_col_bufs(kvp_col_declarations);
80 auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
81 auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
82 auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
83
84 auto& slice_final_with_bit = block;
85 uint32_t format_version = 2;
7c673cae
FG
86 BlockContents contents;
87 const char* content_ptr;
88
89 CompressionType type =
90 (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
91 if (type != kNoCompression) {
11fdf7f2
TL
92 UncompressionContext uncompression_ctx(type);
93 UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
7c673cae 94 slice_final_with_bit.size() - 1, &contents,
11fdf7f2 95 format_version, ioptions);
7c673cae
FG
96 content_ptr = contents.data.data();
97 } else {
98 content_ptr = slice_final_with_bit.data();
99 }
100
101 size_t num_kv_pairs;
102 const char* header_content_ptr = content_ptr;
11fdf7f2 103 num_kv_pairs = static_cast<size_t>(DecodeFixed64(header_content_ptr));
7c673cae
FG
104
105 header_content_ptr += sizeof(size_t);
106 size_t num_key_columns = key_col_bufs.size();
107 size_t num_value_columns = value_col_bufs.size();
108 std::vector<const char*> key_content_ptr(num_key_columns);
109 std::vector<const char*> value_content_ptr(num_value_columns);
110 const char* checksum_content_ptr;
111
112 size_t num_columns = num_key_columns + num_value_columns;
113 const char* col_content_ptr =
114 header_content_ptr + sizeof(size_t) * num_columns;
115
116 // Read headers
117 for (size_t i = 0; i < num_key_columns; ++i) {
118 key_content_ptr[i] = col_content_ptr;
119 key_content_ptr[i] += key_col_bufs[i]->Init(key_content_ptr[i]);
120 size_t offset;
11fdf7f2 121 offset = static_cast<size_t>(DecodeFixed64(header_content_ptr));
7c673cae
FG
122 header_content_ptr += sizeof(size_t);
123 col_content_ptr += offset;
124 }
125 for (size_t i = 0; i < num_value_columns; ++i) {
126 value_content_ptr[i] = col_content_ptr;
127 value_content_ptr[i] += value_col_bufs[i]->Init(value_content_ptr[i]);
128 size_t offset;
11fdf7f2 129 offset = static_cast<size_t>(DecodeFixed64(header_content_ptr));
7c673cae
FG
130 header_content_ptr += sizeof(size_t);
131 col_content_ptr += offset;
132 }
133 checksum_content_ptr = col_content_ptr;
134 checksum_content_ptr += value_checksum_buf->Init(checksum_content_ptr);
135
136 // Decode block
137 char* decoded_content = decoded_content_base;
138 for (size_t j = 0; j < num_kv_pairs; ++j) {
139 for (size_t i = 0; i < num_key_columns; ++i) {
140 key_content_ptr[i] +=
141 key_col_bufs[i]->Decode(key_content_ptr[i], &decoded_content);
142 }
143 for (size_t i = 0; i < num_value_columns; ++i) {
144 value_content_ptr[i] +=
145 value_col_bufs[i]->Decode(value_content_ptr[i], &decoded_content);
146 }
147 checksum_content_ptr +=
148 value_checksum_buf->Decode(checksum_content_ptr, &decoded_content);
149 }
150
151 size_t offset = decoded_content - decoded_content_base;
152 Slice output_content(decoded_content, offset);
153
154 if (out_file != nullptr) {
155 out_file->Append(output_content);
156 }
157 }
158 delete[] decoded_content_base;
159}
160
161void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
162 WritableFile* out_file, const std::vector<std::string>* blocks) {
163 Options options;
164 ImmutableCFOptions ioptions(options);
165 for (auto& block : *blocks) {
166 auto& slice_final_with_bit = block;
167 uint32_t format_version = 2;
7c673cae
FG
168 BlockContents contents;
169 std::string decoded_content;
170
171 CompressionType type =
172 (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
173 if (type != kNoCompression) {
11fdf7f2
TL
174 UncompressionContext uncompression_ctx(type);
175 UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
7c673cae 176 slice_final_with_bit.size() - 1, &contents,
11fdf7f2 177 format_version, ioptions);
7c673cae
FG
178 decoded_content = std::string(contents.data.data(), contents.data.size());
179 } else {
180 decoded_content = std::move(slice_final_with_bit);
181 }
182
183 if (out_file != nullptr) {
184 out_file->Append(decoded_content);
185 }
186 }
187}
188
189void ColumnAwareEncodingReader::DumpDataColumns(
190 const std::string& filename,
191 const KVPairColDeclarations& kvp_col_declarations,
192 const std::vector<KVPairBlock>& kv_pair_blocks) {
193 KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations);
194 auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
195 auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
196 auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
197
198 FILE* fp = fopen(filename.c_str(), "w");
199 size_t block_id = 1;
200 for (auto& kv_pairs : kv_pair_blocks) {
201 fprintf(fp, "---------------- Block: %-4" ROCKSDB_PRIszt " ----------------\n", block_id);
202 for (auto& kv_pair : kv_pairs) {
203 const auto& key = kv_pair.first;
204 const auto& value = kv_pair.second;
205 size_t value_offset = 0;
206
207 const char* key_ptr = key.data();
208 for (auto& buf : key_col_bufs) {
209 size_t col_size = buf->Append(key_ptr);
210 std::string tmp_buf(key_ptr, col_size);
211 Slice col(tmp_buf);
212 fprintf(fp, "%s ", col.ToString(true).c_str());
213 key_ptr += col_size;
214 }
215 fprintf(fp, "|");
216
217 const char* value_ptr = value.data();
218 for (auto& buf : value_col_bufs) {
219 size_t col_size = buf->Append(value_ptr);
220 std::string tmp_buf(value_ptr, col_size);
221 Slice col(tmp_buf);
222 fprintf(fp, " %s", col.ToString(true).c_str());
223 value_ptr += col_size;
224 value_offset += col_size;
225 }
226
227 if (value_offset < value.size()) {
228 size_t col_size = value_checksum_buf->Append(value_ptr);
229 std::string tmp_buf(value_ptr, col_size);
230 Slice col(tmp_buf);
231 fprintf(fp, "|%s", col.ToString(true).c_str());
232 } else {
233 value_checksum_buf->Append(nullptr);
234 }
235 fprintf(fp, "\n");
236 }
237 block_id++;
238 }
239 fclose(fp);
240}
241
242namespace {
243
244void CompressDataBlock(const std::string& output_content, Slice* slice_final,
245 CompressionType* type, std::string* compressed_output) {
11fdf7f2 246 CompressionContext compression_ctx(*type);
7c673cae 247 uint32_t format_version = 2; // hard-coded version
11fdf7f2
TL
248 *slice_final = CompressBlock(output_content, compression_ctx, type,
249 format_version, compressed_output);
7c673cae
FG
250}
251
252} // namespace
253
254void ColumnAwareEncodingReader::EncodeBlocksToRowFormat(
255 WritableFile* out_file, CompressionType compression_type,
256 const std::vector<KVPairBlock>& kv_pair_blocks,
257 std::vector<std::string>* blocks) {
258 std::string output_content;
259 for (auto& kv_pairs : kv_pair_blocks) {
260 output_content.clear();
261 std::string last_key;
262 size_t counter = 0;
263 const size_t block_restart_interval = 16;
264 for (auto& kv_pair : kv_pairs) {
265 const auto& key = kv_pair.first;
266 const auto& value = kv_pair.second;
267
268 Slice last_key_piece(last_key);
269 size_t shared = 0;
270 if (counter >= block_restart_interval) {
271 counter = 0;
272 } else {
273 const size_t min_length = std::min(last_key_piece.size(), key.size());
274 while ((shared < min_length) && last_key_piece[shared] == key[shared]) {
275 shared++;
276 }
277 }
278 const size_t non_shared = key.size() - shared;
279 output_content.append(key.c_str() + shared, non_shared);
280 output_content.append(value);
281
282 last_key.resize(shared);
283 last_key.append(key.data() + shared, non_shared);
284 counter++;
285 }
286 Slice slice_final;
287 auto type = compression_type;
288 std::string compressed_output;
289 CompressDataBlock(output_content, &slice_final, &type, &compressed_output);
290
291 if (out_file != nullptr) {
292 out_file->Append(slice_final);
293 }
294
295 // Add a bit in the end for decoding
296 std::string slice_final_with_bit(slice_final.data(), slice_final.size());
297 slice_final_with_bit.append(reinterpret_cast<char*>(&type), 1);
298 blocks->push_back(
299 std::string(slice_final_with_bit.data(), slice_final_with_bit.size()));
300 }
301}
302
303Status ColumnAwareEncodingReader::EncodeBlocks(
304 const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file,
305 CompressionType compression_type,
306 const std::vector<KVPairBlock>& kv_pair_blocks,
307 std::vector<std::string>* blocks, bool print_column_stat) {
308 std::vector<size_t> key_col_sizes(
309 kvp_col_declarations.key_col_declarations->size(), 0);
310 std::vector<size_t> value_col_sizes(
311 kvp_col_declarations.value_col_declarations->size(), 0);
312 size_t value_checksum_size = 0;
313
314 for (auto& kv_pairs : kv_pair_blocks) {
315 KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations);
316 auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
317 auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
318 auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
319
320 size_t num_kv_pairs = 0;
321 for (auto& kv_pair : kv_pairs) {
322 const auto& key = kv_pair.first;
323 const auto& value = kv_pair.second;
324 size_t value_offset = 0;
325 num_kv_pairs++;
326
327 const char* key_ptr = key.data();
328 for (auto& buf : key_col_bufs) {
329 size_t col_size = buf->Append(key_ptr);
330 key_ptr += col_size;
331 }
332
333 const char* value_ptr = value.data();
334 for (auto& buf : value_col_bufs) {
335 size_t col_size = buf->Append(value_ptr);
336 value_ptr += col_size;
337 value_offset += col_size;
338 }
339
340 if (value_offset < value.size()) {
341 value_checksum_buf->Append(value_ptr);
342 } else {
343 value_checksum_buf->Append(nullptr);
344 }
345 }
346
347 kvp_col_bufs.Finish();
348 // Get stats
349 // Compress and write a block
350 if (print_column_stat) {
351 for (size_t i = 0; i < key_col_bufs.size(); ++i) {
352 Slice slice_final;
353 auto type = compression_type;
354 std::string compressed_output;
355 CompressDataBlock(key_col_bufs[i]->GetData(), &slice_final, &type,
356 &compressed_output);
357 out_file->Append(slice_final);
358 key_col_sizes[i] += slice_final.size();
359 }
360 for (size_t i = 0; i < value_col_bufs.size(); ++i) {
361 Slice slice_final;
362 auto type = compression_type;
363 std::string compressed_output;
364 CompressDataBlock(value_col_bufs[i]->GetData(), &slice_final, &type,
365 &compressed_output);
366 out_file->Append(slice_final);
367 value_col_sizes[i] += slice_final.size();
368 }
369 Slice slice_final;
370 auto type = compression_type;
371 std::string compressed_output;
372 CompressDataBlock(value_checksum_buf->GetData(), &slice_final, &type,
373 &compressed_output);
374 out_file->Append(slice_final);
375 value_checksum_size += slice_final.size();
376 } else {
377 std::string output_content;
378 // Write column sizes
379 PutFixed64(&output_content, num_kv_pairs);
380 for (auto& buf : key_col_bufs) {
381 size_t size = buf->GetData().size();
382 PutFixed64(&output_content, size);
383 }
384 for (auto& buf : value_col_bufs) {
385 size_t size = buf->GetData().size();
386 PutFixed64(&output_content, size);
387 }
388 // Write data
389 for (auto& buf : key_col_bufs) {
390 output_content.append(buf->GetData());
391 }
392 for (auto& buf : value_col_bufs) {
393 output_content.append(buf->GetData());
394 }
395 output_content.append(value_checksum_buf->GetData());
396
397 Slice slice_final;
398 auto type = compression_type;
399 std::string compressed_output;
400 CompressDataBlock(output_content, &slice_final, &type,
401 &compressed_output);
402
403 if (out_file != nullptr) {
404 out_file->Append(slice_final);
405 }
406
407 // Add a bit in the end for decoding
408 std::string slice_final_with_bit(slice_final.data(),
409 slice_final.size() + 1);
410 slice_final_with_bit[slice_final.size()] = static_cast<char>(type);
411 blocks->push_back(std::string(slice_final_with_bit.data(),
412 slice_final_with_bit.size()));
413 }
414 }
415
416 if (print_column_stat) {
417 size_t total_size = 0;
418 for (size_t i = 0; i < key_col_sizes.size(); ++i)
419 total_size += key_col_sizes[i];
420 for (size_t i = 0; i < value_col_sizes.size(); ++i)
421 total_size += value_col_sizes[i];
422 total_size += value_checksum_size;
423
424 for (size_t i = 0; i < key_col_sizes.size(); ++i)
425 printf("Key col %" ROCKSDB_PRIszt " size: %" ROCKSDB_PRIszt " percentage %lf%%\n", i, key_col_sizes[i],
426 100.0 * key_col_sizes[i] / total_size);
427 for (size_t i = 0; i < value_col_sizes.size(); ++i)
428 printf("Value col %" ROCKSDB_PRIszt " size: %" ROCKSDB_PRIszt " percentage %lf%%\n", i,
429 value_col_sizes[i], 100.0 * value_col_sizes[i] / total_size);
430 printf("Value checksum size: %" ROCKSDB_PRIszt " percentage %lf%%\n", value_checksum_size,
431 100.0 * value_checksum_size / total_size);
432 }
433 return Status::OK();
434}
435
436void ColumnAwareEncodingReader::GetColDeclarationsPrimary(
437 std::vector<ColDeclaration>** key_col_declarations,
438 std::vector<ColDeclaration>** value_col_declarations,
439 ColDeclaration** value_checksum_declaration) {
440 *key_col_declarations = new std::vector<ColDeclaration>{
441 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false,
442 true),
443 ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8,
444 false, true),
445 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
446 false, true),
447 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
448 false, true),
449 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
450
451 *value_col_declarations = new std::vector<ColDeclaration>{
452 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4),
453 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4),
454 ColDeclaration("FixedLength", ColCompressionType::kColRle, 1),
455 ColDeclaration("VariableLength"),
456 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4),
457 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
458 *value_checksum_declaration = new ColDeclaration(
459 "LongFixedLength", ColCompressionType::kColNoCompression, 9,
460 true /* nullable */);
461}
462
463void ColumnAwareEncodingReader::GetColDeclarationsSecondary(
464 std::vector<ColDeclaration>** key_col_declarations,
465 std::vector<ColDeclaration>** value_col_declarations,
466 ColDeclaration** value_checksum_declaration) {
467 *key_col_declarations = new std::vector<ColDeclaration>{
468 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false,
469 true),
470 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
471 false, true),
472 ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8,
473 false, true),
474 ColDeclaration("FixedLength", ColCompressionType::kColRle, 1),
475 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4,
476 false, true),
477 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
478 false, true),
479 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8, false,
480 true),
481 ColDeclaration("VariableChunk", ColCompressionType::kColNoCompression),
482 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
483 *value_col_declarations = new std::vector<ColDeclaration>();
484 *value_checksum_declaration = new ColDeclaration(
485 "LongFixedLength", ColCompressionType::kColNoCompression, 9,
486 true /* nullable */);
487}
488
489} // namespace rocksdb
490
491#endif // ROCKSDB_LITE