]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "table/block_based_table_builder.h" | |
11 | ||
12 | #include <assert.h> | |
7c673cae FG |
13 | #include <stdio.h> |
14 | ||
15 | #include <list> | |
16 | #include <map> | |
17 | #include <memory> | |
18 | #include <string> | |
19 | #include <unordered_map> | |
20 | #include <utility> | |
21 | ||
22 | #include "db/dbformat.h" | |
23 | ||
24 | #include "rocksdb/cache.h" | |
25 | #include "rocksdb/comparator.h" | |
26 | #include "rocksdb/env.h" | |
27 | #include "rocksdb/filter_policy.h" | |
28 | #include "rocksdb/flush_block_policy.h" | |
29 | #include "rocksdb/merge_operator.h" | |
30 | #include "rocksdb/table.h" | |
31 | ||
32 | #include "table/block.h" | |
33 | #include "table/block_based_filter_block.h" | |
34 | #include "table/block_based_table_factory.h" | |
35 | #include "table/block_based_table_reader.h" | |
36 | #include "table/block_builder.h" | |
37 | #include "table/filter_block.h" | |
38 | #include "table/format.h" | |
39 | #include "table/full_filter_block.h" | |
7c673cae FG |
40 | #include "table/table_builder.h" |
41 | ||
7c673cae FG |
42 | #include "util/coding.h" |
43 | #include "util/compression.h" | |
44 | #include "util/crc32c.h" | |
494da23a | 45 | #include "util/memory_allocator.h" |
7c673cae | 46 | #include "util/stop_watch.h" |
11fdf7f2 | 47 | #include "util/string_util.h" |
7c673cae FG |
48 | #include "util/xxhash.h" |
49 | ||
50 | #include "table/index_builder.h" | |
51 | #include "table/partitioned_filter_block.h" | |
52 | ||
53 | namespace rocksdb { | |
54 | ||
55 | extern const std::string kHashIndexPrefixesBlock; | |
56 | extern const std::string kHashIndexPrefixesMetadataBlock; | |
57 | ||
58 | typedef BlockBasedTableOptions::IndexType IndexType; | |
59 | ||
60 | // Without anonymous namespace here, we fail the warning -Wmissing-prototypes | |
61 | namespace { | |
62 | ||
63 | // Create a filter block builder based on its type. | |
64 | FilterBlockBuilder* CreateFilterBlockBuilder( | |
11fdf7f2 TL |
65 | const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt, |
66 | const BlockBasedTableOptions& table_opt, | |
67 | const bool use_delta_encoding_for_index_values, | |
7c673cae FG |
68 | PartitionedIndexBuilder* const p_index_builder) { |
69 | if (table_opt.filter_policy == nullptr) return nullptr; | |
70 | ||
71 | FilterBitsBuilder* filter_bits_builder = | |
72 | table_opt.filter_policy->GetFilterBitsBuilder(); | |
73 | if (filter_bits_builder == nullptr) { | |
11fdf7f2 TL |
74 | return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(), |
75 | table_opt); | |
7c673cae FG |
76 | } else { |
77 | if (table_opt.partition_filters) { | |
78 | assert(p_index_builder != nullptr); | |
11fdf7f2 TL |
79 | // Since after partition cut request from filter builder it takes time |
80 | // until index builder actully cuts the partition, we take the lower bound | |
81 | // as partition size. | |
82 | assert(table_opt.block_size_deviation <= 100); | |
494da23a TL |
83 | auto partition_size = |
84 | static_cast<uint32_t>(((table_opt.metadata_block_size * | |
85 | (100 - table_opt.block_size_deviation)) + | |
86 | 99) / | |
87 | 100); | |
11fdf7f2 | 88 | partition_size = std::max(partition_size, static_cast<uint32_t>(1)); |
7c673cae | 89 | return new PartitionedFilterBlockBuilder( |
11fdf7f2 | 90 | mopt.prefix_extractor.get(), table_opt.whole_key_filtering, |
7c673cae | 91 | filter_bits_builder, table_opt.index_block_restart_interval, |
11fdf7f2 | 92 | use_delta_encoding_for_index_values, p_index_builder, partition_size); |
7c673cae | 93 | } else { |
11fdf7f2 | 94 | return new FullFilterBlockBuilder(mopt.prefix_extractor.get(), |
7c673cae FG |
95 | table_opt.whole_key_filtering, |
96 | filter_bits_builder); | |
97 | } | |
98 | } | |
99 | } | |
100 | ||
101 | bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { | |
102 | // Check to see if compressed less than 12.5% | |
103 | return compressed_size < raw_size - (raw_size / 8u); | |
104 | } | |
105 | ||
494da23a TL |
106 | bool CompressBlockInternal(const Slice& raw, |
107 | const CompressionInfo& compression_info, | |
108 | uint32_t format_version, | |
109 | std::string* compressed_output) { | |
7c673cae FG |
110 | // Will return compressed block contents if (1) the compression method is |
111 | // supported in this platform and (2) the compression rate is "good enough". | |
494da23a | 112 | switch (compression_info.type()) { |
7c673cae | 113 | case kSnappyCompression: |
494da23a TL |
114 | return Snappy_Compress(compression_info, raw.data(), raw.size(), |
115 | compressed_output); | |
7c673cae | 116 | case kZlibCompression: |
494da23a TL |
117 | return Zlib_Compress( |
118 | compression_info, | |
119 | GetCompressFormatForVersion(kZlibCompression, format_version), | |
120 | raw.data(), raw.size(), compressed_output); | |
7c673cae | 121 | case kBZip2Compression: |
494da23a TL |
122 | return BZip2_Compress( |
123 | compression_info, | |
124 | GetCompressFormatForVersion(kBZip2Compression, format_version), | |
125 | raw.data(), raw.size(), compressed_output); | |
7c673cae | 126 | case kLZ4Compression: |
494da23a TL |
127 | return LZ4_Compress( |
128 | compression_info, | |
129 | GetCompressFormatForVersion(kLZ4Compression, format_version), | |
130 | raw.data(), raw.size(), compressed_output); | |
7c673cae | 131 | case kLZ4HCCompression: |
494da23a TL |
132 | return LZ4HC_Compress( |
133 | compression_info, | |
134 | GetCompressFormatForVersion(kLZ4HCCompression, format_version), | |
135 | raw.data(), raw.size(), compressed_output); | |
7c673cae | 136 | case kXpressCompression: |
494da23a | 137 | return XPRESS_Compress(raw.data(), raw.size(), compressed_output); |
7c673cae FG |
138 | case kZSTD: |
139 | case kZSTDNotFinalCompression: | |
494da23a TL |
140 | return ZSTD_Compress(compression_info, raw.data(), raw.size(), |
141 | compressed_output); | |
142 | default: | |
143 | // Do not recognize this compression type | |
144 | return false; | |
145 | } | |
146 | } | |
147 | ||
148 | } // namespace | |
149 | ||
150 | // format_version is the block format as defined in include/rocksdb/table.h | |
151 | Slice CompressBlock(const Slice& raw, const CompressionInfo& info, | |
152 | CompressionType* type, uint32_t format_version, | |
153 | bool do_sample, std::string* compressed_output, | |
154 | std::string* sampled_output_fast, | |
155 | std::string* sampled_output_slow) { | |
156 | *type = info.type(); | |
157 | ||
158 | if (info.type() == kNoCompression && !info.SampleForCompression()) { | |
159 | return raw; | |
7c673cae FG |
160 | } |
161 | ||
494da23a TL |
162 | // If requested, we sample one in every N block with a |
163 | // fast and slow compression algorithm and report the stats. | |
164 | // The users can use these stats to decide if it is worthwhile | |
165 | // enabling compression and they also get a hint about which | |
166 | // compression algorithm wil be beneficial. | |
167 | if (do_sample && info.SampleForCompression() && | |
168 | Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) && | |
169 | sampled_output_fast && sampled_output_slow) { | |
170 | // Sampling with a fast compression algorithm | |
171 | if (LZ4_Supported() || Snappy_Supported()) { | |
172 | CompressionType c = | |
173 | LZ4_Supported() ? kLZ4Compression : kSnappyCompression; | |
174 | CompressionContext context(c); | |
175 | CompressionOptions options; | |
176 | CompressionInfo info_tmp(options, context, | |
177 | CompressionDict::GetEmptyDict(), c, | |
178 | info.SampleForCompression()); | |
179 | ||
180 | CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast); | |
181 | } | |
182 | ||
183 | // Sampling with a slow but high-compression algorithm | |
184 | if (ZSTD_Supported() || Zlib_Supported()) { | |
185 | CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression; | |
186 | CompressionContext context(c); | |
187 | CompressionOptions options; | |
188 | CompressionInfo info_tmp(options, context, | |
189 | CompressionDict::GetEmptyDict(), c, | |
190 | info.SampleForCompression()); | |
191 | CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow); | |
192 | } | |
193 | } | |
194 | ||
195 | // Actually compress the data | |
196 | if (*type != kNoCompression) { | |
197 | if (CompressBlockInternal(raw, info, format_version, compressed_output) && | |
198 | GoodCompressionRatio(compressed_output->size(), raw.size())) { | |
199 | return *compressed_output; | |
200 | } | |
201 | } | |
202 | ||
203 | // Compression method is not supported, or not good | |
204 | // compression ratio, so just fall back to uncompressed form. | |
7c673cae FG |
205 | *type = kNoCompression; |
206 | return raw; | |
207 | } | |
208 | ||
209 | // kBlockBasedTableMagicNumber was picked by running | |
210 | // echo rocksdb.table.block_based | sha1sum | |
211 | // and taking the leading 64 bits. | |
212 | // Please note that kBlockBasedTableMagicNumber may also be accessed by other | |
213 | // .cc files | |
214 | // for that reason we declare it extern in the header but to get the space | |
215 | // allocated | |
216 | // it must be not extern in one place. | |
217 | const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull; | |
218 | // We also support reading and writing legacy block based table format (for | |
219 | // backwards compatibility) | |
220 | const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull; | |
221 | ||
222 | // A collector that collects properties of interest to block-based table. | |
223 | // For now this class looks heavy-weight since we only write one additional | |
224 | // property. | |
225 | // But in the foreseeable future, we will add more and more properties that are | |
226 | // specific to block-based table. | |
227 | class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector | |
228 | : public IntTblPropCollector { | |
229 | public: | |
230 | explicit BlockBasedTablePropertiesCollector( | |
231 | BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering, | |
232 | bool prefix_filtering) | |
233 | : index_type_(index_type), | |
234 | whole_key_filtering_(whole_key_filtering), | |
235 | prefix_filtering_(prefix_filtering) {} | |
236 | ||
494da23a TL |
237 | Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/, |
238 | uint64_t /*file_size*/) override { | |
7c673cae FG |
239 | // Intentionally left blank. Have no interest in collecting stats for |
240 | // individual key/value pairs. | |
241 | return Status::OK(); | |
242 | } | |
243 | ||
494da23a TL |
244 | virtual void BlockAdd(uint64_t /* blockRawBytes */, |
245 | uint64_t /* blockCompressedBytesFast */, | |
246 | uint64_t /* blockCompressedBytesSlow */) override { | |
247 | // Intentionally left blank. No interest in collecting stats for | |
248 | // blocks. | |
249 | return; | |
250 | } | |
251 | ||
252 | Status Finish(UserCollectedProperties* properties) override { | |
7c673cae FG |
253 | std::string val; |
254 | PutFixed32(&val, static_cast<uint32_t>(index_type_)); | |
255 | properties->insert({BlockBasedTablePropertyNames::kIndexType, val}); | |
256 | properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering, | |
257 | whole_key_filtering_ ? kPropTrue : kPropFalse}); | |
258 | properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering, | |
259 | prefix_filtering_ ? kPropTrue : kPropFalse}); | |
260 | return Status::OK(); | |
261 | } | |
262 | ||
263 | // The name of the properties collector can be used for debugging purpose. | |
494da23a | 264 | const char* Name() const override { |
7c673cae FG |
265 | return "BlockBasedTablePropertiesCollector"; |
266 | } | |
267 | ||
494da23a | 268 | UserCollectedProperties GetReadableProperties() const override { |
7c673cae FG |
269 | // Intentionally left blank. |
270 | return UserCollectedProperties(); | |
271 | } | |
272 | ||
273 | private: | |
274 | BlockBasedTableOptions::IndexType index_type_; | |
275 | bool whole_key_filtering_; | |
276 | bool prefix_filtering_; | |
277 | }; | |
278 | ||
279 | struct BlockBasedTableBuilder::Rep { | |
280 | const ImmutableCFOptions ioptions; | |
11fdf7f2 | 281 | const MutableCFOptions moptions; |
7c673cae FG |
282 | const BlockBasedTableOptions table_options; |
283 | const InternalKeyComparator& internal_comparator; | |
284 | WritableFileWriter* file; | |
285 | uint64_t offset = 0; | |
286 | Status status; | |
11fdf7f2 | 287 | size_t alignment; |
7c673cae | 288 | BlockBuilder data_block; |
494da23a TL |
289 | // Buffers uncompressed data blocks and keys to replay later. Needed when |
290 | // compression dictionary is enabled so we can finalize the dictionary before | |
291 | // compressing any data blocks. | |
292 | // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data | |
293 | // blocks as it's redundant, but it's easier to implement for now. | |
294 | std::vector<std::pair<std::string, std::vector<std::string>>> | |
295 | data_block_and_keys_buffers; | |
7c673cae FG |
296 | BlockBuilder range_del_block; |
297 | ||
298 | InternalKeySliceTransform internal_prefix_transform; | |
299 | std::unique_ptr<IndexBuilder> index_builder; | |
11fdf7f2 | 300 | PartitionedIndexBuilder* p_index_builder_ = nullptr; |
7c673cae FG |
301 | |
302 | std::string last_key; | |
494da23a TL |
303 | CompressionType compression_type; |
304 | uint64_t sample_for_compression; | |
305 | CompressionOptions compression_opts; | |
306 | std::unique_ptr<CompressionDict> compression_dict; | |
11fdf7f2 TL |
307 | CompressionContext compression_ctx; |
308 | std::unique_ptr<UncompressionContext> verify_ctx; | |
494da23a TL |
309 | std::unique_ptr<UncompressionDict> verify_dict; |
310 | ||
311 | size_t data_begin_offset = 0; | |
312 | ||
7c673cae FG |
313 | TableProperties props; |
314 | ||
494da23a TL |
315 | // States of the builder. |
316 | // | |
317 | // - `kBuffered`: This is the initial state where zero or more data blocks are | |
318 | // accumulated uncompressed in-memory. From this state, call | |
319 | // `EnterUnbuffered()` to finalize the compression dictionary if enabled, | |
320 | // compress/write out any buffered blocks, and proceed to the `kUnbuffered` | |
321 | // state. | |
322 | // | |
323 | // - `kUnbuffered`: This is the state when compression dictionary is finalized | |
324 | // either because it wasn't enabled in the first place or it's been created | |
325 | // from sampling previously buffered data. In this state, blocks are simply | |
326 | // compressed/written out as they fill up. From this state, call `Finish()` | |
327 | // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete | |
328 | // the partially created file. | |
329 | // | |
330 | // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been | |
331 | // called, so the table builder is no longer usable. We must be in this | |
332 | // state by the time the destructor runs. | |
333 | enum class State { | |
334 | kBuffered, | |
335 | kUnbuffered, | |
336 | kClosed, | |
337 | }; | |
338 | State state; | |
339 | ||
11fdf7f2 | 340 | const bool use_delta_encoding_for_index_values; |
7c673cae FG |
341 | std::unique_ptr<FilterBlockBuilder> filter_builder; |
342 | char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; | |
343 | size_t compressed_cache_key_prefix_size; | |
344 | ||
345 | BlockHandle pending_handle; // Handle to add to index block | |
346 | ||
347 | std::string compressed_output; | |
348 | std::unique_ptr<FlushBlockPolicy> flush_block_policy; | |
349 | uint32_t column_family_id; | |
350 | const std::string& column_family_name; | |
11fdf7f2 TL |
351 | uint64_t creation_time = 0; |
352 | uint64_t oldest_key_time = 0; | |
494da23a | 353 | const uint64_t target_file_size; |
7c673cae FG |
354 | |
355 | std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors; | |
356 | ||
11fdf7f2 | 357 | Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions, |
7c673cae FG |
358 | const BlockBasedTableOptions& table_opt, |
359 | const InternalKeyComparator& icomparator, | |
360 | const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* | |
361 | int_tbl_prop_collector_factories, | |
362 | uint32_t _column_family_id, WritableFileWriter* f, | |
363 | const CompressionType _compression_type, | |
494da23a TL |
364 | const uint64_t _sample_for_compression, |
365 | const CompressionOptions& _compression_opts, const bool skip_filters, | |
11fdf7f2 | 366 | const std::string& _column_family_name, const uint64_t _creation_time, |
494da23a | 367 | const uint64_t _oldest_key_time, const uint64_t _target_file_size) |
7c673cae | 368 | : ioptions(_ioptions), |
11fdf7f2 | 369 | moptions(_moptions), |
7c673cae FG |
370 | table_options(table_opt), |
371 | internal_comparator(icomparator), | |
372 | file(f), | |
11fdf7f2 TL |
373 | alignment(table_options.block_align |
374 | ? std::min(table_options.block_size, kDefaultPageSize) | |
375 | : 0), | |
7c673cae | 376 | data_block(table_options.block_restart_interval, |
11fdf7f2 TL |
377 | table_options.use_delta_encoding, |
378 | false /* use_value_delta_encoding */, | |
379 | icomparator.user_comparator() | |
380 | ->CanKeysWithDifferentByteContentsBeEqual() | |
381 | ? BlockBasedTableOptions::kDataBlockBinarySearch | |
382 | : table_options.data_block_index_type, | |
383 | table_options.data_block_hash_table_util_ratio), | |
384 | range_del_block(1 /* block_restart_interval */), | |
385 | internal_prefix_transform(_moptions.prefix_extractor.get()), | |
494da23a TL |
386 | compression_type(_compression_type), |
387 | sample_for_compression(_sample_for_compression), | |
388 | compression_opts(_compression_opts), | |
389 | compression_dict(), | |
390 | compression_ctx(_compression_type), | |
391 | verify_dict(), | |
392 | state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered | |
393 | : State::kUnbuffered), | |
11fdf7f2 TL |
394 | use_delta_encoding_for_index_values(table_opt.format_version >= 4 && |
395 | !table_opt.block_align), | |
396 | compressed_cache_key_prefix_size(0), | |
7c673cae FG |
397 | flush_block_policy( |
398 | table_options.flush_block_policy_factory->NewFlushBlockPolicy( | |
399 | table_options, data_block)), | |
400 | column_family_id(_column_family_id), | |
11fdf7f2 TL |
401 | column_family_name(_column_family_name), |
402 | creation_time(_creation_time), | |
494da23a TL |
403 | oldest_key_time(_oldest_key_time), |
404 | target_file_size(_target_file_size) { | |
7c673cae FG |
405 | if (table_options.index_type == |
406 | BlockBasedTableOptions::kTwoLevelIndexSearch) { | |
11fdf7f2 TL |
407 | p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( |
408 | &internal_comparator, use_delta_encoding_for_index_values, | |
409 | table_options); | |
410 | index_builder.reset(p_index_builder_); | |
7c673cae FG |
411 | } else { |
412 | index_builder.reset(IndexBuilder::CreateIndexBuilder( | |
413 | table_options.index_type, &internal_comparator, | |
11fdf7f2 TL |
414 | &this->internal_prefix_transform, use_delta_encoding_for_index_values, |
415 | table_options)); | |
7c673cae FG |
416 | } |
417 | if (skip_filters) { | |
418 | filter_builder = nullptr; | |
419 | } else { | |
11fdf7f2 TL |
420 | filter_builder.reset(CreateFilterBlockBuilder( |
421 | _ioptions, _moptions, table_options, | |
422 | use_delta_encoding_for_index_values, p_index_builder_)); | |
7c673cae FG |
423 | } |
424 | ||
425 | for (auto& collector_factories : *int_tbl_prop_collector_factories) { | |
426 | table_properties_collectors.emplace_back( | |
427 | collector_factories->CreateIntTblPropCollector(column_family_id)); | |
428 | } | |
429 | table_properties_collectors.emplace_back( | |
430 | new BlockBasedTablePropertiesCollector( | |
431 | table_options.index_type, table_options.whole_key_filtering, | |
11fdf7f2 TL |
432 | _moptions.prefix_extractor != nullptr)); |
433 | if (table_options.verify_compression) { | |
434 | verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), | |
494da23a | 435 | compression_type)); |
11fdf7f2 | 436 | } |
7c673cae | 437 | } |
11fdf7f2 TL |
438 | |
439 | Rep(const Rep&) = delete; | |
440 | Rep& operator=(const Rep&) = delete; | |
441 | ||
442 | ~Rep() {} | |
7c673cae FG |
443 | }; |
444 | ||
445 | BlockBasedTableBuilder::BlockBasedTableBuilder( | |
11fdf7f2 | 446 | const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions, |
7c673cae FG |
447 | const BlockBasedTableOptions& table_options, |
448 | const InternalKeyComparator& internal_comparator, | |
449 | const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* | |
450 | int_tbl_prop_collector_factories, | |
451 | uint32_t column_family_id, WritableFileWriter* file, | |
452 | const CompressionType compression_type, | |
494da23a TL |
453 | const uint64_t sample_for_compression, |
454 | const CompressionOptions& compression_opts, const bool skip_filters, | |
11fdf7f2 | 455 | const std::string& column_family_name, const uint64_t creation_time, |
494da23a | 456 | const uint64_t oldest_key_time, const uint64_t target_file_size) { |
7c673cae FG |
457 | BlockBasedTableOptions sanitized_table_options(table_options); |
458 | if (sanitized_table_options.format_version == 0 && | |
459 | sanitized_table_options.checksum != kCRC32c) { | |
460 | ROCKS_LOG_WARN( | |
461 | ioptions.info_log, | |
462 | "Silently converting format_version to 1 because checksum is " | |
463 | "non-default"); | |
464 | // silently convert format_version to 1 to keep consistent with current | |
465 | // behavior | |
466 | sanitized_table_options.format_version = 1; | |
467 | } | |
468 | ||
494da23a TL |
469 | rep_ = new Rep( |
470 | ioptions, moptions, sanitized_table_options, internal_comparator, | |
471 | int_tbl_prop_collector_factories, column_family_id, file, | |
472 | compression_type, sample_for_compression, compression_opts, skip_filters, | |
473 | column_family_name, creation_time, oldest_key_time, target_file_size); | |
7c673cae FG |
474 | |
475 | if (rep_->filter_builder != nullptr) { | |
476 | rep_->filter_builder->StartBlock(0); | |
477 | } | |
478 | if (table_options.block_cache_compressed.get() != nullptr) { | |
479 | BlockBasedTable::GenerateCachePrefix( | |
480 | table_options.block_cache_compressed.get(), file->writable_file(), | |
481 | &rep_->compressed_cache_key_prefix[0], | |
482 | &rep_->compressed_cache_key_prefix_size); | |
483 | } | |
484 | } | |
485 | ||
486 | BlockBasedTableBuilder::~BlockBasedTableBuilder() { | |
494da23a TL |
487 | // Catch errors where caller forgot to call Finish() |
488 | assert(rep_->state == Rep::State::kClosed); | |
7c673cae FG |
489 | delete rep_; |
490 | } | |
491 | ||
492 | void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { | |
493 | Rep* r = rep_; | |
494da23a | 494 | assert(rep_->state != Rep::State::kClosed); |
7c673cae FG |
495 | if (!ok()) return; |
496 | ValueType value_type = ExtractValueType(key); | |
497 | if (IsValueType(value_type)) { | |
494da23a TL |
498 | #ifndef NDEBUG |
499 | if (r->props.num_entries > r->props.num_range_deletions) { | |
7c673cae FG |
500 | assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0); |
501 | } | |
494da23a | 502 | #endif // NDEBUG |
7c673cae FG |
503 | |
504 | auto should_flush = r->flush_block_policy->Update(key, value); | |
505 | if (should_flush) { | |
506 | assert(!r->data_block.empty()); | |
507 | Flush(); | |
508 | ||
494da23a TL |
509 | if (r->state == Rep::State::kBuffered && |
510 | r->data_begin_offset > r->target_file_size) { | |
511 | EnterUnbuffered(); | |
512 | } | |
513 | ||
7c673cae FG |
514 | // Add item to index block. |
515 | // We do not emit the index entry for a block until we have seen the | |
516 | // first key for the next data block. This allows us to use shorter | |
517 | // keys in the index block. For example, consider a block boundary | |
518 | // between the keys "the quick brown fox" and "the who". We can use | |
519 | // "the r" as the key for the index block entry since it is >= all | |
520 | // entries in the first block and < all entries in subsequent | |
521 | // blocks. | |
494da23a | 522 | if (ok() && r->state == Rep::State::kUnbuffered) { |
7c673cae FG |
523 | r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle); |
524 | } | |
525 | } | |
526 | ||
527 | // Note: PartitionedFilterBlockBuilder requires key being added to filter | |
528 | // builder after being added to index builder. | |
494da23a | 529 | if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) { |
7c673cae FG |
530 | r->filter_builder->Add(ExtractUserKey(key)); |
531 | } | |
532 | ||
533 | r->last_key.assign(key.data(), key.size()); | |
534 | r->data_block.Add(key, value); | |
494da23a TL |
535 | if (r->state == Rep::State::kBuffered) { |
536 | // Buffer keys to be replayed during `Finish()` once compression | |
537 | // dictionary has been finalized. | |
538 | if (r->data_block_and_keys_buffers.empty() || should_flush) { | |
539 | r->data_block_and_keys_buffers.emplace_back(); | |
540 | } | |
541 | r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); | |
542 | } else { | |
543 | r->index_builder->OnKeyAdded(key); | |
544 | } | |
7c673cae FG |
545 | NotifyCollectTableCollectorsOnAdd(key, value, r->offset, |
546 | r->table_properties_collectors, | |
547 | r->ioptions.info_log); | |
548 | ||
549 | } else if (value_type == kTypeRangeDeletion) { | |
7c673cae | 550 | r->range_del_block.Add(key, value); |
7c673cae FG |
551 | NotifyCollectTableCollectorsOnAdd(key, value, r->offset, |
552 | r->table_properties_collectors, | |
553 | r->ioptions.info_log); | |
554 | } else { | |
555 | assert(false); | |
556 | } | |
494da23a TL |
557 | |
558 | r->props.num_entries++; | |
559 | r->props.raw_key_size += key.size(); | |
560 | r->props.raw_value_size += value.size(); | |
561 | if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) { | |
562 | r->props.num_deletions++; | |
563 | } else if (value_type == kTypeRangeDeletion) { | |
564 | r->props.num_deletions++; | |
565 | r->props.num_range_deletions++; | |
566 | } else if (value_type == kTypeMerge) { | |
567 | r->props.num_merge_operands++; | |
568 | } | |
7c673cae FG |
569 | } |
570 | ||
571 | void BlockBasedTableBuilder::Flush() { | |
572 | Rep* r = rep_; | |
494da23a | 573 | assert(rep_->state != Rep::State::kClosed); |
7c673cae FG |
574 | if (!ok()) return; |
575 | if (r->data_block.empty()) return; | |
576 | WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); | |
7c673cae FG |
577 | } |
578 | ||
579 | void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, | |
580 | BlockHandle* handle, | |
581 | bool is_data_block) { | |
582 | WriteBlock(block->Finish(), handle, is_data_block); | |
583 | block->Reset(); | |
584 | } | |
585 | ||
586 | void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, | |
587 | BlockHandle* handle, | |
588 | bool is_data_block) { | |
589 | // File format contains a sequence of blocks where each block has: | |
590 | // block_data: uint8[n] | |
591 | // type: uint8 | |
592 | // crc: uint32 | |
593 | assert(ok()); | |
594 | Rep* r = rep_; | |
595 | ||
494da23a TL |
596 | auto type = r->compression_type; |
597 | uint64_t sample_for_compression = r->sample_for_compression; | |
7c673cae FG |
598 | Slice block_contents; |
599 | bool abort_compression = false; | |
600 | ||
494da23a TL |
601 | StopWatchNano timer( |
602 | r->ioptions.env, | |
603 | ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); | |
604 | ||
605 | if (r->state == Rep::State::kBuffered) { | |
606 | assert(is_data_block); | |
607 | assert(!r->data_block_and_keys_buffers.empty()); | |
608 | r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString(); | |
609 | r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size(); | |
610 | return; | |
611 | } | |
7c673cae FG |
612 | |
613 | if (raw_block_contents.size() < kCompressionSizeLimit) { | |
494da23a TL |
614 | const CompressionDict* compression_dict; |
615 | if (!is_data_block || r->compression_dict == nullptr) { | |
616 | compression_dict = &CompressionDict::GetEmptyDict(); | |
11fdf7f2 | 617 | } else { |
494da23a | 618 | compression_dict = r->compression_dict.get(); |
7c673cae | 619 | } |
494da23a TL |
620 | assert(compression_dict != nullptr); |
621 | CompressionInfo compression_info(r->compression_opts, r->compression_ctx, | |
622 | *compression_dict, type, | |
623 | sample_for_compression); | |
624 | ||
625 | std::string sampled_output_fast; | |
626 | std::string sampled_output_slow; | |
627 | block_contents = CompressBlock( | |
628 | raw_block_contents, compression_info, &type, | |
629 | r->table_options.format_version, is_data_block /* do_sample */, | |
630 | &r->compressed_output, &sampled_output_fast, &sampled_output_slow); | |
631 | ||
632 | // notify collectors on block add | |
633 | NotifyCollectTableCollectorsOnBlockAdd( | |
634 | r->table_properties_collectors, raw_block_contents.size(), | |
635 | sampled_output_fast.size(), sampled_output_slow.size()); | |
7c673cae FG |
636 | |
637 | // Some of the compression algorithms are known to be unreliable. If | |
638 | // the verify_compression flag is set then try to de-compress the | |
639 | // compressed data and compare to the input. | |
640 | if (type != kNoCompression && r->table_options.verify_compression) { | |
641 | // Retrieve the uncompressed contents into a new buffer | |
494da23a TL |
642 | const UncompressionDict* verify_dict; |
643 | if (!is_data_block || r->verify_dict == nullptr) { | |
644 | verify_dict = &UncompressionDict::GetEmptyDict(); | |
645 | } else { | |
646 | verify_dict = r->verify_dict.get(); | |
647 | } | |
648 | assert(verify_dict != nullptr); | |
7c673cae | 649 | BlockContents contents; |
494da23a TL |
650 | UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict, |
651 | r->compression_type); | |
7c673cae | 652 | Status stat = UncompressBlockContentsForCompressionType( |
494da23a | 653 | uncompression_info, block_contents.data(), block_contents.size(), |
11fdf7f2 | 654 | &contents, r->table_options.format_version, r->ioptions); |
7c673cae FG |
655 | |
656 | if (stat.ok()) { | |
657 | bool compressed_ok = contents.data.compare(raw_block_contents) == 0; | |
658 | if (!compressed_ok) { | |
659 | // The result of the compression was invalid. abort. | |
660 | abort_compression = true; | |
661 | ROCKS_LOG_ERROR(r->ioptions.info_log, | |
662 | "Decompressed block did not match raw block"); | |
663 | r->status = | |
664 | Status::Corruption("Decompressed block did not match raw block"); | |
665 | } | |
666 | } else { | |
667 | // Decompression reported an error. abort. | |
668 | r->status = Status::Corruption("Could not decompress"); | |
669 | abort_compression = true; | |
670 | } | |
671 | } | |
672 | } else { | |
673 | // Block is too big to be compressed. | |
674 | abort_compression = true; | |
675 | } | |
676 | ||
677 | // Abort compression if the block is too big, or did not pass | |
678 | // verification. | |
679 | if (abort_compression) { | |
680 | RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); | |
681 | type = kNoCompression; | |
682 | block_contents = raw_block_contents; | |
11fdf7f2 TL |
683 | } else if (type != kNoCompression) { |
684 | if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) { | |
494da23a TL |
685 | RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS, |
686 | timer.ElapsedNanos()); | |
11fdf7f2 | 687 | } |
494da23a TL |
688 | RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED, |
689 | raw_block_contents.size()); | |
7c673cae | 690 | RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED); |
494da23a TL |
691 | } else if (type != r->compression_type) { |
692 | RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); | |
7c673cae FG |
693 | } |
694 | ||
11fdf7f2 | 695 | WriteRawBlock(block_contents, type, handle, is_data_block); |
7c673cae | 696 | r->compressed_output.clear(); |
494da23a TL |
697 | if (is_data_block) { |
698 | if (r->filter_builder != nullptr) { | |
699 | r->filter_builder->StartBlock(r->offset); | |
700 | } | |
701 | r->props.data_size = r->offset; | |
702 | ++r->props.num_data_blocks; | |
703 | } | |
7c673cae FG |
704 | } |
705 | ||
706 | void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, | |
707 | CompressionType type, | |
11fdf7f2 TL |
708 | BlockHandle* handle, |
709 | bool is_data_block) { | |
7c673cae FG |
710 | Rep* r = rep_; |
711 | StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS); | |
712 | handle->set_offset(r->offset); | |
713 | handle->set_size(block_contents.size()); | |
11fdf7f2 | 714 | assert(r->status.ok()); |
7c673cae FG |
715 | r->status = r->file->Append(block_contents); |
716 | if (r->status.ok()) { | |
717 | char trailer[kBlockTrailerSize]; | |
718 | trailer[0] = type; | |
719 | char* trailer_without_type = trailer + 1; | |
720 | switch (r->table_options.checksum) { | |
721 | case kNoChecksum: | |
11fdf7f2 TL |
722 | EncodeFixed32(trailer_without_type, 0); |
723 | break; | |
7c673cae FG |
724 | case kCRC32c: { |
725 | auto crc = crc32c::Value(block_contents.data(), block_contents.size()); | |
726 | crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type | |
727 | EncodeFixed32(trailer_without_type, crc32c::Mask(crc)); | |
728 | break; | |
729 | } | |
730 | case kxxHash: { | |
731 | void* xxh = XXH32_init(0); | |
732 | XXH32_update(xxh, block_contents.data(), | |
733 | static_cast<uint32_t>(block_contents.size())); | |
734 | XXH32_update(xxh, trailer, 1); // Extend to cover block type | |
735 | EncodeFixed32(trailer_without_type, XXH32_digest(xxh)); | |
736 | break; | |
737 | } | |
494da23a TL |
738 | case kxxHash64: { |
739 | XXH64_state_t* const state = XXH64_createState(); | |
740 | XXH64_reset(state, 0); | |
741 | XXH64_update(state, block_contents.data(), | |
742 | static_cast<uint32_t>(block_contents.size())); | |
743 | XXH64_update(state, trailer, 1); // Extend to cover block type | |
744 | EncodeFixed32( | |
745 | trailer_without_type, | |
746 | static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits | |
747 | uint64_t{0xffffffff})); | |
748 | XXH64_freeState(state); | |
749 | break; | |
750 | } | |
7c673cae FG |
751 | } |
752 | ||
11fdf7f2 | 753 | assert(r->status.ok()); |
494da23a TL |
754 | TEST_SYNC_POINT_CALLBACK( |
755 | "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", | |
756 | static_cast<char*>(trailer)); | |
7c673cae FG |
757 | r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); |
758 | if (r->status.ok()) { | |
759 | r->status = InsertBlockInCache(block_contents, type, handle); | |
760 | } | |
761 | if (r->status.ok()) { | |
762 | r->offset += block_contents.size() + kBlockTrailerSize; | |
11fdf7f2 TL |
763 | if (r->table_options.block_align && is_data_block) { |
764 | size_t pad_bytes = | |
765 | (r->alignment - ((block_contents.size() + kBlockTrailerSize) & | |
766 | (r->alignment - 1))) & | |
767 | (r->alignment - 1); | |
768 | r->status = r->file->Pad(pad_bytes); | |
769 | if (r->status.ok()) { | |
770 | r->offset += pad_bytes; | |
771 | } | |
772 | } | |
7c673cae FG |
773 | } |
774 | } | |
775 | } | |
776 | ||
494da23a | 777 | Status BlockBasedTableBuilder::status() const { return rep_->status; } |
7c673cae | 778 | |
494da23a TL |
779 | static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) { |
780 | BlockContents* bc = reinterpret_cast<BlockContents*>(value); | |
781 | delete bc; | |
7c673cae FG |
782 | } |
783 | ||
784 | // | |
785 | // Make a copy of the block contents and insert into compressed block cache | |
786 | // | |
787 | Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, | |
788 | const CompressionType type, | |
789 | const BlockHandle* handle) { | |
790 | Rep* r = rep_; | |
791 | Cache* block_cache_compressed = r->table_options.block_cache_compressed.get(); | |
792 | ||
793 | if (type != kNoCompression && block_cache_compressed != nullptr) { | |
7c673cae FG |
794 | size_t size = block_contents.size(); |
795 | ||
494da23a TL |
796 | auto ubuf = |
797 | AllocateBlock(size + 1, block_cache_compressed->memory_allocator()); | |
7c673cae FG |
798 | memcpy(ubuf.get(), block_contents.data(), size); |
799 | ubuf[size] = type; | |
800 | ||
494da23a TL |
801 | BlockContents* block_contents_to_cache = |
802 | new BlockContents(std::move(ubuf), size); | |
803 | #ifndef NDEBUG | |
804 | block_contents_to_cache->is_raw_block = true; | |
805 | #endif // NDEBUG | |
7c673cae FG |
806 | |
807 | // make cache key by appending the file offset to the cache prefix id | |
808 | char* end = EncodeVarint64( | |
494da23a TL |
809 | r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size, |
810 | handle->offset()); | |
811 | Slice key(r->compressed_cache_key_prefix, | |
812 | static_cast<size_t>(end - r->compressed_cache_key_prefix)); | |
7c673cae FG |
813 | |
814 | // Insert into compressed block cache. | |
494da23a TL |
815 | block_cache_compressed->Insert( |
816 | key, block_contents_to_cache, | |
817 | block_contents_to_cache->ApproximateMemoryUsage(), | |
818 | &DeleteCachedBlockContents); | |
7c673cae FG |
819 | |
820 | // Invalidate OS cache. | |
821 | r->file->InvalidateCache(static_cast<size_t>(r->offset), size); | |
822 | } | |
823 | return Status::OK(); | |
824 | } | |
825 | ||
11fdf7f2 TL |
826 | void BlockBasedTableBuilder::WriteFilterBlock( |
827 | MetaIndexBuilder* meta_index_builder) { | |
828 | BlockHandle filter_block_handle; | |
829 | bool empty_filter_block = (rep_->filter_builder == nullptr || | |
830 | rep_->filter_builder->NumAdded() == 0); | |
831 | if (ok() && !empty_filter_block) { | |
7c673cae | 832 | Status s = Status::Incomplete(); |
11fdf7f2 TL |
833 | while (ok() && s.IsIncomplete()) { |
834 | Slice filter_content = | |
835 | rep_->filter_builder->Finish(filter_block_handle, &s); | |
7c673cae | 836 | assert(s.ok() || s.IsIncomplete()); |
11fdf7f2 | 837 | rep_->props.filter_size += filter_content.size(); |
7c673cae FG |
838 | WriteRawBlock(filter_content, kNoCompression, &filter_block_handle); |
839 | } | |
840 | } | |
11fdf7f2 TL |
841 | if (ok() && !empty_filter_block) { |
842 | // Add mapping from "<filter_block_prefix>.Name" to location | |
843 | // of filter data. | |
844 | std::string key; | |
845 | if (rep_->filter_builder->IsBlockBased()) { | |
846 | key = BlockBasedTable::kFilterBlockPrefix; | |
847 | } else { | |
848 | key = rep_->table_options.partition_filters | |
849 | ? BlockBasedTable::kPartitionedFilterBlockPrefix | |
850 | : BlockBasedTable::kFullFilterBlockPrefix; | |
851 | } | |
852 | key.append(rep_->table_options.filter_policy->Name()); | |
853 | meta_index_builder->Add(key, filter_block_handle); | |
854 | } | |
855 | } | |
7c673cae | 856 | |
11fdf7f2 TL |
857 | void BlockBasedTableBuilder::WriteIndexBlock( |
858 | MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) { | |
7c673cae | 859 | IndexBuilder::IndexBlocks index_blocks; |
11fdf7f2 | 860 | auto index_builder_status = rep_->index_builder->Finish(&index_blocks); |
7c673cae FG |
861 | if (index_builder_status.IsIncomplete()) { |
862 | // We we have more than one index partition then meta_blocks are not | |
863 | // supported for the index. Currently meta_blocks are used only by | |
864 | // HashIndexBuilder which is not multi-partition. | |
865 | assert(index_blocks.meta_blocks.empty()); | |
11fdf7f2 TL |
866 | } else if (ok() && !index_builder_status.ok()) { |
867 | rep_->status = index_builder_status; | |
7c673cae | 868 | } |
11fdf7f2 TL |
869 | if (ok()) { |
870 | for (const auto& item : index_blocks.meta_blocks) { | |
871 | BlockHandle block_handle; | |
872 | WriteBlock(item.second, &block_handle, false /* is_data_block */); | |
873 | if (!ok()) { | |
874 | break; | |
875 | } | |
876 | meta_index_builder->Add(item.first, block_handle); | |
877 | } | |
7c673cae | 878 | } |
11fdf7f2 TL |
879 | if (ok()) { |
880 | if (rep_->table_options.enable_index_compression) { | |
881 | WriteBlock(index_blocks.index_block_contents, index_block_handle, false); | |
882 | } else { | |
883 | WriteRawBlock(index_blocks.index_block_contents, kNoCompression, | |
884 | index_block_handle); | |
885 | } | |
886 | } | |
887 | // If there are more index partitions, finish them and write them out | |
888 | Status s = index_builder_status; | |
889 | while (ok() && s.IsIncomplete()) { | |
890 | s = rep_->index_builder->Finish(&index_blocks, *index_block_handle); | |
891 | if (!s.ok() && !s.IsIncomplete()) { | |
892 | rep_->status = s; | |
893 | return; | |
894 | } | |
895 | if (rep_->table_options.enable_index_compression) { | |
896 | WriteBlock(index_blocks.index_block_contents, index_block_handle, false); | |
897 | } else { | |
898 | WriteRawBlock(index_blocks.index_block_contents, kNoCompression, | |
899 | index_block_handle); | |
900 | } | |
901 | // The last index_block_handle will be for the partition index block | |
902 | } | |
903 | } | |
7c673cae | 904 | |
11fdf7f2 TL |
905 | void BlockBasedTableBuilder::WritePropertiesBlock( |
906 | MetaIndexBuilder* meta_index_builder) { | |
907 | BlockHandle properties_block_handle; | |
7c673cae | 908 | if (ok()) { |
11fdf7f2 TL |
909 | PropertyBlockBuilder property_block_builder; |
910 | rep_->props.column_family_id = rep_->column_family_id; | |
911 | rep_->props.column_family_name = rep_->column_family_name; | |
912 | rep_->props.filter_policy_name = | |
913 | rep_->table_options.filter_policy != nullptr | |
914 | ? rep_->table_options.filter_policy->Name() | |
915 | : ""; | |
916 | rep_->props.index_size = | |
917 | rep_->index_builder->IndexSize() + kBlockTrailerSize; | |
918 | rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr | |
919 | ? rep_->ioptions.user_comparator->Name() | |
920 | : "nullptr"; | |
921 | rep_->props.merge_operator_name = | |
922 | rep_->ioptions.merge_operator != nullptr | |
923 | ? rep_->ioptions.merge_operator->Name() | |
924 | : "nullptr"; | |
925 | rep_->props.compression_name = | |
494da23a TL |
926 | CompressionTypeToString(rep_->compression_type); |
927 | rep_->props.compression_options = | |
928 | CompressionOptionsToString(rep_->compression_opts); | |
11fdf7f2 TL |
929 | rep_->props.prefix_extractor_name = |
930 | rep_->moptions.prefix_extractor != nullptr | |
931 | ? rep_->moptions.prefix_extractor->Name() | |
932 | : "nullptr"; | |
933 | ||
934 | std::string property_collectors_names = "["; | |
935 | for (size_t i = 0; | |
936 | i < rep_->ioptions.table_properties_collector_factories.size(); ++i) { | |
937 | if (i != 0) { | |
938 | property_collectors_names += ","; | |
7c673cae | 939 | } |
11fdf7f2 TL |
940 | property_collectors_names += |
941 | rep_->ioptions.table_properties_collector_factories[i]->Name(); | |
942 | } | |
943 | property_collectors_names += "]"; | |
944 | rep_->props.property_collectors_names = property_collectors_names; | |
945 | if (rep_->table_options.index_type == | |
946 | BlockBasedTableOptions::kTwoLevelIndexSearch) { | |
947 | assert(rep_->p_index_builder_ != nullptr); | |
948 | rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions(); | |
949 | rep_->props.top_level_index_size = | |
950 | rep_->p_index_builder_->TopLevelIndexSize(rep_->offset); | |
951 | } | |
952 | rep_->props.index_key_is_user_key = | |
953 | !rep_->index_builder->seperator_is_key_plus_seq(); | |
954 | rep_->props.index_value_is_delta_encoded = | |
955 | rep_->use_delta_encoding_for_index_values; | |
956 | rep_->props.creation_time = rep_->creation_time; | |
957 | rep_->props.oldest_key_time = rep_->oldest_key_time; | |
958 | ||
959 | // Add basic properties | |
960 | property_block_builder.AddTableProperty(rep_->props); | |
961 | ||
962 | // Add use collected properties | |
963 | NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors, | |
964 | rep_->ioptions.info_log, | |
965 | &property_block_builder); | |
966 | ||
967 | WriteRawBlock(property_block_builder.Finish(), kNoCompression, | |
968 | &properties_block_handle); | |
969 | } | |
970 | if (ok()) { | |
494da23a TL |
971 | #ifndef NDEBUG |
972 | { | |
973 | uint64_t props_block_offset = properties_block_handle.offset(); | |
974 | uint64_t props_block_size = properties_block_handle.size(); | |
975 | TEST_SYNC_POINT_CALLBACK( | |
976 | "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset", | |
977 | &props_block_offset); | |
978 | TEST_SYNC_POINT_CALLBACK( | |
979 | "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize", | |
980 | &props_block_size); | |
981 | } | |
982 | #endif // !NDEBUG | |
11fdf7f2 TL |
983 | meta_index_builder->Add(kPropertiesBlock, properties_block_handle); |
984 | } | |
985 | } | |
986 | ||
987 | void BlockBasedTableBuilder::WriteCompressionDictBlock( | |
988 | MetaIndexBuilder* meta_index_builder) { | |
494da23a TL |
989 | if (rep_->compression_dict != nullptr && |
990 | rep_->compression_dict->GetRawDict().size()) { | |
11fdf7f2 TL |
991 | BlockHandle compression_dict_block_handle; |
992 | if (ok()) { | |
494da23a | 993 | WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression, |
11fdf7f2 | 994 | &compression_dict_block_handle); |
494da23a TL |
995 | #ifndef NDEBUG |
996 | Slice compression_dict = rep_->compression_dict->GetRawDict(); | |
997 | TEST_SYNC_POINT_CALLBACK( | |
998 | "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict", | |
999 | &compression_dict); | |
1000 | #endif // NDEBUG | |
11fdf7f2 TL |
1001 | } |
1002 | if (ok()) { | |
1003 | meta_index_builder->Add(kCompressionDictBlock, | |
1004 | compression_dict_block_handle); | |
7c673cae | 1005 | } |
11fdf7f2 TL |
1006 | } |
1007 | } | |
7c673cae | 1008 | |
11fdf7f2 TL |
1009 | void BlockBasedTableBuilder::WriteRangeDelBlock( |
1010 | MetaIndexBuilder* meta_index_builder) { | |
1011 | if (ok() && !rep_->range_del_block.empty()) { | |
1012 | BlockHandle range_del_block_handle; | |
1013 | WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression, | |
1014 | &range_del_block_handle); | |
1015 | meta_index_builder->Add(kRangeDelBlock, range_del_block_handle); | |
1016 | } | |
1017 | } | |
7c673cae | 1018 | |
494da23a TL |
1019 | void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle, |
1020 | BlockHandle& index_block_handle) { | |
1021 | Rep* r = rep_; | |
1022 | // No need to write out new footer if we're using default checksum. | |
1023 | // We're writing legacy magic number because we want old versions of RocksDB | |
1024 | // be able to read files generated with new release (just in case if | |
1025 | // somebody wants to roll back after an upgrade) | |
1026 | // TODO(icanadi) at some point in the future, when we're absolutely sure | |
1027 | // nobody will roll back to RocksDB 2.x versions, retire the legacy magic | |
1028 | // number and always write new table files with new magic number | |
1029 | bool legacy = (r->table_options.format_version == 0); | |
1030 | // this is guaranteed by BlockBasedTableBuilder's constructor | |
1031 | assert(r->table_options.checksum == kCRC32c || | |
1032 | r->table_options.format_version != 0); | |
1033 | Footer footer( | |
1034 | legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber, | |
1035 | r->table_options.format_version); | |
1036 | footer.set_metaindex_handle(metaindex_block_handle); | |
1037 | footer.set_index_handle(index_block_handle); | |
1038 | footer.set_checksum(r->table_options.checksum); | |
1039 | std::string footer_encoding; | |
1040 | footer.EncodeTo(&footer_encoding); | |
1041 | assert(r->status.ok()); | |
1042 | r->status = r->file->Append(footer_encoding); | |
1043 | if (r->status.ok()) { | |
1044 | r->offset += footer_encoding.size(); | |
1045 | } | |
1046 | } | |
1047 | ||
1048 | void BlockBasedTableBuilder::EnterUnbuffered() { | |
1049 | Rep* r = rep_; | |
1050 | assert(r->state == Rep::State::kBuffered); | |
1051 | r->state = Rep::State::kUnbuffered; | |
1052 | const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0 | |
1053 | ? r->compression_opts.zstd_max_train_bytes | |
1054 | : r->compression_opts.max_dict_bytes; | |
1055 | Random64 generator{r->creation_time}; | |
1056 | std::string compression_dict_samples; | |
1057 | std::vector<size_t> compression_dict_sample_lens; | |
1058 | if (!r->data_block_and_keys_buffers.empty()) { | |
1059 | while (compression_dict_samples.size() < kSampleBytes) { | |
1060 | size_t rand_idx = | |
1061 | generator.Uniform(r->data_block_and_keys_buffers.size()); | |
1062 | size_t copy_len = | |
1063 | std::min(kSampleBytes - compression_dict_samples.size(), | |
1064 | r->data_block_and_keys_buffers[rand_idx].first.size()); | |
1065 | compression_dict_samples.append( | |
1066 | r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len); | |
1067 | compression_dict_sample_lens.emplace_back(copy_len); | |
1068 | } | |
1069 | } | |
1070 | ||
1071 | // final data block flushed, now we can generate dictionary from the samples. | |
1072 | // OK if compression_dict_samples is empty, we'll just get empty dictionary. | |
1073 | std::string dict; | |
1074 | if (r->compression_opts.zstd_max_train_bytes > 0) { | |
1075 | dict = ZSTD_TrainDictionary(compression_dict_samples, | |
1076 | compression_dict_sample_lens, | |
1077 | r->compression_opts.max_dict_bytes); | |
1078 | } else { | |
1079 | dict = std::move(compression_dict_samples); | |
1080 | } | |
1081 | r->compression_dict.reset(new CompressionDict(dict, r->compression_type, | |
1082 | r->compression_opts.level)); | |
1083 | r->verify_dict.reset(new UncompressionDict( | |
1084 | dict, r->compression_type == kZSTD || | |
1085 | r->compression_type == kZSTDNotFinalCompression)); | |
1086 | ||
1087 | for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) { | |
1088 | const auto& data_block = r->data_block_and_keys_buffers[i].first; | |
1089 | auto& keys = r->data_block_and_keys_buffers[i].second; | |
1090 | assert(!data_block.empty()); | |
1091 | assert(!keys.empty()); | |
1092 | ||
1093 | for (const auto& key : keys) { | |
1094 | if (r->filter_builder != nullptr) { | |
1095 | r->filter_builder->Add(ExtractUserKey(key)); | |
1096 | } | |
1097 | r->index_builder->OnKeyAdded(key); | |
1098 | } | |
1099 | WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */); | |
1100 | if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) { | |
1101 | Slice first_key_in_next_block = | |
1102 | r->data_block_and_keys_buffers[i + 1].second.front(); | |
1103 | Slice* first_key_in_next_block_ptr = &first_key_in_next_block; | |
1104 | r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr, | |
1105 | r->pending_handle); | |
1106 | } | |
1107 | } | |
1108 | r->data_block_and_keys_buffers.clear(); | |
1109 | } | |
1110 | ||
11fdf7f2 TL |
1111 | Status BlockBasedTableBuilder::Finish() { |
1112 | Rep* r = rep_; | |
494da23a | 1113 | assert(r->state != Rep::State::kClosed); |
11fdf7f2 TL |
1114 | bool empty_data_block = r->data_block.empty(); |
1115 | Flush(); | |
494da23a TL |
1116 | if (r->state == Rep::State::kBuffered) { |
1117 | EnterUnbuffered(); | |
1118 | } | |
11fdf7f2 TL |
1119 | // To make sure properties block is able to keep the accurate size of index |
1120 | // block, we will finish writing all index entries first. | |
1121 | if (ok() && !empty_data_block) { | |
1122 | r->index_builder->AddIndexEntry( | |
1123 | &r->last_key, nullptr /* no next data block */, r->pending_handle); | |
1124 | } | |
7c673cae | 1125 | |
494da23a | 1126 | // Write meta blocks, metaindex block and footer in the following order. |
11fdf7f2 TL |
1127 | // 1. [meta block: filter] |
1128 | // 2. [meta block: index] | |
1129 | // 3. [meta block: compression dictionary] | |
1130 | // 4. [meta block: range deletion tombstone] | |
1131 | // 5. [meta block: properties] | |
1132 | // 6. [metaindex block] | |
494da23a | 1133 | // 7. Footer |
11fdf7f2 TL |
1134 | BlockHandle metaindex_block_handle, index_block_handle; |
1135 | MetaIndexBuilder meta_index_builder; | |
1136 | WriteFilterBlock(&meta_index_builder); | |
1137 | WriteIndexBlock(&meta_index_builder, &index_block_handle); | |
1138 | WriteCompressionDictBlock(&meta_index_builder); | |
1139 | WriteRangeDelBlock(&meta_index_builder); | |
1140 | WritePropertiesBlock(&meta_index_builder); | |
7c673cae FG |
1141 | if (ok()) { |
1142 | // flush the meta index block | |
1143 | WriteRawBlock(meta_index_builder.Finish(), kNoCompression, | |
1144 | &metaindex_block_handle); | |
7c673cae | 1145 | } |
7c673cae | 1146 | if (ok()) { |
494da23a | 1147 | WriteFooter(metaindex_block_handle, index_block_handle); |
7c673cae | 1148 | } |
494da23a | 1149 | r->state = Rep::State::kClosed; |
7c673cae FG |
1150 | return r->status; |
1151 | } | |
1152 | ||
1153 | void BlockBasedTableBuilder::Abandon() { | |
494da23a TL |
1154 | assert(rep_->state != Rep::State::kClosed); |
1155 | rep_->state = Rep::State::kClosed; | |
7c673cae FG |
1156 | } |
1157 | ||
1158 | uint64_t BlockBasedTableBuilder::NumEntries() const { | |
1159 | return rep_->props.num_entries; | |
1160 | } | |
1161 | ||
494da23a | 1162 | uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } |
7c673cae FG |
1163 | |
1164 | bool BlockBasedTableBuilder::NeedCompact() const { | |
1165 | for (const auto& collector : rep_->table_properties_collectors) { | |
1166 | if (collector->NeedCompact()) { | |
1167 | return true; | |
1168 | } | |
1169 | } | |
1170 | return false; | |
1171 | } | |
1172 | ||
1173 | TableProperties BlockBasedTableBuilder::GetTableProperties() const { | |
1174 | TableProperties ret = rep_->props; | |
1175 | for (const auto& collector : rep_->table_properties_collectors) { | |
1176 | for (const auto& prop : collector->GetReadableProperties()) { | |
1177 | ret.readable_properties.insert(prop); | |
1178 | } | |
1179 | collector->Finish(&ret.user_collected_properties); | |
1180 | } | |
1181 | return ret; | |
1182 | } | |
1183 | ||
1184 | const std::string BlockBasedTable::kFilterBlockPrefix = "filter."; | |
1185 | const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter."; | |
1186 | const std::string BlockBasedTable::kPartitionedFilterBlockPrefix = | |
1187 | "partitionedfilter."; | |
1188 | } // namespace rocksdb |