]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/block_based_table_builder.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / table / block_based_table_builder.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/block_based_table_builder.h"
11
12 #include <assert.h>
13 #include <stdio.h>
14
15 #include <list>
16 #include <map>
17 #include <memory>
18 #include <string>
19 #include <unordered_map>
20 #include <utility>
21
22 #include "db/dbformat.h"
23
24 #include "rocksdb/cache.h"
25 #include "rocksdb/comparator.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/filter_policy.h"
28 #include "rocksdb/flush_block_policy.h"
29 #include "rocksdb/merge_operator.h"
30 #include "rocksdb/table.h"
31
32 #include "table/block.h"
33 #include "table/block_based_filter_block.h"
34 #include "table/block_based_table_factory.h"
35 #include "table/block_based_table_reader.h"
36 #include "table/block_builder.h"
37 #include "table/filter_block.h"
38 #include "table/format.h"
39 #include "table/full_filter_block.h"
40 #include "table/table_builder.h"
41
42 #include "util/coding.h"
43 #include "util/compression.h"
44 #include "util/crc32c.h"
45 #include "util/stop_watch.h"
46 #include "util/string_util.h"
47 #include "util/xxhash.h"
48
49 #include "table/index_builder.h"
50 #include "table/partitioned_filter_block.h"
51
52 namespace rocksdb {
53
54 extern const std::string kHashIndexPrefixesBlock;
55 extern const std::string kHashIndexPrefixesMetadataBlock;
56
57 typedef BlockBasedTableOptions::IndexType IndexType;
58
59 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
60 namespace {
61
62 // Create a filter block builder based on its type.
63 FilterBlockBuilder* CreateFilterBlockBuilder(
64 const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
65 const BlockBasedTableOptions& table_opt,
66 const bool use_delta_encoding_for_index_values,
67 PartitionedIndexBuilder* const p_index_builder) {
68 if (table_opt.filter_policy == nullptr) return nullptr;
69
70 FilterBitsBuilder* filter_bits_builder =
71 table_opt.filter_policy->GetFilterBitsBuilder();
72 if (filter_bits_builder == nullptr) {
73 return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
74 table_opt);
75 } else {
76 if (table_opt.partition_filters) {
77 assert(p_index_builder != nullptr);
78 // Since after partition cut request from filter builder it takes time
79 // until index builder actully cuts the partition, we take the lower bound
80 // as partition size.
81 assert(table_opt.block_size_deviation <= 100);
82 auto partition_size = static_cast<uint32_t>(
83 ((table_opt.metadata_block_size *
84 (100 - table_opt.block_size_deviation)) + 99) / 100);
85 partition_size = std::max(partition_size, static_cast<uint32_t>(1));
86 return new PartitionedFilterBlockBuilder(
87 mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
88 filter_bits_builder, table_opt.index_block_restart_interval,
89 use_delta_encoding_for_index_values, p_index_builder, partition_size);
90 } else {
91 return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
92 table_opt.whole_key_filtering,
93 filter_bits_builder);
94 }
95 }
96 }
97
98 bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
99 // Check to see if compressed less than 12.5%
100 return compressed_size < raw_size - (raw_size / 8u);
101 }
102
103 } // namespace
104
105 // format_version is the block format as defined in include/rocksdb/table.h
106 Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
107 CompressionType* type, uint32_t format_version,
108 std::string* compressed_output) {
109 *type = compression_ctx.type();
110 if (compression_ctx.type() == kNoCompression) {
111 return raw;
112 }
113
114 // Will return compressed block contents if (1) the compression method is
115 // supported in this platform and (2) the compression rate is "good enough".
116 switch (compression_ctx.type()) {
117 case kSnappyCompression:
118 if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
119 compressed_output) &&
120 GoodCompressionRatio(compressed_output->size(), raw.size())) {
121 return *compressed_output;
122 }
123 break; // fall back to no compression.
124 case kZlibCompression:
125 if (Zlib_Compress(
126 compression_ctx,
127 GetCompressFormatForVersion(kZlibCompression, format_version),
128 raw.data(), raw.size(), compressed_output) &&
129 GoodCompressionRatio(compressed_output->size(), raw.size())) {
130 return *compressed_output;
131 }
132 break; // fall back to no compression.
133 case kBZip2Compression:
134 if (BZip2_Compress(
135 compression_ctx,
136 GetCompressFormatForVersion(kBZip2Compression, format_version),
137 raw.data(), raw.size(), compressed_output) &&
138 GoodCompressionRatio(compressed_output->size(), raw.size())) {
139 return *compressed_output;
140 }
141 break; // fall back to no compression.
142 case kLZ4Compression:
143 if (LZ4_Compress(
144 compression_ctx,
145 GetCompressFormatForVersion(kLZ4Compression, format_version),
146 raw.data(), raw.size(), compressed_output) &&
147 GoodCompressionRatio(compressed_output->size(), raw.size())) {
148 return *compressed_output;
149 }
150 break; // fall back to no compression.
151 case kLZ4HCCompression:
152 if (LZ4HC_Compress(
153 compression_ctx,
154 GetCompressFormatForVersion(kLZ4HCCompression, format_version),
155 raw.data(), raw.size(), compressed_output) &&
156 GoodCompressionRatio(compressed_output->size(), raw.size())) {
157 return *compressed_output;
158 }
159 break; // fall back to no compression.
160 case kXpressCompression:
161 if (XPRESS_Compress(raw.data(), raw.size(),
162 compressed_output) &&
163 GoodCompressionRatio(compressed_output->size(), raw.size())) {
164 return *compressed_output;
165 }
166 break;
167 case kZSTD:
168 case kZSTDNotFinalCompression:
169 if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
170 compressed_output) &&
171 GoodCompressionRatio(compressed_output->size(), raw.size())) {
172 return *compressed_output;
173 }
174 break; // fall back to no compression.
175 default: {} // Do not recognize this compression type
176 }
177
178 // Compression method is not supported, or not good compression ratio, so just
179 // fall back to uncompressed form.
180 *type = kNoCompression;
181 return raw;
182 }
183
184 // kBlockBasedTableMagicNumber was picked by running
185 // echo rocksdb.table.block_based | sha1sum
186 // and taking the leading 64 bits.
187 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
188 // .cc files
189 // for that reason we declare it extern in the header but to get the space
190 // allocated
191 // it must be not extern in one place.
192 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
193 // We also support reading and writing legacy block based table format (for
194 // backwards compatibility)
195 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
196
197 // A collector that collects properties of interest to block-based table.
198 // For now this class looks heavy-weight since we only write one additional
199 // property.
200 // But in the foreseeable future, we will add more and more properties that are
201 // specific to block-based table.
202 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
203 : public IntTblPropCollector {
204 public:
205 explicit BlockBasedTablePropertiesCollector(
206 BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
207 bool prefix_filtering)
208 : index_type_(index_type),
209 whole_key_filtering_(whole_key_filtering),
210 prefix_filtering_(prefix_filtering) {}
211
212 virtual Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
213 uint64_t /*file_size*/) override {
214 // Intentionally left blank. Have no interest in collecting stats for
215 // individual key/value pairs.
216 return Status::OK();
217 }
218
219 virtual Status Finish(UserCollectedProperties* properties) override {
220 std::string val;
221 PutFixed32(&val, static_cast<uint32_t>(index_type_));
222 properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
223 properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
224 whole_key_filtering_ ? kPropTrue : kPropFalse});
225 properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
226 prefix_filtering_ ? kPropTrue : kPropFalse});
227 return Status::OK();
228 }
229
230 // The name of the properties collector can be used for debugging purpose.
231 virtual const char* Name() const override {
232 return "BlockBasedTablePropertiesCollector";
233 }
234
235 virtual UserCollectedProperties GetReadableProperties() const override {
236 // Intentionally left blank.
237 return UserCollectedProperties();
238 }
239
240 private:
241 BlockBasedTableOptions::IndexType index_type_;
242 bool whole_key_filtering_;
243 bool prefix_filtering_;
244 };
245
246 struct BlockBasedTableBuilder::Rep {
247 const ImmutableCFOptions ioptions;
248 const MutableCFOptions moptions;
249 const BlockBasedTableOptions table_options;
250 const InternalKeyComparator& internal_comparator;
251 WritableFileWriter* file;
252 uint64_t offset = 0;
253 Status status;
254 size_t alignment;
255 BlockBuilder data_block;
256 BlockBuilder range_del_block;
257
258 InternalKeySliceTransform internal_prefix_transform;
259 std::unique_ptr<IndexBuilder> index_builder;
260 PartitionedIndexBuilder* p_index_builder_ = nullptr;
261
262 std::string last_key;
263 // Compression dictionary or nullptr
264 const std::string* compression_dict;
265 CompressionContext compression_ctx;
266 std::unique_ptr<UncompressionContext> verify_ctx;
267 TableProperties props;
268
269 bool closed = false; // Either Finish() or Abandon() has been called.
270 const bool use_delta_encoding_for_index_values;
271 std::unique_ptr<FilterBlockBuilder> filter_builder;
272 char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
273 size_t compressed_cache_key_prefix_size;
274
275 BlockHandle pending_handle; // Handle to add to index block
276
277 std::string compressed_output;
278 std::unique_ptr<FlushBlockPolicy> flush_block_policy;
279 uint32_t column_family_id;
280 const std::string& column_family_name;
281 uint64_t creation_time = 0;
282 uint64_t oldest_key_time = 0;
283
284 std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
285
286 Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
287 const BlockBasedTableOptions& table_opt,
288 const InternalKeyComparator& icomparator,
289 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
290 int_tbl_prop_collector_factories,
291 uint32_t _column_family_id, WritableFileWriter* f,
292 const CompressionType _compression_type,
293 const CompressionOptions& _compression_opts,
294 const std::string* _compression_dict, const bool skip_filters,
295 const std::string& _column_family_name, const uint64_t _creation_time,
296 const uint64_t _oldest_key_time)
297 : ioptions(_ioptions),
298 moptions(_moptions),
299 table_options(table_opt),
300 internal_comparator(icomparator),
301 file(f),
302 alignment(table_options.block_align
303 ? std::min(table_options.block_size, kDefaultPageSize)
304 : 0),
305 data_block(table_options.block_restart_interval,
306 table_options.use_delta_encoding,
307 false /* use_value_delta_encoding */,
308 icomparator.user_comparator()
309 ->CanKeysWithDifferentByteContentsBeEqual()
310 ? BlockBasedTableOptions::kDataBlockBinarySearch
311 : table_options.data_block_index_type,
312 table_options.data_block_hash_table_util_ratio),
313 range_del_block(1 /* block_restart_interval */),
314 internal_prefix_transform(_moptions.prefix_extractor.get()),
315 compression_dict(_compression_dict),
316 compression_ctx(_compression_type, _compression_opts),
317 use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
318 !table_opt.block_align),
319 compressed_cache_key_prefix_size(0),
320 flush_block_policy(
321 table_options.flush_block_policy_factory->NewFlushBlockPolicy(
322 table_options, data_block)),
323 column_family_id(_column_family_id),
324 column_family_name(_column_family_name),
325 creation_time(_creation_time),
326 oldest_key_time(_oldest_key_time) {
327 if (table_options.index_type ==
328 BlockBasedTableOptions::kTwoLevelIndexSearch) {
329 p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
330 &internal_comparator, use_delta_encoding_for_index_values,
331 table_options);
332 index_builder.reset(p_index_builder_);
333 } else {
334 index_builder.reset(IndexBuilder::CreateIndexBuilder(
335 table_options.index_type, &internal_comparator,
336 &this->internal_prefix_transform, use_delta_encoding_for_index_values,
337 table_options));
338 }
339 if (skip_filters) {
340 filter_builder = nullptr;
341 } else {
342 filter_builder.reset(CreateFilterBlockBuilder(
343 _ioptions, _moptions, table_options,
344 use_delta_encoding_for_index_values, p_index_builder_));
345 }
346
347 for (auto& collector_factories : *int_tbl_prop_collector_factories) {
348 table_properties_collectors.emplace_back(
349 collector_factories->CreateIntTblPropCollector(column_family_id));
350 }
351 table_properties_collectors.emplace_back(
352 new BlockBasedTablePropertiesCollector(
353 table_options.index_type, table_options.whole_key_filtering,
354 _moptions.prefix_extractor != nullptr));
355 if (table_options.verify_compression) {
356 verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
357 compression_ctx.type()));
358 }
359 }
360
361 Rep(const Rep&) = delete;
362 Rep& operator=(const Rep&) = delete;
363
364 ~Rep() {}
365 };
366
367 BlockBasedTableBuilder::BlockBasedTableBuilder(
368 const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
369 const BlockBasedTableOptions& table_options,
370 const InternalKeyComparator& internal_comparator,
371 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
372 int_tbl_prop_collector_factories,
373 uint32_t column_family_id, WritableFileWriter* file,
374 const CompressionType compression_type,
375 const CompressionOptions& compression_opts,
376 const std::string* compression_dict, const bool skip_filters,
377 const std::string& column_family_name, const uint64_t creation_time,
378 const uint64_t oldest_key_time) {
379 BlockBasedTableOptions sanitized_table_options(table_options);
380 if (sanitized_table_options.format_version == 0 &&
381 sanitized_table_options.checksum != kCRC32c) {
382 ROCKS_LOG_WARN(
383 ioptions.info_log,
384 "Silently converting format_version to 1 because checksum is "
385 "non-default");
386 // silently convert format_version to 1 to keep consistent with current
387 // behavior
388 sanitized_table_options.format_version = 1;
389 }
390
391 rep_ =
392 new Rep(ioptions, moptions, sanitized_table_options, internal_comparator,
393 int_tbl_prop_collector_factories, column_family_id, file,
394 compression_type, compression_opts, compression_dict,
395 skip_filters, column_family_name, creation_time, oldest_key_time);
396
397 if (rep_->filter_builder != nullptr) {
398 rep_->filter_builder->StartBlock(0);
399 }
400 if (table_options.block_cache_compressed.get() != nullptr) {
401 BlockBasedTable::GenerateCachePrefix(
402 table_options.block_cache_compressed.get(), file->writable_file(),
403 &rep_->compressed_cache_key_prefix[0],
404 &rep_->compressed_cache_key_prefix_size);
405 }
406 }
407
408 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
409 assert(rep_->closed); // Catch errors where caller forgot to call Finish()
410 delete rep_;
411 }
412
413 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
414 Rep* r = rep_;
415 assert(!r->closed);
416 if (!ok()) return;
417 ValueType value_type = ExtractValueType(key);
418 if (IsValueType(value_type)) {
419 if (r->props.num_entries > 0) {
420 assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
421 }
422
423 auto should_flush = r->flush_block_policy->Update(key, value);
424 if (should_flush) {
425 assert(!r->data_block.empty());
426 Flush();
427
428 // Add item to index block.
429 // We do not emit the index entry for a block until we have seen the
430 // first key for the next data block. This allows us to use shorter
431 // keys in the index block. For example, consider a block boundary
432 // between the keys "the quick brown fox" and "the who". We can use
433 // "the r" as the key for the index block entry since it is >= all
434 // entries in the first block and < all entries in subsequent
435 // blocks.
436 if (ok()) {
437 r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
438 }
439 }
440
441 // Note: PartitionedFilterBlockBuilder requires key being added to filter
442 // builder after being added to index builder.
443 if (r->filter_builder != nullptr) {
444 r->filter_builder->Add(ExtractUserKey(key));
445 }
446
447 r->last_key.assign(key.data(), key.size());
448 r->data_block.Add(key, value);
449 r->props.num_entries++;
450 r->props.raw_key_size += key.size();
451 r->props.raw_value_size += value.size();
452
453 r->index_builder->OnKeyAdded(key);
454 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
455 r->table_properties_collectors,
456 r->ioptions.info_log);
457
458 } else if (value_type == kTypeRangeDeletion) {
459 r->range_del_block.Add(key, value);
460 ++r->props.num_range_deletions;
461 r->props.raw_key_size += key.size();
462 r->props.raw_value_size += value.size();
463 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
464 r->table_properties_collectors,
465 r->ioptions.info_log);
466 } else {
467 assert(false);
468 }
469 }
470
471 void BlockBasedTableBuilder::Flush() {
472 Rep* r = rep_;
473 assert(!r->closed);
474 if (!ok()) return;
475 if (r->data_block.empty()) return;
476 WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
477 if (r->filter_builder != nullptr) {
478 r->filter_builder->StartBlock(r->offset);
479 }
480 r->props.data_size = r->offset;
481 ++r->props.num_data_blocks;
482 }
483
484 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
485 BlockHandle* handle,
486 bool is_data_block) {
487 WriteBlock(block->Finish(), handle, is_data_block);
488 block->Reset();
489 }
490
491 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
492 BlockHandle* handle,
493 bool is_data_block) {
494 // File format contains a sequence of blocks where each block has:
495 // block_data: uint8[n]
496 // type: uint8
497 // crc: uint32
498 assert(ok());
499 Rep* r = rep_;
500
501 auto type = r->compression_ctx.type();
502 Slice block_contents;
503 bool abort_compression = false;
504
505 StopWatchNano timer(r->ioptions.env,
506 ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
507
508 if (raw_block_contents.size() < kCompressionSizeLimit) {
509 Slice compression_dict;
510 if (is_data_block && r->compression_dict && r->compression_dict->size()) {
511 r->compression_ctx.dict() = *r->compression_dict;
512 if (r->table_options.verify_compression) {
513 assert(r->verify_ctx != nullptr);
514 r->verify_ctx->dict() = *r->compression_dict;
515 }
516 } else {
517 // Clear dictionary
518 r->compression_ctx.dict() = Slice();
519 if (r->table_options.verify_compression) {
520 assert(r->verify_ctx != nullptr);
521 r->verify_ctx->dict() = Slice();
522 }
523 }
524
525 block_contents =
526 CompressBlock(raw_block_contents, r->compression_ctx, &type,
527 r->table_options.format_version, &r->compressed_output);
528
529 // Some of the compression algorithms are known to be unreliable. If
530 // the verify_compression flag is set then try to de-compress the
531 // compressed data and compare to the input.
532 if (type != kNoCompression && r->table_options.verify_compression) {
533 // Retrieve the uncompressed contents into a new buffer
534 BlockContents contents;
535 Status stat = UncompressBlockContentsForCompressionType(
536 *r->verify_ctx, block_contents.data(), block_contents.size(),
537 &contents, r->table_options.format_version, r->ioptions);
538
539 if (stat.ok()) {
540 bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
541 if (!compressed_ok) {
542 // The result of the compression was invalid. abort.
543 abort_compression = true;
544 ROCKS_LOG_ERROR(r->ioptions.info_log,
545 "Decompressed block did not match raw block");
546 r->status =
547 Status::Corruption("Decompressed block did not match raw block");
548 }
549 } else {
550 // Decompression reported an error. abort.
551 r->status = Status::Corruption("Could not decompress");
552 abort_compression = true;
553 }
554 }
555 } else {
556 // Block is too big to be compressed.
557 abort_compression = true;
558 }
559
560 // Abort compression if the block is too big, or did not pass
561 // verification.
562 if (abort_compression) {
563 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
564 type = kNoCompression;
565 block_contents = raw_block_contents;
566 } else if (type != kNoCompression) {
567 if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
568 MeasureTime(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
569 timer.ElapsedNanos());
570 }
571 MeasureTime(r->ioptions.statistics, BYTES_COMPRESSED,
572 raw_block_contents.size());
573 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
574 }
575
576 WriteRawBlock(block_contents, type, handle, is_data_block);
577 r->compressed_output.clear();
578 }
579
580 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
581 CompressionType type,
582 BlockHandle* handle,
583 bool is_data_block) {
584 Rep* r = rep_;
585 StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
586 handle->set_offset(r->offset);
587 handle->set_size(block_contents.size());
588 assert(r->status.ok());
589 r->status = r->file->Append(block_contents);
590 if (r->status.ok()) {
591 char trailer[kBlockTrailerSize];
592 trailer[0] = type;
593 char* trailer_without_type = trailer + 1;
594 switch (r->table_options.checksum) {
595 case kNoChecksum:
596 EncodeFixed32(trailer_without_type, 0);
597 break;
598 case kCRC32c: {
599 auto crc = crc32c::Value(block_contents.data(), block_contents.size());
600 crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
601 EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
602 break;
603 }
604 case kxxHash: {
605 void* xxh = XXH32_init(0);
606 XXH32_update(xxh, block_contents.data(),
607 static_cast<uint32_t>(block_contents.size()));
608 XXH32_update(xxh, trailer, 1); // Extend to cover block type
609 EncodeFixed32(trailer_without_type, XXH32_digest(xxh));
610 break;
611 }
612 }
613
614 assert(r->status.ok());
615 r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
616 if (r->status.ok()) {
617 r->status = InsertBlockInCache(block_contents, type, handle);
618 }
619 if (r->status.ok()) {
620 r->offset += block_contents.size() + kBlockTrailerSize;
621 if (r->table_options.block_align && is_data_block) {
622 size_t pad_bytes =
623 (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
624 (r->alignment - 1))) &
625 (r->alignment - 1);
626 r->status = r->file->Pad(pad_bytes);
627 if (r->status.ok()) {
628 r->offset += pad_bytes;
629 }
630 }
631 }
632 }
633 }
634
635 Status BlockBasedTableBuilder::status() const {
636 return rep_->status;
637 }
638
639 static void DeleteCachedBlock(const Slice& /*key*/, void* value) {
640 Block* block = reinterpret_cast<Block*>(value);
641 delete block;
642 }
643
644 //
645 // Make a copy of the block contents and insert into compressed block cache
646 //
647 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
648 const CompressionType type,
649 const BlockHandle* handle) {
650 Rep* r = rep_;
651 Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
652
653 if (type != kNoCompression && block_cache_compressed != nullptr) {
654
655 size_t size = block_contents.size();
656
657 std::unique_ptr<char[]> ubuf(new char[size + 1]);
658 memcpy(ubuf.get(), block_contents.data(), size);
659 ubuf[size] = type;
660
661 BlockContents results(std::move(ubuf), size, true, type);
662
663 Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber);
664
665 // make cache key by appending the file offset to the cache prefix id
666 char* end = EncodeVarint64(
667 r->compressed_cache_key_prefix +
668 r->compressed_cache_key_prefix_size,
669 handle->offset());
670 Slice key(r->compressed_cache_key_prefix, static_cast<size_t>
671 (end - r->compressed_cache_key_prefix));
672
673 // Insert into compressed block cache.
674 block_cache_compressed->Insert(key, block, block->ApproximateMemoryUsage(),
675 &DeleteCachedBlock);
676
677 // Invalidate OS cache.
678 r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
679 }
680 return Status::OK();
681 }
682
683 void BlockBasedTableBuilder::WriteFilterBlock(
684 MetaIndexBuilder* meta_index_builder) {
685 BlockHandle filter_block_handle;
686 bool empty_filter_block = (rep_->filter_builder == nullptr ||
687 rep_->filter_builder->NumAdded() == 0);
688 if (ok() && !empty_filter_block) {
689 Status s = Status::Incomplete();
690 while (ok() && s.IsIncomplete()) {
691 Slice filter_content =
692 rep_->filter_builder->Finish(filter_block_handle, &s);
693 assert(s.ok() || s.IsIncomplete());
694 rep_->props.filter_size += filter_content.size();
695 WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
696 }
697 }
698 if (ok() && !empty_filter_block) {
699 // Add mapping from "<filter_block_prefix>.Name" to location
700 // of filter data.
701 std::string key;
702 if (rep_->filter_builder->IsBlockBased()) {
703 key = BlockBasedTable::kFilterBlockPrefix;
704 } else {
705 key = rep_->table_options.partition_filters
706 ? BlockBasedTable::kPartitionedFilterBlockPrefix
707 : BlockBasedTable::kFullFilterBlockPrefix;
708 }
709 key.append(rep_->table_options.filter_policy->Name());
710 meta_index_builder->Add(key, filter_block_handle);
711 }
712 }
713
714 void BlockBasedTableBuilder::WriteIndexBlock(
715 MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
716 IndexBuilder::IndexBlocks index_blocks;
717 auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
718 if (index_builder_status.IsIncomplete()) {
719 // We we have more than one index partition then meta_blocks are not
720 // supported for the index. Currently meta_blocks are used only by
721 // HashIndexBuilder which is not multi-partition.
722 assert(index_blocks.meta_blocks.empty());
723 } else if (ok() && !index_builder_status.ok()) {
724 rep_->status = index_builder_status;
725 }
726 if (ok()) {
727 for (const auto& item : index_blocks.meta_blocks) {
728 BlockHandle block_handle;
729 WriteBlock(item.second, &block_handle, false /* is_data_block */);
730 if (!ok()) {
731 break;
732 }
733 meta_index_builder->Add(item.first, block_handle);
734 }
735 }
736 if (ok()) {
737 if (rep_->table_options.enable_index_compression) {
738 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
739 } else {
740 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
741 index_block_handle);
742 }
743 }
744 // If there are more index partitions, finish them and write them out
745 Status s = index_builder_status;
746 while (ok() && s.IsIncomplete()) {
747 s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
748 if (!s.ok() && !s.IsIncomplete()) {
749 rep_->status = s;
750 return;
751 }
752 if (rep_->table_options.enable_index_compression) {
753 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
754 } else {
755 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
756 index_block_handle);
757 }
758 // The last index_block_handle will be for the partition index block
759 }
760 }
761
762 void BlockBasedTableBuilder::WritePropertiesBlock(
763 MetaIndexBuilder* meta_index_builder) {
764 BlockHandle properties_block_handle;
765 if (ok()) {
766 PropertyBlockBuilder property_block_builder;
767 rep_->props.column_family_id = rep_->column_family_id;
768 rep_->props.column_family_name = rep_->column_family_name;
769 rep_->props.filter_policy_name =
770 rep_->table_options.filter_policy != nullptr
771 ? rep_->table_options.filter_policy->Name()
772 : "";
773 rep_->props.index_size =
774 rep_->index_builder->IndexSize() + kBlockTrailerSize;
775 rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
776 ? rep_->ioptions.user_comparator->Name()
777 : "nullptr";
778 rep_->props.merge_operator_name =
779 rep_->ioptions.merge_operator != nullptr
780 ? rep_->ioptions.merge_operator->Name()
781 : "nullptr";
782 rep_->props.compression_name =
783 CompressionTypeToString(rep_->compression_ctx.type());
784 rep_->props.prefix_extractor_name =
785 rep_->moptions.prefix_extractor != nullptr
786 ? rep_->moptions.prefix_extractor->Name()
787 : "nullptr";
788
789 std::string property_collectors_names = "[";
790 for (size_t i = 0;
791 i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
792 if (i != 0) {
793 property_collectors_names += ",";
794 }
795 property_collectors_names +=
796 rep_->ioptions.table_properties_collector_factories[i]->Name();
797 }
798 property_collectors_names += "]";
799 rep_->props.property_collectors_names = property_collectors_names;
800 if (rep_->table_options.index_type ==
801 BlockBasedTableOptions::kTwoLevelIndexSearch) {
802 assert(rep_->p_index_builder_ != nullptr);
803 rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
804 rep_->props.top_level_index_size =
805 rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
806 }
807 rep_->props.index_key_is_user_key =
808 !rep_->index_builder->seperator_is_key_plus_seq();
809 rep_->props.index_value_is_delta_encoded =
810 rep_->use_delta_encoding_for_index_values;
811 rep_->props.creation_time = rep_->creation_time;
812 rep_->props.oldest_key_time = rep_->oldest_key_time;
813
814 // Add basic properties
815 property_block_builder.AddTableProperty(rep_->props);
816
817 // Add use collected properties
818 NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
819 rep_->ioptions.info_log,
820 &property_block_builder);
821
822 WriteRawBlock(property_block_builder.Finish(), kNoCompression,
823 &properties_block_handle);
824 }
825 if (ok()) {
826 meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
827 }
828 }
829
830 void BlockBasedTableBuilder::WriteCompressionDictBlock(
831 MetaIndexBuilder* meta_index_builder) {
832 if (rep_->compression_dict && rep_->compression_dict->size()) {
833 BlockHandle compression_dict_block_handle;
834 if (ok()) {
835 WriteRawBlock(*rep_->compression_dict, kNoCompression,
836 &compression_dict_block_handle);
837 }
838 if (ok()) {
839 meta_index_builder->Add(kCompressionDictBlock,
840 compression_dict_block_handle);
841 }
842 }
843 }
844
845 void BlockBasedTableBuilder::WriteRangeDelBlock(
846 MetaIndexBuilder* meta_index_builder) {
847 if (ok() && !rep_->range_del_block.empty()) {
848 BlockHandle range_del_block_handle;
849 WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
850 &range_del_block_handle);
851 meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
852 }
853 }
854
855 Status BlockBasedTableBuilder::Finish() {
856 Rep* r = rep_;
857 bool empty_data_block = r->data_block.empty();
858 Flush();
859 assert(!r->closed);
860 r->closed = true;
861
862 // To make sure properties block is able to keep the accurate size of index
863 // block, we will finish writing all index entries first.
864 if (ok() && !empty_data_block) {
865 r->index_builder->AddIndexEntry(
866 &r->last_key, nullptr /* no next data block */, r->pending_handle);
867 }
868
869 // Write meta blocks and metaindex block with the following order.
870 // 1. [meta block: filter]
871 // 2. [meta block: index]
872 // 3. [meta block: compression dictionary]
873 // 4. [meta block: range deletion tombstone]
874 // 5. [meta block: properties]
875 // 6. [metaindex block]
876 BlockHandle metaindex_block_handle, index_block_handle;
877 MetaIndexBuilder meta_index_builder;
878 WriteFilterBlock(&meta_index_builder);
879 WriteIndexBlock(&meta_index_builder, &index_block_handle);
880 WriteCompressionDictBlock(&meta_index_builder);
881 WriteRangeDelBlock(&meta_index_builder);
882 WritePropertiesBlock(&meta_index_builder);
883 if (ok()) {
884 // flush the meta index block
885 WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
886 &metaindex_block_handle);
887 }
888
889 // Write footer
890 if (ok()) {
891 // No need to write out new footer if we're using default checksum.
892 // We're writing legacy magic number because we want old versions of RocksDB
893 // be able to read files generated with new release (just in case if
894 // somebody wants to roll back after an upgrade)
895 // TODO(icanadi) at some point in the future, when we're absolutely sure
896 // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
897 // number and always write new table files with new magic number
898 bool legacy = (r->table_options.format_version == 0);
899 // this is guaranteed by BlockBasedTableBuilder's constructor
900 assert(r->table_options.checksum == kCRC32c ||
901 r->table_options.format_version != 0);
902 Footer footer(legacy ? kLegacyBlockBasedTableMagicNumber
903 : kBlockBasedTableMagicNumber,
904 r->table_options.format_version);
905 footer.set_metaindex_handle(metaindex_block_handle);
906 footer.set_index_handle(index_block_handle);
907 footer.set_checksum(r->table_options.checksum);
908 std::string footer_encoding;
909 footer.EncodeTo(&footer_encoding);
910 assert(r->status.ok());
911 r->status = r->file->Append(footer_encoding);
912 if (r->status.ok()) {
913 r->offset += footer_encoding.size();
914 }
915 }
916
917 return r->status;
918 }
919
920 void BlockBasedTableBuilder::Abandon() {
921 Rep* r = rep_;
922 assert(!r->closed);
923 r->closed = true;
924 }
925
926 uint64_t BlockBasedTableBuilder::NumEntries() const {
927 return rep_->props.num_entries;
928 }
929
930 uint64_t BlockBasedTableBuilder::FileSize() const {
931 return rep_->offset;
932 }
933
934 bool BlockBasedTableBuilder::NeedCompact() const {
935 for (const auto& collector : rep_->table_properties_collectors) {
936 if (collector->NeedCompact()) {
937 return true;
938 }
939 }
940 return false;
941 }
942
943 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
944 TableProperties ret = rep_->props;
945 for (const auto& collector : rep_->table_properties_collectors) {
946 for (const auto& prop : collector->GetReadableProperties()) {
947 ret.readable_properties.insert(prop);
948 }
949 collector->Finish(&ret.user_collected_properties);
950 }
951 return ret;
952 }
953
954 const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
955 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
956 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
957 "partitionedfilter.";
958 } // namespace rocksdb