]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/block_based/block_based_table_builder.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / table / block_based / block_based_table_builder.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/block_based/block_based_table_builder.h"
11
12 #include <assert.h>
13 #include <stdio.h>
14
15 #include <atomic>
16 #include <list>
17 #include <map>
18 #include <memory>
19 #include <numeric>
20 #include <string>
21 #include <unordered_map>
22 #include <utility>
23
24 #include "cache/cache_entry_roles.h"
25 #include "cache/cache_helpers.h"
26 #include "cache/cache_key.h"
27 #include "cache/cache_reservation_manager.h"
28 #include "db/dbformat.h"
29 #include "index_builder.h"
30 #include "logging/logging.h"
31 #include "memory/memory_allocator.h"
32 #include "rocksdb/cache.h"
33 #include "rocksdb/comparator.h"
34 #include "rocksdb/env.h"
35 #include "rocksdb/filter_policy.h"
36 #include "rocksdb/flush_block_policy.h"
37 #include "rocksdb/merge_operator.h"
38 #include "rocksdb/table.h"
39 #include "rocksdb/types.h"
40 #include "table/block_based/block.h"
41 #include "table/block_based/block_based_table_factory.h"
42 #include "table/block_based/block_based_table_reader.h"
43 #include "table/block_based/block_builder.h"
44 #include "table/block_based/block_like_traits.h"
45 #include "table/block_based/filter_block.h"
46 #include "table/block_based/filter_policy_internal.h"
47 #include "table/block_based/full_filter_block.h"
48 #include "table/block_based/partitioned_filter_block.h"
49 #include "table/format.h"
50 #include "table/meta_blocks.h"
51 #include "table/table_builder.h"
52 #include "util/coding.h"
53 #include "util/compression.h"
54 #include "util/stop_watch.h"
55 #include "util/string_util.h"
56 #include "util/work_queue.h"
57
58 namespace ROCKSDB_NAMESPACE {
59
60 extern const std::string kHashIndexPrefixesBlock;
61 extern const std::string kHashIndexPrefixesMetadataBlock;
62
63 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
64 namespace {
65
66 constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
67
68 // Create a filter block builder based on its type.
69 FilterBlockBuilder* CreateFilterBlockBuilder(
70 const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
71 const FilterBuildingContext& context,
72 const bool use_delta_encoding_for_index_values,
73 PartitionedIndexBuilder* const p_index_builder) {
74 const BlockBasedTableOptions& table_opt = context.table_options;
75 assert(table_opt.filter_policy); // precondition
76
77 FilterBitsBuilder* filter_bits_builder =
78 BloomFilterPolicy::GetBuilderFromContext(context);
79 if (filter_bits_builder == nullptr) {
80 return nullptr;
81 } else {
82 if (table_opt.partition_filters) {
83 assert(p_index_builder != nullptr);
84 // Since after partition cut request from filter builder it takes time
85 // until index builder actully cuts the partition, until the end of a
86 // data block potentially with many keys, we take the lower bound as
87 // partition size.
88 assert(table_opt.block_size_deviation <= 100);
89 auto partition_size =
90 static_cast<uint32_t>(((table_opt.metadata_block_size *
91 (100 - table_opt.block_size_deviation)) +
92 99) /
93 100);
94 partition_size = std::max(partition_size, static_cast<uint32_t>(1));
95 return new PartitionedFilterBlockBuilder(
96 mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
97 filter_bits_builder, table_opt.index_block_restart_interval,
98 use_delta_encoding_for_index_values, p_index_builder, partition_size);
99 } else {
100 return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
101 table_opt.whole_key_filtering,
102 filter_bits_builder);
103 }
104 }
105 }
106
107 bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size) {
108 // Check to see if compressed less than 12.5%
109 return compressed_size < uncomp_size - (uncomp_size / 8u);
110 }
111
112 } // namespace
113
114 // format_version is the block format as defined in include/rocksdb/table.h
115 Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info,
116 CompressionType* type, uint32_t format_version,
117 bool do_sample, std::string* compressed_output,
118 std::string* sampled_output_fast,
119 std::string* sampled_output_slow) {
120 assert(type);
121 assert(compressed_output);
122 assert(compressed_output->empty());
123
124 // If requested, we sample one in every N block with a
125 // fast and slow compression algorithm and report the stats.
126 // The users can use these stats to decide if it is worthwhile
127 // enabling compression and they also get a hint about which
128 // compression algorithm wil be beneficial.
129 if (do_sample && info.SampleForCompression() &&
130 Random::GetTLSInstance()->OneIn(
131 static_cast<int>(info.SampleForCompression()))) {
132 // Sampling with a fast compression algorithm
133 if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
134 CompressionType c =
135 LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
136 CompressionContext context(c);
137 CompressionOptions options;
138 CompressionInfo info_tmp(options, context,
139 CompressionDict::GetEmptyDict(), c,
140 info.SampleForCompression());
141
142 CompressData(uncompressed_data, info_tmp,
143 GetCompressFormatForVersion(format_version),
144 sampled_output_fast);
145 }
146
147 // Sampling with a slow but high-compression algorithm
148 if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
149 CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
150 CompressionContext context(c);
151 CompressionOptions options;
152 CompressionInfo info_tmp(options, context,
153 CompressionDict::GetEmptyDict(), c,
154 info.SampleForCompression());
155
156 CompressData(uncompressed_data, info_tmp,
157 GetCompressFormatForVersion(format_version),
158 sampled_output_slow);
159 }
160 }
161
162 if (info.type() == kNoCompression) {
163 *type = kNoCompression;
164 return uncompressed_data;
165 }
166
167 // Actually compress the data; if the compression method is not supported,
168 // or the compression fails etc., just fall back to uncompressed
169 if (!CompressData(uncompressed_data, info,
170 GetCompressFormatForVersion(format_version),
171 compressed_output)) {
172 *type = kNoCompression;
173 return uncompressed_data;
174 }
175
176 // Check the compression ratio; if it's not good enough, just fall back to
177 // uncompressed
178 if (!GoodCompressionRatio(compressed_output->size(),
179 uncompressed_data.size())) {
180 *type = kNoCompression;
181 return uncompressed_data;
182 }
183
184 *type = info.type();
185 return *compressed_output;
186 }
187
188 // kBlockBasedTableMagicNumber was picked by running
189 // echo rocksdb.table.block_based | sha1sum
190 // and taking the leading 64 bits.
191 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
192 // .cc files
193 // for that reason we declare it extern in the header but to get the space
194 // allocated
195 // it must be not extern in one place.
196 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
197 // We also support reading and writing legacy block based table format (for
198 // backwards compatibility)
199 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
200
201 // A collector that collects properties of interest to block-based table.
202 // For now this class looks heavy-weight since we only write one additional
203 // property.
204 // But in the foreseeable future, we will add more and more properties that are
205 // specific to block-based table.
206 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
207 : public IntTblPropCollector {
208 public:
209 explicit BlockBasedTablePropertiesCollector(
210 BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
211 bool prefix_filtering)
212 : index_type_(index_type),
213 whole_key_filtering_(whole_key_filtering),
214 prefix_filtering_(prefix_filtering) {}
215
216 Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
217 uint64_t /*file_size*/) override {
218 // Intentionally left blank. Have no interest in collecting stats for
219 // individual key/value pairs.
220 return Status::OK();
221 }
222
223 virtual void BlockAdd(uint64_t /* block_uncomp_bytes */,
224 uint64_t /* block_compressed_bytes_fast */,
225 uint64_t /* block_compressed_bytes_slow */) override {
226 // Intentionally left blank. No interest in collecting stats for
227 // blocks.
228 return;
229 }
230
231 Status Finish(UserCollectedProperties* properties) override {
232 std::string val;
233 PutFixed32(&val, static_cast<uint32_t>(index_type_));
234 properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
235 properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
236 whole_key_filtering_ ? kPropTrue : kPropFalse});
237 properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
238 prefix_filtering_ ? kPropTrue : kPropFalse});
239 return Status::OK();
240 }
241
242 // The name of the properties collector can be used for debugging purpose.
243 const char* Name() const override {
244 return "BlockBasedTablePropertiesCollector";
245 }
246
247 UserCollectedProperties GetReadableProperties() const override {
248 // Intentionally left blank.
249 return UserCollectedProperties();
250 }
251
252 private:
253 BlockBasedTableOptions::IndexType index_type_;
254 bool whole_key_filtering_;
255 bool prefix_filtering_;
256 };
257
258 struct BlockBasedTableBuilder::Rep {
259 const ImmutableOptions ioptions;
260 const MutableCFOptions moptions;
261 const BlockBasedTableOptions table_options;
262 const InternalKeyComparator& internal_comparator;
263 WritableFileWriter* file;
264 std::atomic<uint64_t> offset;
265 size_t alignment;
266 BlockBuilder data_block;
267 // Buffers uncompressed data blocks to replay later. Needed when
268 // compression dictionary is enabled so we can finalize the dictionary before
269 // compressing any data blocks.
270 std::vector<std::string> data_block_buffers;
271 BlockBuilder range_del_block;
272
273 InternalKeySliceTransform internal_prefix_transform;
274 std::unique_ptr<IndexBuilder> index_builder;
275 PartitionedIndexBuilder* p_index_builder_ = nullptr;
276
277 std::string last_key;
278 const Slice* first_key_in_next_block = nullptr;
279 CompressionType compression_type;
280 uint64_t sample_for_compression;
281 std::atomic<uint64_t> compressible_input_data_bytes;
282 std::atomic<uint64_t> uncompressible_input_data_bytes;
283 std::atomic<uint64_t> sampled_input_data_bytes;
284 std::atomic<uint64_t> sampled_output_slow_data_bytes;
285 std::atomic<uint64_t> sampled_output_fast_data_bytes;
286 CompressionOptions compression_opts;
287 std::unique_ptr<CompressionDict> compression_dict;
288 std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
289 std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
290 std::unique_ptr<UncompressionDict> verify_dict;
291
292 size_t data_begin_offset = 0;
293
294 TableProperties props;
295
296 // States of the builder.
297 //
298 // - `kBuffered`: This is the initial state where zero or more data blocks are
299 // accumulated uncompressed in-memory. From this state, call
300 // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
301 // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
302 // state.
303 //
304 // - `kUnbuffered`: This is the state when compression dictionary is finalized
305 // either because it wasn't enabled in the first place or it's been created
306 // from sampling previously buffered data. In this state, blocks are simply
307 // compressed/written out as they fill up. From this state, call `Finish()`
308 // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
309 // the partially created file.
310 //
311 // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
312 // called, so the table builder is no longer usable. We must be in this
313 // state by the time the destructor runs.
314 enum class State {
315 kBuffered,
316 kUnbuffered,
317 kClosed,
318 };
319 State state;
320 // `kBuffered` state is allowed only as long as the buffering of uncompressed
321 // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
322 uint64_t buffer_limit;
323 std::shared_ptr<CacheReservationManager>
324 compression_dict_buffer_cache_res_mgr;
325 const bool use_delta_encoding_for_index_values;
326 std::unique_ptr<FilterBlockBuilder> filter_builder;
327 OffsetableCacheKey base_cache_key;
328 const TableFileCreationReason reason;
329
330 BlockHandle pending_handle; // Handle to add to index block
331
332 std::string compressed_output;
333 std::unique_ptr<FlushBlockPolicy> flush_block_policy;
334
335 std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
336
337 std::unique_ptr<ParallelCompressionRep> pc_rep;
338
339 uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
340 void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
341
342 bool IsParallelCompressionEnabled() const {
343 return compression_opts.parallel_threads > 1;
344 }
345
346 Status GetStatus() {
347 // We need to make modifications of status visible when status_ok is set
348 // to false, and this is ensured by status_mutex, so no special memory
349 // order for status_ok is required.
350 if (status_ok.load(std::memory_order_relaxed)) {
351 return Status::OK();
352 } else {
353 return CopyStatus();
354 }
355 }
356
357 Status CopyStatus() {
358 std::lock_guard<std::mutex> lock(status_mutex);
359 return status;
360 }
361
362 IOStatus GetIOStatus() {
363 // We need to make modifications of io_status visible when status_ok is set
364 // to false, and this is ensured by io_status_mutex, so no special memory
365 // order for io_status_ok is required.
366 if (io_status_ok.load(std::memory_order_relaxed)) {
367 return IOStatus::OK();
368 } else {
369 return CopyIOStatus();
370 }
371 }
372
373 IOStatus CopyIOStatus() {
374 std::lock_guard<std::mutex> lock(io_status_mutex);
375 return io_status;
376 }
377
378 // Never erase an existing status that is not OK.
379 void SetStatus(Status s) {
380 if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
381 // Locking is an overkill for non compression_opts.parallel_threads
382 // case but since it's unlikely that s is not OK, we take this cost
383 // to be simplicity.
384 std::lock_guard<std::mutex> lock(status_mutex);
385 status = s;
386 status_ok.store(false, std::memory_order_relaxed);
387 }
388 }
389
390 // Never erase an existing I/O status that is not OK.
391 // Calling this will also SetStatus(ios)
392 void SetIOStatus(IOStatus ios) {
393 if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
394 // Locking is an overkill for non compression_opts.parallel_threads
395 // case but since it's unlikely that s is not OK, we take this cost
396 // to be simplicity.
397 std::lock_guard<std::mutex> lock(io_status_mutex);
398 io_status = ios;
399 io_status_ok.store(false, std::memory_order_relaxed);
400 }
401 SetStatus(ios);
402 }
403
404 Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
405 WritableFileWriter* f)
406 : ioptions(tbo.ioptions),
407 moptions(tbo.moptions),
408 table_options(table_opt),
409 internal_comparator(tbo.internal_comparator),
410 file(f),
411 offset(0),
412 alignment(table_options.block_align
413 ? std::min(static_cast<size_t>(table_options.block_size),
414 kDefaultPageSize)
415 : 0),
416 data_block(table_options.block_restart_interval,
417 table_options.use_delta_encoding,
418 false /* use_value_delta_encoding */,
419 tbo.internal_comparator.user_comparator()
420 ->CanKeysWithDifferentByteContentsBeEqual()
421 ? BlockBasedTableOptions::kDataBlockBinarySearch
422 : table_options.data_block_index_type,
423 table_options.data_block_hash_table_util_ratio),
424 range_del_block(1 /* block_restart_interval */),
425 internal_prefix_transform(tbo.moptions.prefix_extractor.get()),
426 compression_type(tbo.compression_type),
427 sample_for_compression(tbo.moptions.sample_for_compression),
428 compressible_input_data_bytes(0),
429 uncompressible_input_data_bytes(0),
430 sampled_input_data_bytes(0),
431 sampled_output_slow_data_bytes(0),
432 sampled_output_fast_data_bytes(0),
433 compression_opts(tbo.compression_opts),
434 compression_dict(),
435 compression_ctxs(tbo.compression_opts.parallel_threads),
436 verify_ctxs(tbo.compression_opts.parallel_threads),
437 verify_dict(),
438 state((tbo.compression_opts.max_dict_bytes > 0) ? State::kBuffered
439 : State::kUnbuffered),
440 use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
441 !table_opt.block_align),
442 reason(tbo.reason),
443 flush_block_policy(
444 table_options.flush_block_policy_factory->NewFlushBlockPolicy(
445 table_options, data_block)),
446 status_ok(true),
447 io_status_ok(true) {
448 if (tbo.target_file_size == 0) {
449 buffer_limit = compression_opts.max_dict_buffer_bytes;
450 } else if (compression_opts.max_dict_buffer_bytes == 0) {
451 buffer_limit = tbo.target_file_size;
452 } else {
453 buffer_limit = std::min(tbo.target_file_size,
454 compression_opts.max_dict_buffer_bytes);
455 }
456
457 const auto compress_dict_build_buffer_charged =
458 table_options.cache_usage_options.options_overrides
459 .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
460 .charged;
461 if (table_options.block_cache &&
462 (compress_dict_build_buffer_charged ==
463 CacheEntryRoleOptions::Decision::kEnabled ||
464 compress_dict_build_buffer_charged ==
465 CacheEntryRoleOptions::Decision::kFallback)) {
466 compression_dict_buffer_cache_res_mgr =
467 std::make_shared<CacheReservationManagerImpl<
468 CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
469 table_options.block_cache);
470 } else {
471 compression_dict_buffer_cache_res_mgr = nullptr;
472 }
473
474 for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
475 compression_ctxs[i].reset(new CompressionContext(compression_type));
476 }
477 if (table_options.index_type ==
478 BlockBasedTableOptions::kTwoLevelIndexSearch) {
479 p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
480 &internal_comparator, use_delta_encoding_for_index_values,
481 table_options);
482 index_builder.reset(p_index_builder_);
483 } else {
484 index_builder.reset(IndexBuilder::CreateIndexBuilder(
485 table_options.index_type, &internal_comparator,
486 &this->internal_prefix_transform, use_delta_encoding_for_index_values,
487 table_options));
488 }
489 if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
490 // Apply optimize_filters_for_hits setting here when applicable by
491 // skipping filter generation
492 filter_builder.reset();
493 } else if (tbo.skip_filters) {
494 // For SstFileWriter skip_filters
495 filter_builder.reset();
496 } else if (!table_options.filter_policy) {
497 // Null filter_policy -> no filter
498 filter_builder.reset();
499 } else {
500 FilterBuildingContext filter_context(table_options);
501
502 filter_context.info_log = ioptions.logger;
503 filter_context.column_family_name = tbo.column_family_name;
504 filter_context.reason = reason;
505
506 // Only populate other fields if known to be in LSM rather than
507 // generating external SST file
508 if (reason != TableFileCreationReason::kMisc) {
509 filter_context.compaction_style = ioptions.compaction_style;
510 filter_context.num_levels = ioptions.num_levels;
511 filter_context.level_at_creation = tbo.level_at_creation;
512 filter_context.is_bottommost = tbo.is_bottommost;
513 assert(filter_context.level_at_creation < filter_context.num_levels);
514 }
515
516 filter_builder.reset(CreateFilterBlockBuilder(
517 ioptions, moptions, filter_context,
518 use_delta_encoding_for_index_values, p_index_builder_));
519 }
520
521 assert(tbo.int_tbl_prop_collector_factories);
522 for (auto& factory : *tbo.int_tbl_prop_collector_factories) {
523 assert(factory);
524
525 table_properties_collectors.emplace_back(
526 factory->CreateIntTblPropCollector(tbo.column_family_id,
527 tbo.level_at_creation));
528 }
529 table_properties_collectors.emplace_back(
530 new BlockBasedTablePropertiesCollector(
531 table_options.index_type, table_options.whole_key_filtering,
532 moptions.prefix_extractor != nullptr));
533 const Comparator* ucmp = tbo.internal_comparator.user_comparator();
534 assert(ucmp);
535 if (ucmp->timestamp_size() > 0) {
536 table_properties_collectors.emplace_back(
537 new TimestampTablePropertiesCollector(ucmp));
538 }
539 if (table_options.verify_compression) {
540 for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
541 verify_ctxs[i].reset(new UncompressionContext(compression_type));
542 }
543 }
544
545 // These are only needed for populating table properties
546 props.column_family_id = tbo.column_family_id;
547 props.column_family_name = tbo.column_family_name;
548 props.oldest_key_time = tbo.oldest_key_time;
549 props.file_creation_time = tbo.file_creation_time;
550 props.orig_file_number = tbo.cur_file_num;
551 props.db_id = tbo.db_id;
552 props.db_session_id = tbo.db_session_id;
553 props.db_host_id = ioptions.db_host_id;
554 if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
555 ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
556 }
557 }
558
559 Rep(const Rep&) = delete;
560 Rep& operator=(const Rep&) = delete;
561
562 private:
563 // Synchronize status & io_status accesses across threads from main thread,
564 // compression thread and write thread in parallel compression.
565 std::mutex status_mutex;
566 std::atomic<bool> status_ok;
567 Status status;
568 std::mutex io_status_mutex;
569 std::atomic<bool> io_status_ok;
570 IOStatus io_status;
571 };
572
573 struct BlockBasedTableBuilder::ParallelCompressionRep {
574 // Keys is a wrapper of vector of strings avoiding
575 // releasing string memories during vector clear()
576 // in order to save memory allocation overhead
577 class Keys {
578 public:
579 Keys() : keys_(kKeysInitSize), size_(0) {}
580 void PushBack(const Slice& key) {
581 if (size_ == keys_.size()) {
582 keys_.emplace_back(key.data(), key.size());
583 } else {
584 keys_[size_].assign(key.data(), key.size());
585 }
586 size_++;
587 }
588 void SwapAssign(std::vector<std::string>& keys) {
589 size_ = keys.size();
590 std::swap(keys_, keys);
591 }
592 void Clear() { size_ = 0; }
593 size_t Size() { return size_; }
594 std::string& Back() { return keys_[size_ - 1]; }
595 std::string& operator[](size_t idx) {
596 assert(idx < size_);
597 return keys_[idx];
598 }
599
600 private:
601 const size_t kKeysInitSize = 32;
602 std::vector<std::string> keys_;
603 size_t size_;
604 };
605 std::unique_ptr<Keys> curr_block_keys;
606
607 class BlockRepSlot;
608
609 // BlockRep instances are fetched from and recycled to
610 // block_rep_pool during parallel compression.
611 struct BlockRep {
612 Slice contents;
613 Slice compressed_contents;
614 std::unique_ptr<std::string> data;
615 std::unique_ptr<std::string> compressed_data;
616 CompressionType compression_type;
617 std::unique_ptr<std::string> first_key_in_next_block;
618 std::unique_ptr<Keys> keys;
619 std::unique_ptr<BlockRepSlot> slot;
620 Status status;
621 };
622 // Use a vector of BlockRep as a buffer for a determined number
623 // of BlockRep structures. All data referenced by pointers in
624 // BlockRep will be freed when this vector is destructed.
625 using BlockRepBuffer = std::vector<BlockRep>;
626 BlockRepBuffer block_rep_buf;
627 // Use a thread-safe queue for concurrent access from block
628 // building thread and writer thread.
629 using BlockRepPool = WorkQueue<BlockRep*>;
630 BlockRepPool block_rep_pool;
631
632 // Use BlockRepSlot to keep block order in write thread.
633 // slot_ will pass references to BlockRep
634 class BlockRepSlot {
635 public:
636 BlockRepSlot() : slot_(1) {}
637 template <typename T>
638 void Fill(T&& rep) {
639 slot_.push(std::forward<T>(rep));
640 };
641 void Take(BlockRep*& rep) { slot_.pop(rep); }
642
643 private:
644 // slot_ will pass references to BlockRep in block_rep_buf,
645 // and those references are always valid before the destruction of
646 // block_rep_buf.
647 WorkQueue<BlockRep*> slot_;
648 };
649
650 // Compression queue will pass references to BlockRep in block_rep_buf,
651 // and those references are always valid before the destruction of
652 // block_rep_buf.
653 using CompressQueue = WorkQueue<BlockRep*>;
654 CompressQueue compress_queue;
655 std::vector<port::Thread> compress_thread_pool;
656
657 // Write queue will pass references to BlockRep::slot in block_rep_buf,
658 // and those references are always valid before the corresponding
659 // BlockRep::slot is destructed, which is before the destruction of
660 // block_rep_buf.
661 using WriteQueue = WorkQueue<BlockRepSlot*>;
662 WriteQueue write_queue;
663 std::unique_ptr<port::Thread> write_thread;
664
665 // Estimate output file size when parallel compression is enabled. This is
666 // necessary because compression & flush are no longer synchronized,
667 // and BlockBasedTableBuilder::FileSize() is no longer accurate.
668 // memory_order_relaxed suffices because accurate statistics is not required.
669 class FileSizeEstimator {
670 public:
671 explicit FileSizeEstimator()
672 : uncomp_bytes_compressed(0),
673 uncomp_bytes_curr_block(0),
674 uncomp_bytes_curr_block_set(false),
675 uncomp_bytes_inflight(0),
676 blocks_inflight(0),
677 curr_compression_ratio(0),
678 estimated_file_size(0) {}
679
680 // Estimate file size when a block is about to be emitted to
681 // compression thread
682 void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) {
683 uint64_t new_uncomp_bytes_inflight =
684 uncomp_bytes_inflight.fetch_add(uncomp_block_size,
685 std::memory_order_relaxed) +
686 uncomp_block_size;
687
688 uint64_t new_blocks_inflight =
689 blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
690
691 estimated_file_size.store(
692 curr_file_size +
693 static_cast<uint64_t>(
694 static_cast<double>(new_uncomp_bytes_inflight) *
695 curr_compression_ratio.load(std::memory_order_relaxed)) +
696 new_blocks_inflight * kBlockTrailerSize,
697 std::memory_order_relaxed);
698 }
699
700 // Estimate file size when a block is already reaped from
701 // compression thread
702 void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
703 assert(uncomp_bytes_curr_block_set);
704
705 uint64_t new_uncomp_bytes_compressed =
706 uncomp_bytes_compressed + uncomp_bytes_curr_block;
707 assert(new_uncomp_bytes_compressed > 0);
708
709 curr_compression_ratio.store(
710 (curr_compression_ratio.load(std::memory_order_relaxed) *
711 uncomp_bytes_compressed +
712 compressed_block_size) /
713 static_cast<double>(new_uncomp_bytes_compressed),
714 std::memory_order_relaxed);
715 uncomp_bytes_compressed = new_uncomp_bytes_compressed;
716
717 uint64_t new_uncomp_bytes_inflight =
718 uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block,
719 std::memory_order_relaxed) -
720 uncomp_bytes_curr_block;
721
722 uint64_t new_blocks_inflight =
723 blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
724
725 estimated_file_size.store(
726 curr_file_size +
727 static_cast<uint64_t>(
728 static_cast<double>(new_uncomp_bytes_inflight) *
729 curr_compression_ratio.load(std::memory_order_relaxed)) +
730 new_blocks_inflight * kBlockTrailerSize,
731 std::memory_order_relaxed);
732
733 uncomp_bytes_curr_block_set = false;
734 }
735
736 void SetEstimatedFileSize(uint64_t size) {
737 estimated_file_size.store(size, std::memory_order_relaxed);
738 }
739
740 uint64_t GetEstimatedFileSize() {
741 return estimated_file_size.load(std::memory_order_relaxed);
742 }
743
744 void SetCurrBlockUncompSize(uint64_t size) {
745 uncomp_bytes_curr_block = size;
746 uncomp_bytes_curr_block_set = true;
747 }
748
749 private:
750 // Input bytes compressed so far.
751 uint64_t uncomp_bytes_compressed;
752 // Size of current block being appended.
753 uint64_t uncomp_bytes_curr_block;
754 // Whether uncomp_bytes_curr_block has been set for next
755 // ReapBlock call.
756 bool uncomp_bytes_curr_block_set;
757 // Input bytes under compression and not appended yet.
758 std::atomic<uint64_t> uncomp_bytes_inflight;
759 // Number of blocks under compression and not appended yet.
760 std::atomic<uint64_t> blocks_inflight;
761 // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock.
762 std::atomic<double> curr_compression_ratio;
763 // Estimated SST file size.
764 std::atomic<uint64_t> estimated_file_size;
765 };
766 FileSizeEstimator file_size_estimator;
767
768 // Facilities used for waiting first block completion. Need to Wait for
769 // the completion of first block compression and flush to get a non-zero
770 // compression ratio.
771 std::atomic<bool> first_block_processed;
772 std::condition_variable first_block_cond;
773 std::mutex first_block_mutex;
774
775 explicit ParallelCompressionRep(uint32_t parallel_threads)
776 : curr_block_keys(new Keys()),
777 block_rep_buf(parallel_threads),
778 block_rep_pool(parallel_threads),
779 compress_queue(parallel_threads),
780 write_queue(parallel_threads),
781 first_block_processed(false) {
782 for (uint32_t i = 0; i < parallel_threads; i++) {
783 block_rep_buf[i].contents = Slice();
784 block_rep_buf[i].compressed_contents = Slice();
785 block_rep_buf[i].data.reset(new std::string());
786 block_rep_buf[i].compressed_data.reset(new std::string());
787 block_rep_buf[i].compression_type = CompressionType();
788 block_rep_buf[i].first_key_in_next_block.reset(new std::string());
789 block_rep_buf[i].keys.reset(new Keys());
790 block_rep_buf[i].slot.reset(new BlockRepSlot());
791 block_rep_buf[i].status = Status::OK();
792 block_rep_pool.push(&block_rep_buf[i]);
793 }
794 }
795
796 ~ParallelCompressionRep() { block_rep_pool.finish(); }
797
798 // Make a block prepared to be emitted to compression thread
799 // Used in non-buffered mode
800 BlockRep* PrepareBlock(CompressionType compression_type,
801 const Slice* first_key_in_next_block,
802 BlockBuilder* data_block) {
803 BlockRep* block_rep =
804 PrepareBlockInternal(compression_type, first_key_in_next_block);
805 assert(block_rep != nullptr);
806 data_block->SwapAndReset(*(block_rep->data));
807 block_rep->contents = *(block_rep->data);
808 std::swap(block_rep->keys, curr_block_keys);
809 curr_block_keys->Clear();
810 return block_rep;
811 }
812
813 // Used in EnterUnbuffered
814 BlockRep* PrepareBlock(CompressionType compression_type,
815 const Slice* first_key_in_next_block,
816 std::string* data_block,
817 std::vector<std::string>* keys) {
818 BlockRep* block_rep =
819 PrepareBlockInternal(compression_type, first_key_in_next_block);
820 assert(block_rep != nullptr);
821 std::swap(*(block_rep->data), *data_block);
822 block_rep->contents = *(block_rep->data);
823 block_rep->keys->SwapAssign(*keys);
824 return block_rep;
825 }
826
827 // Emit a block to compression thread
828 void EmitBlock(BlockRep* block_rep) {
829 assert(block_rep != nullptr);
830 assert(block_rep->status.ok());
831 if (!write_queue.push(block_rep->slot.get())) {
832 return;
833 }
834 if (!compress_queue.push(block_rep)) {
835 return;
836 }
837
838 if (!first_block_processed.load(std::memory_order_relaxed)) {
839 std::unique_lock<std::mutex> lock(first_block_mutex);
840 first_block_cond.wait(lock, [this] {
841 return first_block_processed.load(std::memory_order_relaxed);
842 });
843 }
844 }
845
846 // Reap a block from compression thread
847 void ReapBlock(BlockRep* block_rep) {
848 assert(block_rep != nullptr);
849 block_rep->compressed_data->clear();
850 block_rep_pool.push(block_rep);
851
852 if (!first_block_processed.load(std::memory_order_relaxed)) {
853 std::lock_guard<std::mutex> lock(first_block_mutex);
854 first_block_processed.store(true, std::memory_order_relaxed);
855 first_block_cond.notify_one();
856 }
857 }
858
859 private:
860 BlockRep* PrepareBlockInternal(CompressionType compression_type,
861 const Slice* first_key_in_next_block) {
862 BlockRep* block_rep = nullptr;
863 block_rep_pool.pop(block_rep);
864 assert(block_rep != nullptr);
865
866 assert(block_rep->data);
867
868 block_rep->compression_type = compression_type;
869
870 if (first_key_in_next_block == nullptr) {
871 block_rep->first_key_in_next_block.reset(nullptr);
872 } else {
873 block_rep->first_key_in_next_block->assign(
874 first_key_in_next_block->data(), first_key_in_next_block->size());
875 }
876
877 return block_rep;
878 }
879 };
880
881 BlockBasedTableBuilder::BlockBasedTableBuilder(
882 const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
883 WritableFileWriter* file) {
884 BlockBasedTableOptions sanitized_table_options(table_options);
885 if (sanitized_table_options.format_version == 0 &&
886 sanitized_table_options.checksum != kCRC32c) {
887 ROCKS_LOG_WARN(
888 tbo.ioptions.logger,
889 "Silently converting format_version to 1 because checksum is "
890 "non-default");
891 // silently convert format_version to 1 to keep consistent with current
892 // behavior
893 sanitized_table_options.format_version = 1;
894 }
895
896 rep_ = new Rep(sanitized_table_options, tbo, file);
897
898 TEST_SYNC_POINT_CALLBACK(
899 "BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
900 const_cast<TableProperties*>(&rep_->props));
901
902 BlockBasedTable::SetupBaseCacheKey(&rep_->props, tbo.db_session_id,
903 tbo.cur_file_num, &rep_->base_cache_key);
904
905 if (rep_->IsParallelCompressionEnabled()) {
906 StartParallelCompression();
907 }
908 }
909
910 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
911 // Catch errors where caller forgot to call Finish()
912 assert(rep_->state == Rep::State::kClosed);
913 delete rep_;
914 }
915
916 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
917 Rep* r = rep_;
918 assert(rep_->state != Rep::State::kClosed);
919 if (!ok()) return;
920 ValueType value_type = ExtractValueType(key);
921 if (IsValueType(value_type)) {
922 #ifndef NDEBUG
923 if (r->props.num_entries > r->props.num_range_deletions) {
924 assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
925 }
926 #endif // !NDEBUG
927
928 auto should_flush = r->flush_block_policy->Update(key, value);
929 if (should_flush) {
930 assert(!r->data_block.empty());
931 r->first_key_in_next_block = &key;
932 Flush();
933 if (r->state == Rep::State::kBuffered) {
934 bool exceeds_buffer_limit =
935 (r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
936 bool exceeds_global_block_cache_limit = false;
937
938 // Increase cache charging for the last buffered data block
939 // only if the block is not going to be unbuffered immediately
940 // and there exists a cache reservation manager
941 if (!exceeds_buffer_limit &&
942 r->compression_dict_buffer_cache_res_mgr != nullptr) {
943 Status s =
944 r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
945 r->data_begin_offset);
946 exceeds_global_block_cache_limit = s.IsMemoryLimit();
947 }
948
949 if (exceeds_buffer_limit || exceeds_global_block_cache_limit) {
950 EnterUnbuffered();
951 }
952 }
953
954 // Add item to index block.
955 // We do not emit the index entry for a block until we have seen the
956 // first key for the next data block. This allows us to use shorter
957 // keys in the index block. For example, consider a block boundary
958 // between the keys "the quick brown fox" and "the who". We can use
959 // "the r" as the key for the index block entry since it is >= all
960 // entries in the first block and < all entries in subsequent
961 // blocks.
962 if (ok() && r->state == Rep::State::kUnbuffered) {
963 if (r->IsParallelCompressionEnabled()) {
964 r->pc_rep->curr_block_keys->Clear();
965 } else {
966 r->index_builder->AddIndexEntry(&r->last_key, &key,
967 r->pending_handle);
968 }
969 }
970 }
971
972 // Note: PartitionedFilterBlockBuilder requires key being added to filter
973 // builder after being added to index builder.
974 if (r->state == Rep::State::kUnbuffered) {
975 if (r->IsParallelCompressionEnabled()) {
976 r->pc_rep->curr_block_keys->PushBack(key);
977 } else {
978 if (r->filter_builder != nullptr) {
979 size_t ts_sz =
980 r->internal_comparator.user_comparator()->timestamp_size();
981 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
982 }
983 }
984 }
985
986 r->data_block.AddWithLastKey(key, value, r->last_key);
987 r->last_key.assign(key.data(), key.size());
988 if (r->state == Rep::State::kBuffered) {
989 // Buffered keys will be replayed from data_block_buffers during
990 // `Finish()` once compression dictionary has been finalized.
991 } else {
992 if (!r->IsParallelCompressionEnabled()) {
993 r->index_builder->OnKeyAdded(key);
994 }
995 }
996 // TODO offset passed in is not accurate for parallel compression case
997 NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
998 r->table_properties_collectors,
999 r->ioptions.logger);
1000
1001 } else if (value_type == kTypeRangeDeletion) {
1002 r->range_del_block.Add(key, value);
1003 // TODO offset passed in is not accurate for parallel compression case
1004 NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
1005 r->table_properties_collectors,
1006 r->ioptions.logger);
1007 } else {
1008 assert(false);
1009 }
1010
1011 r->props.num_entries++;
1012 r->props.raw_key_size += key.size();
1013 r->props.raw_value_size += value.size();
1014 if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion ||
1015 value_type == kTypeDeletionWithTimestamp) {
1016 r->props.num_deletions++;
1017 } else if (value_type == kTypeRangeDeletion) {
1018 r->props.num_deletions++;
1019 r->props.num_range_deletions++;
1020 } else if (value_type == kTypeMerge) {
1021 r->props.num_merge_operands++;
1022 }
1023 }
1024
1025 void BlockBasedTableBuilder::Flush() {
1026 Rep* r = rep_;
1027 assert(rep_->state != Rep::State::kClosed);
1028 if (!ok()) return;
1029 if (r->data_block.empty()) return;
1030 if (r->IsParallelCompressionEnabled() &&
1031 r->state == Rep::State::kUnbuffered) {
1032 r->data_block.Finish();
1033 ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
1034 r->compression_type, r->first_key_in_next_block, &(r->data_block));
1035 assert(block_rep != nullptr);
1036 r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
1037 r->get_offset());
1038 r->pc_rep->EmitBlock(block_rep);
1039 } else {
1040 WriteBlock(&r->data_block, &r->pending_handle, BlockType::kData);
1041 }
1042 }
1043
1044 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
1045 BlockHandle* handle,
1046 BlockType block_type) {
1047 block->Finish();
1048 std::string uncompressed_block_data;
1049 uncompressed_block_data.reserve(rep_->table_options.block_size);
1050 block->SwapAndReset(uncompressed_block_data);
1051 if (rep_->state == Rep::State::kBuffered) {
1052 assert(block_type == BlockType::kData);
1053 rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_data));
1054 rep_->data_begin_offset += rep_->data_block_buffers.back().size();
1055 return;
1056 }
1057 WriteBlock(uncompressed_block_data, handle, block_type);
1058 }
1059
1060 void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
1061 BlockHandle* handle,
1062 BlockType block_type) {
1063 Rep* r = rep_;
1064 assert(r->state == Rep::State::kUnbuffered);
1065 Slice block_contents;
1066 CompressionType type;
1067 Status compress_status;
1068 bool is_data_block = block_type == BlockType::kData;
1069 CompressAndVerifyBlock(uncompressed_block_data, is_data_block,
1070 *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
1071 &(r->compressed_output), &(block_contents), &type,
1072 &compress_status);
1073 r->SetStatus(compress_status);
1074 if (!ok()) {
1075 return;
1076 }
1077
1078 WriteMaybeCompressedBlock(block_contents, type, handle, block_type,
1079 &uncompressed_block_data);
1080 r->compressed_output.clear();
1081 if (is_data_block) {
1082 r->props.data_size = r->get_offset();
1083 ++r->props.num_data_blocks;
1084 }
1085 }
1086
1087 void BlockBasedTableBuilder::BGWorkCompression(
1088 const CompressionContext& compression_ctx,
1089 UncompressionContext* verify_ctx) {
1090 ParallelCompressionRep::BlockRep* block_rep = nullptr;
1091 while (rep_->pc_rep->compress_queue.pop(block_rep)) {
1092 assert(block_rep != nullptr);
1093 CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
1094 compression_ctx, verify_ctx,
1095 block_rep->compressed_data.get(),
1096 &block_rep->compressed_contents,
1097 &(block_rep->compression_type), &block_rep->status);
1098 block_rep->slot->Fill(block_rep);
1099 }
1100 }
1101
1102 void BlockBasedTableBuilder::CompressAndVerifyBlock(
1103 const Slice& uncompressed_block_data, bool is_data_block,
1104 const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
1105 std::string* compressed_output, Slice* block_contents,
1106 CompressionType* type, Status* out_status) {
1107 // File format contains a sequence of blocks where each block has:
1108 // block_data: uint8[n]
1109 // type: uint8
1110 // crc: uint32
1111 Rep* r = rep_;
1112 bool is_status_ok = ok();
1113 if (!r->IsParallelCompressionEnabled()) {
1114 assert(is_status_ok);
1115 }
1116
1117 *type = r->compression_type;
1118 uint64_t sample_for_compression = r->sample_for_compression;
1119 bool abort_compression = false;
1120
1121 StopWatchNano timer(
1122 r->ioptions.clock,
1123 ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
1124
1125 if (is_status_ok && uncompressed_block_data.size() < kCompressionSizeLimit) {
1126 if (is_data_block) {
1127 r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(),
1128 std::memory_order_relaxed);
1129 }
1130 const CompressionDict* compression_dict;
1131 if (!is_data_block || r->compression_dict == nullptr) {
1132 compression_dict = &CompressionDict::GetEmptyDict();
1133 } else {
1134 compression_dict = r->compression_dict.get();
1135 }
1136 assert(compression_dict != nullptr);
1137 CompressionInfo compression_info(r->compression_opts, compression_ctx,
1138 *compression_dict, *type,
1139 sample_for_compression);
1140
1141 std::string sampled_output_fast;
1142 std::string sampled_output_slow;
1143 *block_contents = CompressBlock(
1144 uncompressed_block_data, compression_info, type,
1145 r->table_options.format_version, is_data_block /* do_sample */,
1146 compressed_output, &sampled_output_fast, &sampled_output_slow);
1147
1148 if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
1149 // Currently compression sampling is only enabled for data block.
1150 assert(is_data_block);
1151 r->sampled_input_data_bytes.fetch_add(uncompressed_block_data.size(),
1152 std::memory_order_relaxed);
1153 r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
1154 std::memory_order_relaxed);
1155 r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
1156 std::memory_order_relaxed);
1157 }
1158 // notify collectors on block add
1159 NotifyCollectTableCollectorsOnBlockAdd(
1160 r->table_properties_collectors, uncompressed_block_data.size(),
1161 sampled_output_fast.size(), sampled_output_slow.size());
1162
1163 // Some of the compression algorithms are known to be unreliable. If
1164 // the verify_compression flag is set then try to de-compress the
1165 // compressed data and compare to the input.
1166 if (*type != kNoCompression && r->table_options.verify_compression) {
1167 // Retrieve the uncompressed contents into a new buffer
1168 const UncompressionDict* verify_dict;
1169 if (!is_data_block || r->verify_dict == nullptr) {
1170 verify_dict = &UncompressionDict::GetEmptyDict();
1171 } else {
1172 verify_dict = r->verify_dict.get();
1173 }
1174 assert(verify_dict != nullptr);
1175 BlockContents contents;
1176 UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
1177 r->compression_type);
1178 Status stat = UncompressBlockData(
1179 uncompression_info, block_contents->data(), block_contents->size(),
1180 &contents, r->table_options.format_version, r->ioptions);
1181
1182 if (stat.ok()) {
1183 bool compressed_ok =
1184 contents.data.compare(uncompressed_block_data) == 0;
1185 if (!compressed_ok) {
1186 // The result of the compression was invalid. abort.
1187 abort_compression = true;
1188 const char* const msg =
1189 "Decompressed block did not match pre-compression block";
1190 ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg);
1191 *out_status = Status::Corruption(msg);
1192 }
1193 } else {
1194 // Decompression reported an error. abort.
1195 *out_status = Status::Corruption(std::string("Could not decompress: ") +
1196 stat.getState());
1197 abort_compression = true;
1198 }
1199 }
1200 } else {
1201 // Block is too big to be compressed.
1202 if (is_data_block) {
1203 r->uncompressible_input_data_bytes.fetch_add(
1204 uncompressed_block_data.size(), std::memory_order_relaxed);
1205 }
1206 abort_compression = true;
1207 }
1208 if (is_data_block) {
1209 r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
1210 std::memory_order_relaxed);
1211 }
1212
1213 // Abort compression if the block is too big, or did not pass
1214 // verification.
1215 if (abort_compression) {
1216 RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
1217 *type = kNoCompression;
1218 *block_contents = uncompressed_block_data;
1219 } else if (*type != kNoCompression) {
1220 if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) {
1221 RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
1222 timer.ElapsedNanos());
1223 }
1224 RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED,
1225 uncompressed_block_data.size());
1226 RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
1227 } else if (*type != r->compression_type) {
1228 RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
1229 }
1230 }
1231
1232 void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
1233 const Slice& block_contents, CompressionType type, BlockHandle* handle,
1234 BlockType block_type, const Slice* uncompressed_block_data) {
1235 Rep* r = rep_;
1236 bool is_data_block = block_type == BlockType::kData;
1237 // Old, misleading name of this function: WriteRawBlock
1238 StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
1239 handle->set_offset(r->get_offset());
1240 handle->set_size(block_contents.size());
1241 assert(status().ok());
1242 assert(io_status().ok());
1243
1244 {
1245 IOStatus io_s = r->file->Append(block_contents);
1246 if (!io_s.ok()) {
1247 r->SetIOStatus(io_s);
1248 return;
1249 }
1250 }
1251
1252 std::array<char, kBlockTrailerSize> trailer;
1253 trailer[0] = type;
1254 uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
1255 r->table_options.checksum, block_contents.data(), block_contents.size(),
1256 /*last_byte*/ type);
1257
1258 if (block_type == BlockType::kFilter) {
1259 Status s = r->filter_builder->MaybePostVerifyFilter(block_contents);
1260 if (!s.ok()) {
1261 r->SetStatus(s);
1262 return;
1263 }
1264 }
1265
1266 EncodeFixed32(trailer.data() + 1, checksum);
1267 TEST_SYNC_POINT_CALLBACK(
1268 "BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
1269 trailer.data());
1270 {
1271 IOStatus io_s = r->file->Append(Slice(trailer.data(), trailer.size()));
1272 if (!io_s.ok()) {
1273 r->SetIOStatus(io_s);
1274 return;
1275 }
1276 }
1277
1278 {
1279 Status s = Status::OK();
1280 bool warm_cache;
1281 switch (r->table_options.prepopulate_block_cache) {
1282 case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
1283 warm_cache = (r->reason == TableFileCreationReason::kFlush);
1284 break;
1285 case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
1286 warm_cache = false;
1287 break;
1288 default:
1289 // missing case
1290 assert(false);
1291 warm_cache = false;
1292 }
1293 if (warm_cache) {
1294 if (type == kNoCompression) {
1295 s = InsertBlockInCacheHelper(block_contents, handle, block_type);
1296 } else if (uncompressed_block_data != nullptr) {
1297 s = InsertBlockInCacheHelper(*uncompressed_block_data, handle,
1298 block_type);
1299 }
1300 if (!s.ok()) {
1301 r->SetStatus(s);
1302 return;
1303 }
1304 }
1305 s = InsertBlockInCompressedCache(block_contents, type, handle);
1306 if (!s.ok()) {
1307 r->SetStatus(s);
1308 return;
1309 }
1310 }
1311
1312 r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize);
1313 if (r->table_options.block_align && is_data_block) {
1314 size_t pad_bytes =
1315 (r->alignment -
1316 ((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
1317 (r->alignment - 1);
1318 IOStatus io_s = r->file->Pad(pad_bytes);
1319 if (io_s.ok()) {
1320 r->set_offset(r->get_offset() + pad_bytes);
1321 } else {
1322 r->SetIOStatus(io_s);
1323 return;
1324 }
1325 }
1326
1327 if (r->IsParallelCompressionEnabled()) {
1328 if (is_data_block) {
1329 r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
1330 r->get_offset());
1331 } else {
1332 r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
1333 }
1334 }
1335 }
1336
1337 void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() {
1338 Rep* r = rep_;
1339 ParallelCompressionRep::BlockRepSlot* slot = nullptr;
1340 ParallelCompressionRep::BlockRep* block_rep = nullptr;
1341 while (r->pc_rep->write_queue.pop(slot)) {
1342 assert(slot != nullptr);
1343 slot->Take(block_rep);
1344 assert(block_rep != nullptr);
1345 if (!block_rep->status.ok()) {
1346 r->SetStatus(block_rep->status);
1347 // Reap block so that blocked Flush() can finish
1348 // if there is one, and Flush() will notice !ok() next time.
1349 block_rep->status = Status::OK();
1350 r->pc_rep->ReapBlock(block_rep);
1351 continue;
1352 }
1353
1354 for (size_t i = 0; i < block_rep->keys->Size(); i++) {
1355 auto& key = (*block_rep->keys)[i];
1356 if (r->filter_builder != nullptr) {
1357 size_t ts_sz =
1358 r->internal_comparator.user_comparator()->timestamp_size();
1359 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1360 }
1361 r->index_builder->OnKeyAdded(key);
1362 }
1363
1364 r->pc_rep->file_size_estimator.SetCurrBlockUncompSize(
1365 block_rep->data->size());
1366 WriteMaybeCompressedBlock(block_rep->compressed_contents,
1367 block_rep->compression_type, &r->pending_handle,
1368 BlockType::kData, &block_rep->contents);
1369 if (!ok()) {
1370 break;
1371 }
1372
1373 r->props.data_size = r->get_offset();
1374 ++r->props.num_data_blocks;
1375
1376 if (block_rep->first_key_in_next_block == nullptr) {
1377 r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
1378 r->pending_handle);
1379 } else {
1380 Slice first_key_in_next_block =
1381 Slice(*block_rep->first_key_in_next_block);
1382 r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
1383 &first_key_in_next_block,
1384 r->pending_handle);
1385 }
1386
1387 r->pc_rep->ReapBlock(block_rep);
1388 }
1389 }
1390
1391 void BlockBasedTableBuilder::StartParallelCompression() {
1392 rep_->pc_rep.reset(
1393 new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
1394 rep_->pc_rep->compress_thread_pool.reserve(
1395 rep_->compression_opts.parallel_threads);
1396 for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
1397 rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
1398 BGWorkCompression(*(rep_->compression_ctxs[i]),
1399 rep_->verify_ctxs[i].get());
1400 });
1401 }
1402 rep_->pc_rep->write_thread.reset(
1403 new port::Thread([this] { BGWorkWriteMaybeCompressedBlock(); }));
1404 }
1405
1406 void BlockBasedTableBuilder::StopParallelCompression() {
1407 rep_->pc_rep->compress_queue.finish();
1408 for (auto& thread : rep_->pc_rep->compress_thread_pool) {
1409 thread.join();
1410 }
1411 rep_->pc_rep->write_queue.finish();
1412 rep_->pc_rep->write_thread->join();
1413 }
1414
1415 Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
1416
1417 IOStatus BlockBasedTableBuilder::io_status() const {
1418 return rep_->GetIOStatus();
1419 }
1420
1421 //
1422 // Make a copy of the block contents and insert into compressed block cache
1423 //
1424 Status BlockBasedTableBuilder::InsertBlockInCompressedCache(
1425 const Slice& block_contents, const CompressionType type,
1426 const BlockHandle* handle) {
1427 Rep* r = rep_;
1428 Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
1429 Status s;
1430 if (type != kNoCompression && block_cache_compressed != nullptr) {
1431 size_t size = block_contents.size();
1432
1433 auto ubuf =
1434 AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
1435 memcpy(ubuf.get(), block_contents.data(), size);
1436 ubuf[size] = type;
1437
1438 BlockContents* block_contents_to_cache =
1439 new BlockContents(std::move(ubuf), size);
1440 #ifndef NDEBUG
1441 block_contents_to_cache->has_trailer = true;
1442 #endif // NDEBUG
1443
1444 CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
1445
1446 s = block_cache_compressed->Insert(
1447 key.AsSlice(), block_contents_to_cache,
1448 block_contents_to_cache->ApproximateMemoryUsage(),
1449 &DeleteCacheEntry<BlockContents>);
1450 if (s.ok()) {
1451 RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD);
1452 } else {
1453 RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
1454 }
1455 // Invalidate OS cache.
1456 r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
1457 .PermitUncheckedError();
1458 }
1459 return s;
1460 }
1461
1462 Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
1463 const Slice& block_contents, const BlockHandle* handle,
1464 BlockType block_type) {
1465 Status s;
1466 switch (block_type) {
1467 case BlockType::kData:
1468 case BlockType::kIndex:
1469 case BlockType::kFilterPartitionIndex:
1470 s = InsertBlockInCache<Block>(block_contents, handle, block_type);
1471 break;
1472 case BlockType::kFilter:
1473 s = InsertBlockInCache<ParsedFullFilterBlock>(block_contents, handle,
1474 block_type);
1475 break;
1476 case BlockType::kCompressionDictionary:
1477 s = InsertBlockInCache<UncompressionDict>(block_contents, handle,
1478 block_type);
1479 break;
1480 default:
1481 // no-op / not cached
1482 break;
1483 }
1484 return s;
1485 }
1486
1487 template <typename TBlocklike>
1488 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
1489 const BlockHandle* handle,
1490 BlockType block_type) {
1491 // Uncompressed regular block cache
1492 Cache* block_cache = rep_->table_options.block_cache.get();
1493 Status s;
1494 if (block_cache != nullptr) {
1495 size_t size = block_contents.size();
1496 auto buf = AllocateBlock(size, block_cache->memory_allocator());
1497 memcpy(buf.get(), block_contents.data(), size);
1498 BlockContents results(std::move(buf), size);
1499
1500 CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
1501
1502 const size_t read_amp_bytes_per_bit =
1503 rep_->table_options.read_amp_bytes_per_bit;
1504
1505 // TODO akanksha:: Dedup below code by calling
1506 // BlockBasedTable::PutDataBlockToCache.
1507 std::unique_ptr<TBlocklike> block_holder(
1508 BlocklikeTraits<TBlocklike>::Create(
1509 std::move(results), read_amp_bytes_per_bit,
1510 rep_->ioptions.statistics.get(),
1511 false /*rep_->blocks_definitely_zstd_compressed*/,
1512 rep_->table_options.filter_policy.get()));
1513
1514 assert(block_holder->own_bytes());
1515 size_t charge = block_holder->ApproximateMemoryUsage();
1516 s = block_cache->Insert(
1517 key.AsSlice(), block_holder.get(),
1518 BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), charge,
1519 nullptr, Cache::Priority::LOW);
1520
1521 if (s.ok()) {
1522 // Release ownership of block_holder.
1523 block_holder.release();
1524 BlockBasedTable::UpdateCacheInsertionMetrics(
1525 block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
1526 rep_->ioptions.stats);
1527 } else {
1528 RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
1529 }
1530 }
1531 return s;
1532 }
1533
1534 void BlockBasedTableBuilder::WriteFilterBlock(
1535 MetaIndexBuilder* meta_index_builder) {
1536 if (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty()) {
1537 // No filter block needed
1538 return;
1539 }
1540 BlockHandle filter_block_handle;
1541 bool is_partitioned_filter = rep_->table_options.partition_filters;
1542 if (ok()) {
1543 rep_->props.num_filter_entries +=
1544 rep_->filter_builder->EstimateEntriesAdded();
1545 Status s = Status::Incomplete();
1546 while (ok() && s.IsIncomplete()) {
1547 // filter_data is used to store the transferred filter data payload from
1548 // FilterBlockBuilder and deallocate the payload by going out of scope.
1549 // Otherwise, the payload will unnecessarily remain until
1550 // BlockBasedTableBuilder is deallocated.
1551 //
1552 // See FilterBlockBuilder::Finish() for more on the difference in
1553 // transferred filter data payload among different FilterBlockBuilder
1554 // subtypes.
1555 std::unique_ptr<const char[]> filter_data;
1556 Slice filter_content =
1557 rep_->filter_builder->Finish(filter_block_handle, &s, &filter_data);
1558
1559 assert(s.ok() || s.IsIncomplete() || s.IsCorruption());
1560 if (s.IsCorruption()) {
1561 rep_->SetStatus(s);
1562 break;
1563 }
1564
1565 rep_->props.filter_size += filter_content.size();
1566
1567 BlockType btype = is_partitioned_filter && /* last */ s.ok()
1568 ? BlockType::kFilterPartitionIndex
1569 : BlockType::kFilter;
1570 WriteMaybeCompressedBlock(filter_content, kNoCompression,
1571 &filter_block_handle, btype);
1572 }
1573 rep_->filter_builder->ResetFilterBitsBuilder();
1574 }
1575 if (ok()) {
1576 // Add mapping from "<filter_block_prefix>.Name" to location
1577 // of filter data.
1578 std::string key;
1579 key = is_partitioned_filter ? BlockBasedTable::kPartitionedFilterBlockPrefix
1580 : BlockBasedTable::kFullFilterBlockPrefix;
1581 key.append(rep_->table_options.filter_policy->CompatibilityName());
1582 meta_index_builder->Add(key, filter_block_handle);
1583 }
1584 }
1585
1586 void BlockBasedTableBuilder::WriteIndexBlock(
1587 MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
1588 if (!ok()) {
1589 return;
1590 }
1591 IndexBuilder::IndexBlocks index_blocks;
1592 auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
1593 if (index_builder_status.IsIncomplete()) {
1594 // We we have more than one index partition then meta_blocks are not
1595 // supported for the index. Currently meta_blocks are used only by
1596 // HashIndexBuilder which is not multi-partition.
1597 assert(index_blocks.meta_blocks.empty());
1598 } else if (ok() && !index_builder_status.ok()) {
1599 rep_->SetStatus(index_builder_status);
1600 }
1601 if (ok()) {
1602 for (const auto& item : index_blocks.meta_blocks) {
1603 BlockHandle block_handle;
1604 WriteBlock(item.second, &block_handle, BlockType::kIndex);
1605 if (!ok()) {
1606 break;
1607 }
1608 meta_index_builder->Add(item.first, block_handle);
1609 }
1610 }
1611 if (ok()) {
1612 if (rep_->table_options.enable_index_compression) {
1613 WriteBlock(index_blocks.index_block_contents, index_block_handle,
1614 BlockType::kIndex);
1615 } else {
1616 WriteMaybeCompressedBlock(index_blocks.index_block_contents,
1617 kNoCompression, index_block_handle,
1618 BlockType::kIndex);
1619 }
1620 }
1621 // If there are more index partitions, finish them and write them out
1622 if (index_builder_status.IsIncomplete()) {
1623 bool index_building_finished = false;
1624 while (ok() && !index_building_finished) {
1625 Status s =
1626 rep_->index_builder->Finish(&index_blocks, *index_block_handle);
1627 if (s.ok()) {
1628 index_building_finished = true;
1629 } else if (s.IsIncomplete()) {
1630 // More partitioned index after this one
1631 assert(!index_building_finished);
1632 } else {
1633 // Error
1634 rep_->SetStatus(s);
1635 return;
1636 }
1637
1638 if (rep_->table_options.enable_index_compression) {
1639 WriteBlock(index_blocks.index_block_contents, index_block_handle,
1640 BlockType::kIndex);
1641 } else {
1642 WriteMaybeCompressedBlock(index_blocks.index_block_contents,
1643 kNoCompression, index_block_handle,
1644 BlockType::kIndex);
1645 }
1646 // The last index_block_handle will be for the partition index block
1647 }
1648 }
1649 }
1650
1651 void BlockBasedTableBuilder::WritePropertiesBlock(
1652 MetaIndexBuilder* meta_index_builder) {
1653 BlockHandle properties_block_handle;
1654 if (ok()) {
1655 PropertyBlockBuilder property_block_builder;
1656 rep_->props.filter_policy_name =
1657 rep_->table_options.filter_policy != nullptr
1658 ? rep_->table_options.filter_policy->Name()
1659 : "";
1660 rep_->props.index_size =
1661 rep_->index_builder->IndexSize() + kBlockTrailerSize;
1662 rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
1663 ? rep_->ioptions.user_comparator->Name()
1664 : "nullptr";
1665 rep_->props.merge_operator_name =
1666 rep_->ioptions.merge_operator != nullptr
1667 ? rep_->ioptions.merge_operator->Name()
1668 : "nullptr";
1669 rep_->props.compression_name =
1670 CompressionTypeToString(rep_->compression_type);
1671 rep_->props.compression_options =
1672 CompressionOptionsToString(rep_->compression_opts);
1673 rep_->props.prefix_extractor_name =
1674 rep_->moptions.prefix_extractor != nullptr
1675 ? rep_->moptions.prefix_extractor->AsString()
1676 : "nullptr";
1677 std::string property_collectors_names = "[";
1678 for (size_t i = 0;
1679 i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
1680 if (i != 0) {
1681 property_collectors_names += ",";
1682 }
1683 property_collectors_names +=
1684 rep_->ioptions.table_properties_collector_factories[i]->Name();
1685 }
1686 property_collectors_names += "]";
1687 rep_->props.property_collectors_names = property_collectors_names;
1688 if (rep_->table_options.index_type ==
1689 BlockBasedTableOptions::kTwoLevelIndexSearch) {
1690 assert(rep_->p_index_builder_ != nullptr);
1691 rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
1692 rep_->props.top_level_index_size =
1693 rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
1694 }
1695 rep_->props.index_key_is_user_key =
1696 !rep_->index_builder->seperator_is_key_plus_seq();
1697 rep_->props.index_value_is_delta_encoded =
1698 rep_->use_delta_encoding_for_index_values;
1699 if (rep_->sampled_input_data_bytes > 0) {
1700 rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
1701 static_cast<double>(rep_->sampled_output_slow_data_bytes) /
1702 rep_->sampled_input_data_bytes *
1703 rep_->compressible_input_data_bytes +
1704 rep_->uncompressible_input_data_bytes + 0.5);
1705 rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
1706 static_cast<double>(rep_->sampled_output_fast_data_bytes) /
1707 rep_->sampled_input_data_bytes *
1708 rep_->compressible_input_data_bytes +
1709 rep_->uncompressible_input_data_bytes + 0.5);
1710 } else if (rep_->sample_for_compression > 0) {
1711 // We tried to sample but none were found. Assume worst-case (compression
1712 // ratio 1.0) so data is complete and aggregatable.
1713 rep_->props.slow_compression_estimated_data_size =
1714 rep_->compressible_input_data_bytes +
1715 rep_->uncompressible_input_data_bytes;
1716 rep_->props.fast_compression_estimated_data_size =
1717 rep_->compressible_input_data_bytes +
1718 rep_->uncompressible_input_data_bytes;
1719 }
1720
1721 // Add basic properties
1722 property_block_builder.AddTableProperty(rep_->props);
1723
1724 // Add use collected properties
1725 NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
1726 rep_->ioptions.logger,
1727 &property_block_builder);
1728
1729 Slice block_data = property_block_builder.Finish();
1730 TEST_SYNC_POINT_CALLBACK(
1731 "BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data);
1732 WriteMaybeCompressedBlock(block_data, kNoCompression,
1733 &properties_block_handle, BlockType::kProperties);
1734 }
1735 if (ok()) {
1736 #ifndef NDEBUG
1737 {
1738 uint64_t props_block_offset = properties_block_handle.offset();
1739 uint64_t props_block_size = properties_block_handle.size();
1740 TEST_SYNC_POINT_CALLBACK(
1741 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
1742 &props_block_offset);
1743 TEST_SYNC_POINT_CALLBACK(
1744 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
1745 &props_block_size);
1746 }
1747 #endif // !NDEBUG
1748
1749 const std::string* properties_block_meta = &kPropertiesBlockName;
1750 TEST_SYNC_POINT_CALLBACK(
1751 "BlockBasedTableBuilder::WritePropertiesBlock:Meta",
1752 &properties_block_meta);
1753 meta_index_builder->Add(*properties_block_meta, properties_block_handle);
1754 }
1755 }
1756
1757 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1758 MetaIndexBuilder* meta_index_builder) {
1759 if (rep_->compression_dict != nullptr &&
1760 rep_->compression_dict->GetRawDict().size()) {
1761 BlockHandle compression_dict_block_handle;
1762 if (ok()) {
1763 WriteMaybeCompressedBlock(rep_->compression_dict->GetRawDict(),
1764 kNoCompression, &compression_dict_block_handle,
1765 BlockType::kCompressionDictionary);
1766 #ifndef NDEBUG
1767 Slice compression_dict = rep_->compression_dict->GetRawDict();
1768 TEST_SYNC_POINT_CALLBACK(
1769 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1770 &compression_dict);
1771 #endif // NDEBUG
1772 }
1773 if (ok()) {
1774 meta_index_builder->Add(kCompressionDictBlockName,
1775 compression_dict_block_handle);
1776 }
1777 }
1778 }
1779
1780 void BlockBasedTableBuilder::WriteRangeDelBlock(
1781 MetaIndexBuilder* meta_index_builder) {
1782 if (ok() && !rep_->range_del_block.empty()) {
1783 BlockHandle range_del_block_handle;
1784 WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression,
1785 &range_del_block_handle,
1786 BlockType::kRangeDeletion);
1787 meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle);
1788 }
1789 }
1790
1791 void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1792 BlockHandle& index_block_handle) {
1793 Rep* r = rep_;
1794 // this is guaranteed by BlockBasedTableBuilder's constructor
1795 assert(r->table_options.checksum == kCRC32c ||
1796 r->table_options.format_version != 0);
1797 assert(ok());
1798
1799 FooterBuilder footer;
1800 footer.Build(kBlockBasedTableMagicNumber, r->table_options.format_version,
1801 r->get_offset(), r->table_options.checksum,
1802 metaindex_block_handle, index_block_handle);
1803 IOStatus ios = r->file->Append(footer.GetSlice());
1804 if (ios.ok()) {
1805 r->set_offset(r->get_offset() + footer.GetSlice().size());
1806 } else {
1807 r->SetIOStatus(ios);
1808 }
1809 }
1810
1811 void BlockBasedTableBuilder::EnterUnbuffered() {
1812 Rep* r = rep_;
1813 assert(r->state == Rep::State::kBuffered);
1814 r->state = Rep::State::kUnbuffered;
1815 const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1816 ? r->compression_opts.zstd_max_train_bytes
1817 : r->compression_opts.max_dict_bytes;
1818 const size_t kNumBlocksBuffered = r->data_block_buffers.size();
1819 if (kNumBlocksBuffered == 0) {
1820 // The below code is neither safe nor necessary for handling zero data
1821 // blocks.
1822 return;
1823 }
1824
1825 // Abstract algebra teaches us that a finite cyclic group (such as the
1826 // additive group of integers modulo N) can be generated by a number that is
1827 // coprime with N. Since N is variable (number of buffered data blocks), we
1828 // must then pick a prime number in order to guarantee coprimeness with any N.
1829 //
1830 // One downside of this approach is the spread will be poor when
1831 // `kPrimeGeneratorRemainder` is close to zero or close to
1832 // `kNumBlocksBuffered`.
1833 //
1834 // Picked a random number between one and one trillion and then chose the
1835 // next prime number greater than or equal to it.
1836 const uint64_t kPrimeGenerator = 545055921143ull;
1837 // Can avoid repeated division by just adding the remainder repeatedly.
1838 const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
1839 kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
1840 const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
1841
1842 std::string compression_dict_samples;
1843 std::vector<size_t> compression_dict_sample_lens;
1844 size_t buffer_idx = kInitSampleIdx;
1845 for (size_t i = 0;
1846 i < kNumBlocksBuffered && compression_dict_samples.size() < kSampleBytes;
1847 ++i) {
1848 size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(),
1849 r->data_block_buffers[buffer_idx].size());
1850 compression_dict_samples.append(r->data_block_buffers[buffer_idx], 0,
1851 copy_len);
1852 compression_dict_sample_lens.emplace_back(copy_len);
1853
1854 buffer_idx += kPrimeGeneratorRemainder;
1855 if (buffer_idx >= kNumBlocksBuffered) {
1856 buffer_idx -= kNumBlocksBuffered;
1857 }
1858 }
1859
1860 // final data block flushed, now we can generate dictionary from the samples.
1861 // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1862 std::string dict;
1863 if (r->compression_opts.zstd_max_train_bytes > 0) {
1864 if (r->compression_opts.use_zstd_dict_trainer) {
1865 dict = ZSTD_TrainDictionary(compression_dict_samples,
1866 compression_dict_sample_lens,
1867 r->compression_opts.max_dict_bytes);
1868 } else {
1869 dict = ZSTD_FinalizeDictionary(
1870 compression_dict_samples, compression_dict_sample_lens,
1871 r->compression_opts.max_dict_bytes, r->compression_opts.level);
1872 }
1873 } else {
1874 dict = std::move(compression_dict_samples);
1875 }
1876 r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1877 r->compression_opts.level));
1878 r->verify_dict.reset(new UncompressionDict(
1879 dict, r->compression_type == kZSTD ||
1880 r->compression_type == kZSTDNotFinalCompression));
1881
1882 auto get_iterator_for_block = [&r](size_t i) {
1883 auto& data_block = r->data_block_buffers[i];
1884 assert(!data_block.empty());
1885
1886 Block reader{BlockContents{data_block}};
1887 DataBlockIter* iter = reader.NewDataIterator(
1888 r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber);
1889
1890 iter->SeekToFirst();
1891 assert(iter->Valid());
1892 return std::unique_ptr<DataBlockIter>(iter);
1893 };
1894
1895 std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
1896
1897 for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
1898 if (iter == nullptr) {
1899 iter = get_iterator_for_block(i);
1900 assert(iter != nullptr);
1901 };
1902
1903 if (i + 1 < r->data_block_buffers.size()) {
1904 next_block_iter = get_iterator_for_block(i + 1);
1905 }
1906
1907 auto& data_block = r->data_block_buffers[i];
1908 if (r->IsParallelCompressionEnabled()) {
1909 Slice first_key_in_next_block;
1910 const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1911 if (i + 1 < r->data_block_buffers.size()) {
1912 assert(next_block_iter != nullptr);
1913 first_key_in_next_block = next_block_iter->key();
1914 } else {
1915 first_key_in_next_block_ptr = r->first_key_in_next_block;
1916 }
1917
1918 std::vector<std::string> keys;
1919 for (; iter->Valid(); iter->Next()) {
1920 keys.emplace_back(iter->key().ToString());
1921 }
1922
1923 ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
1924 r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
1925
1926 assert(block_rep != nullptr);
1927 r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
1928 r->get_offset());
1929 r->pc_rep->EmitBlock(block_rep);
1930 } else {
1931 for (; iter->Valid(); iter->Next()) {
1932 Slice key = iter->key();
1933 if (r->filter_builder != nullptr) {
1934 size_t ts_sz =
1935 r->internal_comparator.user_comparator()->timestamp_size();
1936 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1937 }
1938 r->index_builder->OnKeyAdded(key);
1939 }
1940 WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
1941 if (ok() && i + 1 < r->data_block_buffers.size()) {
1942 assert(next_block_iter != nullptr);
1943 Slice first_key_in_next_block = next_block_iter->key();
1944
1945 Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1946
1947 iter->SeekToLast();
1948 std::string last_key = iter->key().ToString();
1949 r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr,
1950 r->pending_handle);
1951 }
1952 }
1953 std::swap(iter, next_block_iter);
1954 }
1955 r->data_block_buffers.clear();
1956 r->data_begin_offset = 0;
1957 // Release all reserved cache for data block buffers
1958 if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
1959 Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
1960 r->data_begin_offset);
1961 s.PermitUncheckedError();
1962 }
1963 }
1964
1965 Status BlockBasedTableBuilder::Finish() {
1966 Rep* r = rep_;
1967 assert(r->state != Rep::State::kClosed);
1968 bool empty_data_block = r->data_block.empty();
1969 r->first_key_in_next_block = nullptr;
1970 Flush();
1971 if (r->state == Rep::State::kBuffered) {
1972 EnterUnbuffered();
1973 }
1974 if (r->IsParallelCompressionEnabled()) {
1975 StopParallelCompression();
1976 #ifndef NDEBUG
1977 for (const auto& br : r->pc_rep->block_rep_buf) {
1978 assert(br.status.ok());
1979 }
1980 #endif // !NDEBUG
1981 } else {
1982 // To make sure properties block is able to keep the accurate size of index
1983 // block, we will finish writing all index entries first.
1984 if (ok() && !empty_data_block) {
1985 r->index_builder->AddIndexEntry(
1986 &r->last_key, nullptr /* no next data block */, r->pending_handle);
1987 }
1988 }
1989
1990 // Write meta blocks, metaindex block and footer in the following order.
1991 // 1. [meta block: filter]
1992 // 2. [meta block: index]
1993 // 3. [meta block: compression dictionary]
1994 // 4. [meta block: range deletion tombstone]
1995 // 5. [meta block: properties]
1996 // 6. [metaindex block]
1997 // 7. Footer
1998 BlockHandle metaindex_block_handle, index_block_handle;
1999 MetaIndexBuilder meta_index_builder;
2000 WriteFilterBlock(&meta_index_builder);
2001 WriteIndexBlock(&meta_index_builder, &index_block_handle);
2002 WriteCompressionDictBlock(&meta_index_builder);
2003 WriteRangeDelBlock(&meta_index_builder);
2004 WritePropertiesBlock(&meta_index_builder);
2005 if (ok()) {
2006 // flush the meta index block
2007 WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression,
2008 &metaindex_block_handle, BlockType::kMetaIndex);
2009 }
2010 if (ok()) {
2011 WriteFooter(metaindex_block_handle, index_block_handle);
2012 }
2013 r->state = Rep::State::kClosed;
2014 r->SetStatus(r->CopyIOStatus());
2015 Status ret_status = r->CopyStatus();
2016 assert(!ret_status.ok() || io_status().ok());
2017 return ret_status;
2018 }
2019
2020 void BlockBasedTableBuilder::Abandon() {
2021 assert(rep_->state != Rep::State::kClosed);
2022 if (rep_->IsParallelCompressionEnabled()) {
2023 StopParallelCompression();
2024 }
2025 rep_->state = Rep::State::kClosed;
2026 rep_->CopyStatus().PermitUncheckedError();
2027 rep_->CopyIOStatus().PermitUncheckedError();
2028 }
2029
2030 uint64_t BlockBasedTableBuilder::NumEntries() const {
2031 return rep_->props.num_entries;
2032 }
2033
2034 bool BlockBasedTableBuilder::IsEmpty() const {
2035 return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
2036 }
2037
2038 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
2039
2040 uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
2041 if (rep_->IsParallelCompressionEnabled()) {
2042 // Use compression ratio so far and inflight uncompressed bytes to estimate
2043 // final SST size.
2044 return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
2045 } else {
2046 return FileSize();
2047 }
2048 }
2049
2050 bool BlockBasedTableBuilder::NeedCompact() const {
2051 for (const auto& collector : rep_->table_properties_collectors) {
2052 if (collector->NeedCompact()) {
2053 return true;
2054 }
2055 }
2056 return false;
2057 }
2058
2059 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
2060 TableProperties ret = rep_->props;
2061 for (const auto& collector : rep_->table_properties_collectors) {
2062 for (const auto& prop : collector->GetReadableProperties()) {
2063 ret.readable_properties.insert(prop);
2064 }
2065 collector->Finish(&ret.user_collected_properties).PermitUncheckedError();
2066 }
2067 return ret;
2068 }
2069
2070 std::string BlockBasedTableBuilder::GetFileChecksum() const {
2071 if (rep_->file != nullptr) {
2072 return rep_->file->GetFileChecksum();
2073 } else {
2074 return kUnknownFileChecksum;
2075 }
2076 }
2077
2078 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
2079 if (rep_->file != nullptr) {
2080 return rep_->file->GetFileChecksumFuncName();
2081 } else {
2082 return kUnknownFileChecksumFuncName;
2083 }
2084 }
2085 void BlockBasedTableBuilder::SetSeqnoTimeTableProperties(
2086 const std::string& encoded_seqno_to_time_mapping,
2087 uint64_t oldest_ancestor_time) {
2088 rep_->props.seqno_to_time_mapping = encoded_seqno_to_time_mapping;
2089 rep_->props.creation_time = oldest_ancestor_time;
2090 }
2091
2092 const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter.";
2093 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
2094 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
2095 "partitionedfilter.";
2096 } // namespace ROCKSDB_NAMESPACE