1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
8 #include "utilities/column_aware_encoding_util.h"
10 #ifndef __STDC_FORMAT_MACROS
11 #define __STDC_FORMAT_MACROS
20 #include "include/rocksdb/comparator.h"
21 #include "include/rocksdb/slice.h"
22 #include "rocksdb/env.h"
23 #include "rocksdb/status.h"
24 #include "table/block_based_table_builder.h"
25 #include "table/block_based_table_factory.h"
26 #include "table/format.h"
27 #include "table/table_reader.h"
28 #include "util/coding.h"
29 #include "utilities/col_buf_decoder.h"
30 #include "utilities/col_buf_encoder.h"
32 #include "port/port.h"
36 ColumnAwareEncodingReader::ColumnAwareEncodingReader(
37 const std::string
& file_path
)
38 : file_name_(file_path
),
40 internal_comparator_(BytewiseComparator()) {
41 InitTableReader(file_name_
);
44 void ColumnAwareEncodingReader::InitTableReader(const std::string
& file_path
) {
45 std::unique_ptr
<RandomAccessFile
> file
;
47 options_
.env
->NewRandomAccessFile(file_path
, &file
, soptions_
);
48 options_
.env
->GetFileSize(file_path
, &file_size
);
50 file_
.reset(new RandomAccessFileReader(std::move(file
)));
52 options_
.comparator
= &internal_comparator_
;
53 options_
.table_factory
= std::make_shared
<BlockBasedTableFactory
>();
54 shared_ptr
<BlockBasedTableFactory
> block_table_factory
=
55 std::dynamic_pointer_cast
<BlockBasedTableFactory
>(options_
.table_factory
);
57 std::unique_ptr
<TableReader
> table_reader
;
58 block_table_factory
->NewTableReader(
59 TableReaderOptions(ioptions_
, soptions_
, internal_comparator_
,
60 /*skip_filters=*/false),
61 std::move(file_
), file_size
, &table_reader
, /*enable_prefetch=*/false);
63 table_reader_
.reset(dynamic_cast<BlockBasedTable
*>(table_reader
.release()));
66 void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks(
67 std::vector
<KVPairBlock
>* kv_pair_blocks
) {
68 table_reader_
->GetKVPairsFromDataBlocks(kv_pair_blocks
);
71 void ColumnAwareEncodingReader::DecodeBlocks(
72 const KVPairColDeclarations
& kvp_col_declarations
, WritableFile
* out_file
,
73 const std::vector
<std::string
>* blocks
) {
74 char* decoded_content_base
= new char[16384];
76 ImmutableCFOptions
ioptions(options
);
77 for (auto& block
: *blocks
) {
78 KVPairColBufDecoders
kvp_col_bufs(kvp_col_declarations
);
79 auto& key_col_bufs
= kvp_col_bufs
.key_col_bufs
;
80 auto& value_col_bufs
= kvp_col_bufs
.value_col_bufs
;
81 auto& value_checksum_buf
= kvp_col_bufs
.value_checksum_buf
;
83 auto& slice_final_with_bit
= block
;
84 uint32_t format_version
= 2;
85 Slice compression_dict
;
86 BlockContents contents
;
87 const char* content_ptr
;
89 CompressionType type
=
90 (CompressionType
)slice_final_with_bit
[slice_final_with_bit
.size() - 1];
91 if (type
!= kNoCompression
) {
92 UncompressBlockContents(slice_final_with_bit
.c_str(),
93 slice_final_with_bit
.size() - 1, &contents
,
94 format_version
, compression_dict
, ioptions
);
95 content_ptr
= contents
.data
.data();
97 content_ptr
= slice_final_with_bit
.data();
101 const char* header_content_ptr
= content_ptr
;
102 num_kv_pairs
= DecodeFixed64(header_content_ptr
);
104 header_content_ptr
+= sizeof(size_t);
105 size_t num_key_columns
= key_col_bufs
.size();
106 size_t num_value_columns
= value_col_bufs
.size();
107 std::vector
<const char*> key_content_ptr(num_key_columns
);
108 std::vector
<const char*> value_content_ptr(num_value_columns
);
109 const char* checksum_content_ptr
;
111 size_t num_columns
= num_key_columns
+ num_value_columns
;
112 const char* col_content_ptr
=
113 header_content_ptr
+ sizeof(size_t) * num_columns
;
116 for (size_t i
= 0; i
< num_key_columns
; ++i
) {
117 key_content_ptr
[i
] = col_content_ptr
;
118 key_content_ptr
[i
] += key_col_bufs
[i
]->Init(key_content_ptr
[i
]);
120 offset
= DecodeFixed64(header_content_ptr
);
121 header_content_ptr
+= sizeof(size_t);
122 col_content_ptr
+= offset
;
124 for (size_t i
= 0; i
< num_value_columns
; ++i
) {
125 value_content_ptr
[i
] = col_content_ptr
;
126 value_content_ptr
[i
] += value_col_bufs
[i
]->Init(value_content_ptr
[i
]);
128 offset
= DecodeFixed64(header_content_ptr
);
129 header_content_ptr
+= sizeof(size_t);
130 col_content_ptr
+= offset
;
132 checksum_content_ptr
= col_content_ptr
;
133 checksum_content_ptr
+= value_checksum_buf
->Init(checksum_content_ptr
);
136 char* decoded_content
= decoded_content_base
;
137 for (size_t j
= 0; j
< num_kv_pairs
; ++j
) {
138 for (size_t i
= 0; i
< num_key_columns
; ++i
) {
139 key_content_ptr
[i
] +=
140 key_col_bufs
[i
]->Decode(key_content_ptr
[i
], &decoded_content
);
142 for (size_t i
= 0; i
< num_value_columns
; ++i
) {
143 value_content_ptr
[i
] +=
144 value_col_bufs
[i
]->Decode(value_content_ptr
[i
], &decoded_content
);
146 checksum_content_ptr
+=
147 value_checksum_buf
->Decode(checksum_content_ptr
, &decoded_content
);
150 size_t offset
= decoded_content
- decoded_content_base
;
151 Slice
output_content(decoded_content
, offset
);
153 if (out_file
!= nullptr) {
154 out_file
->Append(output_content
);
157 delete[] decoded_content_base
;
160 void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
161 WritableFile
* out_file
, const std::vector
<std::string
>* blocks
) {
163 ImmutableCFOptions
ioptions(options
);
164 for (auto& block
: *blocks
) {
165 auto& slice_final_with_bit
= block
;
166 uint32_t format_version
= 2;
167 Slice compression_dict
;
168 BlockContents contents
;
169 std::string decoded_content
;
171 CompressionType type
=
172 (CompressionType
)slice_final_with_bit
[slice_final_with_bit
.size() - 1];
173 if (type
!= kNoCompression
) {
174 UncompressBlockContents(slice_final_with_bit
.c_str(),
175 slice_final_with_bit
.size() - 1, &contents
,
176 format_version
, compression_dict
, ioptions
);
177 decoded_content
= std::string(contents
.data
.data(), contents
.data
.size());
179 decoded_content
= std::move(slice_final_with_bit
);
182 if (out_file
!= nullptr) {
183 out_file
->Append(decoded_content
);
188 void ColumnAwareEncodingReader::DumpDataColumns(
189 const std::string
& filename
,
190 const KVPairColDeclarations
& kvp_col_declarations
,
191 const std::vector
<KVPairBlock
>& kv_pair_blocks
) {
192 KVPairColBufEncoders
kvp_col_bufs(kvp_col_declarations
);
193 auto& key_col_bufs
= kvp_col_bufs
.key_col_bufs
;
194 auto& value_col_bufs
= kvp_col_bufs
.value_col_bufs
;
195 auto& value_checksum_buf
= kvp_col_bufs
.value_checksum_buf
;
197 FILE* fp
= fopen(filename
.c_str(), "w");
199 for (auto& kv_pairs
: kv_pair_blocks
) {
200 fprintf(fp
, "---------------- Block: %-4" ROCKSDB_PRIszt
" ----------------\n", block_id
);
201 for (auto& kv_pair
: kv_pairs
) {
202 const auto& key
= kv_pair
.first
;
203 const auto& value
= kv_pair
.second
;
204 size_t value_offset
= 0;
206 const char* key_ptr
= key
.data();
207 for (auto& buf
: key_col_bufs
) {
208 size_t col_size
= buf
->Append(key_ptr
);
209 std::string
tmp_buf(key_ptr
, col_size
);
211 fprintf(fp
, "%s ", col
.ToString(true).c_str());
216 const char* value_ptr
= value
.data();
217 for (auto& buf
: value_col_bufs
) {
218 size_t col_size
= buf
->Append(value_ptr
);
219 std::string
tmp_buf(value_ptr
, col_size
);
221 fprintf(fp
, " %s", col
.ToString(true).c_str());
222 value_ptr
+= col_size
;
223 value_offset
+= col_size
;
226 if (value_offset
< value
.size()) {
227 size_t col_size
= value_checksum_buf
->Append(value_ptr
);
228 std::string
tmp_buf(value_ptr
, col_size
);
230 fprintf(fp
, "|%s", col
.ToString(true).c_str());
232 value_checksum_buf
->Append(nullptr);
243 void CompressDataBlock(const std::string
& output_content
, Slice
* slice_final
,
244 CompressionType
* type
, std::string
* compressed_output
) {
245 CompressionOptions compression_opts
;
246 uint32_t format_version
= 2; // hard-coded version
247 Slice compression_dict
;
249 CompressBlock(output_content
, compression_opts
, type
, format_version
,
250 compression_dict
, compressed_output
);
255 void ColumnAwareEncodingReader::EncodeBlocksToRowFormat(
256 WritableFile
* out_file
, CompressionType compression_type
,
257 const std::vector
<KVPairBlock
>& kv_pair_blocks
,
258 std::vector
<std::string
>* blocks
) {
259 std::string output_content
;
260 for (auto& kv_pairs
: kv_pair_blocks
) {
261 output_content
.clear();
262 std::string last_key
;
264 const size_t block_restart_interval
= 16;
265 for (auto& kv_pair
: kv_pairs
) {
266 const auto& key
= kv_pair
.first
;
267 const auto& value
= kv_pair
.second
;
269 Slice
last_key_piece(last_key
);
271 if (counter
>= block_restart_interval
) {
274 const size_t min_length
= std::min(last_key_piece
.size(), key
.size());
275 while ((shared
< min_length
) && last_key_piece
[shared
] == key
[shared
]) {
279 const size_t non_shared
= key
.size() - shared
;
280 output_content
.append(key
.c_str() + shared
, non_shared
);
281 output_content
.append(value
);
283 last_key
.resize(shared
);
284 last_key
.append(key
.data() + shared
, non_shared
);
288 auto type
= compression_type
;
289 std::string compressed_output
;
290 CompressDataBlock(output_content
, &slice_final
, &type
, &compressed_output
);
292 if (out_file
!= nullptr) {
293 out_file
->Append(slice_final
);
296 // Add a bit in the end for decoding
297 std::string
slice_final_with_bit(slice_final
.data(), slice_final
.size());
298 slice_final_with_bit
.append(reinterpret_cast<char*>(&type
), 1);
300 std::string(slice_final_with_bit
.data(), slice_final_with_bit
.size()));
304 Status
ColumnAwareEncodingReader::EncodeBlocks(
305 const KVPairColDeclarations
& kvp_col_declarations
, WritableFile
* out_file
,
306 CompressionType compression_type
,
307 const std::vector
<KVPairBlock
>& kv_pair_blocks
,
308 std::vector
<std::string
>* blocks
, bool print_column_stat
) {
309 std::vector
<size_t> key_col_sizes(
310 kvp_col_declarations
.key_col_declarations
->size(), 0);
311 std::vector
<size_t> value_col_sizes(
312 kvp_col_declarations
.value_col_declarations
->size(), 0);
313 size_t value_checksum_size
= 0;
315 for (auto& kv_pairs
: kv_pair_blocks
) {
316 KVPairColBufEncoders
kvp_col_bufs(kvp_col_declarations
);
317 auto& key_col_bufs
= kvp_col_bufs
.key_col_bufs
;
318 auto& value_col_bufs
= kvp_col_bufs
.value_col_bufs
;
319 auto& value_checksum_buf
= kvp_col_bufs
.value_checksum_buf
;
321 size_t num_kv_pairs
= 0;
322 for (auto& kv_pair
: kv_pairs
) {
323 const auto& key
= kv_pair
.first
;
324 const auto& value
= kv_pair
.second
;
325 size_t value_offset
= 0;
328 const char* key_ptr
= key
.data();
329 for (auto& buf
: key_col_bufs
) {
330 size_t col_size
= buf
->Append(key_ptr
);
334 const char* value_ptr
= value
.data();
335 for (auto& buf
: value_col_bufs
) {
336 size_t col_size
= buf
->Append(value_ptr
);
337 value_ptr
+= col_size
;
338 value_offset
+= col_size
;
341 if (value_offset
< value
.size()) {
342 value_checksum_buf
->Append(value_ptr
);
344 value_checksum_buf
->Append(nullptr);
348 kvp_col_bufs
.Finish();
350 // Compress and write a block
351 if (print_column_stat
) {
352 for (size_t i
= 0; i
< key_col_bufs
.size(); ++i
) {
354 auto type
= compression_type
;
355 std::string compressed_output
;
356 CompressDataBlock(key_col_bufs
[i
]->GetData(), &slice_final
, &type
,
358 out_file
->Append(slice_final
);
359 key_col_sizes
[i
] += slice_final
.size();
361 for (size_t i
= 0; i
< value_col_bufs
.size(); ++i
) {
363 auto type
= compression_type
;
364 std::string compressed_output
;
365 CompressDataBlock(value_col_bufs
[i
]->GetData(), &slice_final
, &type
,
367 out_file
->Append(slice_final
);
368 value_col_sizes
[i
] += slice_final
.size();
371 auto type
= compression_type
;
372 std::string compressed_output
;
373 CompressDataBlock(value_checksum_buf
->GetData(), &slice_final
, &type
,
375 out_file
->Append(slice_final
);
376 value_checksum_size
+= slice_final
.size();
378 std::string output_content
;
379 // Write column sizes
380 PutFixed64(&output_content
, num_kv_pairs
);
381 for (auto& buf
: key_col_bufs
) {
382 size_t size
= buf
->GetData().size();
383 PutFixed64(&output_content
, size
);
385 for (auto& buf
: value_col_bufs
) {
386 size_t size
= buf
->GetData().size();
387 PutFixed64(&output_content
, size
);
390 for (auto& buf
: key_col_bufs
) {
391 output_content
.append(buf
->GetData());
393 for (auto& buf
: value_col_bufs
) {
394 output_content
.append(buf
->GetData());
396 output_content
.append(value_checksum_buf
->GetData());
399 auto type
= compression_type
;
400 std::string compressed_output
;
401 CompressDataBlock(output_content
, &slice_final
, &type
,
404 if (out_file
!= nullptr) {
405 out_file
->Append(slice_final
);
408 // Add a bit in the end for decoding
409 std::string
slice_final_with_bit(slice_final
.data(),
410 slice_final
.size() + 1);
411 slice_final_with_bit
[slice_final
.size()] = static_cast<char>(type
);
412 blocks
->push_back(std::string(slice_final_with_bit
.data(),
413 slice_final_with_bit
.size()));
417 if (print_column_stat
) {
418 size_t total_size
= 0;
419 for (size_t i
= 0; i
< key_col_sizes
.size(); ++i
)
420 total_size
+= key_col_sizes
[i
];
421 for (size_t i
= 0; i
< value_col_sizes
.size(); ++i
)
422 total_size
+= value_col_sizes
[i
];
423 total_size
+= value_checksum_size
;
425 for (size_t i
= 0; i
< key_col_sizes
.size(); ++i
)
426 printf("Key col %" ROCKSDB_PRIszt
" size: %" ROCKSDB_PRIszt
" percentage %lf%%\n", i
, key_col_sizes
[i
],
427 100.0 * key_col_sizes
[i
] / total_size
);
428 for (size_t i
= 0; i
< value_col_sizes
.size(); ++i
)
429 printf("Value col %" ROCKSDB_PRIszt
" size: %" ROCKSDB_PRIszt
" percentage %lf%%\n", i
,
430 value_col_sizes
[i
], 100.0 * value_col_sizes
[i
] / total_size
);
431 printf("Value checksum size: %" ROCKSDB_PRIszt
" percentage %lf%%\n", value_checksum_size
,
432 100.0 * value_checksum_size
/ total_size
);
437 void ColumnAwareEncodingReader::GetColDeclarationsPrimary(
438 std::vector
<ColDeclaration
>** key_col_declarations
,
439 std::vector
<ColDeclaration
>** value_col_declarations
,
440 ColDeclaration
** value_checksum_declaration
) {
441 *key_col_declarations
= new std::vector
<ColDeclaration
>{
442 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 4, false,
444 ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint
, 8,
446 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint
, 8,
448 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint
, 8,
450 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 8)};
452 *value_col_declarations
= new std::vector
<ColDeclaration
>{
453 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 4),
454 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 4),
455 ColDeclaration("FixedLength", ColCompressionType::kColRle
, 1),
456 ColDeclaration("VariableLength"),
457 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint
, 4),
458 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 8)};
459 *value_checksum_declaration
= new ColDeclaration(
460 "LongFixedLength", ColCompressionType::kColNoCompression
, 9,
461 true /* nullable */);
464 void ColumnAwareEncodingReader::GetColDeclarationsSecondary(
465 std::vector
<ColDeclaration
>** key_col_declarations
,
466 std::vector
<ColDeclaration
>** value_col_declarations
,
467 ColDeclaration
** value_checksum_declaration
) {
468 *key_col_declarations
= new std::vector
<ColDeclaration
>{
469 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 4, false,
471 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint
, 8,
473 ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint
, 8,
475 ColDeclaration("FixedLength", ColCompressionType::kColRle
, 1),
476 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint
, 4,
478 ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint
, 8,
480 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 8, false,
482 ColDeclaration("VariableChunk", ColCompressionType::kColNoCompression
),
483 ColDeclaration("FixedLength", ColCompressionType::kColRleVarint
, 8)};
484 *value_col_declarations
= new std::vector
<ColDeclaration
>();
485 *value_checksum_declaration
= new ColDeclaration(
486 "LongFixedLength", ColCompressionType::kColNoCompression
, 9,
487 true /* nullable */);
490 } // namespace rocksdb
492 #endif // ROCKSDB_LITE