]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/table/block_based_table_builder.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / table / block_based_table_builder.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "table/block_based_table_builder.h"
11
12#include <assert.h>
7c673cae
FG
13#include <stdio.h>
14
15#include <list>
16#include <map>
17#include <memory>
18#include <string>
19#include <unordered_map>
20#include <utility>
21
22#include "db/dbformat.h"
23
24#include "rocksdb/cache.h"
25#include "rocksdb/comparator.h"
26#include "rocksdb/env.h"
27#include "rocksdb/filter_policy.h"
28#include "rocksdb/flush_block_policy.h"
29#include "rocksdb/merge_operator.h"
30#include "rocksdb/table.h"
31
32#include "table/block.h"
33#include "table/block_based_filter_block.h"
34#include "table/block_based_table_factory.h"
35#include "table/block_based_table_reader.h"
36#include "table/block_builder.h"
37#include "table/filter_block.h"
38#include "table/format.h"
39#include "table/full_filter_block.h"
7c673cae
FG
40#include "table/table_builder.h"
41
7c673cae
FG
42#include "util/coding.h"
43#include "util/compression.h"
44#include "util/crc32c.h"
494da23a 45#include "util/memory_allocator.h"
7c673cae 46#include "util/stop_watch.h"
11fdf7f2 47#include "util/string_util.h"
7c673cae
FG
48#include "util/xxhash.h"
49
50#include "table/index_builder.h"
51#include "table/partitioned_filter_block.h"
52
53namespace rocksdb {
54
55extern const std::string kHashIndexPrefixesBlock;
56extern const std::string kHashIndexPrefixesMetadataBlock;
57
58typedef BlockBasedTableOptions::IndexType IndexType;
59
60// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
61namespace {
62
63// Create a filter block builder based on its type.
64FilterBlockBuilder* CreateFilterBlockBuilder(
11fdf7f2
TL
65 const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
66 const BlockBasedTableOptions& table_opt,
67 const bool use_delta_encoding_for_index_values,
7c673cae
FG
68 PartitionedIndexBuilder* const p_index_builder) {
69 if (table_opt.filter_policy == nullptr) return nullptr;
70
71 FilterBitsBuilder* filter_bits_builder =
72 table_opt.filter_policy->GetFilterBitsBuilder();
73 if (filter_bits_builder == nullptr) {
11fdf7f2
TL
74 return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
75 table_opt);
7c673cae
FG
76 } else {
77 if (table_opt.partition_filters) {
78 assert(p_index_builder != nullptr);
11fdf7f2
TL
79 // Since after partition cut request from filter builder it takes time
80 // until index builder actully cuts the partition, we take the lower bound
81 // as partition size.
82 assert(table_opt.block_size_deviation <= 100);
494da23a
TL
83 auto partition_size =
84 static_cast<uint32_t>(((table_opt.metadata_block_size *
85 (100 - table_opt.block_size_deviation)) +
86 99) /
87 100);
11fdf7f2 88 partition_size = std::max(partition_size, static_cast<uint32_t>(1));
7c673cae 89 return new PartitionedFilterBlockBuilder(
11fdf7f2 90 mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
7c673cae 91 filter_bits_builder, table_opt.index_block_restart_interval,
11fdf7f2 92 use_delta_encoding_for_index_values, p_index_builder, partition_size);
7c673cae 93 } else {
11fdf7f2 94 return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
7c673cae
FG
95 table_opt.whole_key_filtering,
96 filter_bits_builder);
97 }
98 }
99}
100
101bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
102 // Check to see if compressed less than 12.5%
103 return compressed_size < raw_size - (raw_size / 8u);
104}
105
494da23a
TL
106bool CompressBlockInternal(const Slice& raw,
107 const CompressionInfo& compression_info,
108 uint32_t format_version,
109 std::string* compressed_output) {
7c673cae
FG
110 // Will return compressed block contents if (1) the compression method is
111 // supported in this platform and (2) the compression rate is "good enough".
494da23a 112 switch (compression_info.type()) {
7c673cae 113 case kSnappyCompression:
494da23a
TL
114 return Snappy_Compress(compression_info, raw.data(), raw.size(),
115 compressed_output);
7c673cae 116 case kZlibCompression:
494da23a
TL
117 return Zlib_Compress(
118 compression_info,
119 GetCompressFormatForVersion(kZlibCompression, format_version),
120 raw.data(), raw.size(), compressed_output);
7c673cae 121 case kBZip2Compression:
494da23a
TL
122 return BZip2_Compress(
123 compression_info,
124 GetCompressFormatForVersion(kBZip2Compression, format_version),
125 raw.data(), raw.size(), compressed_output);
7c673cae 126 case kLZ4Compression:
494da23a
TL
127 return LZ4_Compress(
128 compression_info,
129 GetCompressFormatForVersion(kLZ4Compression, format_version),
130 raw.data(), raw.size(), compressed_output);
7c673cae 131 case kLZ4HCCompression:
494da23a
TL
132 return LZ4HC_Compress(
133 compression_info,
134 GetCompressFormatForVersion(kLZ4HCCompression, format_version),
135 raw.data(), raw.size(), compressed_output);
7c673cae 136 case kXpressCompression:
494da23a 137 return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
7c673cae
FG
138 case kZSTD:
139 case kZSTDNotFinalCompression:
494da23a
TL
140 return ZSTD_Compress(compression_info, raw.data(), raw.size(),
141 compressed_output);
142 default:
143 // Do not recognize this compression type
144 return false;
145 }
146}
147
148} // namespace
149
150// format_version is the block format as defined in include/rocksdb/table.h
151Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
152 CompressionType* type, uint32_t format_version,
153 bool do_sample, std::string* compressed_output,
154 std::string* sampled_output_fast,
155 std::string* sampled_output_slow) {
156 *type = info.type();
157
158 if (info.type() == kNoCompression && !info.SampleForCompression()) {
159 return raw;
7c673cae
FG
160 }
161
494da23a
TL
162 // If requested, we sample one in every N block with a
163 // fast and slow compression algorithm and report the stats.
164 // The users can use these stats to decide if it is worthwhile
165 // enabling compression and they also get a hint about which
166 // compression algorithm wil be beneficial.
167 if (do_sample && info.SampleForCompression() &&
168 Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
169 sampled_output_fast && sampled_output_slow) {
170 // Sampling with a fast compression algorithm
171 if (LZ4_Supported() || Snappy_Supported()) {
172 CompressionType c =
173 LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
174 CompressionContext context(c);
175 CompressionOptions options;
176 CompressionInfo info_tmp(options, context,
177 CompressionDict::GetEmptyDict(), c,
178 info.SampleForCompression());
179
180 CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
181 }
182
183 // Sampling with a slow but high-compression algorithm
184 if (ZSTD_Supported() || Zlib_Supported()) {
185 CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
186 CompressionContext context(c);
187 CompressionOptions options;
188 CompressionInfo info_tmp(options, context,
189 CompressionDict::GetEmptyDict(), c,
190 info.SampleForCompression());
191 CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
192 }
193 }
194
195 // Actually compress the data
196 if (*type != kNoCompression) {
197 if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
198 GoodCompressionRatio(compressed_output->size(), raw.size())) {
199 return *compressed_output;
200 }
201 }
202
203 // Compression method is not supported, or not good
204 // compression ratio, so just fall back to uncompressed form.
7c673cae
FG
205 *type = kNoCompression;
206 return raw;
207}
208
209// kBlockBasedTableMagicNumber was picked by running
210// echo rocksdb.table.block_based | sha1sum
211// and taking the leading 64 bits.
212// Please note that kBlockBasedTableMagicNumber may also be accessed by other
213// .cc files
214// for that reason we declare it extern in the header but to get the space
215// allocated
216// it must be not extern in one place.
217const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
218// We also support reading and writing legacy block based table format (for
219// backwards compatibility)
220const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
221
222// A collector that collects properties of interest to block-based table.
223// For now this class looks heavy-weight since we only write one additional
224// property.
225// But in the foreseeable future, we will add more and more properties that are
226// specific to block-based table.
227class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
228 : public IntTblPropCollector {
229 public:
230 explicit BlockBasedTablePropertiesCollector(
231 BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
232 bool prefix_filtering)
233 : index_type_(index_type),
234 whole_key_filtering_(whole_key_filtering),
235 prefix_filtering_(prefix_filtering) {}
236
494da23a
TL
237 Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
238 uint64_t /*file_size*/) override {
7c673cae
FG
239 // Intentionally left blank. Have no interest in collecting stats for
240 // individual key/value pairs.
241 return Status::OK();
242 }
243
494da23a
TL
244 virtual void BlockAdd(uint64_t /* blockRawBytes */,
245 uint64_t /* blockCompressedBytesFast */,
246 uint64_t /* blockCompressedBytesSlow */) override {
247 // Intentionally left blank. No interest in collecting stats for
248 // blocks.
249 return;
250 }
251
252 Status Finish(UserCollectedProperties* properties) override {
7c673cae
FG
253 std::string val;
254 PutFixed32(&val, static_cast<uint32_t>(index_type_));
255 properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
256 properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
257 whole_key_filtering_ ? kPropTrue : kPropFalse});
258 properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
259 prefix_filtering_ ? kPropTrue : kPropFalse});
260 return Status::OK();
261 }
262
263 // The name of the properties collector can be used for debugging purpose.
494da23a 264 const char* Name() const override {
7c673cae
FG
265 return "BlockBasedTablePropertiesCollector";
266 }
267
494da23a 268 UserCollectedProperties GetReadableProperties() const override {
7c673cae
FG
269 // Intentionally left blank.
270 return UserCollectedProperties();
271 }
272
273 private:
274 BlockBasedTableOptions::IndexType index_type_;
275 bool whole_key_filtering_;
276 bool prefix_filtering_;
277};
278
279struct BlockBasedTableBuilder::Rep {
280 const ImmutableCFOptions ioptions;
11fdf7f2 281 const MutableCFOptions moptions;
7c673cae
FG
282 const BlockBasedTableOptions table_options;
283 const InternalKeyComparator& internal_comparator;
284 WritableFileWriter* file;
285 uint64_t offset = 0;
286 Status status;
11fdf7f2 287 size_t alignment;
7c673cae 288 BlockBuilder data_block;
494da23a
TL
289 // Buffers uncompressed data blocks and keys to replay later. Needed when
290 // compression dictionary is enabled so we can finalize the dictionary before
291 // compressing any data blocks.
292 // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
293 // blocks as it's redundant, but it's easier to implement for now.
294 std::vector<std::pair<std::string, std::vector<std::string>>>
295 data_block_and_keys_buffers;
7c673cae
FG
296 BlockBuilder range_del_block;
297
298 InternalKeySliceTransform internal_prefix_transform;
299 std::unique_ptr<IndexBuilder> index_builder;
11fdf7f2 300 PartitionedIndexBuilder* p_index_builder_ = nullptr;
7c673cae
FG
301
302 std::string last_key;
494da23a
TL
303 CompressionType compression_type;
304 uint64_t sample_for_compression;
305 CompressionOptions compression_opts;
306 std::unique_ptr<CompressionDict> compression_dict;
11fdf7f2
TL
307 CompressionContext compression_ctx;
308 std::unique_ptr<UncompressionContext> verify_ctx;
494da23a
TL
309 std::unique_ptr<UncompressionDict> verify_dict;
310
311 size_t data_begin_offset = 0;
312
7c673cae
FG
313 TableProperties props;
314
494da23a
TL
315 // States of the builder.
316 //
317 // - `kBuffered`: This is the initial state where zero or more data blocks are
318 // accumulated uncompressed in-memory. From this state, call
319 // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
320 // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
321 // state.
322 //
323 // - `kUnbuffered`: This is the state when compression dictionary is finalized
324 // either because it wasn't enabled in the first place or it's been created
325 // from sampling previously buffered data. In this state, blocks are simply
326 // compressed/written out as they fill up. From this state, call `Finish()`
327 // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
328 // the partially created file.
329 //
330 // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
331 // called, so the table builder is no longer usable. We must be in this
332 // state by the time the destructor runs.
333 enum class State {
334 kBuffered,
335 kUnbuffered,
336 kClosed,
337 };
338 State state;
339
11fdf7f2 340 const bool use_delta_encoding_for_index_values;
7c673cae
FG
341 std::unique_ptr<FilterBlockBuilder> filter_builder;
342 char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
343 size_t compressed_cache_key_prefix_size;
344
345 BlockHandle pending_handle; // Handle to add to index block
346
347 std::string compressed_output;
348 std::unique_ptr<FlushBlockPolicy> flush_block_policy;
349 uint32_t column_family_id;
350 const std::string& column_family_name;
11fdf7f2
TL
351 uint64_t creation_time = 0;
352 uint64_t oldest_key_time = 0;
494da23a 353 const uint64_t target_file_size;
7c673cae
FG
354
355 std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
356
11fdf7f2 357 Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
7c673cae
FG
358 const BlockBasedTableOptions& table_opt,
359 const InternalKeyComparator& icomparator,
360 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
361 int_tbl_prop_collector_factories,
362 uint32_t _column_family_id, WritableFileWriter* f,
363 const CompressionType _compression_type,
494da23a
TL
364 const uint64_t _sample_for_compression,
365 const CompressionOptions& _compression_opts, const bool skip_filters,
11fdf7f2 366 const std::string& _column_family_name, const uint64_t _creation_time,
494da23a 367 const uint64_t _oldest_key_time, const uint64_t _target_file_size)
7c673cae 368 : ioptions(_ioptions),
11fdf7f2 369 moptions(_moptions),
7c673cae
FG
370 table_options(table_opt),
371 internal_comparator(icomparator),
372 file(f),
11fdf7f2
TL
373 alignment(table_options.block_align
374 ? std::min(table_options.block_size, kDefaultPageSize)
375 : 0),
7c673cae 376 data_block(table_options.block_restart_interval,
11fdf7f2
TL
377 table_options.use_delta_encoding,
378 false /* use_value_delta_encoding */,
379 icomparator.user_comparator()
380 ->CanKeysWithDifferentByteContentsBeEqual()
381 ? BlockBasedTableOptions::kDataBlockBinarySearch
382 : table_options.data_block_index_type,
383 table_options.data_block_hash_table_util_ratio),
384 range_del_block(1 /* block_restart_interval */),
385 internal_prefix_transform(_moptions.prefix_extractor.get()),
494da23a
TL
386 compression_type(_compression_type),
387 sample_for_compression(_sample_for_compression),
388 compression_opts(_compression_opts),
389 compression_dict(),
390 compression_ctx(_compression_type),
391 verify_dict(),
392 state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
393 : State::kUnbuffered),
11fdf7f2
TL
394 use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
395 !table_opt.block_align),
396 compressed_cache_key_prefix_size(0),
7c673cae
FG
397 flush_block_policy(
398 table_options.flush_block_policy_factory->NewFlushBlockPolicy(
399 table_options, data_block)),
400 column_family_id(_column_family_id),
11fdf7f2
TL
401 column_family_name(_column_family_name),
402 creation_time(_creation_time),
494da23a
TL
403 oldest_key_time(_oldest_key_time),
404 target_file_size(_target_file_size) {
7c673cae
FG
405 if (table_options.index_type ==
406 BlockBasedTableOptions::kTwoLevelIndexSearch) {
11fdf7f2
TL
407 p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
408 &internal_comparator, use_delta_encoding_for_index_values,
409 table_options);
410 index_builder.reset(p_index_builder_);
7c673cae
FG
411 } else {
412 index_builder.reset(IndexBuilder::CreateIndexBuilder(
413 table_options.index_type, &internal_comparator,
11fdf7f2
TL
414 &this->internal_prefix_transform, use_delta_encoding_for_index_values,
415 table_options));
7c673cae
FG
416 }
417 if (skip_filters) {
418 filter_builder = nullptr;
419 } else {
11fdf7f2
TL
420 filter_builder.reset(CreateFilterBlockBuilder(
421 _ioptions, _moptions, table_options,
422 use_delta_encoding_for_index_values, p_index_builder_));
7c673cae
FG
423 }
424
425 for (auto& collector_factories : *int_tbl_prop_collector_factories) {
426 table_properties_collectors.emplace_back(
427 collector_factories->CreateIntTblPropCollector(column_family_id));
428 }
429 table_properties_collectors.emplace_back(
430 new BlockBasedTablePropertiesCollector(
431 table_options.index_type, table_options.whole_key_filtering,
11fdf7f2
TL
432 _moptions.prefix_extractor != nullptr));
433 if (table_options.verify_compression) {
434 verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
494da23a 435 compression_type));
11fdf7f2 436 }
7c673cae 437 }
11fdf7f2
TL
438
439 Rep(const Rep&) = delete;
440 Rep& operator=(const Rep&) = delete;
441
442 ~Rep() {}
7c673cae
FG
443};
444
445BlockBasedTableBuilder::BlockBasedTableBuilder(
11fdf7f2 446 const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
7c673cae
FG
447 const BlockBasedTableOptions& table_options,
448 const InternalKeyComparator& internal_comparator,
449 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
450 int_tbl_prop_collector_factories,
451 uint32_t column_family_id, WritableFileWriter* file,
452 const CompressionType compression_type,
494da23a
TL
453 const uint64_t sample_for_compression,
454 const CompressionOptions& compression_opts, const bool skip_filters,
11fdf7f2 455 const std::string& column_family_name, const uint64_t creation_time,
494da23a 456 const uint64_t oldest_key_time, const uint64_t target_file_size) {
7c673cae
FG
457 BlockBasedTableOptions sanitized_table_options(table_options);
458 if (sanitized_table_options.format_version == 0 &&
459 sanitized_table_options.checksum != kCRC32c) {
460 ROCKS_LOG_WARN(
461 ioptions.info_log,
462 "Silently converting format_version to 1 because checksum is "
463 "non-default");
464 // silently convert format_version to 1 to keep consistent with current
465 // behavior
466 sanitized_table_options.format_version = 1;
467 }
468
494da23a
TL
469 rep_ = new Rep(
470 ioptions, moptions, sanitized_table_options, internal_comparator,
471 int_tbl_prop_collector_factories, column_family_id, file,
472 compression_type, sample_for_compression, compression_opts, skip_filters,
473 column_family_name, creation_time, oldest_key_time, target_file_size);
7c673cae
FG
474
475 if (rep_->filter_builder != nullptr) {
476 rep_->filter_builder->StartBlock(0);
477 }
478 if (table_options.block_cache_compressed.get() != nullptr) {
479 BlockBasedTable::GenerateCachePrefix(
480 table_options.block_cache_compressed.get(), file->writable_file(),
481 &rep_->compressed_cache_key_prefix[0],
482 &rep_->compressed_cache_key_prefix_size);
483 }
484}
485
486BlockBasedTableBuilder::~BlockBasedTableBuilder() {
494da23a
TL
487 // Catch errors where caller forgot to call Finish()
488 assert(rep_->state == Rep::State::kClosed);
7c673cae
FG
489 delete rep_;
490}
491
492void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
493 Rep* r = rep_;
494da23a 494 assert(rep_->state != Rep::State::kClosed);
7c673cae
FG
495 if (!ok()) return;
496 ValueType value_type = ExtractValueType(key);
497 if (IsValueType(value_type)) {
494da23a
TL
498#ifndef NDEBUG
499 if (r->props.num_entries > r->props.num_range_deletions) {
7c673cae
FG
500 assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
501 }
494da23a 502#endif // NDEBUG
7c673cae
FG
503
504 auto should_flush = r->flush_block_policy->Update(key, value);
505 if (should_flush) {
506 assert(!r->data_block.empty());
507 Flush();
508
494da23a
TL
509 if (r->state == Rep::State::kBuffered &&
510 r->data_begin_offset > r->target_file_size) {
511 EnterUnbuffered();
512 }
513
7c673cae
FG
514 // Add item to index block.
515 // We do not emit the index entry for a block until we have seen the
516 // first key for the next data block. This allows us to use shorter
517 // keys in the index block. For example, consider a block boundary
518 // between the keys "the quick brown fox" and "the who". We can use
519 // "the r" as the key for the index block entry since it is >= all
520 // entries in the first block and < all entries in subsequent
521 // blocks.
494da23a 522 if (ok() && r->state == Rep::State::kUnbuffered) {
7c673cae
FG
523 r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
524 }
525 }
526
527 // Note: PartitionedFilterBlockBuilder requires key being added to filter
528 // builder after being added to index builder.
494da23a 529 if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
7c673cae
FG
530 r->filter_builder->Add(ExtractUserKey(key));
531 }
532
533 r->last_key.assign(key.data(), key.size());
534 r->data_block.Add(key, value);
494da23a
TL
535 if (r->state == Rep::State::kBuffered) {
536 // Buffer keys to be replayed during `Finish()` once compression
537 // dictionary has been finalized.
538 if (r->data_block_and_keys_buffers.empty() || should_flush) {
539 r->data_block_and_keys_buffers.emplace_back();
540 }
541 r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
542 } else {
543 r->index_builder->OnKeyAdded(key);
544 }
7c673cae
FG
545 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
546 r->table_properties_collectors,
547 r->ioptions.info_log);
548
549 } else if (value_type == kTypeRangeDeletion) {
7c673cae 550 r->range_del_block.Add(key, value);
7c673cae
FG
551 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
552 r->table_properties_collectors,
553 r->ioptions.info_log);
554 } else {
555 assert(false);
556 }
494da23a
TL
557
558 r->props.num_entries++;
559 r->props.raw_key_size += key.size();
560 r->props.raw_value_size += value.size();
561 if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
562 r->props.num_deletions++;
563 } else if (value_type == kTypeRangeDeletion) {
564 r->props.num_deletions++;
565 r->props.num_range_deletions++;
566 } else if (value_type == kTypeMerge) {
567 r->props.num_merge_operands++;
568 }
7c673cae
FG
569}
570
571void BlockBasedTableBuilder::Flush() {
572 Rep* r = rep_;
494da23a 573 assert(rep_->state != Rep::State::kClosed);
7c673cae
FG
574 if (!ok()) return;
575 if (r->data_block.empty()) return;
576 WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
7c673cae
FG
577}
578
579void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
580 BlockHandle* handle,
581 bool is_data_block) {
582 WriteBlock(block->Finish(), handle, is_data_block);
583 block->Reset();
584}
585
586void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
587 BlockHandle* handle,
588 bool is_data_block) {
589 // File format contains a sequence of blocks where each block has:
590 // block_data: uint8[n]
591 // type: uint8
592 // crc: uint32
593 assert(ok());
594 Rep* r = rep_;
595
494da23a
TL
596 auto type = r->compression_type;
597 uint64_t sample_for_compression = r->sample_for_compression;
7c673cae
FG
598 Slice block_contents;
599 bool abort_compression = false;
600
494da23a
TL
601 StopWatchNano timer(
602 r->ioptions.env,
603 ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
604
605 if (r->state == Rep::State::kBuffered) {
606 assert(is_data_block);
607 assert(!r->data_block_and_keys_buffers.empty());
608 r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
609 r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
610 return;
611 }
7c673cae
FG
612
613 if (raw_block_contents.size() < kCompressionSizeLimit) {
494da23a
TL
614 const CompressionDict* compression_dict;
615 if (!is_data_block || r->compression_dict == nullptr) {
616 compression_dict = &CompressionDict::GetEmptyDict();
11fdf7f2 617 } else {
494da23a 618 compression_dict = r->compression_dict.get();
7c673cae 619 }
494da23a
TL
620 assert(compression_dict != nullptr);
621 CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
622 *compression_dict, type,
623 sample_for_compression);
624
625 std::string sampled_output_fast;
626 std::string sampled_output_slow;
627 block_contents = CompressBlock(
628 raw_block_contents, compression_info, &type,
629 r->table_options.format_version, is_data_block /* do_sample */,
630 &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
631
632 // notify collectors on block add
633 NotifyCollectTableCollectorsOnBlockAdd(
634 r->table_properties_collectors, raw_block_contents.size(),
635 sampled_output_fast.size(), sampled_output_slow.size());
7c673cae
FG
636
637 // Some of the compression algorithms are known to be unreliable. If
638 // the verify_compression flag is set then try to de-compress the
639 // compressed data and compare to the input.
640 if (type != kNoCompression && r->table_options.verify_compression) {
641 // Retrieve the uncompressed contents into a new buffer
494da23a
TL
642 const UncompressionDict* verify_dict;
643 if (!is_data_block || r->verify_dict == nullptr) {
644 verify_dict = &UncompressionDict::GetEmptyDict();
645 } else {
646 verify_dict = r->verify_dict.get();
647 }
648 assert(verify_dict != nullptr);
7c673cae 649 BlockContents contents;
494da23a
TL
650 UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
651 r->compression_type);
7c673cae 652 Status stat = UncompressBlockContentsForCompressionType(
494da23a 653 uncompression_info, block_contents.data(), block_contents.size(),
11fdf7f2 654 &contents, r->table_options.format_version, r->ioptions);
7c673cae
FG
655
656 if (stat.ok()) {
657 bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
658 if (!compressed_ok) {
659 // The result of the compression was invalid. abort.
660 abort_compression = true;
661 ROCKS_LOG_ERROR(r->ioptions.info_log,
662 "Decompressed block did not match raw block");
663 r->status =
664 Status::Corruption("Decompressed block did not match raw block");
665 }
666 } else {
667 // Decompression reported an error. abort.
668 r->status = Status::Corruption("Could not decompress");
669 abort_compression = true;
670 }
671 }
672 } else {
673 // Block is too big to be compressed.
674 abort_compression = true;
675 }
676
677 // Abort compression if the block is too big, or did not pass
678 // verification.
679 if (abort_compression) {
680 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
681 type = kNoCompression;
682 block_contents = raw_block_contents;
11fdf7f2
TL
683 } else if (type != kNoCompression) {
684 if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
494da23a
TL
685 RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
686 timer.ElapsedNanos());
11fdf7f2 687 }
494da23a
TL
688 RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
689 raw_block_contents.size());
7c673cae 690 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
494da23a
TL
691 } else if (type != r->compression_type) {
692 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
7c673cae
FG
693 }
694
11fdf7f2 695 WriteRawBlock(block_contents, type, handle, is_data_block);
7c673cae 696 r->compressed_output.clear();
494da23a
TL
697 if (is_data_block) {
698 if (r->filter_builder != nullptr) {
699 r->filter_builder->StartBlock(r->offset);
700 }
701 r->props.data_size = r->offset;
702 ++r->props.num_data_blocks;
703 }
7c673cae
FG
704}
705
706void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
707 CompressionType type,
11fdf7f2
TL
708 BlockHandle* handle,
709 bool is_data_block) {
7c673cae
FG
710 Rep* r = rep_;
711 StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
712 handle->set_offset(r->offset);
713 handle->set_size(block_contents.size());
11fdf7f2 714 assert(r->status.ok());
7c673cae
FG
715 r->status = r->file->Append(block_contents);
716 if (r->status.ok()) {
717 char trailer[kBlockTrailerSize];
718 trailer[0] = type;
719 char* trailer_without_type = trailer + 1;
720 switch (r->table_options.checksum) {
721 case kNoChecksum:
11fdf7f2
TL
722 EncodeFixed32(trailer_without_type, 0);
723 break;
7c673cae
FG
724 case kCRC32c: {
725 auto crc = crc32c::Value(block_contents.data(), block_contents.size());
726 crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
727 EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
728 break;
729 }
730 case kxxHash: {
731 void* xxh = XXH32_init(0);
732 XXH32_update(xxh, block_contents.data(),
733 static_cast<uint32_t>(block_contents.size()));
734 XXH32_update(xxh, trailer, 1); // Extend to cover block type
735 EncodeFixed32(trailer_without_type, XXH32_digest(xxh));
736 break;
737 }
494da23a
TL
738 case kxxHash64: {
739 XXH64_state_t* const state = XXH64_createState();
740 XXH64_reset(state, 0);
741 XXH64_update(state, block_contents.data(),
742 static_cast<uint32_t>(block_contents.size()));
743 XXH64_update(state, trailer, 1); // Extend to cover block type
744 EncodeFixed32(
745 trailer_without_type,
746 static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
747 uint64_t{0xffffffff}));
748 XXH64_freeState(state);
749 break;
750 }
7c673cae
FG
751 }
752
11fdf7f2 753 assert(r->status.ok());
494da23a
TL
754 TEST_SYNC_POINT_CALLBACK(
755 "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
756 static_cast<char*>(trailer));
7c673cae
FG
757 r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
758 if (r->status.ok()) {
759 r->status = InsertBlockInCache(block_contents, type, handle);
760 }
761 if (r->status.ok()) {
762 r->offset += block_contents.size() + kBlockTrailerSize;
11fdf7f2
TL
763 if (r->table_options.block_align && is_data_block) {
764 size_t pad_bytes =
765 (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
766 (r->alignment - 1))) &
767 (r->alignment - 1);
768 r->status = r->file->Pad(pad_bytes);
769 if (r->status.ok()) {
770 r->offset += pad_bytes;
771 }
772 }
7c673cae
FG
773 }
774 }
775}
776
494da23a 777Status BlockBasedTableBuilder::status() const { return rep_->status; }
7c673cae 778
494da23a
TL
779static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
780 BlockContents* bc = reinterpret_cast<BlockContents*>(value);
781 delete bc;
7c673cae
FG
782}
783
784//
785// Make a copy of the block contents and insert into compressed block cache
786//
787Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
788 const CompressionType type,
789 const BlockHandle* handle) {
790 Rep* r = rep_;
791 Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
792
793 if (type != kNoCompression && block_cache_compressed != nullptr) {
7c673cae
FG
794 size_t size = block_contents.size();
795
494da23a
TL
796 auto ubuf =
797 AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
7c673cae
FG
798 memcpy(ubuf.get(), block_contents.data(), size);
799 ubuf[size] = type;
800
494da23a
TL
801 BlockContents* block_contents_to_cache =
802 new BlockContents(std::move(ubuf), size);
803#ifndef NDEBUG
804 block_contents_to_cache->is_raw_block = true;
805#endif // NDEBUG
7c673cae
FG
806
807 // make cache key by appending the file offset to the cache prefix id
808 char* end = EncodeVarint64(
494da23a
TL
809 r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
810 handle->offset());
811 Slice key(r->compressed_cache_key_prefix,
812 static_cast<size_t>(end - r->compressed_cache_key_prefix));
7c673cae
FG
813
814 // Insert into compressed block cache.
494da23a
TL
815 block_cache_compressed->Insert(
816 key, block_contents_to_cache,
817 block_contents_to_cache->ApproximateMemoryUsage(),
818 &DeleteCachedBlockContents);
7c673cae
FG
819
820 // Invalidate OS cache.
821 r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
822 }
823 return Status::OK();
824}
825
11fdf7f2
TL
826void BlockBasedTableBuilder::WriteFilterBlock(
827 MetaIndexBuilder* meta_index_builder) {
828 BlockHandle filter_block_handle;
829 bool empty_filter_block = (rep_->filter_builder == nullptr ||
830 rep_->filter_builder->NumAdded() == 0);
831 if (ok() && !empty_filter_block) {
7c673cae 832 Status s = Status::Incomplete();
11fdf7f2
TL
833 while (ok() && s.IsIncomplete()) {
834 Slice filter_content =
835 rep_->filter_builder->Finish(filter_block_handle, &s);
7c673cae 836 assert(s.ok() || s.IsIncomplete());
11fdf7f2 837 rep_->props.filter_size += filter_content.size();
7c673cae
FG
838 WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
839 }
840 }
11fdf7f2
TL
841 if (ok() && !empty_filter_block) {
842 // Add mapping from "<filter_block_prefix>.Name" to location
843 // of filter data.
844 std::string key;
845 if (rep_->filter_builder->IsBlockBased()) {
846 key = BlockBasedTable::kFilterBlockPrefix;
847 } else {
848 key = rep_->table_options.partition_filters
849 ? BlockBasedTable::kPartitionedFilterBlockPrefix
850 : BlockBasedTable::kFullFilterBlockPrefix;
851 }
852 key.append(rep_->table_options.filter_policy->Name());
853 meta_index_builder->Add(key, filter_block_handle);
854 }
855}
7c673cae 856
11fdf7f2
TL
857void BlockBasedTableBuilder::WriteIndexBlock(
858 MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
7c673cae 859 IndexBuilder::IndexBlocks index_blocks;
11fdf7f2 860 auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
7c673cae
FG
861 if (index_builder_status.IsIncomplete()) {
862 // We we have more than one index partition then meta_blocks are not
863 // supported for the index. Currently meta_blocks are used only by
864 // HashIndexBuilder which is not multi-partition.
865 assert(index_blocks.meta_blocks.empty());
11fdf7f2
TL
866 } else if (ok() && !index_builder_status.ok()) {
867 rep_->status = index_builder_status;
7c673cae 868 }
11fdf7f2
TL
869 if (ok()) {
870 for (const auto& item : index_blocks.meta_blocks) {
871 BlockHandle block_handle;
872 WriteBlock(item.second, &block_handle, false /* is_data_block */);
873 if (!ok()) {
874 break;
875 }
876 meta_index_builder->Add(item.first, block_handle);
877 }
7c673cae 878 }
11fdf7f2
TL
879 if (ok()) {
880 if (rep_->table_options.enable_index_compression) {
881 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
882 } else {
883 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
884 index_block_handle);
885 }
886 }
887 // If there are more index partitions, finish them and write them out
888 Status s = index_builder_status;
889 while (ok() && s.IsIncomplete()) {
890 s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
891 if (!s.ok() && !s.IsIncomplete()) {
892 rep_->status = s;
893 return;
894 }
895 if (rep_->table_options.enable_index_compression) {
896 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
897 } else {
898 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
899 index_block_handle);
900 }
901 // The last index_block_handle will be for the partition index block
902 }
903}
7c673cae 904
11fdf7f2
TL
905void BlockBasedTableBuilder::WritePropertiesBlock(
906 MetaIndexBuilder* meta_index_builder) {
907 BlockHandle properties_block_handle;
7c673cae 908 if (ok()) {
11fdf7f2
TL
909 PropertyBlockBuilder property_block_builder;
910 rep_->props.column_family_id = rep_->column_family_id;
911 rep_->props.column_family_name = rep_->column_family_name;
912 rep_->props.filter_policy_name =
913 rep_->table_options.filter_policy != nullptr
914 ? rep_->table_options.filter_policy->Name()
915 : "";
916 rep_->props.index_size =
917 rep_->index_builder->IndexSize() + kBlockTrailerSize;
918 rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
919 ? rep_->ioptions.user_comparator->Name()
920 : "nullptr";
921 rep_->props.merge_operator_name =
922 rep_->ioptions.merge_operator != nullptr
923 ? rep_->ioptions.merge_operator->Name()
924 : "nullptr";
925 rep_->props.compression_name =
494da23a
TL
926 CompressionTypeToString(rep_->compression_type);
927 rep_->props.compression_options =
928 CompressionOptionsToString(rep_->compression_opts);
11fdf7f2
TL
929 rep_->props.prefix_extractor_name =
930 rep_->moptions.prefix_extractor != nullptr
931 ? rep_->moptions.prefix_extractor->Name()
932 : "nullptr";
933
934 std::string property_collectors_names = "[";
935 for (size_t i = 0;
936 i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
937 if (i != 0) {
938 property_collectors_names += ",";
7c673cae 939 }
11fdf7f2
TL
940 property_collectors_names +=
941 rep_->ioptions.table_properties_collector_factories[i]->Name();
942 }
943 property_collectors_names += "]";
944 rep_->props.property_collectors_names = property_collectors_names;
945 if (rep_->table_options.index_type ==
946 BlockBasedTableOptions::kTwoLevelIndexSearch) {
947 assert(rep_->p_index_builder_ != nullptr);
948 rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
949 rep_->props.top_level_index_size =
950 rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
951 }
952 rep_->props.index_key_is_user_key =
953 !rep_->index_builder->seperator_is_key_plus_seq();
954 rep_->props.index_value_is_delta_encoded =
955 rep_->use_delta_encoding_for_index_values;
956 rep_->props.creation_time = rep_->creation_time;
957 rep_->props.oldest_key_time = rep_->oldest_key_time;
958
959 // Add basic properties
960 property_block_builder.AddTableProperty(rep_->props);
961
962 // Add use collected properties
963 NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
964 rep_->ioptions.info_log,
965 &property_block_builder);
966
967 WriteRawBlock(property_block_builder.Finish(), kNoCompression,
968 &properties_block_handle);
969 }
970 if (ok()) {
494da23a
TL
971#ifndef NDEBUG
972 {
973 uint64_t props_block_offset = properties_block_handle.offset();
974 uint64_t props_block_size = properties_block_handle.size();
975 TEST_SYNC_POINT_CALLBACK(
976 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
977 &props_block_offset);
978 TEST_SYNC_POINT_CALLBACK(
979 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
980 &props_block_size);
981 }
982#endif // !NDEBUG
11fdf7f2
TL
983 meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
984 }
985}
986
987void BlockBasedTableBuilder::WriteCompressionDictBlock(
988 MetaIndexBuilder* meta_index_builder) {
494da23a
TL
989 if (rep_->compression_dict != nullptr &&
990 rep_->compression_dict->GetRawDict().size()) {
11fdf7f2
TL
991 BlockHandle compression_dict_block_handle;
992 if (ok()) {
494da23a 993 WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
11fdf7f2 994 &compression_dict_block_handle);
494da23a
TL
995#ifndef NDEBUG
996 Slice compression_dict = rep_->compression_dict->GetRawDict();
997 TEST_SYNC_POINT_CALLBACK(
998 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
999 &compression_dict);
1000#endif // NDEBUG
11fdf7f2
TL
1001 }
1002 if (ok()) {
1003 meta_index_builder->Add(kCompressionDictBlock,
1004 compression_dict_block_handle);
7c673cae 1005 }
11fdf7f2
TL
1006 }
1007}
7c673cae 1008
11fdf7f2
TL
1009void BlockBasedTableBuilder::WriteRangeDelBlock(
1010 MetaIndexBuilder* meta_index_builder) {
1011 if (ok() && !rep_->range_del_block.empty()) {
1012 BlockHandle range_del_block_handle;
1013 WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1014 &range_del_block_handle);
1015 meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1016 }
1017}
7c673cae 1018
494da23a
TL
1019void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1020 BlockHandle& index_block_handle) {
1021 Rep* r = rep_;
1022 // No need to write out new footer if we're using default checksum.
1023 // We're writing legacy magic number because we want old versions of RocksDB
1024 // be able to read files generated with new release (just in case if
1025 // somebody wants to roll back after an upgrade)
1026 // TODO(icanadi) at some point in the future, when we're absolutely sure
1027 // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1028 // number and always write new table files with new magic number
1029 bool legacy = (r->table_options.format_version == 0);
1030 // this is guaranteed by BlockBasedTableBuilder's constructor
1031 assert(r->table_options.checksum == kCRC32c ||
1032 r->table_options.format_version != 0);
1033 Footer footer(
1034 legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1035 r->table_options.format_version);
1036 footer.set_metaindex_handle(metaindex_block_handle);
1037 footer.set_index_handle(index_block_handle);
1038 footer.set_checksum(r->table_options.checksum);
1039 std::string footer_encoding;
1040 footer.EncodeTo(&footer_encoding);
1041 assert(r->status.ok());
1042 r->status = r->file->Append(footer_encoding);
1043 if (r->status.ok()) {
1044 r->offset += footer_encoding.size();
1045 }
1046}
1047
1048void BlockBasedTableBuilder::EnterUnbuffered() {
1049 Rep* r = rep_;
1050 assert(r->state == Rep::State::kBuffered);
1051 r->state = Rep::State::kUnbuffered;
1052 const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1053 ? r->compression_opts.zstd_max_train_bytes
1054 : r->compression_opts.max_dict_bytes;
1055 Random64 generator{r->creation_time};
1056 std::string compression_dict_samples;
1057 std::vector<size_t> compression_dict_sample_lens;
1058 if (!r->data_block_and_keys_buffers.empty()) {
1059 while (compression_dict_samples.size() < kSampleBytes) {
1060 size_t rand_idx =
1061 generator.Uniform(r->data_block_and_keys_buffers.size());
1062 size_t copy_len =
1063 std::min(kSampleBytes - compression_dict_samples.size(),
1064 r->data_block_and_keys_buffers[rand_idx].first.size());
1065 compression_dict_samples.append(
1066 r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
1067 compression_dict_sample_lens.emplace_back(copy_len);
1068 }
1069 }
1070
1071 // final data block flushed, now we can generate dictionary from the samples.
1072 // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1073 std::string dict;
1074 if (r->compression_opts.zstd_max_train_bytes > 0) {
1075 dict = ZSTD_TrainDictionary(compression_dict_samples,
1076 compression_dict_sample_lens,
1077 r->compression_opts.max_dict_bytes);
1078 } else {
1079 dict = std::move(compression_dict_samples);
1080 }
1081 r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1082 r->compression_opts.level));
1083 r->verify_dict.reset(new UncompressionDict(
1084 dict, r->compression_type == kZSTD ||
1085 r->compression_type == kZSTDNotFinalCompression));
1086
1087 for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
1088 const auto& data_block = r->data_block_and_keys_buffers[i].first;
1089 auto& keys = r->data_block_and_keys_buffers[i].second;
1090 assert(!data_block.empty());
1091 assert(!keys.empty());
1092
1093 for (const auto& key : keys) {
1094 if (r->filter_builder != nullptr) {
1095 r->filter_builder->Add(ExtractUserKey(key));
1096 }
1097 r->index_builder->OnKeyAdded(key);
1098 }
1099 WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
1100 if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
1101 Slice first_key_in_next_block =
1102 r->data_block_and_keys_buffers[i + 1].second.front();
1103 Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1104 r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
1105 r->pending_handle);
1106 }
1107 }
1108 r->data_block_and_keys_buffers.clear();
1109}
1110
11fdf7f2
TL
1111Status BlockBasedTableBuilder::Finish() {
1112 Rep* r = rep_;
494da23a 1113 assert(r->state != Rep::State::kClosed);
11fdf7f2
TL
1114 bool empty_data_block = r->data_block.empty();
1115 Flush();
494da23a
TL
1116 if (r->state == Rep::State::kBuffered) {
1117 EnterUnbuffered();
1118 }
11fdf7f2
TL
1119 // To make sure properties block is able to keep the accurate size of index
1120 // block, we will finish writing all index entries first.
1121 if (ok() && !empty_data_block) {
1122 r->index_builder->AddIndexEntry(
1123 &r->last_key, nullptr /* no next data block */, r->pending_handle);
1124 }
7c673cae 1125
494da23a 1126 // Write meta blocks, metaindex block and footer in the following order.
11fdf7f2
TL
1127 // 1. [meta block: filter]
1128 // 2. [meta block: index]
1129 // 3. [meta block: compression dictionary]
1130 // 4. [meta block: range deletion tombstone]
1131 // 5. [meta block: properties]
1132 // 6. [metaindex block]
494da23a 1133 // 7. Footer
11fdf7f2
TL
1134 BlockHandle metaindex_block_handle, index_block_handle;
1135 MetaIndexBuilder meta_index_builder;
1136 WriteFilterBlock(&meta_index_builder);
1137 WriteIndexBlock(&meta_index_builder, &index_block_handle);
1138 WriteCompressionDictBlock(&meta_index_builder);
1139 WriteRangeDelBlock(&meta_index_builder);
1140 WritePropertiesBlock(&meta_index_builder);
7c673cae
FG
1141 if (ok()) {
1142 // flush the meta index block
1143 WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1144 &metaindex_block_handle);
7c673cae 1145 }
7c673cae 1146 if (ok()) {
494da23a 1147 WriteFooter(metaindex_block_handle, index_block_handle);
7c673cae 1148 }
494da23a 1149 r->state = Rep::State::kClosed;
7c673cae
FG
1150 return r->status;
1151}
1152
1153void BlockBasedTableBuilder::Abandon() {
494da23a
TL
1154 assert(rep_->state != Rep::State::kClosed);
1155 rep_->state = Rep::State::kClosed;
7c673cae
FG
1156}
1157
1158uint64_t BlockBasedTableBuilder::NumEntries() const {
1159 return rep_->props.num_entries;
1160}
1161
494da23a 1162uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
7c673cae
FG
1163
1164bool BlockBasedTableBuilder::NeedCompact() const {
1165 for (const auto& collector : rep_->table_properties_collectors) {
1166 if (collector->NeedCompact()) {
1167 return true;
1168 }
1169 }
1170 return false;
1171}
1172
1173TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1174 TableProperties ret = rep_->props;
1175 for (const auto& collector : rep_->table_properties_collectors) {
1176 for (const auto& prop : collector->GetReadableProperties()) {
1177 ret.readable_properties.insert(prop);
1178 }
1179 collector->Finish(&ret.user_collected_properties);
1180 }
1181 return ret;
1182}
1183
1184const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1185const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1186const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1187 "partitionedfilter.";
1188} // namespace rocksdb