1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "table/block_based/block_based_table_builder.h"
21 #include <unordered_map>
24 #include "cache/cache_entry_roles.h"
25 #include "cache/cache_helpers.h"
26 #include "cache/cache_key.h"
27 #include "cache/cache_reservation_manager.h"
28 #include "db/dbformat.h"
29 #include "index_builder.h"
30 #include "logging/logging.h"
31 #include "memory/memory_allocator.h"
32 #include "rocksdb/cache.h"
33 #include "rocksdb/comparator.h"
34 #include "rocksdb/env.h"
35 #include "rocksdb/filter_policy.h"
36 #include "rocksdb/flush_block_policy.h"
37 #include "rocksdb/merge_operator.h"
38 #include "rocksdb/table.h"
39 #include "rocksdb/types.h"
40 #include "table/block_based/block.h"
41 #include "table/block_based/block_based_table_factory.h"
42 #include "table/block_based/block_based_table_reader.h"
43 #include "table/block_based/block_builder.h"
44 #include "table/block_based/block_like_traits.h"
45 #include "table/block_based/filter_block.h"
46 #include "table/block_based/filter_policy_internal.h"
47 #include "table/block_based/full_filter_block.h"
48 #include "table/block_based/partitioned_filter_block.h"
49 #include "table/format.h"
50 #include "table/meta_blocks.h"
51 #include "table/table_builder.h"
52 #include "util/coding.h"
53 #include "util/compression.h"
54 #include "util/stop_watch.h"
55 #include "util/string_util.h"
56 #include "util/work_queue.h"
58 namespace ROCKSDB_NAMESPACE
{
60 extern const std::string kHashIndexPrefixesBlock
;
61 extern const std::string kHashIndexPrefixesMetadataBlock
;
63 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
66 constexpr size_t kBlockTrailerSize
= BlockBasedTable::kBlockTrailerSize
;
68 // Create a filter block builder based on its type.
69 FilterBlockBuilder
* CreateFilterBlockBuilder(
70 const ImmutableCFOptions
& /*opt*/, const MutableCFOptions
& mopt
,
71 const FilterBuildingContext
& context
,
72 const bool use_delta_encoding_for_index_values
,
73 PartitionedIndexBuilder
* const p_index_builder
) {
74 const BlockBasedTableOptions
& table_opt
= context
.table_options
;
75 assert(table_opt
.filter_policy
); // precondition
77 FilterBitsBuilder
* filter_bits_builder
=
78 BloomFilterPolicy::GetBuilderFromContext(context
);
79 if (filter_bits_builder
== nullptr) {
82 if (table_opt
.partition_filters
) {
83 assert(p_index_builder
!= nullptr);
84 // Since after partition cut request from filter builder it takes time
85 // until index builder actully cuts the partition, until the end of a
86 // data block potentially with many keys, we take the lower bound as
88 assert(table_opt
.block_size_deviation
<= 100);
90 static_cast<uint32_t>(((table_opt
.metadata_block_size
*
91 (100 - table_opt
.block_size_deviation
)) +
94 partition_size
= std::max(partition_size
, static_cast<uint32_t>(1));
95 return new PartitionedFilterBlockBuilder(
96 mopt
.prefix_extractor
.get(), table_opt
.whole_key_filtering
,
97 filter_bits_builder
, table_opt
.index_block_restart_interval
,
98 use_delta_encoding_for_index_values
, p_index_builder
, partition_size
);
100 return new FullFilterBlockBuilder(mopt
.prefix_extractor
.get(),
101 table_opt
.whole_key_filtering
,
102 filter_bits_builder
);
107 bool GoodCompressionRatio(size_t compressed_size
, size_t uncomp_size
) {
108 // Check to see if compressed less than 12.5%
109 return compressed_size
< uncomp_size
- (uncomp_size
/ 8u);
114 // format_version is the block format as defined in include/rocksdb/table.h
115 Slice
CompressBlock(const Slice
& uncompressed_data
, const CompressionInfo
& info
,
116 CompressionType
* type
, uint32_t format_version
,
117 bool do_sample
, std::string
* compressed_output
,
118 std::string
* sampled_output_fast
,
119 std::string
* sampled_output_slow
) {
121 assert(compressed_output
);
122 assert(compressed_output
->empty());
124 // If requested, we sample one in every N block with a
125 // fast and slow compression algorithm and report the stats.
126 // The users can use these stats to decide if it is worthwhile
127 // enabling compression and they also get a hint about which
128 // compression algorithm wil be beneficial.
129 if (do_sample
&& info
.SampleForCompression() &&
130 Random::GetTLSInstance()->OneIn(
131 static_cast<int>(info
.SampleForCompression()))) {
132 // Sampling with a fast compression algorithm
133 if (sampled_output_fast
&& (LZ4_Supported() || Snappy_Supported())) {
135 LZ4_Supported() ? kLZ4Compression
: kSnappyCompression
;
136 CompressionContext
context(c
);
137 CompressionOptions options
;
138 CompressionInfo
info_tmp(options
, context
,
139 CompressionDict::GetEmptyDict(), c
,
140 info
.SampleForCompression());
142 CompressData(uncompressed_data
, info_tmp
,
143 GetCompressFormatForVersion(format_version
),
144 sampled_output_fast
);
147 // Sampling with a slow but high-compression algorithm
148 if (sampled_output_slow
&& (ZSTD_Supported() || Zlib_Supported())) {
149 CompressionType c
= ZSTD_Supported() ? kZSTD
: kZlibCompression
;
150 CompressionContext
context(c
);
151 CompressionOptions options
;
152 CompressionInfo
info_tmp(options
, context
,
153 CompressionDict::GetEmptyDict(), c
,
154 info
.SampleForCompression());
156 CompressData(uncompressed_data
, info_tmp
,
157 GetCompressFormatForVersion(format_version
),
158 sampled_output_slow
);
162 if (info
.type() == kNoCompression
) {
163 *type
= kNoCompression
;
164 return uncompressed_data
;
167 // Actually compress the data; if the compression method is not supported,
168 // or the compression fails etc., just fall back to uncompressed
169 if (!CompressData(uncompressed_data
, info
,
170 GetCompressFormatForVersion(format_version
),
171 compressed_output
)) {
172 *type
= kNoCompression
;
173 return uncompressed_data
;
176 // Check the compression ratio; if it's not good enough, just fall back to
178 if (!GoodCompressionRatio(compressed_output
->size(),
179 uncompressed_data
.size())) {
180 *type
= kNoCompression
;
181 return uncompressed_data
;
185 return *compressed_output
;
188 // kBlockBasedTableMagicNumber was picked by running
189 // echo rocksdb.table.block_based | sha1sum
190 // and taking the leading 64 bits.
191 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
193 // for that reason we declare it extern in the header but to get the space
195 // it must be not extern in one place.
196 const uint64_t kBlockBasedTableMagicNumber
= 0x88e241b785f4cff7ull
;
197 // We also support reading and writing legacy block based table format (for
198 // backwards compatibility)
199 const uint64_t kLegacyBlockBasedTableMagicNumber
= 0xdb4775248b80fb57ull
;
201 // A collector that collects properties of interest to block-based table.
202 // For now this class looks heavy-weight since we only write one additional
204 // But in the foreseeable future, we will add more and more properties that are
205 // specific to block-based table.
206 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
207 : public IntTblPropCollector
{
209 explicit BlockBasedTablePropertiesCollector(
210 BlockBasedTableOptions::IndexType index_type
, bool whole_key_filtering
,
211 bool prefix_filtering
)
212 : index_type_(index_type
),
213 whole_key_filtering_(whole_key_filtering
),
214 prefix_filtering_(prefix_filtering
) {}
216 Status
InternalAdd(const Slice
& /*key*/, const Slice
& /*value*/,
217 uint64_t /*file_size*/) override
{
218 // Intentionally left blank. Have no interest in collecting stats for
219 // individual key/value pairs.
223 virtual void BlockAdd(uint64_t /* block_uncomp_bytes */,
224 uint64_t /* block_compressed_bytes_fast */,
225 uint64_t /* block_compressed_bytes_slow */) override
{
226 // Intentionally left blank. No interest in collecting stats for
231 Status
Finish(UserCollectedProperties
* properties
) override
{
233 PutFixed32(&val
, static_cast<uint32_t>(index_type_
));
234 properties
->insert({BlockBasedTablePropertyNames::kIndexType
, val
});
235 properties
->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering
,
236 whole_key_filtering_
? kPropTrue
: kPropFalse
});
237 properties
->insert({BlockBasedTablePropertyNames::kPrefixFiltering
,
238 prefix_filtering_
? kPropTrue
: kPropFalse
});
242 // The name of the properties collector can be used for debugging purpose.
243 const char* Name() const override
{
244 return "BlockBasedTablePropertiesCollector";
247 UserCollectedProperties
GetReadableProperties() const override
{
248 // Intentionally left blank.
249 return UserCollectedProperties();
253 BlockBasedTableOptions::IndexType index_type_
;
254 bool whole_key_filtering_
;
255 bool prefix_filtering_
;
258 struct BlockBasedTableBuilder::Rep
{
259 const ImmutableOptions ioptions
;
260 const MutableCFOptions moptions
;
261 const BlockBasedTableOptions table_options
;
262 const InternalKeyComparator
& internal_comparator
;
263 WritableFileWriter
* file
;
264 std::atomic
<uint64_t> offset
;
266 BlockBuilder data_block
;
267 // Buffers uncompressed data blocks to replay later. Needed when
268 // compression dictionary is enabled so we can finalize the dictionary before
269 // compressing any data blocks.
270 std::vector
<std::string
> data_block_buffers
;
271 BlockBuilder range_del_block
;
273 InternalKeySliceTransform internal_prefix_transform
;
274 std::unique_ptr
<IndexBuilder
> index_builder
;
275 PartitionedIndexBuilder
* p_index_builder_
= nullptr;
277 std::string last_key
;
278 const Slice
* first_key_in_next_block
= nullptr;
279 CompressionType compression_type
;
280 uint64_t sample_for_compression
;
281 std::atomic
<uint64_t> compressible_input_data_bytes
;
282 std::atomic
<uint64_t> uncompressible_input_data_bytes
;
283 std::atomic
<uint64_t> sampled_input_data_bytes
;
284 std::atomic
<uint64_t> sampled_output_slow_data_bytes
;
285 std::atomic
<uint64_t> sampled_output_fast_data_bytes
;
286 CompressionOptions compression_opts
;
287 std::unique_ptr
<CompressionDict
> compression_dict
;
288 std::vector
<std::unique_ptr
<CompressionContext
>> compression_ctxs
;
289 std::vector
<std::unique_ptr
<UncompressionContext
>> verify_ctxs
;
290 std::unique_ptr
<UncompressionDict
> verify_dict
;
292 size_t data_begin_offset
= 0;
294 TableProperties props
;
296 // States of the builder.
298 // - `kBuffered`: This is the initial state where zero or more data blocks are
299 // accumulated uncompressed in-memory. From this state, call
300 // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
301 // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
304 // - `kUnbuffered`: This is the state when compression dictionary is finalized
305 // either because it wasn't enabled in the first place or it's been created
306 // from sampling previously buffered data. In this state, blocks are simply
307 // compressed/written out as they fill up. From this state, call `Finish()`
308 // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
309 // the partially created file.
311 // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
312 // called, so the table builder is no longer usable. We must be in this
313 // state by the time the destructor runs.
320 // `kBuffered` state is allowed only as long as the buffering of uncompressed
321 // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
322 uint64_t buffer_limit
;
323 std::shared_ptr
<CacheReservationManager
>
324 compression_dict_buffer_cache_res_mgr
;
325 const bool use_delta_encoding_for_index_values
;
326 std::unique_ptr
<FilterBlockBuilder
> filter_builder
;
327 OffsetableCacheKey base_cache_key
;
328 const TableFileCreationReason reason
;
330 BlockHandle pending_handle
; // Handle to add to index block
332 std::string compressed_output
;
333 std::unique_ptr
<FlushBlockPolicy
> flush_block_policy
;
335 std::vector
<std::unique_ptr
<IntTblPropCollector
>> table_properties_collectors
;
337 std::unique_ptr
<ParallelCompressionRep
> pc_rep
;
339 uint64_t get_offset() { return offset
.load(std::memory_order_relaxed
); }
340 void set_offset(uint64_t o
) { offset
.store(o
, std::memory_order_relaxed
); }
342 bool IsParallelCompressionEnabled() const {
343 return compression_opts
.parallel_threads
> 1;
347 // We need to make modifications of status visible when status_ok is set
348 // to false, and this is ensured by status_mutex, so no special memory
349 // order for status_ok is required.
350 if (status_ok
.load(std::memory_order_relaxed
)) {
357 Status
CopyStatus() {
358 std::lock_guard
<std::mutex
> lock(status_mutex
);
362 IOStatus
GetIOStatus() {
363 // We need to make modifications of io_status visible when status_ok is set
364 // to false, and this is ensured by io_status_mutex, so no special memory
365 // order for io_status_ok is required.
366 if (io_status_ok
.load(std::memory_order_relaxed
)) {
367 return IOStatus::OK();
369 return CopyIOStatus();
373 IOStatus
CopyIOStatus() {
374 std::lock_guard
<std::mutex
> lock(io_status_mutex
);
378 // Never erase an existing status that is not OK.
379 void SetStatus(Status s
) {
380 if (!s
.ok() && status_ok
.load(std::memory_order_relaxed
)) {
381 // Locking is an overkill for non compression_opts.parallel_threads
382 // case but since it's unlikely that s is not OK, we take this cost
384 std::lock_guard
<std::mutex
> lock(status_mutex
);
386 status_ok
.store(false, std::memory_order_relaxed
);
390 // Never erase an existing I/O status that is not OK.
391 // Calling this will also SetStatus(ios)
392 void SetIOStatus(IOStatus ios
) {
393 if (!ios
.ok() && io_status_ok
.load(std::memory_order_relaxed
)) {
394 // Locking is an overkill for non compression_opts.parallel_threads
395 // case but since it's unlikely that s is not OK, we take this cost
397 std::lock_guard
<std::mutex
> lock(io_status_mutex
);
399 io_status_ok
.store(false, std::memory_order_relaxed
);
404 Rep(const BlockBasedTableOptions
& table_opt
, const TableBuilderOptions
& tbo
,
405 WritableFileWriter
* f
)
406 : ioptions(tbo
.ioptions
),
407 moptions(tbo
.moptions
),
408 table_options(table_opt
),
409 internal_comparator(tbo
.internal_comparator
),
412 alignment(table_options
.block_align
413 ? std::min(static_cast<size_t>(table_options
.block_size
),
416 data_block(table_options
.block_restart_interval
,
417 table_options
.use_delta_encoding
,
418 false /* use_value_delta_encoding */,
419 tbo
.internal_comparator
.user_comparator()
420 ->CanKeysWithDifferentByteContentsBeEqual()
421 ? BlockBasedTableOptions::kDataBlockBinarySearch
422 : table_options
.data_block_index_type
,
423 table_options
.data_block_hash_table_util_ratio
),
424 range_del_block(1 /* block_restart_interval */),
425 internal_prefix_transform(tbo
.moptions
.prefix_extractor
.get()),
426 compression_type(tbo
.compression_type
),
427 sample_for_compression(tbo
.moptions
.sample_for_compression
),
428 compressible_input_data_bytes(0),
429 uncompressible_input_data_bytes(0),
430 sampled_input_data_bytes(0),
431 sampled_output_slow_data_bytes(0),
432 sampled_output_fast_data_bytes(0),
433 compression_opts(tbo
.compression_opts
),
435 compression_ctxs(tbo
.compression_opts
.parallel_threads
),
436 verify_ctxs(tbo
.compression_opts
.parallel_threads
),
438 state((tbo
.compression_opts
.max_dict_bytes
> 0) ? State::kBuffered
439 : State::kUnbuffered
),
440 use_delta_encoding_for_index_values(table_opt
.format_version
>= 4 &&
441 !table_opt
.block_align
),
444 table_options
.flush_block_policy_factory
->NewFlushBlockPolicy(
445 table_options
, data_block
)),
448 if (tbo
.target_file_size
== 0) {
449 buffer_limit
= compression_opts
.max_dict_buffer_bytes
;
450 } else if (compression_opts
.max_dict_buffer_bytes
== 0) {
451 buffer_limit
= tbo
.target_file_size
;
453 buffer_limit
= std::min(tbo
.target_file_size
,
454 compression_opts
.max_dict_buffer_bytes
);
457 const auto compress_dict_build_buffer_charged
=
458 table_options
.cache_usage_options
.options_overrides
459 .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer
)
461 if (table_options
.block_cache
&&
462 (compress_dict_build_buffer_charged
==
463 CacheEntryRoleOptions::Decision::kEnabled
||
464 compress_dict_build_buffer_charged
==
465 CacheEntryRoleOptions::Decision::kFallback
)) {
466 compression_dict_buffer_cache_res_mgr
=
467 std::make_shared
<CacheReservationManagerImpl
<
468 CacheEntryRole::kCompressionDictionaryBuildingBuffer
>>(
469 table_options
.block_cache
);
471 compression_dict_buffer_cache_res_mgr
= nullptr;
474 for (uint32_t i
= 0; i
< compression_opts
.parallel_threads
; i
++) {
475 compression_ctxs
[i
].reset(new CompressionContext(compression_type
));
477 if (table_options
.index_type
==
478 BlockBasedTableOptions::kTwoLevelIndexSearch
) {
479 p_index_builder_
= PartitionedIndexBuilder::CreateIndexBuilder(
480 &internal_comparator
, use_delta_encoding_for_index_values
,
482 index_builder
.reset(p_index_builder_
);
484 index_builder
.reset(IndexBuilder::CreateIndexBuilder(
485 table_options
.index_type
, &internal_comparator
,
486 &this->internal_prefix_transform
, use_delta_encoding_for_index_values
,
489 if (ioptions
.optimize_filters_for_hits
&& tbo
.is_bottommost
) {
490 // Apply optimize_filters_for_hits setting here when applicable by
491 // skipping filter generation
492 filter_builder
.reset();
493 } else if (tbo
.skip_filters
) {
494 // For SstFileWriter skip_filters
495 filter_builder
.reset();
496 } else if (!table_options
.filter_policy
) {
497 // Null filter_policy -> no filter
498 filter_builder
.reset();
500 FilterBuildingContext
filter_context(table_options
);
502 filter_context
.info_log
= ioptions
.logger
;
503 filter_context
.column_family_name
= tbo
.column_family_name
;
504 filter_context
.reason
= reason
;
506 // Only populate other fields if known to be in LSM rather than
507 // generating external SST file
508 if (reason
!= TableFileCreationReason::kMisc
) {
509 filter_context
.compaction_style
= ioptions
.compaction_style
;
510 filter_context
.num_levels
= ioptions
.num_levels
;
511 filter_context
.level_at_creation
= tbo
.level_at_creation
;
512 filter_context
.is_bottommost
= tbo
.is_bottommost
;
513 assert(filter_context
.level_at_creation
< filter_context
.num_levels
);
516 filter_builder
.reset(CreateFilterBlockBuilder(
517 ioptions
, moptions
, filter_context
,
518 use_delta_encoding_for_index_values
, p_index_builder_
));
521 assert(tbo
.int_tbl_prop_collector_factories
);
522 for (auto& factory
: *tbo
.int_tbl_prop_collector_factories
) {
525 table_properties_collectors
.emplace_back(
526 factory
->CreateIntTblPropCollector(tbo
.column_family_id
,
527 tbo
.level_at_creation
));
529 table_properties_collectors
.emplace_back(
530 new BlockBasedTablePropertiesCollector(
531 table_options
.index_type
, table_options
.whole_key_filtering
,
532 moptions
.prefix_extractor
!= nullptr));
533 const Comparator
* ucmp
= tbo
.internal_comparator
.user_comparator();
535 if (ucmp
->timestamp_size() > 0) {
536 table_properties_collectors
.emplace_back(
537 new TimestampTablePropertiesCollector(ucmp
));
539 if (table_options
.verify_compression
) {
540 for (uint32_t i
= 0; i
< compression_opts
.parallel_threads
; i
++) {
541 verify_ctxs
[i
].reset(new UncompressionContext(compression_type
));
545 // These are only needed for populating table properties
546 props
.column_family_id
= tbo
.column_family_id
;
547 props
.column_family_name
= tbo
.column_family_name
;
548 props
.oldest_key_time
= tbo
.oldest_key_time
;
549 props
.file_creation_time
= tbo
.file_creation_time
;
550 props
.orig_file_number
= tbo
.cur_file_num
;
551 props
.db_id
= tbo
.db_id
;
552 props
.db_session_id
= tbo
.db_session_id
;
553 props
.db_host_id
= ioptions
.db_host_id
;
554 if (!ReifyDbHostIdProperty(ioptions
.env
, &props
.db_host_id
).ok()) {
555 ROCKS_LOG_INFO(ioptions
.logger
, "db_host_id property will not be set");
559 Rep(const Rep
&) = delete;
560 Rep
& operator=(const Rep
&) = delete;
563 // Synchronize status & io_status accesses across threads from main thread,
564 // compression thread and write thread in parallel compression.
565 std::mutex status_mutex
;
566 std::atomic
<bool> status_ok
;
568 std::mutex io_status_mutex
;
569 std::atomic
<bool> io_status_ok
;
573 struct BlockBasedTableBuilder::ParallelCompressionRep
{
574 // Keys is a wrapper of vector of strings avoiding
575 // releasing string memories during vector clear()
576 // in order to save memory allocation overhead
579 Keys() : keys_(kKeysInitSize
), size_(0) {}
580 void PushBack(const Slice
& key
) {
581 if (size_
== keys_
.size()) {
582 keys_
.emplace_back(key
.data(), key
.size());
584 keys_
[size_
].assign(key
.data(), key
.size());
588 void SwapAssign(std::vector
<std::string
>& keys
) {
590 std::swap(keys_
, keys
);
592 void Clear() { size_
= 0; }
593 size_t Size() { return size_
; }
594 std::string
& Back() { return keys_
[size_
- 1]; }
595 std::string
& operator[](size_t idx
) {
601 const size_t kKeysInitSize
= 32;
602 std::vector
<std::string
> keys_
;
605 std::unique_ptr
<Keys
> curr_block_keys
;
609 // BlockRep instances are fetched from and recycled to
610 // block_rep_pool during parallel compression.
613 Slice compressed_contents
;
614 std::unique_ptr
<std::string
> data
;
615 std::unique_ptr
<std::string
> compressed_data
;
616 CompressionType compression_type
;
617 std::unique_ptr
<std::string
> first_key_in_next_block
;
618 std::unique_ptr
<Keys
> keys
;
619 std::unique_ptr
<BlockRepSlot
> slot
;
622 // Use a vector of BlockRep as a buffer for a determined number
623 // of BlockRep structures. All data referenced by pointers in
624 // BlockRep will be freed when this vector is destructed.
625 using BlockRepBuffer
= std::vector
<BlockRep
>;
626 BlockRepBuffer block_rep_buf
;
627 // Use a thread-safe queue for concurrent access from block
628 // building thread and writer thread.
629 using BlockRepPool
= WorkQueue
<BlockRep
*>;
630 BlockRepPool block_rep_pool
;
632 // Use BlockRepSlot to keep block order in write thread.
633 // slot_ will pass references to BlockRep
636 BlockRepSlot() : slot_(1) {}
637 template <typename T
>
639 slot_
.push(std::forward
<T
>(rep
));
641 void Take(BlockRep
*& rep
) { slot_
.pop(rep
); }
644 // slot_ will pass references to BlockRep in block_rep_buf,
645 // and those references are always valid before the destruction of
647 WorkQueue
<BlockRep
*> slot_
;
650 // Compression queue will pass references to BlockRep in block_rep_buf,
651 // and those references are always valid before the destruction of
653 using CompressQueue
= WorkQueue
<BlockRep
*>;
654 CompressQueue compress_queue
;
655 std::vector
<port::Thread
> compress_thread_pool
;
657 // Write queue will pass references to BlockRep::slot in block_rep_buf,
658 // and those references are always valid before the corresponding
659 // BlockRep::slot is destructed, which is before the destruction of
661 using WriteQueue
= WorkQueue
<BlockRepSlot
*>;
662 WriteQueue write_queue
;
663 std::unique_ptr
<port::Thread
> write_thread
;
665 // Estimate output file size when parallel compression is enabled. This is
666 // necessary because compression & flush are no longer synchronized,
667 // and BlockBasedTableBuilder::FileSize() is no longer accurate.
668 // memory_order_relaxed suffices because accurate statistics is not required.
669 class FileSizeEstimator
{
671 explicit FileSizeEstimator()
672 : uncomp_bytes_compressed(0),
673 uncomp_bytes_curr_block(0),
674 uncomp_bytes_curr_block_set(false),
675 uncomp_bytes_inflight(0),
677 curr_compression_ratio(0),
678 estimated_file_size(0) {}
680 // Estimate file size when a block is about to be emitted to
681 // compression thread
682 void EmitBlock(uint64_t uncomp_block_size
, uint64_t curr_file_size
) {
683 uint64_t new_uncomp_bytes_inflight
=
684 uncomp_bytes_inflight
.fetch_add(uncomp_block_size
,
685 std::memory_order_relaxed
) +
688 uint64_t new_blocks_inflight
=
689 blocks_inflight
.fetch_add(1, std::memory_order_relaxed
) + 1;
691 estimated_file_size
.store(
693 static_cast<uint64_t>(
694 static_cast<double>(new_uncomp_bytes_inflight
) *
695 curr_compression_ratio
.load(std::memory_order_relaxed
)) +
696 new_blocks_inflight
* kBlockTrailerSize
,
697 std::memory_order_relaxed
);
700 // Estimate file size when a block is already reaped from
701 // compression thread
702 void ReapBlock(uint64_t compressed_block_size
, uint64_t curr_file_size
) {
703 assert(uncomp_bytes_curr_block_set
);
705 uint64_t new_uncomp_bytes_compressed
=
706 uncomp_bytes_compressed
+ uncomp_bytes_curr_block
;
707 assert(new_uncomp_bytes_compressed
> 0);
709 curr_compression_ratio
.store(
710 (curr_compression_ratio
.load(std::memory_order_relaxed
) *
711 uncomp_bytes_compressed
+
712 compressed_block_size
) /
713 static_cast<double>(new_uncomp_bytes_compressed
),
714 std::memory_order_relaxed
);
715 uncomp_bytes_compressed
= new_uncomp_bytes_compressed
;
717 uint64_t new_uncomp_bytes_inflight
=
718 uncomp_bytes_inflight
.fetch_sub(uncomp_bytes_curr_block
,
719 std::memory_order_relaxed
) -
720 uncomp_bytes_curr_block
;
722 uint64_t new_blocks_inflight
=
723 blocks_inflight
.fetch_sub(1, std::memory_order_relaxed
) - 1;
725 estimated_file_size
.store(
727 static_cast<uint64_t>(
728 static_cast<double>(new_uncomp_bytes_inflight
) *
729 curr_compression_ratio
.load(std::memory_order_relaxed
)) +
730 new_blocks_inflight
* kBlockTrailerSize
,
731 std::memory_order_relaxed
);
733 uncomp_bytes_curr_block_set
= false;
736 void SetEstimatedFileSize(uint64_t size
) {
737 estimated_file_size
.store(size
, std::memory_order_relaxed
);
740 uint64_t GetEstimatedFileSize() {
741 return estimated_file_size
.load(std::memory_order_relaxed
);
744 void SetCurrBlockUncompSize(uint64_t size
) {
745 uncomp_bytes_curr_block
= size
;
746 uncomp_bytes_curr_block_set
= true;
750 // Input bytes compressed so far.
751 uint64_t uncomp_bytes_compressed
;
752 // Size of current block being appended.
753 uint64_t uncomp_bytes_curr_block
;
754 // Whether uncomp_bytes_curr_block has been set for next
756 bool uncomp_bytes_curr_block_set
;
757 // Input bytes under compression and not appended yet.
758 std::atomic
<uint64_t> uncomp_bytes_inflight
;
759 // Number of blocks under compression and not appended yet.
760 std::atomic
<uint64_t> blocks_inflight
;
761 // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock.
762 std::atomic
<double> curr_compression_ratio
;
763 // Estimated SST file size.
764 std::atomic
<uint64_t> estimated_file_size
;
766 FileSizeEstimator file_size_estimator
;
768 // Facilities used for waiting first block completion. Need to Wait for
769 // the completion of first block compression and flush to get a non-zero
770 // compression ratio.
771 std::atomic
<bool> first_block_processed
;
772 std::condition_variable first_block_cond
;
773 std::mutex first_block_mutex
;
775 explicit ParallelCompressionRep(uint32_t parallel_threads
)
776 : curr_block_keys(new Keys()),
777 block_rep_buf(parallel_threads
),
778 block_rep_pool(parallel_threads
),
779 compress_queue(parallel_threads
),
780 write_queue(parallel_threads
),
781 first_block_processed(false) {
782 for (uint32_t i
= 0; i
< parallel_threads
; i
++) {
783 block_rep_buf
[i
].contents
= Slice();
784 block_rep_buf
[i
].compressed_contents
= Slice();
785 block_rep_buf
[i
].data
.reset(new std::string());
786 block_rep_buf
[i
].compressed_data
.reset(new std::string());
787 block_rep_buf
[i
].compression_type
= CompressionType();
788 block_rep_buf
[i
].first_key_in_next_block
.reset(new std::string());
789 block_rep_buf
[i
].keys
.reset(new Keys());
790 block_rep_buf
[i
].slot
.reset(new BlockRepSlot());
791 block_rep_buf
[i
].status
= Status::OK();
792 block_rep_pool
.push(&block_rep_buf
[i
]);
796 ~ParallelCompressionRep() { block_rep_pool
.finish(); }
798 // Make a block prepared to be emitted to compression thread
799 // Used in non-buffered mode
800 BlockRep
* PrepareBlock(CompressionType compression_type
,
801 const Slice
* first_key_in_next_block
,
802 BlockBuilder
* data_block
) {
803 BlockRep
* block_rep
=
804 PrepareBlockInternal(compression_type
, first_key_in_next_block
);
805 assert(block_rep
!= nullptr);
806 data_block
->SwapAndReset(*(block_rep
->data
));
807 block_rep
->contents
= *(block_rep
->data
);
808 std::swap(block_rep
->keys
, curr_block_keys
);
809 curr_block_keys
->Clear();
813 // Used in EnterUnbuffered
814 BlockRep
* PrepareBlock(CompressionType compression_type
,
815 const Slice
* first_key_in_next_block
,
816 std::string
* data_block
,
817 std::vector
<std::string
>* keys
) {
818 BlockRep
* block_rep
=
819 PrepareBlockInternal(compression_type
, first_key_in_next_block
);
820 assert(block_rep
!= nullptr);
821 std::swap(*(block_rep
->data
), *data_block
);
822 block_rep
->contents
= *(block_rep
->data
);
823 block_rep
->keys
->SwapAssign(*keys
);
827 // Emit a block to compression thread
828 void EmitBlock(BlockRep
* block_rep
) {
829 assert(block_rep
!= nullptr);
830 assert(block_rep
->status
.ok());
831 if (!write_queue
.push(block_rep
->slot
.get())) {
834 if (!compress_queue
.push(block_rep
)) {
838 if (!first_block_processed
.load(std::memory_order_relaxed
)) {
839 std::unique_lock
<std::mutex
> lock(first_block_mutex
);
840 first_block_cond
.wait(lock
, [this] {
841 return first_block_processed
.load(std::memory_order_relaxed
);
846 // Reap a block from compression thread
847 void ReapBlock(BlockRep
* block_rep
) {
848 assert(block_rep
!= nullptr);
849 block_rep
->compressed_data
->clear();
850 block_rep_pool
.push(block_rep
);
852 if (!first_block_processed
.load(std::memory_order_relaxed
)) {
853 std::lock_guard
<std::mutex
> lock(first_block_mutex
);
854 first_block_processed
.store(true, std::memory_order_relaxed
);
855 first_block_cond
.notify_one();
860 BlockRep
* PrepareBlockInternal(CompressionType compression_type
,
861 const Slice
* first_key_in_next_block
) {
862 BlockRep
* block_rep
= nullptr;
863 block_rep_pool
.pop(block_rep
);
864 assert(block_rep
!= nullptr);
866 assert(block_rep
->data
);
868 block_rep
->compression_type
= compression_type
;
870 if (first_key_in_next_block
== nullptr) {
871 block_rep
->first_key_in_next_block
.reset(nullptr);
873 block_rep
->first_key_in_next_block
->assign(
874 first_key_in_next_block
->data(), first_key_in_next_block
->size());
881 BlockBasedTableBuilder::BlockBasedTableBuilder(
882 const BlockBasedTableOptions
& table_options
, const TableBuilderOptions
& tbo
,
883 WritableFileWriter
* file
) {
884 BlockBasedTableOptions
sanitized_table_options(table_options
);
885 if (sanitized_table_options
.format_version
== 0 &&
886 sanitized_table_options
.checksum
!= kCRC32c
) {
889 "Silently converting format_version to 1 because checksum is "
891 // silently convert format_version to 1 to keep consistent with current
893 sanitized_table_options
.format_version
= 1;
896 rep_
= new Rep(sanitized_table_options
, tbo
, file
);
898 TEST_SYNC_POINT_CALLBACK(
899 "BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
900 const_cast<TableProperties
*>(&rep_
->props
));
902 BlockBasedTable::SetupBaseCacheKey(&rep_
->props
, tbo
.db_session_id
,
903 tbo
.cur_file_num
, &rep_
->base_cache_key
);
905 if (rep_
->IsParallelCompressionEnabled()) {
906 StartParallelCompression();
910 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
911 // Catch errors where caller forgot to call Finish()
912 assert(rep_
->state
== Rep::State::kClosed
);
916 void BlockBasedTableBuilder::Add(const Slice
& key
, const Slice
& value
) {
918 assert(rep_
->state
!= Rep::State::kClosed
);
920 ValueType value_type
= ExtractValueType(key
);
921 if (IsValueType(value_type
)) {
923 if (r
->props
.num_entries
> r
->props
.num_range_deletions
) {
924 assert(r
->internal_comparator
.Compare(key
, Slice(r
->last_key
)) > 0);
928 auto should_flush
= r
->flush_block_policy
->Update(key
, value
);
930 assert(!r
->data_block
.empty());
931 r
->first_key_in_next_block
= &key
;
933 if (r
->state
== Rep::State::kBuffered
) {
934 bool exceeds_buffer_limit
=
935 (r
->buffer_limit
!= 0 && r
->data_begin_offset
> r
->buffer_limit
);
936 bool exceeds_global_block_cache_limit
= false;
938 // Increase cache charging for the last buffered data block
939 // only if the block is not going to be unbuffered immediately
940 // and there exists a cache reservation manager
941 if (!exceeds_buffer_limit
&&
942 r
->compression_dict_buffer_cache_res_mgr
!= nullptr) {
944 r
->compression_dict_buffer_cache_res_mgr
->UpdateCacheReservation(
945 r
->data_begin_offset
);
946 exceeds_global_block_cache_limit
= s
.IsMemoryLimit();
949 if (exceeds_buffer_limit
|| exceeds_global_block_cache_limit
) {
954 // Add item to index block.
955 // We do not emit the index entry for a block until we have seen the
956 // first key for the next data block. This allows us to use shorter
957 // keys in the index block. For example, consider a block boundary
958 // between the keys "the quick brown fox" and "the who". We can use
959 // "the r" as the key for the index block entry since it is >= all
960 // entries in the first block and < all entries in subsequent
962 if (ok() && r
->state
== Rep::State::kUnbuffered
) {
963 if (r
->IsParallelCompressionEnabled()) {
964 r
->pc_rep
->curr_block_keys
->Clear();
966 r
->index_builder
->AddIndexEntry(&r
->last_key
, &key
,
972 // Note: PartitionedFilterBlockBuilder requires key being added to filter
973 // builder after being added to index builder.
974 if (r
->state
== Rep::State::kUnbuffered
) {
975 if (r
->IsParallelCompressionEnabled()) {
976 r
->pc_rep
->curr_block_keys
->PushBack(key
);
978 if (r
->filter_builder
!= nullptr) {
980 r
->internal_comparator
.user_comparator()->timestamp_size();
981 r
->filter_builder
->Add(ExtractUserKeyAndStripTimestamp(key
, ts_sz
));
986 r
->data_block
.AddWithLastKey(key
, value
, r
->last_key
);
987 r
->last_key
.assign(key
.data(), key
.size());
988 if (r
->state
== Rep::State::kBuffered
) {
989 // Buffered keys will be replayed from data_block_buffers during
990 // `Finish()` once compression dictionary has been finalized.
992 if (!r
->IsParallelCompressionEnabled()) {
993 r
->index_builder
->OnKeyAdded(key
);
996 // TODO offset passed in is not accurate for parallel compression case
997 NotifyCollectTableCollectorsOnAdd(key
, value
, r
->get_offset(),
998 r
->table_properties_collectors
,
1001 } else if (value_type
== kTypeRangeDeletion
) {
1002 r
->range_del_block
.Add(key
, value
);
1003 // TODO offset passed in is not accurate for parallel compression case
1004 NotifyCollectTableCollectorsOnAdd(key
, value
, r
->get_offset(),
1005 r
->table_properties_collectors
,
1006 r
->ioptions
.logger
);
1011 r
->props
.num_entries
++;
1012 r
->props
.raw_key_size
+= key
.size();
1013 r
->props
.raw_value_size
+= value
.size();
1014 if (value_type
== kTypeDeletion
|| value_type
== kTypeSingleDeletion
||
1015 value_type
== kTypeDeletionWithTimestamp
) {
1016 r
->props
.num_deletions
++;
1017 } else if (value_type
== kTypeRangeDeletion
) {
1018 r
->props
.num_deletions
++;
1019 r
->props
.num_range_deletions
++;
1020 } else if (value_type
== kTypeMerge
) {
1021 r
->props
.num_merge_operands
++;
1025 void BlockBasedTableBuilder::Flush() {
1027 assert(rep_
->state
!= Rep::State::kClosed
);
1029 if (r
->data_block
.empty()) return;
1030 if (r
->IsParallelCompressionEnabled() &&
1031 r
->state
== Rep::State::kUnbuffered
) {
1032 r
->data_block
.Finish();
1033 ParallelCompressionRep::BlockRep
* block_rep
= r
->pc_rep
->PrepareBlock(
1034 r
->compression_type
, r
->first_key_in_next_block
, &(r
->data_block
));
1035 assert(block_rep
!= nullptr);
1036 r
->pc_rep
->file_size_estimator
.EmitBlock(block_rep
->data
->size(),
1038 r
->pc_rep
->EmitBlock(block_rep
);
1040 WriteBlock(&r
->data_block
, &r
->pending_handle
, BlockType::kData
);
1044 void BlockBasedTableBuilder::WriteBlock(BlockBuilder
* block
,
1045 BlockHandle
* handle
,
1046 BlockType block_type
) {
1048 std::string uncompressed_block_data
;
1049 uncompressed_block_data
.reserve(rep_
->table_options
.block_size
);
1050 block
->SwapAndReset(uncompressed_block_data
);
1051 if (rep_
->state
== Rep::State::kBuffered
) {
1052 assert(block_type
== BlockType::kData
);
1053 rep_
->data_block_buffers
.emplace_back(std::move(uncompressed_block_data
));
1054 rep_
->data_begin_offset
+= rep_
->data_block_buffers
.back().size();
1057 WriteBlock(uncompressed_block_data
, handle
, block_type
);
1060 void BlockBasedTableBuilder::WriteBlock(const Slice
& uncompressed_block_data
,
1061 BlockHandle
* handle
,
1062 BlockType block_type
) {
1064 assert(r
->state
== Rep::State::kUnbuffered
);
1065 Slice block_contents
;
1066 CompressionType type
;
1067 Status compress_status
;
1068 bool is_data_block
= block_type
== BlockType::kData
;
1069 CompressAndVerifyBlock(uncompressed_block_data
, is_data_block
,
1070 *(r
->compression_ctxs
[0]), r
->verify_ctxs
[0].get(),
1071 &(r
->compressed_output
), &(block_contents
), &type
,
1073 r
->SetStatus(compress_status
);
1078 WriteMaybeCompressedBlock(block_contents
, type
, handle
, block_type
,
1079 &uncompressed_block_data
);
1080 r
->compressed_output
.clear();
1081 if (is_data_block
) {
1082 r
->props
.data_size
= r
->get_offset();
1083 ++r
->props
.num_data_blocks
;
1087 void BlockBasedTableBuilder::BGWorkCompression(
1088 const CompressionContext
& compression_ctx
,
1089 UncompressionContext
* verify_ctx
) {
1090 ParallelCompressionRep::BlockRep
* block_rep
= nullptr;
1091 while (rep_
->pc_rep
->compress_queue
.pop(block_rep
)) {
1092 assert(block_rep
!= nullptr);
1093 CompressAndVerifyBlock(block_rep
->contents
, true, /* is_data_block*/
1094 compression_ctx
, verify_ctx
,
1095 block_rep
->compressed_data
.get(),
1096 &block_rep
->compressed_contents
,
1097 &(block_rep
->compression_type
), &block_rep
->status
);
1098 block_rep
->slot
->Fill(block_rep
);
1102 void BlockBasedTableBuilder::CompressAndVerifyBlock(
1103 const Slice
& uncompressed_block_data
, bool is_data_block
,
1104 const CompressionContext
& compression_ctx
, UncompressionContext
* verify_ctx
,
1105 std::string
* compressed_output
, Slice
* block_contents
,
1106 CompressionType
* type
, Status
* out_status
) {
1107 // File format contains a sequence of blocks where each block has:
1108 // block_data: uint8[n]
1112 bool is_status_ok
= ok();
1113 if (!r
->IsParallelCompressionEnabled()) {
1114 assert(is_status_ok
);
1117 *type
= r
->compression_type
;
1118 uint64_t sample_for_compression
= r
->sample_for_compression
;
1119 bool abort_compression
= false;
1121 StopWatchNano
timer(
1123 ShouldReportDetailedTime(r
->ioptions
.env
, r
->ioptions
.stats
));
1125 if (is_status_ok
&& uncompressed_block_data
.size() < kCompressionSizeLimit
) {
1126 if (is_data_block
) {
1127 r
->compressible_input_data_bytes
.fetch_add(uncompressed_block_data
.size(),
1128 std::memory_order_relaxed
);
1130 const CompressionDict
* compression_dict
;
1131 if (!is_data_block
|| r
->compression_dict
== nullptr) {
1132 compression_dict
= &CompressionDict::GetEmptyDict();
1134 compression_dict
= r
->compression_dict
.get();
1136 assert(compression_dict
!= nullptr);
1137 CompressionInfo
compression_info(r
->compression_opts
, compression_ctx
,
1138 *compression_dict
, *type
,
1139 sample_for_compression
);
1141 std::string sampled_output_fast
;
1142 std::string sampled_output_slow
;
1143 *block_contents
= CompressBlock(
1144 uncompressed_block_data
, compression_info
, type
,
1145 r
->table_options
.format_version
, is_data_block
/* do_sample */,
1146 compressed_output
, &sampled_output_fast
, &sampled_output_slow
);
1148 if (sampled_output_slow
.size() > 0 || sampled_output_fast
.size() > 0) {
1149 // Currently compression sampling is only enabled for data block.
1150 assert(is_data_block
);
1151 r
->sampled_input_data_bytes
.fetch_add(uncompressed_block_data
.size(),
1152 std::memory_order_relaxed
);
1153 r
->sampled_output_slow_data_bytes
.fetch_add(sampled_output_slow
.size(),
1154 std::memory_order_relaxed
);
1155 r
->sampled_output_fast_data_bytes
.fetch_add(sampled_output_fast
.size(),
1156 std::memory_order_relaxed
);
1158 // notify collectors on block add
1159 NotifyCollectTableCollectorsOnBlockAdd(
1160 r
->table_properties_collectors
, uncompressed_block_data
.size(),
1161 sampled_output_fast
.size(), sampled_output_slow
.size());
1163 // Some of the compression algorithms are known to be unreliable. If
1164 // the verify_compression flag is set then try to de-compress the
1165 // compressed data and compare to the input.
1166 if (*type
!= kNoCompression
&& r
->table_options
.verify_compression
) {
1167 // Retrieve the uncompressed contents into a new buffer
1168 const UncompressionDict
* verify_dict
;
1169 if (!is_data_block
|| r
->verify_dict
== nullptr) {
1170 verify_dict
= &UncompressionDict::GetEmptyDict();
1172 verify_dict
= r
->verify_dict
.get();
1174 assert(verify_dict
!= nullptr);
1175 BlockContents contents
;
1176 UncompressionInfo
uncompression_info(*verify_ctx
, *verify_dict
,
1177 r
->compression_type
);
1178 Status stat
= UncompressBlockData(
1179 uncompression_info
, block_contents
->data(), block_contents
->size(),
1180 &contents
, r
->table_options
.format_version
, r
->ioptions
);
1183 bool compressed_ok
=
1184 contents
.data
.compare(uncompressed_block_data
) == 0;
1185 if (!compressed_ok
) {
1186 // The result of the compression was invalid. abort.
1187 abort_compression
= true;
1188 const char* const msg
=
1189 "Decompressed block did not match pre-compression block";
1190 ROCKS_LOG_ERROR(r
->ioptions
.logger
, "%s", msg
);
1191 *out_status
= Status::Corruption(msg
);
1194 // Decompression reported an error. abort.
1195 *out_status
= Status::Corruption(std::string("Could not decompress: ") +
1197 abort_compression
= true;
1201 // Block is too big to be compressed.
1202 if (is_data_block
) {
1203 r
->uncompressible_input_data_bytes
.fetch_add(
1204 uncompressed_block_data
.size(), std::memory_order_relaxed
);
1206 abort_compression
= true;
1208 if (is_data_block
) {
1209 r
->uncompressible_input_data_bytes
.fetch_add(kBlockTrailerSize
,
1210 std::memory_order_relaxed
);
1213 // Abort compression if the block is too big, or did not pass
1215 if (abort_compression
) {
1216 RecordTick(r
->ioptions
.stats
, NUMBER_BLOCK_NOT_COMPRESSED
);
1217 *type
= kNoCompression
;
1218 *block_contents
= uncompressed_block_data
;
1219 } else if (*type
!= kNoCompression
) {
1220 if (ShouldReportDetailedTime(r
->ioptions
.env
, r
->ioptions
.stats
)) {
1221 RecordTimeToHistogram(r
->ioptions
.stats
, COMPRESSION_TIMES_NANOS
,
1222 timer
.ElapsedNanos());
1224 RecordInHistogram(r
->ioptions
.stats
, BYTES_COMPRESSED
,
1225 uncompressed_block_data
.size());
1226 RecordTick(r
->ioptions
.stats
, NUMBER_BLOCK_COMPRESSED
);
1227 } else if (*type
!= r
->compression_type
) {
1228 RecordTick(r
->ioptions
.stats
, NUMBER_BLOCK_NOT_COMPRESSED
);
1232 void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
1233 const Slice
& block_contents
, CompressionType type
, BlockHandle
* handle
,
1234 BlockType block_type
, const Slice
* uncompressed_block_data
) {
1236 bool is_data_block
= block_type
== BlockType::kData
;
1237 // Old, misleading name of this function: WriteRawBlock
1238 StopWatch
sw(r
->ioptions
.clock
, r
->ioptions
.stats
, WRITE_RAW_BLOCK_MICROS
);
1239 handle
->set_offset(r
->get_offset());
1240 handle
->set_size(block_contents
.size());
1241 assert(status().ok());
1242 assert(io_status().ok());
1245 IOStatus io_s
= r
->file
->Append(block_contents
);
1247 r
->SetIOStatus(io_s
);
1252 std::array
<char, kBlockTrailerSize
> trailer
;
1254 uint32_t checksum
= ComputeBuiltinChecksumWithLastByte(
1255 r
->table_options
.checksum
, block_contents
.data(), block_contents
.size(),
1256 /*last_byte*/ type
);
1258 if (block_type
== BlockType::kFilter
) {
1259 Status s
= r
->filter_builder
->MaybePostVerifyFilter(block_contents
);
1266 EncodeFixed32(trailer
.data() + 1, checksum
);
1267 TEST_SYNC_POINT_CALLBACK(
1268 "BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
1271 IOStatus io_s
= r
->file
->Append(Slice(trailer
.data(), trailer
.size()));
1273 r
->SetIOStatus(io_s
);
1279 Status s
= Status::OK();
1281 switch (r
->table_options
.prepopulate_block_cache
) {
1282 case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly
:
1283 warm_cache
= (r
->reason
== TableFileCreationReason::kFlush
);
1285 case BlockBasedTableOptions::PrepopulateBlockCache::kDisable
:
1294 if (type
== kNoCompression
) {
1295 s
= InsertBlockInCacheHelper(block_contents
, handle
, block_type
);
1296 } else if (uncompressed_block_data
!= nullptr) {
1297 s
= InsertBlockInCacheHelper(*uncompressed_block_data
, handle
,
1305 s
= InsertBlockInCompressedCache(block_contents
, type
, handle
);
1312 r
->set_offset(r
->get_offset() + block_contents
.size() + kBlockTrailerSize
);
1313 if (r
->table_options
.block_align
&& is_data_block
) {
1316 ((block_contents
.size() + kBlockTrailerSize
) & (r
->alignment
- 1))) &
1318 IOStatus io_s
= r
->file
->Pad(pad_bytes
);
1320 r
->set_offset(r
->get_offset() + pad_bytes
);
1322 r
->SetIOStatus(io_s
);
1327 if (r
->IsParallelCompressionEnabled()) {
1328 if (is_data_block
) {
1329 r
->pc_rep
->file_size_estimator
.ReapBlock(block_contents
.size(),
1332 r
->pc_rep
->file_size_estimator
.SetEstimatedFileSize(r
->get_offset());
1337 void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() {
1339 ParallelCompressionRep::BlockRepSlot
* slot
= nullptr;
1340 ParallelCompressionRep::BlockRep
* block_rep
= nullptr;
1341 while (r
->pc_rep
->write_queue
.pop(slot
)) {
1342 assert(slot
!= nullptr);
1343 slot
->Take(block_rep
);
1344 assert(block_rep
!= nullptr);
1345 if (!block_rep
->status
.ok()) {
1346 r
->SetStatus(block_rep
->status
);
1347 // Reap block so that blocked Flush() can finish
1348 // if there is one, and Flush() will notice !ok() next time.
1349 block_rep
->status
= Status::OK();
1350 r
->pc_rep
->ReapBlock(block_rep
);
1354 for (size_t i
= 0; i
< block_rep
->keys
->Size(); i
++) {
1355 auto& key
= (*block_rep
->keys
)[i
];
1356 if (r
->filter_builder
!= nullptr) {
1358 r
->internal_comparator
.user_comparator()->timestamp_size();
1359 r
->filter_builder
->Add(ExtractUserKeyAndStripTimestamp(key
, ts_sz
));
1361 r
->index_builder
->OnKeyAdded(key
);
1364 r
->pc_rep
->file_size_estimator
.SetCurrBlockUncompSize(
1365 block_rep
->data
->size());
1366 WriteMaybeCompressedBlock(block_rep
->compressed_contents
,
1367 block_rep
->compression_type
, &r
->pending_handle
,
1368 BlockType::kData
, &block_rep
->contents
);
1373 r
->props
.data_size
= r
->get_offset();
1374 ++r
->props
.num_data_blocks
;
1376 if (block_rep
->first_key_in_next_block
== nullptr) {
1377 r
->index_builder
->AddIndexEntry(&(block_rep
->keys
->Back()), nullptr,
1380 Slice first_key_in_next_block
=
1381 Slice(*block_rep
->first_key_in_next_block
);
1382 r
->index_builder
->AddIndexEntry(&(block_rep
->keys
->Back()),
1383 &first_key_in_next_block
,
1387 r
->pc_rep
->ReapBlock(block_rep
);
1391 void BlockBasedTableBuilder::StartParallelCompression() {
1393 new ParallelCompressionRep(rep_
->compression_opts
.parallel_threads
));
1394 rep_
->pc_rep
->compress_thread_pool
.reserve(
1395 rep_
->compression_opts
.parallel_threads
);
1396 for (uint32_t i
= 0; i
< rep_
->compression_opts
.parallel_threads
; i
++) {
1397 rep_
->pc_rep
->compress_thread_pool
.emplace_back([this, i
] {
1398 BGWorkCompression(*(rep_
->compression_ctxs
[i
]),
1399 rep_
->verify_ctxs
[i
].get());
1402 rep_
->pc_rep
->write_thread
.reset(
1403 new port::Thread([this] { BGWorkWriteMaybeCompressedBlock(); }));
1406 void BlockBasedTableBuilder::StopParallelCompression() {
1407 rep_
->pc_rep
->compress_queue
.finish();
1408 for (auto& thread
: rep_
->pc_rep
->compress_thread_pool
) {
1411 rep_
->pc_rep
->write_queue
.finish();
1412 rep_
->pc_rep
->write_thread
->join();
1415 Status
BlockBasedTableBuilder::status() const { return rep_
->GetStatus(); }
1417 IOStatus
BlockBasedTableBuilder::io_status() const {
1418 return rep_
->GetIOStatus();
1422 // Make a copy of the block contents and insert into compressed block cache
1424 Status
BlockBasedTableBuilder::InsertBlockInCompressedCache(
1425 const Slice
& block_contents
, const CompressionType type
,
1426 const BlockHandle
* handle
) {
1428 Cache
* block_cache_compressed
= r
->table_options
.block_cache_compressed
.get();
1430 if (type
!= kNoCompression
&& block_cache_compressed
!= nullptr) {
1431 size_t size
= block_contents
.size();
1434 AllocateBlock(size
+ 1, block_cache_compressed
->memory_allocator());
1435 memcpy(ubuf
.get(), block_contents
.data(), size
);
1438 BlockContents
* block_contents_to_cache
=
1439 new BlockContents(std::move(ubuf
), size
);
1441 block_contents_to_cache
->has_trailer
= true;
1444 CacheKey key
= BlockBasedTable::GetCacheKey(rep_
->base_cache_key
, *handle
);
1446 s
= block_cache_compressed
->Insert(
1447 key
.AsSlice(), block_contents_to_cache
,
1448 block_contents_to_cache
->ApproximateMemoryUsage(),
1449 &DeleteCacheEntry
<BlockContents
>);
1451 RecordTick(rep_
->ioptions
.stats
, BLOCK_CACHE_COMPRESSED_ADD
);
1453 RecordTick(rep_
->ioptions
.stats
, BLOCK_CACHE_COMPRESSED_ADD_FAILURES
);
1455 // Invalidate OS cache.
1456 r
->file
->InvalidateCache(static_cast<size_t>(r
->get_offset()), size
)
1457 .PermitUncheckedError();
1462 Status
BlockBasedTableBuilder::InsertBlockInCacheHelper(
1463 const Slice
& block_contents
, const BlockHandle
* handle
,
1464 BlockType block_type
) {
1466 switch (block_type
) {
1467 case BlockType::kData
:
1468 case BlockType::kIndex
:
1469 case BlockType::kFilterPartitionIndex
:
1470 s
= InsertBlockInCache
<Block
>(block_contents
, handle
, block_type
);
1472 case BlockType::kFilter
:
1473 s
= InsertBlockInCache
<ParsedFullFilterBlock
>(block_contents
, handle
,
1476 case BlockType::kCompressionDictionary
:
1477 s
= InsertBlockInCache
<UncompressionDict
>(block_contents
, handle
,
1481 // no-op / not cached
1487 template <typename TBlocklike
>
1488 Status
BlockBasedTableBuilder::InsertBlockInCache(const Slice
& block_contents
,
1489 const BlockHandle
* handle
,
1490 BlockType block_type
) {
1491 // Uncompressed regular block cache
1492 Cache
* block_cache
= rep_
->table_options
.block_cache
.get();
1494 if (block_cache
!= nullptr) {
1495 size_t size
= block_contents
.size();
1496 auto buf
= AllocateBlock(size
, block_cache
->memory_allocator());
1497 memcpy(buf
.get(), block_contents
.data(), size
);
1498 BlockContents
results(std::move(buf
), size
);
1500 CacheKey key
= BlockBasedTable::GetCacheKey(rep_
->base_cache_key
, *handle
);
1502 const size_t read_amp_bytes_per_bit
=
1503 rep_
->table_options
.read_amp_bytes_per_bit
;
1505 // TODO akanksha:: Dedup below code by calling
1506 // BlockBasedTable::PutDataBlockToCache.
1507 std::unique_ptr
<TBlocklike
> block_holder(
1508 BlocklikeTraits
<TBlocklike
>::Create(
1509 std::move(results
), read_amp_bytes_per_bit
,
1510 rep_
->ioptions
.statistics
.get(),
1511 false /*rep_->blocks_definitely_zstd_compressed*/,
1512 rep_
->table_options
.filter_policy
.get()));
1514 assert(block_holder
->own_bytes());
1515 size_t charge
= block_holder
->ApproximateMemoryUsage();
1516 s
= block_cache
->Insert(
1517 key
.AsSlice(), block_holder
.get(),
1518 BlocklikeTraits
<TBlocklike
>::GetCacheItemHelper(block_type
), charge
,
1519 nullptr, Cache::Priority::LOW
);
1522 // Release ownership of block_holder.
1523 block_holder
.release();
1524 BlockBasedTable::UpdateCacheInsertionMetrics(
1525 block_type
, nullptr /*get_context*/, charge
, s
.IsOkOverwritten(),
1526 rep_
->ioptions
.stats
);
1528 RecordTick(rep_
->ioptions
.stats
, BLOCK_CACHE_ADD_FAILURES
);
1534 void BlockBasedTableBuilder::WriteFilterBlock(
1535 MetaIndexBuilder
* meta_index_builder
) {
1536 if (rep_
->filter_builder
== nullptr || rep_
->filter_builder
->IsEmpty()) {
1537 // No filter block needed
1540 BlockHandle filter_block_handle
;
1541 bool is_partitioned_filter
= rep_
->table_options
.partition_filters
;
1543 rep_
->props
.num_filter_entries
+=
1544 rep_
->filter_builder
->EstimateEntriesAdded();
1545 Status s
= Status::Incomplete();
1546 while (ok() && s
.IsIncomplete()) {
1547 // filter_data is used to store the transferred filter data payload from
1548 // FilterBlockBuilder and deallocate the payload by going out of scope.
1549 // Otherwise, the payload will unnecessarily remain until
1550 // BlockBasedTableBuilder is deallocated.
1552 // See FilterBlockBuilder::Finish() for more on the difference in
1553 // transferred filter data payload among different FilterBlockBuilder
1555 std::unique_ptr
<const char[]> filter_data
;
1556 Slice filter_content
=
1557 rep_
->filter_builder
->Finish(filter_block_handle
, &s
, &filter_data
);
1559 assert(s
.ok() || s
.IsIncomplete() || s
.IsCorruption());
1560 if (s
.IsCorruption()) {
1565 rep_
->props
.filter_size
+= filter_content
.size();
1567 BlockType btype
= is_partitioned_filter
&& /* last */ s
.ok()
1568 ? BlockType::kFilterPartitionIndex
1569 : BlockType::kFilter
;
1570 WriteMaybeCompressedBlock(filter_content
, kNoCompression
,
1571 &filter_block_handle
, btype
);
1573 rep_
->filter_builder
->ResetFilterBitsBuilder();
1576 // Add mapping from "<filter_block_prefix>.Name" to location
1579 key
= is_partitioned_filter
? BlockBasedTable::kPartitionedFilterBlockPrefix
1580 : BlockBasedTable::kFullFilterBlockPrefix
;
1581 key
.append(rep_
->table_options
.filter_policy
->CompatibilityName());
1582 meta_index_builder
->Add(key
, filter_block_handle
);
1586 void BlockBasedTableBuilder::WriteIndexBlock(
1587 MetaIndexBuilder
* meta_index_builder
, BlockHandle
* index_block_handle
) {
1591 IndexBuilder::IndexBlocks index_blocks
;
1592 auto index_builder_status
= rep_
->index_builder
->Finish(&index_blocks
);
1593 if (index_builder_status
.IsIncomplete()) {
1594 // We we have more than one index partition then meta_blocks are not
1595 // supported for the index. Currently meta_blocks are used only by
1596 // HashIndexBuilder which is not multi-partition.
1597 assert(index_blocks
.meta_blocks
.empty());
1598 } else if (ok() && !index_builder_status
.ok()) {
1599 rep_
->SetStatus(index_builder_status
);
1602 for (const auto& item
: index_blocks
.meta_blocks
) {
1603 BlockHandle block_handle
;
1604 WriteBlock(item
.second
, &block_handle
, BlockType::kIndex
);
1608 meta_index_builder
->Add(item
.first
, block_handle
);
1612 if (rep_
->table_options
.enable_index_compression
) {
1613 WriteBlock(index_blocks
.index_block_contents
, index_block_handle
,
1616 WriteMaybeCompressedBlock(index_blocks
.index_block_contents
,
1617 kNoCompression
, index_block_handle
,
1621 // If there are more index partitions, finish them and write them out
1622 if (index_builder_status
.IsIncomplete()) {
1623 bool index_building_finished
= false;
1624 while (ok() && !index_building_finished
) {
1626 rep_
->index_builder
->Finish(&index_blocks
, *index_block_handle
);
1628 index_building_finished
= true;
1629 } else if (s
.IsIncomplete()) {
1630 // More partitioned index after this one
1631 assert(!index_building_finished
);
1638 if (rep_
->table_options
.enable_index_compression
) {
1639 WriteBlock(index_blocks
.index_block_contents
, index_block_handle
,
1642 WriteMaybeCompressedBlock(index_blocks
.index_block_contents
,
1643 kNoCompression
, index_block_handle
,
1646 // The last index_block_handle will be for the partition index block
1651 void BlockBasedTableBuilder::WritePropertiesBlock(
1652 MetaIndexBuilder
* meta_index_builder
) {
1653 BlockHandle properties_block_handle
;
1655 PropertyBlockBuilder property_block_builder
;
1656 rep_
->props
.filter_policy_name
=
1657 rep_
->table_options
.filter_policy
!= nullptr
1658 ? rep_
->table_options
.filter_policy
->Name()
1660 rep_
->props
.index_size
=
1661 rep_
->index_builder
->IndexSize() + kBlockTrailerSize
;
1662 rep_
->props
.comparator_name
= rep_
->ioptions
.user_comparator
!= nullptr
1663 ? rep_
->ioptions
.user_comparator
->Name()
1665 rep_
->props
.merge_operator_name
=
1666 rep_
->ioptions
.merge_operator
!= nullptr
1667 ? rep_
->ioptions
.merge_operator
->Name()
1669 rep_
->props
.compression_name
=
1670 CompressionTypeToString(rep_
->compression_type
);
1671 rep_
->props
.compression_options
=
1672 CompressionOptionsToString(rep_
->compression_opts
);
1673 rep_
->props
.prefix_extractor_name
=
1674 rep_
->moptions
.prefix_extractor
!= nullptr
1675 ? rep_
->moptions
.prefix_extractor
->AsString()
1677 std::string property_collectors_names
= "[";
1679 i
< rep_
->ioptions
.table_properties_collector_factories
.size(); ++i
) {
1681 property_collectors_names
+= ",";
1683 property_collectors_names
+=
1684 rep_
->ioptions
.table_properties_collector_factories
[i
]->Name();
1686 property_collectors_names
+= "]";
1687 rep_
->props
.property_collectors_names
= property_collectors_names
;
1688 if (rep_
->table_options
.index_type
==
1689 BlockBasedTableOptions::kTwoLevelIndexSearch
) {
1690 assert(rep_
->p_index_builder_
!= nullptr);
1691 rep_
->props
.index_partitions
= rep_
->p_index_builder_
->NumPartitions();
1692 rep_
->props
.top_level_index_size
=
1693 rep_
->p_index_builder_
->TopLevelIndexSize(rep_
->offset
);
1695 rep_
->props
.index_key_is_user_key
=
1696 !rep_
->index_builder
->seperator_is_key_plus_seq();
1697 rep_
->props
.index_value_is_delta_encoded
=
1698 rep_
->use_delta_encoding_for_index_values
;
1699 if (rep_
->sampled_input_data_bytes
> 0) {
1700 rep_
->props
.slow_compression_estimated_data_size
= static_cast<uint64_t>(
1701 static_cast<double>(rep_
->sampled_output_slow_data_bytes
) /
1702 rep_
->sampled_input_data_bytes
*
1703 rep_
->compressible_input_data_bytes
+
1704 rep_
->uncompressible_input_data_bytes
+ 0.5);
1705 rep_
->props
.fast_compression_estimated_data_size
= static_cast<uint64_t>(
1706 static_cast<double>(rep_
->sampled_output_fast_data_bytes
) /
1707 rep_
->sampled_input_data_bytes
*
1708 rep_
->compressible_input_data_bytes
+
1709 rep_
->uncompressible_input_data_bytes
+ 0.5);
1710 } else if (rep_
->sample_for_compression
> 0) {
1711 // We tried to sample but none were found. Assume worst-case (compression
1712 // ratio 1.0) so data is complete and aggregatable.
1713 rep_
->props
.slow_compression_estimated_data_size
=
1714 rep_
->compressible_input_data_bytes
+
1715 rep_
->uncompressible_input_data_bytes
;
1716 rep_
->props
.fast_compression_estimated_data_size
=
1717 rep_
->compressible_input_data_bytes
+
1718 rep_
->uncompressible_input_data_bytes
;
1721 // Add basic properties
1722 property_block_builder
.AddTableProperty(rep_
->props
);
1724 // Add use collected properties
1725 NotifyCollectTableCollectorsOnFinish(rep_
->table_properties_collectors
,
1726 rep_
->ioptions
.logger
,
1727 &property_block_builder
);
1729 Slice block_data
= property_block_builder
.Finish();
1730 TEST_SYNC_POINT_CALLBACK(
1731 "BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data
);
1732 WriteMaybeCompressedBlock(block_data
, kNoCompression
,
1733 &properties_block_handle
, BlockType::kProperties
);
1738 uint64_t props_block_offset
= properties_block_handle
.offset();
1739 uint64_t props_block_size
= properties_block_handle
.size();
1740 TEST_SYNC_POINT_CALLBACK(
1741 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
1742 &props_block_offset
);
1743 TEST_SYNC_POINT_CALLBACK(
1744 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
1749 const std::string
* properties_block_meta
= &kPropertiesBlockName
;
1750 TEST_SYNC_POINT_CALLBACK(
1751 "BlockBasedTableBuilder::WritePropertiesBlock:Meta",
1752 &properties_block_meta
);
1753 meta_index_builder
->Add(*properties_block_meta
, properties_block_handle
);
1757 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1758 MetaIndexBuilder
* meta_index_builder
) {
1759 if (rep_
->compression_dict
!= nullptr &&
1760 rep_
->compression_dict
->GetRawDict().size()) {
1761 BlockHandle compression_dict_block_handle
;
1763 WriteMaybeCompressedBlock(rep_
->compression_dict
->GetRawDict(),
1764 kNoCompression
, &compression_dict_block_handle
,
1765 BlockType::kCompressionDictionary
);
1767 Slice compression_dict
= rep_
->compression_dict
->GetRawDict();
1768 TEST_SYNC_POINT_CALLBACK(
1769 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1774 meta_index_builder
->Add(kCompressionDictBlockName
,
1775 compression_dict_block_handle
);
1780 void BlockBasedTableBuilder::WriteRangeDelBlock(
1781 MetaIndexBuilder
* meta_index_builder
) {
1782 if (ok() && !rep_
->range_del_block
.empty()) {
1783 BlockHandle range_del_block_handle
;
1784 WriteMaybeCompressedBlock(rep_
->range_del_block
.Finish(), kNoCompression
,
1785 &range_del_block_handle
,
1786 BlockType::kRangeDeletion
);
1787 meta_index_builder
->Add(kRangeDelBlockName
, range_del_block_handle
);
1791 void BlockBasedTableBuilder::WriteFooter(BlockHandle
& metaindex_block_handle
,
1792 BlockHandle
& index_block_handle
) {
1794 // this is guaranteed by BlockBasedTableBuilder's constructor
1795 assert(r
->table_options
.checksum
== kCRC32c
||
1796 r
->table_options
.format_version
!= 0);
1799 FooterBuilder footer
;
1800 footer
.Build(kBlockBasedTableMagicNumber
, r
->table_options
.format_version
,
1801 r
->get_offset(), r
->table_options
.checksum
,
1802 metaindex_block_handle
, index_block_handle
);
1803 IOStatus ios
= r
->file
->Append(footer
.GetSlice());
1805 r
->set_offset(r
->get_offset() + footer
.GetSlice().size());
1807 r
->SetIOStatus(ios
);
1811 void BlockBasedTableBuilder::EnterUnbuffered() {
1813 assert(r
->state
== Rep::State::kBuffered
);
1814 r
->state
= Rep::State::kUnbuffered
;
1815 const size_t kSampleBytes
= r
->compression_opts
.zstd_max_train_bytes
> 0
1816 ? r
->compression_opts
.zstd_max_train_bytes
1817 : r
->compression_opts
.max_dict_bytes
;
1818 const size_t kNumBlocksBuffered
= r
->data_block_buffers
.size();
1819 if (kNumBlocksBuffered
== 0) {
1820 // The below code is neither safe nor necessary for handling zero data
1825 // Abstract algebra teaches us that a finite cyclic group (such as the
1826 // additive group of integers modulo N) can be generated by a number that is
1827 // coprime with N. Since N is variable (number of buffered data blocks), we
1828 // must then pick a prime number in order to guarantee coprimeness with any N.
1830 // One downside of this approach is the spread will be poor when
1831 // `kPrimeGeneratorRemainder` is close to zero or close to
1832 // `kNumBlocksBuffered`.
1834 // Picked a random number between one and one trillion and then chose the
1835 // next prime number greater than or equal to it.
1836 const uint64_t kPrimeGenerator
= 545055921143ull;
1837 // Can avoid repeated division by just adding the remainder repeatedly.
1838 const size_t kPrimeGeneratorRemainder
= static_cast<size_t>(
1839 kPrimeGenerator
% static_cast<uint64_t>(kNumBlocksBuffered
));
1840 const size_t kInitSampleIdx
= kNumBlocksBuffered
/ 2;
1842 std::string compression_dict_samples
;
1843 std::vector
<size_t> compression_dict_sample_lens
;
1844 size_t buffer_idx
= kInitSampleIdx
;
1846 i
< kNumBlocksBuffered
&& compression_dict_samples
.size() < kSampleBytes
;
1848 size_t copy_len
= std::min(kSampleBytes
- compression_dict_samples
.size(),
1849 r
->data_block_buffers
[buffer_idx
].size());
1850 compression_dict_samples
.append(r
->data_block_buffers
[buffer_idx
], 0,
1852 compression_dict_sample_lens
.emplace_back(copy_len
);
1854 buffer_idx
+= kPrimeGeneratorRemainder
;
1855 if (buffer_idx
>= kNumBlocksBuffered
) {
1856 buffer_idx
-= kNumBlocksBuffered
;
1860 // final data block flushed, now we can generate dictionary from the samples.
1861 // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1863 if (r
->compression_opts
.zstd_max_train_bytes
> 0) {
1864 if (r
->compression_opts
.use_zstd_dict_trainer
) {
1865 dict
= ZSTD_TrainDictionary(compression_dict_samples
,
1866 compression_dict_sample_lens
,
1867 r
->compression_opts
.max_dict_bytes
);
1869 dict
= ZSTD_FinalizeDictionary(
1870 compression_dict_samples
, compression_dict_sample_lens
,
1871 r
->compression_opts
.max_dict_bytes
, r
->compression_opts
.level
);
1874 dict
= std::move(compression_dict_samples
);
1876 r
->compression_dict
.reset(new CompressionDict(dict
, r
->compression_type
,
1877 r
->compression_opts
.level
));
1878 r
->verify_dict
.reset(new UncompressionDict(
1879 dict
, r
->compression_type
== kZSTD
||
1880 r
->compression_type
== kZSTDNotFinalCompression
));
1882 auto get_iterator_for_block
= [&r
](size_t i
) {
1883 auto& data_block
= r
->data_block_buffers
[i
];
1884 assert(!data_block
.empty());
1886 Block reader
{BlockContents
{data_block
}};
1887 DataBlockIter
* iter
= reader
.NewDataIterator(
1888 r
->internal_comparator
.user_comparator(), kDisableGlobalSequenceNumber
);
1890 iter
->SeekToFirst();
1891 assert(iter
->Valid());
1892 return std::unique_ptr
<DataBlockIter
>(iter
);
1895 std::unique_ptr
<DataBlockIter
> iter
= nullptr, next_block_iter
= nullptr;
1897 for (size_t i
= 0; ok() && i
< r
->data_block_buffers
.size(); ++i
) {
1898 if (iter
== nullptr) {
1899 iter
= get_iterator_for_block(i
);
1900 assert(iter
!= nullptr);
1903 if (i
+ 1 < r
->data_block_buffers
.size()) {
1904 next_block_iter
= get_iterator_for_block(i
+ 1);
1907 auto& data_block
= r
->data_block_buffers
[i
];
1908 if (r
->IsParallelCompressionEnabled()) {
1909 Slice first_key_in_next_block
;
1910 const Slice
* first_key_in_next_block_ptr
= &first_key_in_next_block
;
1911 if (i
+ 1 < r
->data_block_buffers
.size()) {
1912 assert(next_block_iter
!= nullptr);
1913 first_key_in_next_block
= next_block_iter
->key();
1915 first_key_in_next_block_ptr
= r
->first_key_in_next_block
;
1918 std::vector
<std::string
> keys
;
1919 for (; iter
->Valid(); iter
->Next()) {
1920 keys
.emplace_back(iter
->key().ToString());
1923 ParallelCompressionRep::BlockRep
* block_rep
= r
->pc_rep
->PrepareBlock(
1924 r
->compression_type
, first_key_in_next_block_ptr
, &data_block
, &keys
);
1926 assert(block_rep
!= nullptr);
1927 r
->pc_rep
->file_size_estimator
.EmitBlock(block_rep
->data
->size(),
1929 r
->pc_rep
->EmitBlock(block_rep
);
1931 for (; iter
->Valid(); iter
->Next()) {
1932 Slice key
= iter
->key();
1933 if (r
->filter_builder
!= nullptr) {
1935 r
->internal_comparator
.user_comparator()->timestamp_size();
1936 r
->filter_builder
->Add(ExtractUserKeyAndStripTimestamp(key
, ts_sz
));
1938 r
->index_builder
->OnKeyAdded(key
);
1940 WriteBlock(Slice(data_block
), &r
->pending_handle
, BlockType::kData
);
1941 if (ok() && i
+ 1 < r
->data_block_buffers
.size()) {
1942 assert(next_block_iter
!= nullptr);
1943 Slice first_key_in_next_block
= next_block_iter
->key();
1945 Slice
* first_key_in_next_block_ptr
= &first_key_in_next_block
;
1948 std::string last_key
= iter
->key().ToString();
1949 r
->index_builder
->AddIndexEntry(&last_key
, first_key_in_next_block_ptr
,
1953 std::swap(iter
, next_block_iter
);
1955 r
->data_block_buffers
.clear();
1956 r
->data_begin_offset
= 0;
1957 // Release all reserved cache for data block buffers
1958 if (r
->compression_dict_buffer_cache_res_mgr
!= nullptr) {
1959 Status s
= r
->compression_dict_buffer_cache_res_mgr
->UpdateCacheReservation(
1960 r
->data_begin_offset
);
1961 s
.PermitUncheckedError();
1965 Status
BlockBasedTableBuilder::Finish() {
1967 assert(r
->state
!= Rep::State::kClosed
);
1968 bool empty_data_block
= r
->data_block
.empty();
1969 r
->first_key_in_next_block
= nullptr;
1971 if (r
->state
== Rep::State::kBuffered
) {
1974 if (r
->IsParallelCompressionEnabled()) {
1975 StopParallelCompression();
1977 for (const auto& br
: r
->pc_rep
->block_rep_buf
) {
1978 assert(br
.status
.ok());
1982 // To make sure properties block is able to keep the accurate size of index
1983 // block, we will finish writing all index entries first.
1984 if (ok() && !empty_data_block
) {
1985 r
->index_builder
->AddIndexEntry(
1986 &r
->last_key
, nullptr /* no next data block */, r
->pending_handle
);
1990 // Write meta blocks, metaindex block and footer in the following order.
1991 // 1. [meta block: filter]
1992 // 2. [meta block: index]
1993 // 3. [meta block: compression dictionary]
1994 // 4. [meta block: range deletion tombstone]
1995 // 5. [meta block: properties]
1996 // 6. [metaindex block]
1998 BlockHandle metaindex_block_handle
, index_block_handle
;
1999 MetaIndexBuilder meta_index_builder
;
2000 WriteFilterBlock(&meta_index_builder
);
2001 WriteIndexBlock(&meta_index_builder
, &index_block_handle
);
2002 WriteCompressionDictBlock(&meta_index_builder
);
2003 WriteRangeDelBlock(&meta_index_builder
);
2004 WritePropertiesBlock(&meta_index_builder
);
2006 // flush the meta index block
2007 WriteMaybeCompressedBlock(meta_index_builder
.Finish(), kNoCompression
,
2008 &metaindex_block_handle
, BlockType::kMetaIndex
);
2011 WriteFooter(metaindex_block_handle
, index_block_handle
);
2013 r
->state
= Rep::State::kClosed
;
2014 r
->SetStatus(r
->CopyIOStatus());
2015 Status ret_status
= r
->CopyStatus();
2016 assert(!ret_status
.ok() || io_status().ok());
2020 void BlockBasedTableBuilder::Abandon() {
2021 assert(rep_
->state
!= Rep::State::kClosed
);
2022 if (rep_
->IsParallelCompressionEnabled()) {
2023 StopParallelCompression();
2025 rep_
->state
= Rep::State::kClosed
;
2026 rep_
->CopyStatus().PermitUncheckedError();
2027 rep_
->CopyIOStatus().PermitUncheckedError();
2030 uint64_t BlockBasedTableBuilder::NumEntries() const {
2031 return rep_
->props
.num_entries
;
2034 bool BlockBasedTableBuilder::IsEmpty() const {
2035 return rep_
->props
.num_entries
== 0 && rep_
->props
.num_range_deletions
== 0;
2038 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_
->offset
; }
2040 uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
2041 if (rep_
->IsParallelCompressionEnabled()) {
2042 // Use compression ratio so far and inflight uncompressed bytes to estimate
2044 return rep_
->pc_rep
->file_size_estimator
.GetEstimatedFileSize();
2050 bool BlockBasedTableBuilder::NeedCompact() const {
2051 for (const auto& collector
: rep_
->table_properties_collectors
) {
2052 if (collector
->NeedCompact()) {
2059 TableProperties
BlockBasedTableBuilder::GetTableProperties() const {
2060 TableProperties ret
= rep_
->props
;
2061 for (const auto& collector
: rep_
->table_properties_collectors
) {
2062 for (const auto& prop
: collector
->GetReadableProperties()) {
2063 ret
.readable_properties
.insert(prop
);
2065 collector
->Finish(&ret
.user_collected_properties
).PermitUncheckedError();
2070 std::string
BlockBasedTableBuilder::GetFileChecksum() const {
2071 if (rep_
->file
!= nullptr) {
2072 return rep_
->file
->GetFileChecksum();
2074 return kUnknownFileChecksum
;
2078 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
2079 if (rep_
->file
!= nullptr) {
2080 return rep_
->file
->GetFileChecksumFuncName();
2082 return kUnknownFileChecksumFuncName
;
2085 void BlockBasedTableBuilder::SetSeqnoTimeTableProperties(
2086 const std::string
& encoded_seqno_to_time_mapping
,
2087 uint64_t oldest_ancestor_time
) {
2088 rep_
->props
.seqno_to_time_mapping
= encoded_seqno_to_time_mapping
;
2089 rep_
->props
.creation_time
= oldest_ancestor_time
;
2092 const std::string
BlockBasedTable::kObsoleteFilterBlockPrefix
= "filter.";
2093 const std::string
BlockBasedTable::kFullFilterBlockPrefix
= "fullfilter.";
2094 const std::string
BlockBasedTable::kPartitionedFilterBlockPrefix
=
2095 "partitionedfilter.";
2096 } // namespace ROCKSDB_NAMESPACE