#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
+#include "util/memory_allocator.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/xxhash.h"
// until index builder actully cuts the partition, we take the lower bound
// as partition size.
assert(table_opt.block_size_deviation <= 100);
- auto partition_size = static_cast<uint32_t>(
- ((table_opt.metadata_block_size *
- (100 - table_opt.block_size_deviation)) + 99) / 100);
+ auto partition_size =
+ static_cast<uint32_t>(((table_opt.metadata_block_size *
+ (100 - table_opt.block_size_deviation)) +
+ 99) /
+ 100);
partition_size = std::max(partition_size, static_cast<uint32_t>(1));
return new PartitionedFilterBlockBuilder(
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
return compressed_size < raw_size - (raw_size / 8u);
}
-} // namespace
-
-// format_version is the block format as defined in include/rocksdb/table.h
-Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
- CompressionType* type, uint32_t format_version,
- std::string* compressed_output) {
- *type = compression_ctx.type();
- if (compression_ctx.type() == kNoCompression) {
- return raw;
- }
-
+bool CompressBlockInternal(const Slice& raw,
+ const CompressionInfo& compression_info,
+ uint32_t format_version,
+ std::string* compressed_output) {
// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
- switch (compression_ctx.type()) {
+ switch (compression_info.type()) {
case kSnappyCompression:
- if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
- compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
- break; // fall back to no compression.
+ return Snappy_Compress(compression_info, raw.data(), raw.size(),
+ compressed_output);
case kZlibCompression:
- if (Zlib_Compress(
- compression_ctx,
- GetCompressFormatForVersion(kZlibCompression, format_version),
- raw.data(), raw.size(), compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
- break; // fall back to no compression.
+ return Zlib_Compress(
+ compression_info,
+ GetCompressFormatForVersion(kZlibCompression, format_version),
+ raw.data(), raw.size(), compressed_output);
case kBZip2Compression:
- if (BZip2_Compress(
- compression_ctx,
- GetCompressFormatForVersion(kBZip2Compression, format_version),
- raw.data(), raw.size(), compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
- break; // fall back to no compression.
+ return BZip2_Compress(
+ compression_info,
+ GetCompressFormatForVersion(kBZip2Compression, format_version),
+ raw.data(), raw.size(), compressed_output);
case kLZ4Compression:
- if (LZ4_Compress(
- compression_ctx,
- GetCompressFormatForVersion(kLZ4Compression, format_version),
- raw.data(), raw.size(), compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
- break; // fall back to no compression.
+ return LZ4_Compress(
+ compression_info,
+ GetCompressFormatForVersion(kLZ4Compression, format_version),
+ raw.data(), raw.size(), compressed_output);
case kLZ4HCCompression:
- if (LZ4HC_Compress(
- compression_ctx,
- GetCompressFormatForVersion(kLZ4HCCompression, format_version),
- raw.data(), raw.size(), compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
- break; // fall back to no compression.
+ return LZ4HC_Compress(
+ compression_info,
+ GetCompressFormatForVersion(kLZ4HCCompression, format_version),
+ raw.data(), raw.size(), compressed_output);
case kXpressCompression:
- if (XPRESS_Compress(raw.data(), raw.size(),
- compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
- break;
+ return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
case kZSTD:
case kZSTDNotFinalCompression:
- if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
- compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
- break; // fall back to no compression.
- default: {} // Do not recognize this compression type
+ return ZSTD_Compress(compression_info, raw.data(), raw.size(),
+ compressed_output);
+ default:
+ // Do not recognize this compression type
+ return false;
+ }
+}
+
+} // namespace
+
+// format_version is the block format as defined in include/rocksdb/table.h
+Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
+ CompressionType* type, uint32_t format_version,
+ bool do_sample, std::string* compressed_output,
+ std::string* sampled_output_fast,
+ std::string* sampled_output_slow) {
+ *type = info.type();
+
+ if (info.type() == kNoCompression && !info.SampleForCompression()) {
+ return raw;
}
- // Compression method is not supported, or not good compression ratio, so just
- // fall back to uncompressed form.
+ // If requested, we sample one in every N block with a
+ // fast and slow compression algorithm and report the stats.
+ // The users can use these stats to decide if it is worthwhile
+ // enabling compression and they also get a hint about which
+ // compression algorithm wil be beneficial.
+ if (do_sample && info.SampleForCompression() &&
+ Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
+ sampled_output_fast && sampled_output_slow) {
+ // Sampling with a fast compression algorithm
+ if (LZ4_Supported() || Snappy_Supported()) {
+ CompressionType c =
+ LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
+ CompressionContext context(c);
+ CompressionOptions options;
+ CompressionInfo info_tmp(options, context,
+ CompressionDict::GetEmptyDict(), c,
+ info.SampleForCompression());
+
+ CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
+ }
+
+ // Sampling with a slow but high-compression algorithm
+ if (ZSTD_Supported() || Zlib_Supported()) {
+ CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
+ CompressionContext context(c);
+ CompressionOptions options;
+ CompressionInfo info_tmp(options, context,
+ CompressionDict::GetEmptyDict(), c,
+ info.SampleForCompression());
+ CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
+ }
+ }
+
+ // Actually compress the data
+ if (*type != kNoCompression) {
+ if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
+ GoodCompressionRatio(compressed_output->size(), raw.size())) {
+ return *compressed_output;
+ }
+ }
+
+ // Compression method is not supported, or not good
+ // compression ratio, so just fall back to uncompressed form.
*type = kNoCompression;
return raw;
}
whole_key_filtering_(whole_key_filtering),
prefix_filtering_(prefix_filtering) {}
- virtual Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
- uint64_t /*file_size*/) override {
+ Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
+ uint64_t /*file_size*/) override {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
- virtual Status Finish(UserCollectedProperties* properties) override {
+ virtual void BlockAdd(uint64_t /* blockRawBytes */,
+ uint64_t /* blockCompressedBytesFast */,
+ uint64_t /* blockCompressedBytesSlow */) override {
+ // Intentionally left blank. No interest in collecting stats for
+ // blocks.
+ return;
+ }
+
+ Status Finish(UserCollectedProperties* properties) override {
std::string val;
PutFixed32(&val, static_cast<uint32_t>(index_type_));
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
}
// The name of the properties collector can be used for debugging purpose.
- virtual const char* Name() const override {
+ const char* Name() const override {
return "BlockBasedTablePropertiesCollector";
}
- virtual UserCollectedProperties GetReadableProperties() const override {
+ UserCollectedProperties GetReadableProperties() const override {
// Intentionally left blank.
return UserCollectedProperties();
}
Status status;
size_t alignment;
BlockBuilder data_block;
+ // Buffers uncompressed data blocks and keys to replay later. Needed when
+ // compression dictionary is enabled so we can finalize the dictionary before
+ // compressing any data blocks.
+ // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
+ // blocks as it's redundant, but it's easier to implement for now.
+ std::vector<std::pair<std::string, std::vector<std::string>>>
+ data_block_and_keys_buffers;
BlockBuilder range_del_block;
InternalKeySliceTransform internal_prefix_transform;
PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_key;
- // Compression dictionary or nullptr
- const std::string* compression_dict;
+ CompressionType compression_type;
+ uint64_t sample_for_compression;
+ CompressionOptions compression_opts;
+ std::unique_ptr<CompressionDict> compression_dict;
CompressionContext compression_ctx;
std::unique_ptr<UncompressionContext> verify_ctx;
+ std::unique_ptr<UncompressionDict> verify_dict;
+
+ size_t data_begin_offset = 0;
+
TableProperties props;
- bool closed = false; // Either Finish() or Abandon() has been called.
+ // States of the builder.
+ //
+ // - `kBuffered`: This is the initial state where zero or more data blocks are
+ // accumulated uncompressed in-memory. From this state, call
+ // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
+ // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
+ // state.
+ //
+ // - `kUnbuffered`: This is the state when compression dictionary is finalized
+ // either because it wasn't enabled in the first place or it's been created
+ // from sampling previously buffered data. In this state, blocks are simply
+ // compressed/written out as they fill up. From this state, call `Finish()`
+ // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
+ // the partially created file.
+ //
+ // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
+ // called, so the table builder is no longer usable. We must be in this
+ // state by the time the destructor runs.
+ enum class State {
+ kBuffered,
+ kUnbuffered,
+ kClosed,
+ };
+ State state;
+
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
const std::string& column_family_name;
uint64_t creation_time = 0;
uint64_t oldest_key_time = 0;
+ const uint64_t target_file_size;
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
int_tbl_prop_collector_factories,
uint32_t _column_family_id, WritableFileWriter* f,
const CompressionType _compression_type,
- const CompressionOptions& _compression_opts,
- const std::string* _compression_dict, const bool skip_filters,
+ const uint64_t _sample_for_compression,
+ const CompressionOptions& _compression_opts, const bool skip_filters,
const std::string& _column_family_name, const uint64_t _creation_time,
- const uint64_t _oldest_key_time)
+ const uint64_t _oldest_key_time, const uint64_t _target_file_size)
: ioptions(_ioptions),
moptions(_moptions),
table_options(table_opt),
table_options.data_block_hash_table_util_ratio),
range_del_block(1 /* block_restart_interval */),
internal_prefix_transform(_moptions.prefix_extractor.get()),
- compression_dict(_compression_dict),
- compression_ctx(_compression_type, _compression_opts),
+ compression_type(_compression_type),
+ sample_for_compression(_sample_for_compression),
+ compression_opts(_compression_opts),
+ compression_dict(),
+ compression_ctx(_compression_type),
+ verify_dict(),
+ state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
+ : State::kUnbuffered),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
compressed_cache_key_prefix_size(0),
column_family_id(_column_family_id),
column_family_name(_column_family_name),
creation_time(_creation_time),
- oldest_key_time(_oldest_key_time) {
+ oldest_key_time(_oldest_key_time),
+ target_file_size(_target_file_size) {
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
_moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) {
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
- compression_ctx.type()));
+ compression_type));
}
}
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
- const CompressionOptions& compression_opts,
- const std::string* compression_dict, const bool skip_filters,
+ const uint64_t sample_for_compression,
+ const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time,
- const uint64_t oldest_key_time) {
+ const uint64_t oldest_key_time, const uint64_t target_file_size) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
sanitized_table_options.format_version = 1;
}
- rep_ =
- new Rep(ioptions, moptions, sanitized_table_options, internal_comparator,
- int_tbl_prop_collector_factories, column_family_id, file,
- compression_type, compression_opts, compression_dict,
- skip_filters, column_family_name, creation_time, oldest_key_time);
+ rep_ = new Rep(
+ ioptions, moptions, sanitized_table_options, internal_comparator,
+ int_tbl_prop_collector_factories, column_family_id, file,
+ compression_type, sample_for_compression, compression_opts, skip_filters,
+ column_family_name, creation_time, oldest_key_time, target_file_size);
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
}
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
- assert(rep_->closed); // Catch errors where caller forgot to call Finish()
+ // Catch errors where caller forgot to call Finish()
+ assert(rep_->state == Rep::State::kClosed);
delete rep_;
}
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
- assert(!r->closed);
+ assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
ValueType value_type = ExtractValueType(key);
if (IsValueType(value_type)) {
- if (r->props.num_entries > 0) {
+#ifndef NDEBUG
+ if (r->props.num_entries > r->props.num_range_deletions) {
assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
}
+#endif // NDEBUG
auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
Flush();
+ if (r->state == Rep::State::kBuffered &&
+ r->data_begin_offset > r->target_file_size) {
+ EnterUnbuffered();
+ }
+
// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
- if (ok()) {
+ if (ok() && r->state == Rep::State::kUnbuffered) {
r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
}
}
// Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder.
- if (r->filter_builder != nullptr) {
+ if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
r->filter_builder->Add(ExtractUserKey(key));
}
r->last_key.assign(key.data(), key.size());
r->data_block.Add(key, value);
- r->props.num_entries++;
- r->props.raw_key_size += key.size();
- r->props.raw_value_size += value.size();
-
- r->index_builder->OnKeyAdded(key);
+ if (r->state == Rep::State::kBuffered) {
+ // Buffer keys to be replayed during `Finish()` once compression
+ // dictionary has been finalized.
+ if (r->data_block_and_keys_buffers.empty() || should_flush) {
+ r->data_block_and_keys_buffers.emplace_back();
+ }
+ r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
+ } else {
+ r->index_builder->OnKeyAdded(key);
+ }
NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
r->table_properties_collectors,
r->ioptions.info_log);
} else if (value_type == kTypeRangeDeletion) {
r->range_del_block.Add(key, value);
- ++r->props.num_range_deletions;
- r->props.raw_key_size += key.size();
- r->props.raw_value_size += value.size();
NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
r->table_properties_collectors,
r->ioptions.info_log);
} else {
assert(false);
}
+
+ r->props.num_entries++;
+ r->props.raw_key_size += key.size();
+ r->props.raw_value_size += value.size();
+ if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
+ r->props.num_deletions++;
+ } else if (value_type == kTypeRangeDeletion) {
+ r->props.num_deletions++;
+ r->props.num_range_deletions++;
+ } else if (value_type == kTypeMerge) {
+ r->props.num_merge_operands++;
+ }
}
void BlockBasedTableBuilder::Flush() {
Rep* r = rep_;
- assert(!r->closed);
+ assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
if (r->data_block.empty()) return;
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
- if (r->filter_builder != nullptr) {
- r->filter_builder->StartBlock(r->offset);
- }
- r->props.data_size = r->offset;
- ++r->props.num_data_blocks;
}
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
assert(ok());
Rep* r = rep_;
- auto type = r->compression_ctx.type();
+ auto type = r->compression_type;
+ uint64_t sample_for_compression = r->sample_for_compression;
Slice block_contents;
bool abort_compression = false;
- StopWatchNano timer(r->ioptions.env,
- ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
+ StopWatchNano timer(
+ r->ioptions.env,
+ ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
+
+ if (r->state == Rep::State::kBuffered) {
+ assert(is_data_block);
+ assert(!r->data_block_and_keys_buffers.empty());
+ r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
+ r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
+ return;
+ }
if (raw_block_contents.size() < kCompressionSizeLimit) {
- Slice compression_dict;
- if (is_data_block && r->compression_dict && r->compression_dict->size()) {
- r->compression_ctx.dict() = *r->compression_dict;
- if (r->table_options.verify_compression) {
- assert(r->verify_ctx != nullptr);
- r->verify_ctx->dict() = *r->compression_dict;
- }
+ const CompressionDict* compression_dict;
+ if (!is_data_block || r->compression_dict == nullptr) {
+ compression_dict = &CompressionDict::GetEmptyDict();
} else {
- // Clear dictionary
- r->compression_ctx.dict() = Slice();
- if (r->table_options.verify_compression) {
- assert(r->verify_ctx != nullptr);
- r->verify_ctx->dict() = Slice();
- }
+ compression_dict = r->compression_dict.get();
}
-
- block_contents =
- CompressBlock(raw_block_contents, r->compression_ctx, &type,
- r->table_options.format_version, &r->compressed_output);
+ assert(compression_dict != nullptr);
+ CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
+ *compression_dict, type,
+ sample_for_compression);
+
+ std::string sampled_output_fast;
+ std::string sampled_output_slow;
+ block_contents = CompressBlock(
+ raw_block_contents, compression_info, &type,
+ r->table_options.format_version, is_data_block /* do_sample */,
+ &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
+
+ // notify collectors on block add
+ NotifyCollectTableCollectorsOnBlockAdd(
+ r->table_properties_collectors, raw_block_contents.size(),
+ sampled_output_fast.size(), sampled_output_slow.size());
// Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the
// compressed data and compare to the input.
if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer
+ const UncompressionDict* verify_dict;
+ if (!is_data_block || r->verify_dict == nullptr) {
+ verify_dict = &UncompressionDict::GetEmptyDict();
+ } else {
+ verify_dict = r->verify_dict.get();
+ }
+ assert(verify_dict != nullptr);
BlockContents contents;
+ UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
+ r->compression_type);
Status stat = UncompressBlockContentsForCompressionType(
- *r->verify_ctx, block_contents.data(), block_contents.size(),
+ uncompression_info, block_contents.data(), block_contents.size(),
&contents, r->table_options.format_version, r->ioptions);
if (stat.ok()) {
block_contents = raw_block_contents;
} else if (type != kNoCompression) {
if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
- MeasureTime(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
- timer.ElapsedNanos());
+ RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
+ timer.ElapsedNanos());
}
- MeasureTime(r->ioptions.statistics, BYTES_COMPRESSED,
- raw_block_contents.size());
+ RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
+ raw_block_contents.size());
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
+ } else if (type != r->compression_type) {
+ RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
}
WriteRawBlock(block_contents, type, handle, is_data_block);
r->compressed_output.clear();
+ if (is_data_block) {
+ if (r->filter_builder != nullptr) {
+ r->filter_builder->StartBlock(r->offset);
+ }
+ r->props.data_size = r->offset;
+ ++r->props.num_data_blocks;
+ }
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
EncodeFixed32(trailer_without_type, XXH32_digest(xxh));
break;
}
+ case kxxHash64: {
+ XXH64_state_t* const state = XXH64_createState();
+ XXH64_reset(state, 0);
+ XXH64_update(state, block_contents.data(),
+ static_cast<uint32_t>(block_contents.size()));
+ XXH64_update(state, trailer, 1); // Extend to cover block type
+ EncodeFixed32(
+ trailer_without_type,
+ static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
+ uint64_t{0xffffffff}));
+ XXH64_freeState(state);
+ break;
+ }
}
assert(r->status.ok());
+ TEST_SYNC_POINT_CALLBACK(
+ "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
+ static_cast<char*>(trailer));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->status = InsertBlockInCache(block_contents, type, handle);
}
}
-Status BlockBasedTableBuilder::status() const {
- return rep_->status;
-}
+Status BlockBasedTableBuilder::status() const { return rep_->status; }
-static void DeleteCachedBlock(const Slice& /*key*/, void* value) {
- Block* block = reinterpret_cast<Block*>(value);
- delete block;
+static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
+ BlockContents* bc = reinterpret_cast<BlockContents*>(value);
+ delete bc;
}
//
Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
if (type != kNoCompression && block_cache_compressed != nullptr) {
-
size_t size = block_contents.size();
- std::unique_ptr<char[]> ubuf(new char[size + 1]);
+ auto ubuf =
+ AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type;
- BlockContents results(std::move(ubuf), size, true, type);
-
- Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber);
+ BlockContents* block_contents_to_cache =
+ new BlockContents(std::move(ubuf), size);
+#ifndef NDEBUG
+ block_contents_to_cache->is_raw_block = true;
+#endif // NDEBUG
// make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64(
- r->compressed_cache_key_prefix +
- r->compressed_cache_key_prefix_size,
- handle->offset());
- Slice key(r->compressed_cache_key_prefix, static_cast<size_t>
- (end - r->compressed_cache_key_prefix));
+ r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
+ handle->offset());
+ Slice key(r->compressed_cache_key_prefix,
+ static_cast<size_t>(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
- block_cache_compressed->Insert(key, block, block->ApproximateMemoryUsage(),
- &DeleteCachedBlock);
+ block_cache_compressed->Insert(
+ key, block_contents_to_cache,
+ block_contents_to_cache->ApproximateMemoryUsage(),
+ &DeleteCachedBlockContents);
// Invalidate OS cache.
r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
? rep_->ioptions.merge_operator->Name()
: "nullptr";
rep_->props.compression_name =
- CompressionTypeToString(rep_->compression_ctx.type());
+ CompressionTypeToString(rep_->compression_type);
+ rep_->props.compression_options =
+ CompressionOptionsToString(rep_->compression_opts);
rep_->props.prefix_extractor_name =
rep_->moptions.prefix_extractor != nullptr
? rep_->moptions.prefix_extractor->Name()
&properties_block_handle);
}
if (ok()) {
+#ifndef NDEBUG
+ {
+ uint64_t props_block_offset = properties_block_handle.offset();
+ uint64_t props_block_size = properties_block_handle.size();
+ TEST_SYNC_POINT_CALLBACK(
+ "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
+ &props_block_offset);
+ TEST_SYNC_POINT_CALLBACK(
+ "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
+ &props_block_size);
+ }
+#endif // !NDEBUG
meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
}
}
void BlockBasedTableBuilder::WriteCompressionDictBlock(
MetaIndexBuilder* meta_index_builder) {
- if (rep_->compression_dict && rep_->compression_dict->size()) {
+ if (rep_->compression_dict != nullptr &&
+ rep_->compression_dict->GetRawDict().size()) {
BlockHandle compression_dict_block_handle;
if (ok()) {
- WriteRawBlock(*rep_->compression_dict, kNoCompression,
+ WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
&compression_dict_block_handle);
+#ifndef NDEBUG
+ Slice compression_dict = rep_->compression_dict->GetRawDict();
+ TEST_SYNC_POINT_CALLBACK(
+ "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
+ &compression_dict);
+#endif // NDEBUG
}
if (ok()) {
meta_index_builder->Add(kCompressionDictBlock,
}
}
+void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
+ BlockHandle& index_block_handle) {
+ Rep* r = rep_;
+ // No need to write out new footer if we're using default checksum.
+ // We're writing legacy magic number because we want old versions of RocksDB
+ // be able to read files generated with new release (just in case if
+ // somebody wants to roll back after an upgrade)
+ // TODO(icanadi) at some point in the future, when we're absolutely sure
+ // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
+ // number and always write new table files with new magic number
+ bool legacy = (r->table_options.format_version == 0);
+ // this is guaranteed by BlockBasedTableBuilder's constructor
+ assert(r->table_options.checksum == kCRC32c ||
+ r->table_options.format_version != 0);
+ Footer footer(
+ legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
+ r->table_options.format_version);
+ footer.set_metaindex_handle(metaindex_block_handle);
+ footer.set_index_handle(index_block_handle);
+ footer.set_checksum(r->table_options.checksum);
+ std::string footer_encoding;
+ footer.EncodeTo(&footer_encoding);
+ assert(r->status.ok());
+ r->status = r->file->Append(footer_encoding);
+ if (r->status.ok()) {
+ r->offset += footer_encoding.size();
+ }
+}
+
+void BlockBasedTableBuilder::EnterUnbuffered() {
+ Rep* r = rep_;
+ assert(r->state == Rep::State::kBuffered);
+ r->state = Rep::State::kUnbuffered;
+ const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
+ ? r->compression_opts.zstd_max_train_bytes
+ : r->compression_opts.max_dict_bytes;
+ Random64 generator{r->creation_time};
+ std::string compression_dict_samples;
+ std::vector<size_t> compression_dict_sample_lens;
+ if (!r->data_block_and_keys_buffers.empty()) {
+ while (compression_dict_samples.size() < kSampleBytes) {
+ size_t rand_idx =
+ generator.Uniform(r->data_block_and_keys_buffers.size());
+ size_t copy_len =
+ std::min(kSampleBytes - compression_dict_samples.size(),
+ r->data_block_and_keys_buffers[rand_idx].first.size());
+ compression_dict_samples.append(
+ r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
+ compression_dict_sample_lens.emplace_back(copy_len);
+ }
+ }
+
+ // final data block flushed, now we can generate dictionary from the samples.
+ // OK if compression_dict_samples is empty, we'll just get empty dictionary.
+ std::string dict;
+ if (r->compression_opts.zstd_max_train_bytes > 0) {
+ dict = ZSTD_TrainDictionary(compression_dict_samples,
+ compression_dict_sample_lens,
+ r->compression_opts.max_dict_bytes);
+ } else {
+ dict = std::move(compression_dict_samples);
+ }
+ r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
+ r->compression_opts.level));
+ r->verify_dict.reset(new UncompressionDict(
+ dict, r->compression_type == kZSTD ||
+ r->compression_type == kZSTDNotFinalCompression));
+
+ for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
+ const auto& data_block = r->data_block_and_keys_buffers[i].first;
+ auto& keys = r->data_block_and_keys_buffers[i].second;
+ assert(!data_block.empty());
+ assert(!keys.empty());
+
+ for (const auto& key : keys) {
+ if (r->filter_builder != nullptr) {
+ r->filter_builder->Add(ExtractUserKey(key));
+ }
+ r->index_builder->OnKeyAdded(key);
+ }
+ WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
+ if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
+ Slice first_key_in_next_block =
+ r->data_block_and_keys_buffers[i + 1].second.front();
+ Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
+ r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
+ r->pending_handle);
+ }
+ }
+ r->data_block_and_keys_buffers.clear();
+}
+
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
+ assert(r->state != Rep::State::kClosed);
bool empty_data_block = r->data_block.empty();
Flush();
- assert(!r->closed);
- r->closed = true;
-
+ if (r->state == Rep::State::kBuffered) {
+ EnterUnbuffered();
+ }
// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries first.
if (ok() && !empty_data_block) {
&r->last_key, nullptr /* no next data block */, r->pending_handle);
}
- // Write meta blocks and metaindex block with the following order.
+ // Write meta blocks, metaindex block and footer in the following order.
// 1. [meta block: filter]
// 2. [meta block: index]
// 3. [meta block: compression dictionary]
// 4. [meta block: range deletion tombstone]
// 5. [meta block: properties]
// 6. [metaindex block]
+ // 7. Footer
BlockHandle metaindex_block_handle, index_block_handle;
MetaIndexBuilder meta_index_builder;
WriteFilterBlock(&meta_index_builder);
WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
&metaindex_block_handle);
}
-
- // Write footer
if (ok()) {
- // No need to write out new footer if we're using default checksum.
- // We're writing legacy magic number because we want old versions of RocksDB
- // be able to read files generated with new release (just in case if
- // somebody wants to roll back after an upgrade)
- // TODO(icanadi) at some point in the future, when we're absolutely sure
- // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
- // number and always write new table files with new magic number
- bool legacy = (r->table_options.format_version == 0);
- // this is guaranteed by BlockBasedTableBuilder's constructor
- assert(r->table_options.checksum == kCRC32c ||
- r->table_options.format_version != 0);
- Footer footer(legacy ? kLegacyBlockBasedTableMagicNumber
- : kBlockBasedTableMagicNumber,
- r->table_options.format_version);
- footer.set_metaindex_handle(metaindex_block_handle);
- footer.set_index_handle(index_block_handle);
- footer.set_checksum(r->table_options.checksum);
- std::string footer_encoding;
- footer.EncodeTo(&footer_encoding);
- assert(r->status.ok());
- r->status = r->file->Append(footer_encoding);
- if (r->status.ok()) {
- r->offset += footer_encoding.size();
- }
+ WriteFooter(metaindex_block_handle, index_block_handle);
}
-
+ r->state = Rep::State::kClosed;
return r->status;
}
void BlockBasedTableBuilder::Abandon() {
- Rep* r = rep_;
- assert(!r->closed);
- r->closed = true;
+ assert(rep_->state != Rep::State::kClosed);
+ rep_->state = Rep::State::kClosed;
}
uint64_t BlockBasedTableBuilder::NumEntries() const {
return rep_->props.num_entries;
}
-uint64_t BlockBasedTableBuilder::FileSize() const {
- return rep_->offset;
-}
+uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
bool BlockBasedTableBuilder::NeedCompact() const {
for (const auto& collector : rep_->table_properties_collectors) {