]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/table/block_based/block_based_table_builder.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / block_based / block_based_table_builder.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
f67539c2 10#include "table/block_based/block_based_table_builder.h"
7c673cae
FG
11
12#include <assert.h>
7c673cae 13#include <stdio.h>
20effc67 14#include <atomic>
7c673cae
FG
15#include <list>
16#include <map>
17#include <memory>
18#include <string>
19#include <unordered_map>
20#include <utility>
21
22#include "db/dbformat.h"
f67539c2 23#include "index_builder.h"
7c673cae
FG
24
25#include "rocksdb/cache.h"
26#include "rocksdb/comparator.h"
27#include "rocksdb/env.h"
7c673cae
FG
28#include "rocksdb/flush_block_policy.h"
29#include "rocksdb/merge_operator.h"
30#include "rocksdb/table.h"
31
f67539c2
TL
32#include "table/block_based/block.h"
33#include "table/block_based/block_based_filter_block.h"
34#include "table/block_based/block_based_table_factory.h"
35#include "table/block_based/block_based_table_reader.h"
36#include "table/block_based/block_builder.h"
37#include "table/block_based/filter_block.h"
38#include "table/block_based/filter_policy_internal.h"
39#include "table/block_based/full_filter_block.h"
40#include "table/block_based/partitioned_filter_block.h"
7c673cae 41#include "table/format.h"
7c673cae
FG
42#include "table/table_builder.h"
43
f67539c2 44#include "memory/memory_allocator.h"
7c673cae
FG
45#include "util/coding.h"
46#include "util/compression.h"
47#include "util/crc32c.h"
48#include "util/stop_watch.h"
11fdf7f2 49#include "util/string_util.h"
20effc67 50#include "util/work_queue.h"
7c673cae
FG
51#include "util/xxhash.h"
52
f67539c2 53namespace ROCKSDB_NAMESPACE {
7c673cae
FG
54
55extern const std::string kHashIndexPrefixesBlock;
56extern const std::string kHashIndexPrefixesMetadataBlock;
57
7c673cae
FG
58
59// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
60namespace {
61
62// Create a filter block builder based on its type.
63FilterBlockBuilder* CreateFilterBlockBuilder(
11fdf7f2 64 const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
f67539c2 65 const FilterBuildingContext& context,
11fdf7f2 66 const bool use_delta_encoding_for_index_values,
7c673cae 67 PartitionedIndexBuilder* const p_index_builder) {
f67539c2 68 const BlockBasedTableOptions& table_opt = context.table_options;
7c673cae
FG
69 if (table_opt.filter_policy == nullptr) return nullptr;
70
71 FilterBitsBuilder* filter_bits_builder =
f67539c2 72 BloomFilterPolicy::GetBuilderFromContext(context);
7c673cae 73 if (filter_bits_builder == nullptr) {
11fdf7f2
TL
74 return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
75 table_opt);
7c673cae
FG
76 } else {
77 if (table_opt.partition_filters) {
78 assert(p_index_builder != nullptr);
11fdf7f2
TL
79 // Since after partition cut request from filter builder it takes time
80 // until index builder actully cuts the partition, we take the lower bound
81 // as partition size.
82 assert(table_opt.block_size_deviation <= 100);
494da23a
TL
83 auto partition_size =
84 static_cast<uint32_t>(((table_opt.metadata_block_size *
85 (100 - table_opt.block_size_deviation)) +
86 99) /
87 100);
11fdf7f2 88 partition_size = std::max(partition_size, static_cast<uint32_t>(1));
7c673cae 89 return new PartitionedFilterBlockBuilder(
11fdf7f2 90 mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
7c673cae 91 filter_bits_builder, table_opt.index_block_restart_interval,
11fdf7f2 92 use_delta_encoding_for_index_values, p_index_builder, partition_size);
7c673cae 93 } else {
11fdf7f2 94 return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
7c673cae
FG
95 table_opt.whole_key_filtering,
96 filter_bits_builder);
97 }
98 }
99}
100
101bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
102 // Check to see if compressed less than 12.5%
103 return compressed_size < raw_size - (raw_size / 8u);
104}
105
494da23a
TL
106} // namespace
107
108// format_version is the block format as defined in include/rocksdb/table.h
109Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
110 CompressionType* type, uint32_t format_version,
111 bool do_sample, std::string* compressed_output,
112 std::string* sampled_output_fast,
113 std::string* sampled_output_slow) {
20effc67
TL
114 assert(type);
115 assert(compressed_output);
116 assert(compressed_output->empty());
7c673cae 117
494da23a
TL
118 // If requested, we sample one in every N block with a
119 // fast and slow compression algorithm and report the stats.
120 // The users can use these stats to decide if it is worthwhile
121 // enabling compression and they also get a hint about which
122 // compression algorithm wil be beneficial.
123 if (do_sample && info.SampleForCompression() &&
20effc67
TL
124 Random::GetTLSInstance()->OneIn(
125 static_cast<int>(info.SampleForCompression()))) {
494da23a 126 // Sampling with a fast compression algorithm
20effc67 127 if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
494da23a
TL
128 CompressionType c =
129 LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
130 CompressionContext context(c);
131 CompressionOptions options;
132 CompressionInfo info_tmp(options, context,
133 CompressionDict::GetEmptyDict(), c,
134 info.SampleForCompression());
135
20effc67
TL
136 CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
137 sampled_output_fast);
494da23a
TL
138 }
139
140 // Sampling with a slow but high-compression algorithm
20effc67 141 if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
494da23a
TL
142 CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
143 CompressionContext context(c);
144 CompressionOptions options;
145 CompressionInfo info_tmp(options, context,
146 CompressionDict::GetEmptyDict(), c,
147 info.SampleForCompression());
20effc67
TL
148
149 CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
150 sampled_output_slow);
494da23a
TL
151 }
152 }
153
20effc67
TL
154 if (info.type() == kNoCompression) {
155 *type = kNoCompression;
156 return raw;
494da23a
TL
157 }
158
20effc67
TL
159 // Actually compress the data; if the compression method is not supported,
160 // or the compression fails etc., just fall back to uncompressed
161 if (!CompressData(raw, info, GetCompressFormatForVersion(format_version),
162 compressed_output)) {
163 *type = kNoCompression;
164 return raw;
165 }
166
167 // Check the compression ratio; if it's not good enough, just fall back to
168 // uncompressed
169 if (!GoodCompressionRatio(compressed_output->size(), raw.size())) {
170 *type = kNoCompression;
171 return raw;
172 }
173
174 *type = info.type();
175 return *compressed_output;
7c673cae
FG
176}
177
178// kBlockBasedTableMagicNumber was picked by running
179// echo rocksdb.table.block_based | sha1sum
180// and taking the leading 64 bits.
181// Please note that kBlockBasedTableMagicNumber may also be accessed by other
182// .cc files
183// for that reason we declare it extern in the header but to get the space
184// allocated
185// it must be not extern in one place.
186const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
187// We also support reading and writing legacy block based table format (for
188// backwards compatibility)
189const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
190
191// A collector that collects properties of interest to block-based table.
192// For now this class looks heavy-weight since we only write one additional
193// property.
194// But in the foreseeable future, we will add more and more properties that are
195// specific to block-based table.
196class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
197 : public IntTblPropCollector {
198 public:
199 explicit BlockBasedTablePropertiesCollector(
200 BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
201 bool prefix_filtering)
202 : index_type_(index_type),
203 whole_key_filtering_(whole_key_filtering),
204 prefix_filtering_(prefix_filtering) {}
205
494da23a
TL
206 Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
207 uint64_t /*file_size*/) override {
7c673cae
FG
208 // Intentionally left blank. Have no interest in collecting stats for
209 // individual key/value pairs.
210 return Status::OK();
211 }
212
494da23a
TL
213 virtual void BlockAdd(uint64_t /* blockRawBytes */,
214 uint64_t /* blockCompressedBytesFast */,
215 uint64_t /* blockCompressedBytesSlow */) override {
216 // Intentionally left blank. No interest in collecting stats for
217 // blocks.
218 return;
219 }
220
221 Status Finish(UserCollectedProperties* properties) override {
7c673cae
FG
222 std::string val;
223 PutFixed32(&val, static_cast<uint32_t>(index_type_));
224 properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
225 properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
226 whole_key_filtering_ ? kPropTrue : kPropFalse});
227 properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
228 prefix_filtering_ ? kPropTrue : kPropFalse});
229 return Status::OK();
230 }
231
232 // The name of the properties collector can be used for debugging purpose.
494da23a 233 const char* Name() const override {
7c673cae
FG
234 return "BlockBasedTablePropertiesCollector";
235 }
236
494da23a 237 UserCollectedProperties GetReadableProperties() const override {
7c673cae
FG
238 // Intentionally left blank.
239 return UserCollectedProperties();
240 }
241
242 private:
243 BlockBasedTableOptions::IndexType index_type_;
244 bool whole_key_filtering_;
245 bool prefix_filtering_;
246};
247
248struct BlockBasedTableBuilder::Rep {
249 const ImmutableCFOptions ioptions;
11fdf7f2 250 const MutableCFOptions moptions;
7c673cae
FG
251 const BlockBasedTableOptions table_options;
252 const InternalKeyComparator& internal_comparator;
253 WritableFileWriter* file;
20effc67 254 std::atomic<uint64_t> offset;
11fdf7f2 255 size_t alignment;
7c673cae 256 BlockBuilder data_block;
494da23a
TL
257 // Buffers uncompressed data blocks and keys to replay later. Needed when
258 // compression dictionary is enabled so we can finalize the dictionary before
259 // compressing any data blocks.
260 // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
261 // blocks as it's redundant, but it's easier to implement for now.
262 std::vector<std::pair<std::string, std::vector<std::string>>>
263 data_block_and_keys_buffers;
7c673cae
FG
264 BlockBuilder range_del_block;
265
266 InternalKeySliceTransform internal_prefix_transform;
267 std::unique_ptr<IndexBuilder> index_builder;
11fdf7f2 268 PartitionedIndexBuilder* p_index_builder_ = nullptr;
7c673cae
FG
269
270 std::string last_key;
20effc67 271 const Slice* first_key_in_next_block = nullptr;
494da23a
TL
272 CompressionType compression_type;
273 uint64_t sample_for_compression;
274 CompressionOptions compression_opts;
275 std::unique_ptr<CompressionDict> compression_dict;
20effc67
TL
276 std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
277 std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
494da23a
TL
278 std::unique_ptr<UncompressionDict> verify_dict;
279
280 size_t data_begin_offset = 0;
281
7c673cae
FG
282 TableProperties props;
283
494da23a
TL
284 // States of the builder.
285 //
286 // - `kBuffered`: This is the initial state where zero or more data blocks are
287 // accumulated uncompressed in-memory. From this state, call
288 // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
289 // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
290 // state.
291 //
292 // - `kUnbuffered`: This is the state when compression dictionary is finalized
293 // either because it wasn't enabled in the first place or it's been created
294 // from sampling previously buffered data. In this state, blocks are simply
295 // compressed/written out as they fill up. From this state, call `Finish()`
296 // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
297 // the partially created file.
298 //
299 // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
300 // called, so the table builder is no longer usable. We must be in this
301 // state by the time the destructor runs.
302 enum class State {
303 kBuffered,
304 kUnbuffered,
305 kClosed,
306 };
307 State state;
308
11fdf7f2 309 const bool use_delta_encoding_for_index_values;
7c673cae
FG
310 std::unique_ptr<FilterBlockBuilder> filter_builder;
311 char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
312 size_t compressed_cache_key_prefix_size;
313
314 BlockHandle pending_handle; // Handle to add to index block
315
316 std::string compressed_output;
317 std::unique_ptr<FlushBlockPolicy> flush_block_policy;
f67539c2 318 int level_at_creation;
7c673cae
FG
319 uint32_t column_family_id;
320 const std::string& column_family_name;
11fdf7f2
TL
321 uint64_t creation_time = 0;
322 uint64_t oldest_key_time = 0;
494da23a 323 const uint64_t target_file_size;
f67539c2 324 uint64_t file_creation_time = 0;
7c673cae 325
20effc67
TL
326 // DB IDs
327 const std::string db_id;
328 const std::string db_session_id;
329 std::string db_host_id;
330
7c673cae
FG
331 std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
332
20effc67
TL
333 std::unique_ptr<ParallelCompressionRep> pc_rep;
334
335 uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
336 void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
337
338 bool IsParallelCompressionEnabled() const {
339 return compression_opts.parallel_threads > 1;
340 }
341
342 Status GetStatus() {
343 // We need to make modifications of status visible when status_ok is set
344 // to false, and this is ensured by status_mutex, so no special memory
345 // order for status_ok is required.
346 if (status_ok.load(std::memory_order_relaxed)) {
347 return Status::OK();
348 } else {
349 return CopyStatus();
350 }
351 }
352
353 Status CopyStatus() {
354 std::lock_guard<std::mutex> lock(status_mutex);
355 return status;
356 }
357
358 IOStatus GetIOStatus() {
359 // We need to make modifications of io_status visible when status_ok is set
360 // to false, and this is ensured by io_status_mutex, so no special memory
361 // order for io_status_ok is required.
362 if (io_status_ok.load(std::memory_order_relaxed)) {
363 return IOStatus::OK();
364 } else {
365 return CopyIOStatus();
366 }
367 }
368
369 IOStatus CopyIOStatus() {
370 std::lock_guard<std::mutex> lock(io_status_mutex);
371 return io_status;
372 }
373
374 // Never erase an existing status that is not OK.
375 void SetStatus(Status s) {
376 if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
377 // Locking is an overkill for non compression_opts.parallel_threads
378 // case but since it's unlikely that s is not OK, we take this cost
379 // to be simplicity.
380 std::lock_guard<std::mutex> lock(status_mutex);
381 status = s;
382 status_ok.store(false, std::memory_order_relaxed);
383 }
384 }
385
386 // Never erase an existing I/O status that is not OK.
387 void SetIOStatus(IOStatus ios) {
388 if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
389 // Locking is an overkill for non compression_opts.parallel_threads
390 // case but since it's unlikely that s is not OK, we take this cost
391 // to be simplicity.
392 std::lock_guard<std::mutex> lock(io_status_mutex);
393 io_status = ios;
394 io_status_ok.store(false, std::memory_order_relaxed);
395 }
396 }
397
11fdf7f2 398 Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
7c673cae
FG
399 const BlockBasedTableOptions& table_opt,
400 const InternalKeyComparator& icomparator,
401 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
402 int_tbl_prop_collector_factories,
403 uint32_t _column_family_id, WritableFileWriter* f,
404 const CompressionType _compression_type,
494da23a
TL
405 const uint64_t _sample_for_compression,
406 const CompressionOptions& _compression_opts, const bool skip_filters,
f67539c2
TL
407 const int _level_at_creation, const std::string& _column_family_name,
408 const uint64_t _creation_time, const uint64_t _oldest_key_time,
20effc67
TL
409 const uint64_t _target_file_size, const uint64_t _file_creation_time,
410 const std::string& _db_id, const std::string& _db_session_id)
7c673cae 411 : ioptions(_ioptions),
11fdf7f2 412 moptions(_moptions),
7c673cae
FG
413 table_options(table_opt),
414 internal_comparator(icomparator),
415 file(f),
20effc67 416 offset(0),
11fdf7f2
TL
417 alignment(table_options.block_align
418 ? std::min(table_options.block_size, kDefaultPageSize)
419 : 0),
7c673cae 420 data_block(table_options.block_restart_interval,
11fdf7f2
TL
421 table_options.use_delta_encoding,
422 false /* use_value_delta_encoding */,
423 icomparator.user_comparator()
424 ->CanKeysWithDifferentByteContentsBeEqual()
425 ? BlockBasedTableOptions::kDataBlockBinarySearch
426 : table_options.data_block_index_type,
427 table_options.data_block_hash_table_util_ratio),
428 range_del_block(1 /* block_restart_interval */),
429 internal_prefix_transform(_moptions.prefix_extractor.get()),
494da23a
TL
430 compression_type(_compression_type),
431 sample_for_compression(_sample_for_compression),
432 compression_opts(_compression_opts),
433 compression_dict(),
20effc67
TL
434 compression_ctxs(_compression_opts.parallel_threads),
435 verify_ctxs(_compression_opts.parallel_threads),
494da23a
TL
436 verify_dict(),
437 state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
438 : State::kUnbuffered),
11fdf7f2
TL
439 use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
440 !table_opt.block_align),
441 compressed_cache_key_prefix_size(0),
7c673cae
FG
442 flush_block_policy(
443 table_options.flush_block_policy_factory->NewFlushBlockPolicy(
444 table_options, data_block)),
f67539c2 445 level_at_creation(_level_at_creation),
7c673cae 446 column_family_id(_column_family_id),
11fdf7f2
TL
447 column_family_name(_column_family_name),
448 creation_time(_creation_time),
494da23a 449 oldest_key_time(_oldest_key_time),
f67539c2 450 target_file_size(_target_file_size),
20effc67
TL
451 file_creation_time(_file_creation_time),
452 db_id(_db_id),
453 db_session_id(_db_session_id),
454 db_host_id(ioptions.db_host_id),
455 status_ok(true),
456 io_status_ok(true) {
457 for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
458 compression_ctxs[i].reset(new CompressionContext(compression_type));
459 }
7c673cae
FG
460 if (table_options.index_type ==
461 BlockBasedTableOptions::kTwoLevelIndexSearch) {
11fdf7f2
TL
462 p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
463 &internal_comparator, use_delta_encoding_for_index_values,
464 table_options);
465 index_builder.reset(p_index_builder_);
7c673cae
FG
466 } else {
467 index_builder.reset(IndexBuilder::CreateIndexBuilder(
468 table_options.index_type, &internal_comparator,
11fdf7f2
TL
469 &this->internal_prefix_transform, use_delta_encoding_for_index_values,
470 table_options));
7c673cae
FG
471 }
472 if (skip_filters) {
473 filter_builder = nullptr;
474 } else {
f67539c2
TL
475 FilterBuildingContext context(table_options);
476 context.column_family_name = column_family_name;
477 context.compaction_style = ioptions.compaction_style;
478 context.level_at_creation = level_at_creation;
479 context.info_log = ioptions.info_log;
11fdf7f2 480 filter_builder.reset(CreateFilterBlockBuilder(
f67539c2
TL
481 ioptions, moptions, context, use_delta_encoding_for_index_values,
482 p_index_builder_));
7c673cae
FG
483 }
484
485 for (auto& collector_factories : *int_tbl_prop_collector_factories) {
486 table_properties_collectors.emplace_back(
487 collector_factories->CreateIntTblPropCollector(column_family_id));
488 }
489 table_properties_collectors.emplace_back(
490 new BlockBasedTablePropertiesCollector(
491 table_options.index_type, table_options.whole_key_filtering,
11fdf7f2
TL
492 _moptions.prefix_extractor != nullptr));
493 if (table_options.verify_compression) {
20effc67
TL
494 for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
495 verify_ctxs[i].reset(new UncompressionContext(compression_type));
496 }
497 }
498
499 if (!ReifyDbHostIdProperty(ioptions.env, &db_host_id).ok()) {
500 ROCKS_LOG_INFO(ioptions.info_log, "db_host_id property will not be set");
11fdf7f2 501 }
7c673cae 502 }
11fdf7f2
TL
503
504 Rep(const Rep&) = delete;
505 Rep& operator=(const Rep&) = delete;
506
20effc67
TL
507 private:
508 // Synchronize status & io_status accesses across threads from main thread,
509 // compression thread and write thread in parallel compression.
510 std::mutex status_mutex;
511 std::atomic<bool> status_ok;
512 Status status;
513 std::mutex io_status_mutex;
514 std::atomic<bool> io_status_ok;
515 IOStatus io_status;
516};
517
518struct BlockBasedTableBuilder::ParallelCompressionRep {
519 // Keys is a wrapper of vector of strings avoiding
520 // releasing string memories during vector clear()
521 // in order to save memory allocation overhead
522 class Keys {
523 public:
524 Keys() : keys_(kKeysInitSize), size_(0) {}
525 void PushBack(const Slice& key) {
526 if (size_ == keys_.size()) {
527 keys_.emplace_back(key.data(), key.size());
528 } else {
529 keys_[size_].assign(key.data(), key.size());
530 }
531 size_++;
532 }
533 void SwapAssign(std::vector<std::string>& keys) {
534 size_ = keys.size();
535 std::swap(keys_, keys);
536 }
537 void Clear() { size_ = 0; }
538 size_t Size() { return size_; }
539 std::string& Back() { return keys_[size_ - 1]; }
540 std::string& operator[](size_t idx) {
541 assert(idx < size_);
542 return keys_[idx];
543 }
544
545 private:
546 const size_t kKeysInitSize = 32;
547 std::vector<std::string> keys_;
548 size_t size_;
549 };
550 std::unique_ptr<Keys> curr_block_keys;
551
552 class BlockRepSlot;
553
554 // BlockRep instances are fetched from and recycled to
555 // block_rep_pool during parallel compression.
556 struct BlockRep {
557 Slice contents;
558 Slice compressed_contents;
559 std::unique_ptr<std::string> data;
560 std::unique_ptr<std::string> compressed_data;
561 CompressionType compression_type;
562 std::unique_ptr<std::string> first_key_in_next_block;
563 std::unique_ptr<Keys> keys;
564 std::unique_ptr<BlockRepSlot> slot;
565 Status status;
566 };
567 // Use a vector of BlockRep as a buffer for a determined number
568 // of BlockRep structures. All data referenced by pointers in
569 // BlockRep will be freed when this vector is destructed.
570 typedef std::vector<BlockRep> BlockRepBuffer;
571 BlockRepBuffer block_rep_buf;
572 // Use a thread-safe queue for concurrent access from block
573 // building thread and writer thread.
574 typedef WorkQueue<BlockRep*> BlockRepPool;
575 BlockRepPool block_rep_pool;
576
577 // Use BlockRepSlot to keep block order in write thread.
578 // slot_ will pass references to BlockRep
579 class BlockRepSlot {
580 public:
581 BlockRepSlot() : slot_(1) {}
582 template <typename T>
583 void Fill(T&& rep) {
584 slot_.push(std::forward<T>(rep));
585 };
586 void Take(BlockRep*& rep) { slot_.pop(rep); }
587
588 private:
589 // slot_ will pass references to BlockRep in block_rep_buf,
590 // and those references are always valid before the destruction of
591 // block_rep_buf.
592 WorkQueue<BlockRep*> slot_;
593 };
594
595 // Compression queue will pass references to BlockRep in block_rep_buf,
596 // and those references are always valid before the destruction of
597 // block_rep_buf.
598 typedef WorkQueue<BlockRep*> CompressQueue;
599 CompressQueue compress_queue;
600 std::vector<port::Thread> compress_thread_pool;
601
602 // Write queue will pass references to BlockRep::slot in block_rep_buf,
603 // and those references are always valid before the corresponding
604 // BlockRep::slot is destructed, which is before the destruction of
605 // block_rep_buf.
606 typedef WorkQueue<BlockRepSlot*> WriteQueue;
607 WriteQueue write_queue;
608 std::unique_ptr<port::Thread> write_thread;
609
610 // Estimate output file size when parallel compression is enabled. This is
611 // necessary because compression & flush are no longer synchronized,
612 // and BlockBasedTableBuilder::FileSize() is no longer accurate.
613 // memory_order_relaxed suffices because accurate statistics is not required.
614 class FileSizeEstimator {
615 public:
616 explicit FileSizeEstimator()
617 : raw_bytes_compressed(0),
618 raw_bytes_curr_block(0),
619 raw_bytes_curr_block_set(false),
620 raw_bytes_inflight(0),
621 blocks_inflight(0),
622 curr_compression_ratio(0),
623 estimated_file_size(0) {}
624
625 // Estimate file size when a block is about to be emitted to
626 // compression thread
627 void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
628 uint64_t new_raw_bytes_inflight =
629 raw_bytes_inflight.fetch_add(raw_block_size,
630 std::memory_order_relaxed) +
631 raw_block_size;
632
633 uint64_t new_blocks_inflight =
634 blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
635
636 estimated_file_size.store(
637 curr_file_size +
638 static_cast<uint64_t>(
639 static_cast<double>(new_raw_bytes_inflight) *
640 curr_compression_ratio.load(std::memory_order_relaxed)) +
641 new_blocks_inflight * kBlockTrailerSize,
642 std::memory_order_relaxed);
643 }
644
645 // Estimate file size when a block is already reaped from
646 // compression thread
647 void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
648 assert(raw_bytes_curr_block_set);
649
650 uint64_t new_raw_bytes_compressed =
651 raw_bytes_compressed + raw_bytes_curr_block;
652 assert(new_raw_bytes_compressed > 0);
653
654 curr_compression_ratio.store(
655 (curr_compression_ratio.load(std::memory_order_relaxed) *
656 raw_bytes_compressed +
657 compressed_block_size) /
658 static_cast<double>(new_raw_bytes_compressed),
659 std::memory_order_relaxed);
660 raw_bytes_compressed = new_raw_bytes_compressed;
661
662 uint64_t new_raw_bytes_inflight =
663 raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
664 std::memory_order_relaxed) -
665 raw_bytes_curr_block;
666
667 uint64_t new_blocks_inflight =
668 blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
669
670 estimated_file_size.store(
671 curr_file_size +
672 static_cast<uint64_t>(
673 static_cast<double>(new_raw_bytes_inflight) *
674 curr_compression_ratio.load(std::memory_order_relaxed)) +
675 new_blocks_inflight * kBlockTrailerSize,
676 std::memory_order_relaxed);
677
678 raw_bytes_curr_block_set = false;
679 }
680
681 void SetEstimatedFileSize(uint64_t size) {
682 estimated_file_size.store(size, std::memory_order_relaxed);
683 }
684
685 uint64_t GetEstimatedFileSize() {
686 return estimated_file_size.load(std::memory_order_relaxed);
687 }
688
689 void SetCurrBlockRawSize(uint64_t size) {
690 raw_bytes_curr_block = size;
691 raw_bytes_curr_block_set = true;
692 }
693
694 private:
695 // Raw bytes compressed so far.
696 uint64_t raw_bytes_compressed;
697 // Size of current block being appended.
698 uint64_t raw_bytes_curr_block;
699 // Whether raw_bytes_curr_block has been set for next
700 // ReapBlock call.
701 bool raw_bytes_curr_block_set;
702 // Raw bytes under compression and not appended yet.
703 std::atomic<uint64_t> raw_bytes_inflight;
704 // Number of blocks under compression and not appended yet.
705 std::atomic<uint64_t> blocks_inflight;
706 // Current compression ratio, maintained by BGWorkWriteRawBlock.
707 std::atomic<double> curr_compression_ratio;
708 // Estimated SST file size.
709 std::atomic<uint64_t> estimated_file_size;
710 };
711 FileSizeEstimator file_size_estimator;
712
713 // Facilities used for waiting first block completion. Need to Wait for
714 // the completion of first block compression and flush to get a non-zero
715 // compression ratio.
716 std::atomic<bool> first_block_processed;
717 std::condition_variable first_block_cond;
718 std::mutex first_block_mutex;
719
720 explicit ParallelCompressionRep(uint32_t parallel_threads)
721 : curr_block_keys(new Keys()),
722 block_rep_buf(parallel_threads),
723 block_rep_pool(parallel_threads),
724 compress_queue(parallel_threads),
725 write_queue(parallel_threads),
726 first_block_processed(false) {
727 for (uint32_t i = 0; i < parallel_threads; i++) {
728 block_rep_buf[i].contents = Slice();
729 block_rep_buf[i].compressed_contents = Slice();
730 block_rep_buf[i].data.reset(new std::string());
731 block_rep_buf[i].compressed_data.reset(new std::string());
732 block_rep_buf[i].compression_type = CompressionType();
733 block_rep_buf[i].first_key_in_next_block.reset(new std::string());
734 block_rep_buf[i].keys.reset(new Keys());
735 block_rep_buf[i].slot.reset(new BlockRepSlot());
736 block_rep_buf[i].status = Status::OK();
737 block_rep_pool.push(&block_rep_buf[i]);
738 }
739 }
740
741 ~ParallelCompressionRep() { block_rep_pool.finish(); }
742
743 // Make a block prepared to be emitted to compression thread
744 // Used in non-buffered mode
745 BlockRep* PrepareBlock(CompressionType compression_type,
746 const Slice* first_key_in_next_block,
747 BlockBuilder* data_block) {
748 BlockRep* block_rep =
749 PrepareBlockInternal(compression_type, first_key_in_next_block);
750 assert(block_rep != nullptr);
751 data_block->SwapAndReset(*(block_rep->data));
752 block_rep->contents = *(block_rep->data);
753 std::swap(block_rep->keys, curr_block_keys);
754 curr_block_keys->Clear();
755 return block_rep;
756 }
757
758 // Used in EnterUnbuffered
759 BlockRep* PrepareBlock(CompressionType compression_type,
760 const Slice* first_key_in_next_block,
761 std::string* data_block,
762 std::vector<std::string>* keys) {
763 BlockRep* block_rep =
764 PrepareBlockInternal(compression_type, first_key_in_next_block);
765 assert(block_rep != nullptr);
766 std::swap(*(block_rep->data), *data_block);
767 block_rep->contents = *(block_rep->data);
768 block_rep->keys->SwapAssign(*keys);
769 return block_rep;
770 }
771
772 // Emit a block to compression thread
773 void EmitBlock(BlockRep* block_rep) {
774 assert(block_rep != nullptr);
775 assert(block_rep->status.ok());
776 if (!write_queue.push(block_rep->slot.get())) {
777 return;
778 }
779 if (!compress_queue.push(block_rep)) {
780 return;
781 }
782
783 if (!first_block_processed.load(std::memory_order_relaxed)) {
784 std::unique_lock<std::mutex> lock(first_block_mutex);
785 first_block_cond.wait(lock, [this] {
786 return first_block_processed.load(std::memory_order_relaxed);
787 });
788 }
789 }
790
791 // Reap a block from compression thread
792 void ReapBlock(BlockRep* block_rep) {
793 assert(block_rep != nullptr);
794 block_rep->compressed_data->clear();
795 block_rep_pool.push(block_rep);
796
797 if (!first_block_processed.load(std::memory_order_relaxed)) {
798 std::lock_guard<std::mutex> lock(first_block_mutex);
799 first_block_processed.store(true, std::memory_order_relaxed);
800 first_block_cond.notify_one();
801 }
802 }
803
804 private:
805 BlockRep* PrepareBlockInternal(CompressionType compression_type,
806 const Slice* first_key_in_next_block) {
807 BlockRep* block_rep = nullptr;
808 block_rep_pool.pop(block_rep);
809 assert(block_rep != nullptr);
810
811 assert(block_rep->data);
812
813 block_rep->compression_type = compression_type;
814
815 if (first_key_in_next_block == nullptr) {
816 block_rep->first_key_in_next_block.reset(nullptr);
817 } else {
818 block_rep->first_key_in_next_block->assign(
819 first_key_in_next_block->data(), first_key_in_next_block->size());
820 }
821
822 return block_rep;
823 }
7c673cae
FG
824};
825
826BlockBasedTableBuilder::BlockBasedTableBuilder(
11fdf7f2 827 const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
7c673cae
FG
828 const BlockBasedTableOptions& table_options,
829 const InternalKeyComparator& internal_comparator,
830 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
831 int_tbl_prop_collector_factories,
832 uint32_t column_family_id, WritableFileWriter* file,
833 const CompressionType compression_type,
494da23a
TL
834 const uint64_t sample_for_compression,
835 const CompressionOptions& compression_opts, const bool skip_filters,
f67539c2
TL
836 const std::string& column_family_name, const int level_at_creation,
837 const uint64_t creation_time, const uint64_t oldest_key_time,
20effc67
TL
838 const uint64_t target_file_size, const uint64_t file_creation_time,
839 const std::string& db_id, const std::string& db_session_id) {
7c673cae
FG
840 BlockBasedTableOptions sanitized_table_options(table_options);
841 if (sanitized_table_options.format_version == 0 &&
842 sanitized_table_options.checksum != kCRC32c) {
843 ROCKS_LOG_WARN(
844 ioptions.info_log,
845 "Silently converting format_version to 1 because checksum is "
846 "non-default");
847 // silently convert format_version to 1 to keep consistent with current
848 // behavior
849 sanitized_table_options.format_version = 1;
850 }
851
20effc67
TL
852 rep_ = new Rep(
853 ioptions, moptions, sanitized_table_options, internal_comparator,
854 int_tbl_prop_collector_factories, column_family_id, file,
855 compression_type, sample_for_compression, compression_opts, skip_filters,
856 level_at_creation, column_family_name, creation_time, oldest_key_time,
857 target_file_size, file_creation_time, db_id, db_session_id);
7c673cae
FG
858
859 if (rep_->filter_builder != nullptr) {
860 rep_->filter_builder->StartBlock(0);
861 }
862 if (table_options.block_cache_compressed.get() != nullptr) {
20effc67 863 BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
7c673cae
FG
864 table_options.block_cache_compressed.get(), file->writable_file(),
865 &rep_->compressed_cache_key_prefix[0],
866 &rep_->compressed_cache_key_prefix_size);
867 }
20effc67
TL
868
869 if (rep_->IsParallelCompressionEnabled()) {
870 StartParallelCompression();
871 }
7c673cae
FG
872}
873
874BlockBasedTableBuilder::~BlockBasedTableBuilder() {
494da23a
TL
875 // Catch errors where caller forgot to call Finish()
876 assert(rep_->state == Rep::State::kClosed);
7c673cae
FG
877 delete rep_;
878}
879
880void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
881 Rep* r = rep_;
494da23a 882 assert(rep_->state != Rep::State::kClosed);
7c673cae
FG
883 if (!ok()) return;
884 ValueType value_type = ExtractValueType(key);
885 if (IsValueType(value_type)) {
494da23a
TL
886#ifndef NDEBUG
887 if (r->props.num_entries > r->props.num_range_deletions) {
7c673cae
FG
888 assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
889 }
20effc67 890#endif // !NDEBUG
7c673cae
FG
891
892 auto should_flush = r->flush_block_policy->Update(key, value);
893 if (should_flush) {
894 assert(!r->data_block.empty());
20effc67 895 r->first_key_in_next_block = &key;
7c673cae
FG
896 Flush();
897
20effc67 898 if (r->state == Rep::State::kBuffered && r->target_file_size != 0 &&
494da23a
TL
899 r->data_begin_offset > r->target_file_size) {
900 EnterUnbuffered();
901 }
902
7c673cae
FG
903 // Add item to index block.
904 // We do not emit the index entry for a block until we have seen the
905 // first key for the next data block. This allows us to use shorter
906 // keys in the index block. For example, consider a block boundary
907 // between the keys "the quick brown fox" and "the who". We can use
908 // "the r" as the key for the index block entry since it is >= all
909 // entries in the first block and < all entries in subsequent
910 // blocks.
494da23a 911 if (ok() && r->state == Rep::State::kUnbuffered) {
20effc67
TL
912 if (r->IsParallelCompressionEnabled()) {
913 r->pc_rep->curr_block_keys->Clear();
914 } else {
915 r->index_builder->AddIndexEntry(&r->last_key, &key,
916 r->pending_handle);
917 }
7c673cae
FG
918 }
919 }
920
921 // Note: PartitionedFilterBlockBuilder requires key being added to filter
922 // builder after being added to index builder.
20effc67
TL
923 if (r->state == Rep::State::kUnbuffered) {
924 if (r->IsParallelCompressionEnabled()) {
925 r->pc_rep->curr_block_keys->PushBack(key);
926 } else {
927 if (r->filter_builder != nullptr) {
928 size_t ts_sz =
929 r->internal_comparator.user_comparator()->timestamp_size();
930 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
931 }
932 }
7c673cae
FG
933 }
934
935 r->last_key.assign(key.data(), key.size());
936 r->data_block.Add(key, value);
494da23a
TL
937 if (r->state == Rep::State::kBuffered) {
938 // Buffer keys to be replayed during `Finish()` once compression
939 // dictionary has been finalized.
940 if (r->data_block_and_keys_buffers.empty() || should_flush) {
941 r->data_block_and_keys_buffers.emplace_back();
942 }
943 r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
944 } else {
20effc67
TL
945 if (!r->IsParallelCompressionEnabled()) {
946 r->index_builder->OnKeyAdded(key);
947 }
494da23a 948 }
20effc67
TL
949 // TODO offset passed in is not accurate for parallel compression case
950 NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
7c673cae
FG
951 r->table_properties_collectors,
952 r->ioptions.info_log);
953
954 } else if (value_type == kTypeRangeDeletion) {
7c673cae 955 r->range_del_block.Add(key, value);
20effc67
TL
956 // TODO offset passed in is not accurate for parallel compression case
957 NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
7c673cae
FG
958 r->table_properties_collectors,
959 r->ioptions.info_log);
960 } else {
961 assert(false);
962 }
494da23a
TL
963
964 r->props.num_entries++;
965 r->props.raw_key_size += key.size();
966 r->props.raw_value_size += value.size();
967 if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
968 r->props.num_deletions++;
969 } else if (value_type == kTypeRangeDeletion) {
970 r->props.num_deletions++;
971 r->props.num_range_deletions++;
972 } else if (value_type == kTypeMerge) {
973 r->props.num_merge_operands++;
974 }
7c673cae
FG
975}
976
977void BlockBasedTableBuilder::Flush() {
978 Rep* r = rep_;
494da23a 979 assert(rep_->state != Rep::State::kClosed);
7c673cae
FG
980 if (!ok()) return;
981 if (r->data_block.empty()) return;
20effc67
TL
982 if (r->IsParallelCompressionEnabled() &&
983 r->state == Rep::State::kUnbuffered) {
984 r->data_block.Finish();
985 ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
986 r->compression_type, r->first_key_in_next_block, &(r->data_block));
987 assert(block_rep != nullptr);
988 r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
989 r->get_offset());
990 r->pc_rep->EmitBlock(block_rep);
991 } else {
992 WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
993 }
7c673cae
FG
994}
995
996void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
997 BlockHandle* handle,
998 bool is_data_block) {
999 WriteBlock(block->Finish(), handle, is_data_block);
1000 block->Reset();
1001}
1002
1003void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
1004 BlockHandle* handle,
1005 bool is_data_block) {
20effc67
TL
1006 Rep* r = rep_;
1007 Slice block_contents;
1008 CompressionType type;
1009 if (r->state == Rep::State::kBuffered) {
1010 assert(is_data_block);
1011 assert(!r->data_block_and_keys_buffers.empty());
1012 r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
1013 r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
1014 return;
1015 }
1016 Status compress_status;
1017 CompressAndVerifyBlock(raw_block_contents, is_data_block,
1018 *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
1019 &(r->compressed_output), &(block_contents), &type,
1020 &compress_status);
1021 r->SetStatus(compress_status);
1022 if (!ok()) {
1023 return;
1024 }
1025 WriteRawBlock(block_contents, type, handle, is_data_block);
1026 r->compressed_output.clear();
1027 if (is_data_block) {
1028 if (r->filter_builder != nullptr) {
1029 r->filter_builder->StartBlock(r->get_offset());
1030 }
1031 r->props.data_size = r->get_offset();
1032 ++r->props.num_data_blocks;
1033 }
1034}
1035
1036void BlockBasedTableBuilder::BGWorkCompression(
1037 const CompressionContext& compression_ctx,
1038 UncompressionContext* verify_ctx) {
1039 ParallelCompressionRep::BlockRep* block_rep = nullptr;
1040 while (rep_->pc_rep->compress_queue.pop(block_rep)) {
1041 assert(block_rep != nullptr);
1042 CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
1043 compression_ctx, verify_ctx,
1044 block_rep->compressed_data.get(),
1045 &block_rep->compressed_contents,
1046 &(block_rep->compression_type), &block_rep->status);
1047 block_rep->slot->Fill(block_rep);
1048 }
1049}
1050
1051void BlockBasedTableBuilder::CompressAndVerifyBlock(
1052 const Slice& raw_block_contents, bool is_data_block,
1053 const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
1054 std::string* compressed_output, Slice* block_contents,
1055 CompressionType* type, Status* out_status) {
7c673cae
FG
1056 // File format contains a sequence of blocks where each block has:
1057 // block_data: uint8[n]
1058 // type: uint8
1059 // crc: uint32
7c673cae 1060 Rep* r = rep_;
20effc67
TL
1061 bool is_status_ok = ok();
1062 if (!r->IsParallelCompressionEnabled()) {
1063 assert(is_status_ok);
1064 }
7c673cae 1065
20effc67 1066 *type = r->compression_type;
494da23a 1067 uint64_t sample_for_compression = r->sample_for_compression;
7c673cae
FG
1068 bool abort_compression = false;
1069
494da23a
TL
1070 StopWatchNano timer(
1071 r->ioptions.env,
1072 ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
1073
20effc67 1074 if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
494da23a
TL
1075 const CompressionDict* compression_dict;
1076 if (!is_data_block || r->compression_dict == nullptr) {
1077 compression_dict = &CompressionDict::GetEmptyDict();
11fdf7f2 1078 } else {
494da23a 1079 compression_dict = r->compression_dict.get();
7c673cae 1080 }
494da23a 1081 assert(compression_dict != nullptr);
20effc67
TL
1082 CompressionInfo compression_info(r->compression_opts, compression_ctx,
1083 *compression_dict, *type,
494da23a
TL
1084 sample_for_compression);
1085
1086 std::string sampled_output_fast;
1087 std::string sampled_output_slow;
20effc67
TL
1088 *block_contents = CompressBlock(
1089 raw_block_contents, compression_info, type,
494da23a 1090 r->table_options.format_version, is_data_block /* do_sample */,
20effc67 1091 compressed_output, &sampled_output_fast, &sampled_output_slow);
494da23a
TL
1092
1093 // notify collectors on block add
1094 NotifyCollectTableCollectorsOnBlockAdd(
1095 r->table_properties_collectors, raw_block_contents.size(),
1096 sampled_output_fast.size(), sampled_output_slow.size());
7c673cae
FG
1097
1098 // Some of the compression algorithms are known to be unreliable. If
1099 // the verify_compression flag is set then try to de-compress the
1100 // compressed data and compare to the input.
20effc67 1101 if (*type != kNoCompression && r->table_options.verify_compression) {
7c673cae 1102 // Retrieve the uncompressed contents into a new buffer
494da23a
TL
1103 const UncompressionDict* verify_dict;
1104 if (!is_data_block || r->verify_dict == nullptr) {
1105 verify_dict = &UncompressionDict::GetEmptyDict();
1106 } else {
1107 verify_dict = r->verify_dict.get();
1108 }
1109 assert(verify_dict != nullptr);
7c673cae 1110 BlockContents contents;
20effc67 1111 UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
494da23a 1112 r->compression_type);
7c673cae 1113 Status stat = UncompressBlockContentsForCompressionType(
20effc67 1114 uncompression_info, block_contents->data(), block_contents->size(),
11fdf7f2 1115 &contents, r->table_options.format_version, r->ioptions);
7c673cae
FG
1116
1117 if (stat.ok()) {
1118 bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
1119 if (!compressed_ok) {
1120 // The result of the compression was invalid. abort.
1121 abort_compression = true;
1122 ROCKS_LOG_ERROR(r->ioptions.info_log,
1123 "Decompressed block did not match raw block");
20effc67 1124 *out_status =
7c673cae
FG
1125 Status::Corruption("Decompressed block did not match raw block");
1126 }
1127 } else {
1128 // Decompression reported an error. abort.
20effc67
TL
1129 *out_status = Status::Corruption(std::string("Could not decompress: ") +
1130 stat.getState());
7c673cae
FG
1131 abort_compression = true;
1132 }
1133 }
1134 } else {
1135 // Block is too big to be compressed.
1136 abort_compression = true;
1137 }
1138
1139 // Abort compression if the block is too big, or did not pass
1140 // verification.
1141 if (abort_compression) {
1142 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
20effc67
TL
1143 *type = kNoCompression;
1144 *block_contents = raw_block_contents;
1145 } else if (*type != kNoCompression) {
11fdf7f2 1146 if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
494da23a
TL
1147 RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
1148 timer.ElapsedNanos());
11fdf7f2 1149 }
494da23a
TL
1150 RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
1151 raw_block_contents.size());
7c673cae 1152 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
20effc67 1153 } else if (*type != r->compression_type) {
494da23a 1154 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
7c673cae 1155 }
7c673cae
FG
1156}
1157
1158void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
1159 CompressionType type,
11fdf7f2
TL
1160 BlockHandle* handle,
1161 bool is_data_block) {
7c673cae 1162 Rep* r = rep_;
20effc67
TL
1163 Status s = Status::OK();
1164 IOStatus io_s = IOStatus::OK();
7c673cae 1165 StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
20effc67 1166 handle->set_offset(r->get_offset());
7c673cae 1167 handle->set_size(block_contents.size());
20effc67
TL
1168 assert(status().ok());
1169 assert(io_status().ok());
1170 io_s = r->file->Append(block_contents);
1171 if (io_s.ok()) {
7c673cae
FG
1172 char trailer[kBlockTrailerSize];
1173 trailer[0] = type;
20effc67 1174 uint32_t checksum = 0;
7c673cae
FG
1175 switch (r->table_options.checksum) {
1176 case kNoChecksum:
11fdf7f2 1177 break;
7c673cae 1178 case kCRC32c: {
20effc67
TL
1179 uint32_t crc =
1180 crc32c::Value(block_contents.data(), block_contents.size());
1181 // Extend to cover compression type
1182 crc = crc32c::Extend(crc, trailer, 1);
1183 checksum = crc32c::Mask(crc);
7c673cae
FG
1184 break;
1185 }
1186 case kxxHash: {
f67539c2
TL
1187 XXH32_state_t* const state = XXH32_createState();
1188 XXH32_reset(state, 0);
20effc67
TL
1189 XXH32_update(state, block_contents.data(), block_contents.size());
1190 // Extend to cover compression type
1191 XXH32_update(state, trailer, 1);
1192 checksum = XXH32_digest(state);
f67539c2 1193 XXH32_freeState(state);
7c673cae
FG
1194 break;
1195 }
494da23a
TL
1196 case kxxHash64: {
1197 XXH64_state_t* const state = XXH64_createState();
1198 XXH64_reset(state, 0);
20effc67
TL
1199 XXH64_update(state, block_contents.data(), block_contents.size());
1200 // Extend to cover compression type
1201 XXH64_update(state, trailer, 1);
1202 checksum = Lower32of64(XXH64_digest(state));
494da23a
TL
1203 XXH64_freeState(state);
1204 break;
1205 }
20effc67
TL
1206 default:
1207 assert(false);
1208 break;
7c673cae 1209 }
20effc67
TL
1210 EncodeFixed32(trailer + 1, checksum);
1211 assert(io_s.ok());
494da23a
TL
1212 TEST_SYNC_POINT_CALLBACK(
1213 "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
1214 static_cast<char*>(trailer));
20effc67
TL
1215 io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
1216 if (io_s.ok()) {
1217 assert(s.ok());
1218 s = InsertBlockInCache(block_contents, type, handle);
1219 if (!s.ok()) {
1220 r->SetStatus(s);
1221 }
1222 } else {
1223 r->SetIOStatus(io_s);
7c673cae 1224 }
20effc67
TL
1225 if (s.ok() && io_s.ok()) {
1226 r->set_offset(r->get_offset() + block_contents.size() +
1227 kBlockTrailerSize);
11fdf7f2
TL
1228 if (r->table_options.block_align && is_data_block) {
1229 size_t pad_bytes =
1230 (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
1231 (r->alignment - 1))) &
1232 (r->alignment - 1);
20effc67
TL
1233 io_s = r->file->Pad(pad_bytes);
1234 if (io_s.ok()) {
1235 r->set_offset(r->get_offset() + pad_bytes);
1236 } else {
1237 r->SetIOStatus(io_s);
1238 }
1239 }
1240 if (r->IsParallelCompressionEnabled()) {
1241 if (is_data_block) {
1242 r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
1243 r->get_offset());
1244 } else {
1245 r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
11fdf7f2
TL
1246 }
1247 }
7c673cae 1248 }
20effc67
TL
1249 } else {
1250 r->SetIOStatus(io_s);
1251 }
1252 if (!io_s.ok() && s.ok()) {
1253 r->SetStatus(io_s);
7c673cae
FG
1254 }
1255}
1256
20effc67
TL
1257void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
1258 Rep* r = rep_;
1259 ParallelCompressionRep::BlockRepSlot* slot = nullptr;
1260 ParallelCompressionRep::BlockRep* block_rep = nullptr;
1261 while (r->pc_rep->write_queue.pop(slot)) {
1262 assert(slot != nullptr);
1263 slot->Take(block_rep);
1264 assert(block_rep != nullptr);
1265 if (!block_rep->status.ok()) {
1266 r->SetStatus(block_rep->status);
1267 // Reap block so that blocked Flush() can finish
1268 // if there is one, and Flush() will notice !ok() next time.
1269 block_rep->status = Status::OK();
1270 r->pc_rep->ReapBlock(block_rep);
1271 continue;
1272 }
1273
1274 for (size_t i = 0; i < block_rep->keys->Size(); i++) {
1275 auto& key = (*block_rep->keys)[i];
1276 if (r->filter_builder != nullptr) {
1277 size_t ts_sz =
1278 r->internal_comparator.user_comparator()->timestamp_size();
1279 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1280 }
1281 r->index_builder->OnKeyAdded(key);
1282 }
1283
1284 r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
1285 WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
1286 &r->pending_handle, true /* is_data_block*/);
1287 if (!ok()) {
1288 break;
1289 }
1290
1291 if (r->filter_builder != nullptr) {
1292 r->filter_builder->StartBlock(r->get_offset());
1293 }
1294 r->props.data_size = r->get_offset();
1295 ++r->props.num_data_blocks;
1296
1297 if (block_rep->first_key_in_next_block == nullptr) {
1298 r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
1299 r->pending_handle);
1300 } else {
1301 Slice first_key_in_next_block =
1302 Slice(*block_rep->first_key_in_next_block);
1303 r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
1304 &first_key_in_next_block,
1305 r->pending_handle);
1306 }
1307
1308 r->pc_rep->ReapBlock(block_rep);
1309 }
1310}
1311
1312void BlockBasedTableBuilder::StartParallelCompression() {
1313 rep_->pc_rep.reset(
1314 new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
1315 rep_->pc_rep->compress_thread_pool.reserve(
1316 rep_->compression_opts.parallel_threads);
1317 for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
1318 rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
1319 BGWorkCompression(*(rep_->compression_ctxs[i]),
1320 rep_->verify_ctxs[i].get());
1321 });
1322 }
1323 rep_->pc_rep->write_thread.reset(
1324 new port::Thread([this] { BGWorkWriteRawBlock(); }));
1325}
1326
1327void BlockBasedTableBuilder::StopParallelCompression() {
1328 rep_->pc_rep->compress_queue.finish();
1329 for (auto& thread : rep_->pc_rep->compress_thread_pool) {
1330 thread.join();
1331 }
1332 rep_->pc_rep->write_queue.finish();
1333 rep_->pc_rep->write_thread->join();
1334}
1335
1336Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
1337
1338IOStatus BlockBasedTableBuilder::io_status() const {
1339 return rep_->GetIOStatus();
1340}
7c673cae 1341
494da23a
TL
1342static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
1343 BlockContents* bc = reinterpret_cast<BlockContents*>(value);
1344 delete bc;
7c673cae
FG
1345}
1346
1347//
1348// Make a copy of the block contents and insert into compressed block cache
1349//
1350Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
1351 const CompressionType type,
1352 const BlockHandle* handle) {
1353 Rep* r = rep_;
1354 Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
1355
1356 if (type != kNoCompression && block_cache_compressed != nullptr) {
7c673cae
FG
1357 size_t size = block_contents.size();
1358
494da23a
TL
1359 auto ubuf =
1360 AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
7c673cae
FG
1361 memcpy(ubuf.get(), block_contents.data(), size);
1362 ubuf[size] = type;
1363
494da23a
TL
1364 BlockContents* block_contents_to_cache =
1365 new BlockContents(std::move(ubuf), size);
1366#ifndef NDEBUG
1367 block_contents_to_cache->is_raw_block = true;
1368#endif // NDEBUG
7c673cae
FG
1369
1370 // make cache key by appending the file offset to the cache prefix id
1371 char* end = EncodeVarint64(
494da23a
TL
1372 r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
1373 handle->offset());
1374 Slice key(r->compressed_cache_key_prefix,
1375 static_cast<size_t>(end - r->compressed_cache_key_prefix));
7c673cae
FG
1376
1377 // Insert into compressed block cache.
20effc67
TL
1378 // How should we deal with compressed cache full?
1379 block_cache_compressed
1380 ->Insert(key, block_contents_to_cache,
1381 block_contents_to_cache->ApproximateMemoryUsage(),
1382 &DeleteCachedBlockContents)
1383 .PermitUncheckedError();
7c673cae
FG
1384
1385 // Invalidate OS cache.
20effc67
TL
1386 r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
1387 .PermitUncheckedError();
7c673cae
FG
1388 }
1389 return Status::OK();
1390}
1391
11fdf7f2
TL
1392void BlockBasedTableBuilder::WriteFilterBlock(
1393 MetaIndexBuilder* meta_index_builder) {
1394 BlockHandle filter_block_handle;
1395 bool empty_filter_block = (rep_->filter_builder == nullptr ||
1396 rep_->filter_builder->NumAdded() == 0);
1397 if (ok() && !empty_filter_block) {
7c673cae 1398 Status s = Status::Incomplete();
11fdf7f2
TL
1399 while (ok() && s.IsIncomplete()) {
1400 Slice filter_content =
1401 rep_->filter_builder->Finish(filter_block_handle, &s);
7c673cae 1402 assert(s.ok() || s.IsIncomplete());
11fdf7f2 1403 rep_->props.filter_size += filter_content.size();
7c673cae
FG
1404 WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
1405 }
1406 }
11fdf7f2
TL
1407 if (ok() && !empty_filter_block) {
1408 // Add mapping from "<filter_block_prefix>.Name" to location
1409 // of filter data.
1410 std::string key;
1411 if (rep_->filter_builder->IsBlockBased()) {
1412 key = BlockBasedTable::kFilterBlockPrefix;
1413 } else {
1414 key = rep_->table_options.partition_filters
1415 ? BlockBasedTable::kPartitionedFilterBlockPrefix
1416 : BlockBasedTable::kFullFilterBlockPrefix;
1417 }
1418 key.append(rep_->table_options.filter_policy->Name());
1419 meta_index_builder->Add(key, filter_block_handle);
1420 }
1421}
7c673cae 1422
11fdf7f2
TL
1423void BlockBasedTableBuilder::WriteIndexBlock(
1424 MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
7c673cae 1425 IndexBuilder::IndexBlocks index_blocks;
11fdf7f2 1426 auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
7c673cae
FG
1427 if (index_builder_status.IsIncomplete()) {
1428 // We we have more than one index partition then meta_blocks are not
1429 // supported for the index. Currently meta_blocks are used only by
1430 // HashIndexBuilder which is not multi-partition.
1431 assert(index_blocks.meta_blocks.empty());
11fdf7f2 1432 } else if (ok() && !index_builder_status.ok()) {
20effc67 1433 rep_->SetStatus(index_builder_status);
7c673cae 1434 }
11fdf7f2
TL
1435 if (ok()) {
1436 for (const auto& item : index_blocks.meta_blocks) {
1437 BlockHandle block_handle;
1438 WriteBlock(item.second, &block_handle, false /* is_data_block */);
1439 if (!ok()) {
1440 break;
1441 }
1442 meta_index_builder->Add(item.first, block_handle);
1443 }
7c673cae 1444 }
11fdf7f2
TL
1445 if (ok()) {
1446 if (rep_->table_options.enable_index_compression) {
1447 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
1448 } else {
1449 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
1450 index_block_handle);
1451 }
1452 }
1453 // If there are more index partitions, finish them and write them out
1454 Status s = index_builder_status;
1455 while (ok() && s.IsIncomplete()) {
1456 s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
1457 if (!s.ok() && !s.IsIncomplete()) {
20effc67 1458 rep_->SetStatus(s);
11fdf7f2
TL
1459 return;
1460 }
1461 if (rep_->table_options.enable_index_compression) {
1462 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
1463 } else {
1464 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
1465 index_block_handle);
1466 }
1467 // The last index_block_handle will be for the partition index block
1468 }
1469}
7c673cae 1470
11fdf7f2
TL
1471void BlockBasedTableBuilder::WritePropertiesBlock(
1472 MetaIndexBuilder* meta_index_builder) {
1473 BlockHandle properties_block_handle;
7c673cae 1474 if (ok()) {
11fdf7f2
TL
1475 PropertyBlockBuilder property_block_builder;
1476 rep_->props.column_family_id = rep_->column_family_id;
1477 rep_->props.column_family_name = rep_->column_family_name;
1478 rep_->props.filter_policy_name =
1479 rep_->table_options.filter_policy != nullptr
1480 ? rep_->table_options.filter_policy->Name()
1481 : "";
1482 rep_->props.index_size =
1483 rep_->index_builder->IndexSize() + kBlockTrailerSize;
1484 rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
1485 ? rep_->ioptions.user_comparator->Name()
1486 : "nullptr";
1487 rep_->props.merge_operator_name =
1488 rep_->ioptions.merge_operator != nullptr
1489 ? rep_->ioptions.merge_operator->Name()
1490 : "nullptr";
1491 rep_->props.compression_name =
494da23a
TL
1492 CompressionTypeToString(rep_->compression_type);
1493 rep_->props.compression_options =
1494 CompressionOptionsToString(rep_->compression_opts);
11fdf7f2
TL
1495 rep_->props.prefix_extractor_name =
1496 rep_->moptions.prefix_extractor != nullptr
1497 ? rep_->moptions.prefix_extractor->Name()
1498 : "nullptr";
1499
1500 std::string property_collectors_names = "[";
1501 for (size_t i = 0;
1502 i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
1503 if (i != 0) {
1504 property_collectors_names += ",";
7c673cae 1505 }
11fdf7f2
TL
1506 property_collectors_names +=
1507 rep_->ioptions.table_properties_collector_factories[i]->Name();
1508 }
1509 property_collectors_names += "]";
1510 rep_->props.property_collectors_names = property_collectors_names;
1511 if (rep_->table_options.index_type ==
1512 BlockBasedTableOptions::kTwoLevelIndexSearch) {
1513 assert(rep_->p_index_builder_ != nullptr);
1514 rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
1515 rep_->props.top_level_index_size =
1516 rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
1517 }
1518 rep_->props.index_key_is_user_key =
1519 !rep_->index_builder->seperator_is_key_plus_seq();
1520 rep_->props.index_value_is_delta_encoded =
1521 rep_->use_delta_encoding_for_index_values;
1522 rep_->props.creation_time = rep_->creation_time;
1523 rep_->props.oldest_key_time = rep_->oldest_key_time;
f67539c2 1524 rep_->props.file_creation_time = rep_->file_creation_time;
20effc67
TL
1525 rep_->props.db_id = rep_->db_id;
1526 rep_->props.db_session_id = rep_->db_session_id;
1527 rep_->props.db_host_id = rep_->db_host_id;
11fdf7f2
TL
1528
1529 // Add basic properties
1530 property_block_builder.AddTableProperty(rep_->props);
1531
1532 // Add use collected properties
1533 NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
1534 rep_->ioptions.info_log,
1535 &property_block_builder);
1536
1537 WriteRawBlock(property_block_builder.Finish(), kNoCompression,
1538 &properties_block_handle);
1539 }
1540 if (ok()) {
494da23a
TL
1541#ifndef NDEBUG
1542 {
1543 uint64_t props_block_offset = properties_block_handle.offset();
1544 uint64_t props_block_size = properties_block_handle.size();
1545 TEST_SYNC_POINT_CALLBACK(
1546 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
1547 &props_block_offset);
1548 TEST_SYNC_POINT_CALLBACK(
1549 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
1550 &props_block_size);
1551 }
1552#endif // !NDEBUG
11fdf7f2
TL
1553 meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
1554 }
1555}
1556
1557void BlockBasedTableBuilder::WriteCompressionDictBlock(
1558 MetaIndexBuilder* meta_index_builder) {
494da23a
TL
1559 if (rep_->compression_dict != nullptr &&
1560 rep_->compression_dict->GetRawDict().size()) {
11fdf7f2
TL
1561 BlockHandle compression_dict_block_handle;
1562 if (ok()) {
494da23a 1563 WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
11fdf7f2 1564 &compression_dict_block_handle);
494da23a
TL
1565#ifndef NDEBUG
1566 Slice compression_dict = rep_->compression_dict->GetRawDict();
1567 TEST_SYNC_POINT_CALLBACK(
1568 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1569 &compression_dict);
1570#endif // NDEBUG
11fdf7f2
TL
1571 }
1572 if (ok()) {
1573 meta_index_builder->Add(kCompressionDictBlock,
1574 compression_dict_block_handle);
7c673cae 1575 }
11fdf7f2
TL
1576 }
1577}
7c673cae 1578
11fdf7f2
TL
1579void BlockBasedTableBuilder::WriteRangeDelBlock(
1580 MetaIndexBuilder* meta_index_builder) {
1581 if (ok() && !rep_->range_del_block.empty()) {
1582 BlockHandle range_del_block_handle;
1583 WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1584 &range_del_block_handle);
1585 meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1586 }
1587}
7c673cae 1588
494da23a
TL
1589void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1590 BlockHandle& index_block_handle) {
1591 Rep* r = rep_;
1592 // No need to write out new footer if we're using default checksum.
1593 // We're writing legacy magic number because we want old versions of RocksDB
1594 // be able to read files generated with new release (just in case if
1595 // somebody wants to roll back after an upgrade)
1596 // TODO(icanadi) at some point in the future, when we're absolutely sure
1597 // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1598 // number and always write new table files with new magic number
1599 bool legacy = (r->table_options.format_version == 0);
1600 // this is guaranteed by BlockBasedTableBuilder's constructor
1601 assert(r->table_options.checksum == kCRC32c ||
1602 r->table_options.format_version != 0);
1603 Footer footer(
1604 legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1605 r->table_options.format_version);
1606 footer.set_metaindex_handle(metaindex_block_handle);
1607 footer.set_index_handle(index_block_handle);
1608 footer.set_checksum(r->table_options.checksum);
1609 std::string footer_encoding;
1610 footer.EncodeTo(&footer_encoding);
20effc67
TL
1611 assert(ok());
1612 IOStatus ios = r->file->Append(footer_encoding);
1613 if (ios.ok()) {
1614 r->set_offset(r->get_offset() + footer_encoding.size());
1615 } else {
1616 r->SetIOStatus(ios);
1617 r->SetStatus(ios);
494da23a
TL
1618 }
1619}
1620
1621void BlockBasedTableBuilder::EnterUnbuffered() {
1622 Rep* r = rep_;
1623 assert(r->state == Rep::State::kBuffered);
1624 r->state = Rep::State::kUnbuffered;
1625 const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1626 ? r->compression_opts.zstd_max_train_bytes
1627 : r->compression_opts.max_dict_bytes;
1628 Random64 generator{r->creation_time};
1629 std::string compression_dict_samples;
1630 std::vector<size_t> compression_dict_sample_lens;
1631 if (!r->data_block_and_keys_buffers.empty()) {
1632 while (compression_dict_samples.size() < kSampleBytes) {
1633 size_t rand_idx =
f67539c2
TL
1634 static_cast<size_t>(
1635 generator.Uniform(r->data_block_and_keys_buffers.size()));
494da23a
TL
1636 size_t copy_len =
1637 std::min(kSampleBytes - compression_dict_samples.size(),
1638 r->data_block_and_keys_buffers[rand_idx].first.size());
1639 compression_dict_samples.append(
1640 r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
1641 compression_dict_sample_lens.emplace_back(copy_len);
1642 }
1643 }
1644
1645 // final data block flushed, now we can generate dictionary from the samples.
1646 // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1647 std::string dict;
1648 if (r->compression_opts.zstd_max_train_bytes > 0) {
1649 dict = ZSTD_TrainDictionary(compression_dict_samples,
1650 compression_dict_sample_lens,
1651 r->compression_opts.max_dict_bytes);
1652 } else {
1653 dict = std::move(compression_dict_samples);
1654 }
1655 r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1656 r->compression_opts.level));
1657 r->verify_dict.reset(new UncompressionDict(
1658 dict, r->compression_type == kZSTD ||
1659 r->compression_type == kZSTDNotFinalCompression));
1660
1661 for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
20effc67 1662 auto& data_block = r->data_block_and_keys_buffers[i].first;
494da23a
TL
1663 auto& keys = r->data_block_and_keys_buffers[i].second;
1664 assert(!data_block.empty());
1665 assert(!keys.empty());
1666
20effc67
TL
1667 if (r->IsParallelCompressionEnabled()) {
1668 Slice first_key_in_next_block;
1669 const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1670 if (i + 1 < r->data_block_and_keys_buffers.size()) {
1671 first_key_in_next_block =
1672 r->data_block_and_keys_buffers[i + 1].second.front();
1673 } else {
1674 first_key_in_next_block_ptr = r->first_key_in_next_block;
1675 }
1676
1677 ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
1678 r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
1679 assert(block_rep != nullptr);
1680 r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
1681 r->get_offset());
1682 r->pc_rep->EmitBlock(block_rep);
1683 } else {
1684 for (const auto& key : keys) {
1685 if (r->filter_builder != nullptr) {
1686 size_t ts_sz =
1687 r->internal_comparator.user_comparator()->timestamp_size();
1688 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1689 }
1690 r->index_builder->OnKeyAdded(key);
1691 }
1692 WriteBlock(Slice(data_block), &r->pending_handle,
1693 true /* is_data_block */);
1694 if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
1695 Slice first_key_in_next_block =
1696 r->data_block_and_keys_buffers[i + 1].second.front();
1697 Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1698 r->index_builder->AddIndexEntry(
1699 &keys.back(), first_key_in_next_block_ptr, r->pending_handle);
494da23a 1700 }
494da23a
TL
1701 }
1702 }
1703 r->data_block_and_keys_buffers.clear();
1704}
1705
11fdf7f2
TL
1706Status BlockBasedTableBuilder::Finish() {
1707 Rep* r = rep_;
494da23a 1708 assert(r->state != Rep::State::kClosed);
11fdf7f2 1709 bool empty_data_block = r->data_block.empty();
20effc67 1710 r->first_key_in_next_block = nullptr;
11fdf7f2 1711 Flush();
494da23a
TL
1712 if (r->state == Rep::State::kBuffered) {
1713 EnterUnbuffered();
1714 }
20effc67
TL
1715 if (r->IsParallelCompressionEnabled()) {
1716 StopParallelCompression();
1717#ifndef NDEBUG
1718 for (const auto& br : r->pc_rep->block_rep_buf) {
1719 assert(br.status.ok());
1720 }
1721#endif // !NDEBUG
1722 } else {
1723 // To make sure properties block is able to keep the accurate size of index
1724 // block, we will finish writing all index entries first.
1725 if (ok() && !empty_data_block) {
1726 r->index_builder->AddIndexEntry(
1727 &r->last_key, nullptr /* no next data block */, r->pending_handle);
1728 }
11fdf7f2 1729 }
7c673cae 1730
494da23a 1731 // Write meta blocks, metaindex block and footer in the following order.
11fdf7f2
TL
1732 // 1. [meta block: filter]
1733 // 2. [meta block: index]
1734 // 3. [meta block: compression dictionary]
1735 // 4. [meta block: range deletion tombstone]
1736 // 5. [meta block: properties]
1737 // 6. [metaindex block]
494da23a 1738 // 7. Footer
11fdf7f2
TL
1739 BlockHandle metaindex_block_handle, index_block_handle;
1740 MetaIndexBuilder meta_index_builder;
1741 WriteFilterBlock(&meta_index_builder);
1742 WriteIndexBlock(&meta_index_builder, &index_block_handle);
1743 WriteCompressionDictBlock(&meta_index_builder);
1744 WriteRangeDelBlock(&meta_index_builder);
1745 WritePropertiesBlock(&meta_index_builder);
7c673cae
FG
1746 if (ok()) {
1747 // flush the meta index block
1748 WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1749 &metaindex_block_handle);
7c673cae 1750 }
7c673cae 1751 if (ok()) {
494da23a 1752 WriteFooter(metaindex_block_handle, index_block_handle);
7c673cae 1753 }
494da23a 1754 r->state = Rep::State::kClosed;
20effc67
TL
1755 r->SetStatus(r->CopyIOStatus());
1756 Status ret_status = r->CopyStatus();
1757 assert(!ret_status.ok() || io_status().ok());
1758 return ret_status;
7c673cae
FG
1759}
1760
1761void BlockBasedTableBuilder::Abandon() {
494da23a 1762 assert(rep_->state != Rep::State::kClosed);
20effc67
TL
1763 if (rep_->IsParallelCompressionEnabled()) {
1764 StopParallelCompression();
1765 }
494da23a 1766 rep_->state = Rep::State::kClosed;
20effc67
TL
1767 rep_->CopyStatus().PermitUncheckedError();
1768 rep_->CopyIOStatus().PermitUncheckedError();
7c673cae
FG
1769}
1770
1771uint64_t BlockBasedTableBuilder::NumEntries() const {
1772 return rep_->props.num_entries;
1773}
1774
20effc67
TL
1775bool BlockBasedTableBuilder::IsEmpty() const {
1776 return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
1777}
1778
494da23a 1779uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
7c673cae 1780
20effc67
TL
1781uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
1782 if (rep_->IsParallelCompressionEnabled()) {
1783 // Use compression ratio so far and inflight raw bytes to estimate
1784 // final SST size.
1785 return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
1786 } else {
1787 return FileSize();
1788 }
1789}
1790
7c673cae
FG
1791bool BlockBasedTableBuilder::NeedCompact() const {
1792 for (const auto& collector : rep_->table_properties_collectors) {
1793 if (collector->NeedCompact()) {
1794 return true;
1795 }
1796 }
1797 return false;
1798}
1799
1800TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1801 TableProperties ret = rep_->props;
1802 for (const auto& collector : rep_->table_properties_collectors) {
1803 for (const auto& prop : collector->GetReadableProperties()) {
1804 ret.readable_properties.insert(prop);
1805 }
20effc67 1806 collector->Finish(&ret.user_collected_properties).PermitUncheckedError();
7c673cae
FG
1807 }
1808 return ret;
1809}
1810
20effc67
TL
1811std::string BlockBasedTableBuilder::GetFileChecksum() const {
1812 if (rep_->file != nullptr) {
1813 return rep_->file->GetFileChecksum();
1814 } else {
1815 return kUnknownFileChecksum;
1816 }
1817}
1818
f67539c2
TL
1819const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
1820 if (rep_->file != nullptr) {
1821 return rep_->file->GetFileChecksumFuncName();
1822 } else {
20effc67 1823 return kUnknownFileChecksumFuncName;
f67539c2
TL
1824 }
1825}
1826
7c673cae
FG
1827const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1828const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1829const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1830 "partitionedfilter.";
f67539c2 1831} // namespace ROCKSDB_NAMESPACE