#include <assert.h>
#include <stdio.h>
+#include <atomic>
#include <list>
#include <map>
#include <memory>
#include "util/crc32c.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
+#include "util/work_queue.h"
#include "util/xxhash.h"
namespace ROCKSDB_NAMESPACE {
extern const std::string kHashIndexPrefixesBlock;
extern const std::string kHashIndexPrefixesMetadataBlock;
-typedef BlockBasedTableOptions::IndexType IndexType;
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
return compressed_size < raw_size - (raw_size / 8u);
}
-bool CompressBlockInternal(const Slice& raw,
- const CompressionInfo& compression_info,
- uint32_t format_version,
- std::string* compressed_output) {
- // Will return compressed block contents if (1) the compression method is
- // supported in this platform and (2) the compression rate is "good enough".
- switch (compression_info.type()) {
- case kSnappyCompression:
- return Snappy_Compress(compression_info, raw.data(), raw.size(),
- compressed_output);
- case kZlibCompression:
- return Zlib_Compress(
- compression_info,
- GetCompressFormatForVersion(kZlibCompression, format_version),
- raw.data(), raw.size(), compressed_output);
- case kBZip2Compression:
- return BZip2_Compress(
- compression_info,
- GetCompressFormatForVersion(kBZip2Compression, format_version),
- raw.data(), raw.size(), compressed_output);
- case kLZ4Compression:
- return LZ4_Compress(
- compression_info,
- GetCompressFormatForVersion(kLZ4Compression, format_version),
- raw.data(), raw.size(), compressed_output);
- case kLZ4HCCompression:
- return LZ4HC_Compress(
- compression_info,
- GetCompressFormatForVersion(kLZ4HCCompression, format_version),
- raw.data(), raw.size(), compressed_output);
- case kXpressCompression:
- return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
- case kZSTD:
- case kZSTDNotFinalCompression:
- return ZSTD_Compress(compression_info, raw.data(), raw.size(),
- compressed_output);
- default:
- // Do not recognize this compression type
- return false;
- }
-}
-
} // namespace
// format_version is the block format as defined in include/rocksdb/table.h
bool do_sample, std::string* compressed_output,
std::string* sampled_output_fast,
std::string* sampled_output_slow) {
- *type = info.type();
-
- if (info.type() == kNoCompression && !info.SampleForCompression()) {
- return raw;
- }
+ assert(type);
+ assert(compressed_output);
+ assert(compressed_output->empty());
// If requested, we sample one in every N block with a
// fast and slow compression algorithm and report the stats.
// enabling compression and they also get a hint about which
// compression algorithm wil be beneficial.
if (do_sample && info.SampleForCompression() &&
- Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
- sampled_output_fast && sampled_output_slow) {
+ Random::GetTLSInstance()->OneIn(
+ static_cast<int>(info.SampleForCompression()))) {
// Sampling with a fast compression algorithm
- if (LZ4_Supported() || Snappy_Supported()) {
+ if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
CompressionType c =
LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
CompressionContext context(c);
CompressionDict::GetEmptyDict(), c,
info.SampleForCompression());
- CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
+ CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
+ sampled_output_fast);
}
// Sampling with a slow but high-compression algorithm
- if (ZSTD_Supported() || Zlib_Supported()) {
+ if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
CompressionContext context(c);
CompressionOptions options;
CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c,
info.SampleForCompression());
- CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
+
+ CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version),
+ sampled_output_slow);
}
}
- // Actually compress the data
- if (*type != kNoCompression) {
- if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
- GoodCompressionRatio(compressed_output->size(), raw.size())) {
- return *compressed_output;
- }
+ if (info.type() == kNoCompression) {
+ *type = kNoCompression;
+ return raw;
}
- // Compression method is not supported, or not good
- // compression ratio, so just fall back to uncompressed form.
- *type = kNoCompression;
- return raw;
+ // Actually compress the data; if the compression method is not supported,
+ // or the compression fails etc., just fall back to uncompressed
+ if (!CompressData(raw, info, GetCompressFormatForVersion(format_version),
+ compressed_output)) {
+ *type = kNoCompression;
+ return raw;
+ }
+
+ // Check the compression ratio; if it's not good enough, just fall back to
+ // uncompressed
+ if (!GoodCompressionRatio(compressed_output->size(), raw.size())) {
+ *type = kNoCompression;
+ return raw;
+ }
+
+ *type = info.type();
+ return *compressed_output;
}
// kBlockBasedTableMagicNumber was picked by running
const BlockBasedTableOptions table_options;
const InternalKeyComparator& internal_comparator;
WritableFileWriter* file;
- uint64_t offset = 0;
- Status status;
+ std::atomic<uint64_t> offset;
size_t alignment;
BlockBuilder data_block;
// Buffers uncompressed data blocks and keys to replay later. Needed when
PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_key;
+ const Slice* first_key_in_next_block = nullptr;
CompressionType compression_type;
uint64_t sample_for_compression;
CompressionOptions compression_opts;
std::unique_ptr<CompressionDict> compression_dict;
- CompressionContext compression_ctx;
- std::unique_ptr<UncompressionContext> verify_ctx;
+ std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
+ std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
std::unique_ptr<UncompressionDict> verify_dict;
size_t data_begin_offset = 0;
const uint64_t target_file_size;
uint64_t file_creation_time = 0;
+ // DB IDs
+ const std::string db_id;
+ const std::string db_session_id;
+ std::string db_host_id;
+
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
+ std::unique_ptr<ParallelCompressionRep> pc_rep;
+
+ uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
+ void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
+
+ bool IsParallelCompressionEnabled() const {
+ return compression_opts.parallel_threads > 1;
+ }
+
+ Status GetStatus() {
+ // We need to make modifications of status visible when status_ok is set
+ // to false, and this is ensured by status_mutex, so no special memory
+ // order for status_ok is required.
+ if (status_ok.load(std::memory_order_relaxed)) {
+ return Status::OK();
+ } else {
+ return CopyStatus();
+ }
+ }
+
+ Status CopyStatus() {
+ std::lock_guard<std::mutex> lock(status_mutex);
+ return status;
+ }
+
+ IOStatus GetIOStatus() {
+ // We need to make modifications of io_status visible when status_ok is set
+ // to false, and this is ensured by io_status_mutex, so no special memory
+ // order for io_status_ok is required.
+ if (io_status_ok.load(std::memory_order_relaxed)) {
+ return IOStatus::OK();
+ } else {
+ return CopyIOStatus();
+ }
+ }
+
+ IOStatus CopyIOStatus() {
+ std::lock_guard<std::mutex> lock(io_status_mutex);
+ return io_status;
+ }
+
+ // Never erase an existing status that is not OK.
+ void SetStatus(Status s) {
+ if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
+ // Locking is an overkill for non compression_opts.parallel_threads
+ // case but since it's unlikely that s is not OK, we take this cost
+ // to be simplicity.
+ std::lock_guard<std::mutex> lock(status_mutex);
+ status = s;
+ status_ok.store(false, std::memory_order_relaxed);
+ }
+ }
+
+ // Never erase an existing I/O status that is not OK.
+ void SetIOStatus(IOStatus ios) {
+ if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
+ // Locking is an overkill for non compression_opts.parallel_threads
+ // case but since it's unlikely that s is not OK, we take this cost
+ // to be simplicity.
+ std::lock_guard<std::mutex> lock(io_status_mutex);
+ io_status = ios;
+ io_status_ok.store(false, std::memory_order_relaxed);
+ }
+ }
+
Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
const BlockBasedTableOptions& table_opt,
const InternalKeyComparator& icomparator,
const CompressionOptions& _compression_opts, const bool skip_filters,
const int _level_at_creation, const std::string& _column_family_name,
const uint64_t _creation_time, const uint64_t _oldest_key_time,
- const uint64_t _target_file_size, const uint64_t _file_creation_time)
+ const uint64_t _target_file_size, const uint64_t _file_creation_time,
+ const std::string& _db_id, const std::string& _db_session_id)
: ioptions(_ioptions),
moptions(_moptions),
table_options(table_opt),
internal_comparator(icomparator),
file(f),
+ offset(0),
alignment(table_options.block_align
? std::min(table_options.block_size, kDefaultPageSize)
: 0),
sample_for_compression(_sample_for_compression),
compression_opts(_compression_opts),
compression_dict(),
- compression_ctx(_compression_type),
+ compression_ctxs(_compression_opts.parallel_threads),
+ verify_ctxs(_compression_opts.parallel_threads),
verify_dict(),
state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
: State::kUnbuffered),
creation_time(_creation_time),
oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size),
- file_creation_time(_file_creation_time) {
+ file_creation_time(_file_creation_time),
+ db_id(_db_id),
+ db_session_id(_db_session_id),
+ db_host_id(ioptions.db_host_id),
+ status_ok(true),
+ io_status_ok(true) {
+ for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
+ compression_ctxs[i].reset(new CompressionContext(compression_type));
+ }
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
table_options.index_type, table_options.whole_key_filtering,
_moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) {
- verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
- compression_type));
+ for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
+ verify_ctxs[i].reset(new UncompressionContext(compression_type));
+ }
+ }
+
+ if (!ReifyDbHostIdProperty(ioptions.env, &db_host_id).ok()) {
+ ROCKS_LOG_INFO(ioptions.info_log, "db_host_id property will not be set");
}
}
Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete;
- ~Rep() {}
+ private:
+ // Synchronize status & io_status accesses across threads from main thread,
+ // compression thread and write thread in parallel compression.
+ std::mutex status_mutex;
+ std::atomic<bool> status_ok;
+ Status status;
+ std::mutex io_status_mutex;
+ std::atomic<bool> io_status_ok;
+ IOStatus io_status;
+};
+
+struct BlockBasedTableBuilder::ParallelCompressionRep {
+ // Keys is a wrapper of vector of strings avoiding
+ // releasing string memories during vector clear()
+ // in order to save memory allocation overhead
+ class Keys {
+ public:
+ Keys() : keys_(kKeysInitSize), size_(0) {}
+ void PushBack(const Slice& key) {
+ if (size_ == keys_.size()) {
+ keys_.emplace_back(key.data(), key.size());
+ } else {
+ keys_[size_].assign(key.data(), key.size());
+ }
+ size_++;
+ }
+ void SwapAssign(std::vector<std::string>& keys) {
+ size_ = keys.size();
+ std::swap(keys_, keys);
+ }
+ void Clear() { size_ = 0; }
+ size_t Size() { return size_; }
+ std::string& Back() { return keys_[size_ - 1]; }
+ std::string& operator[](size_t idx) {
+ assert(idx < size_);
+ return keys_[idx];
+ }
+
+ private:
+ const size_t kKeysInitSize = 32;
+ std::vector<std::string> keys_;
+ size_t size_;
+ };
+ std::unique_ptr<Keys> curr_block_keys;
+
+ class BlockRepSlot;
+
+ // BlockRep instances are fetched from and recycled to
+ // block_rep_pool during parallel compression.
+ struct BlockRep {
+ Slice contents;
+ Slice compressed_contents;
+ std::unique_ptr<std::string> data;
+ std::unique_ptr<std::string> compressed_data;
+ CompressionType compression_type;
+ std::unique_ptr<std::string> first_key_in_next_block;
+ std::unique_ptr<Keys> keys;
+ std::unique_ptr<BlockRepSlot> slot;
+ Status status;
+ };
+ // Use a vector of BlockRep as a buffer for a determined number
+ // of BlockRep structures. All data referenced by pointers in
+ // BlockRep will be freed when this vector is destructed.
+ typedef std::vector<BlockRep> BlockRepBuffer;
+ BlockRepBuffer block_rep_buf;
+ // Use a thread-safe queue for concurrent access from block
+ // building thread and writer thread.
+ typedef WorkQueue<BlockRep*> BlockRepPool;
+ BlockRepPool block_rep_pool;
+
+ // Use BlockRepSlot to keep block order in write thread.
+ // slot_ will pass references to BlockRep
+ class BlockRepSlot {
+ public:
+ BlockRepSlot() : slot_(1) {}
+ template <typename T>
+ void Fill(T&& rep) {
+ slot_.push(std::forward<T>(rep));
+ };
+ void Take(BlockRep*& rep) { slot_.pop(rep); }
+
+ private:
+ // slot_ will pass references to BlockRep in block_rep_buf,
+ // and those references are always valid before the destruction of
+ // block_rep_buf.
+ WorkQueue<BlockRep*> slot_;
+ };
+
+ // Compression queue will pass references to BlockRep in block_rep_buf,
+ // and those references are always valid before the destruction of
+ // block_rep_buf.
+ typedef WorkQueue<BlockRep*> CompressQueue;
+ CompressQueue compress_queue;
+ std::vector<port::Thread> compress_thread_pool;
+
+ // Write queue will pass references to BlockRep::slot in block_rep_buf,
+ // and those references are always valid before the corresponding
+ // BlockRep::slot is destructed, which is before the destruction of
+ // block_rep_buf.
+ typedef WorkQueue<BlockRepSlot*> WriteQueue;
+ WriteQueue write_queue;
+ std::unique_ptr<port::Thread> write_thread;
+
+ // Estimate output file size when parallel compression is enabled. This is
+ // necessary because compression & flush are no longer synchronized,
+ // and BlockBasedTableBuilder::FileSize() is no longer accurate.
+ // memory_order_relaxed suffices because accurate statistics is not required.
+ class FileSizeEstimator {
+ public:
+ explicit FileSizeEstimator()
+ : raw_bytes_compressed(0),
+ raw_bytes_curr_block(0),
+ raw_bytes_curr_block_set(false),
+ raw_bytes_inflight(0),
+ blocks_inflight(0),
+ curr_compression_ratio(0),
+ estimated_file_size(0) {}
+
+ // Estimate file size when a block is about to be emitted to
+ // compression thread
+ void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
+ uint64_t new_raw_bytes_inflight =
+ raw_bytes_inflight.fetch_add(raw_block_size,
+ std::memory_order_relaxed) +
+ raw_block_size;
+
+ uint64_t new_blocks_inflight =
+ blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
+
+ estimated_file_size.store(
+ curr_file_size +
+ static_cast<uint64_t>(
+ static_cast<double>(new_raw_bytes_inflight) *
+ curr_compression_ratio.load(std::memory_order_relaxed)) +
+ new_blocks_inflight * kBlockTrailerSize,
+ std::memory_order_relaxed);
+ }
+
+ // Estimate file size when a block is already reaped from
+ // compression thread
+ void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
+ assert(raw_bytes_curr_block_set);
+
+ uint64_t new_raw_bytes_compressed =
+ raw_bytes_compressed + raw_bytes_curr_block;
+ assert(new_raw_bytes_compressed > 0);
+
+ curr_compression_ratio.store(
+ (curr_compression_ratio.load(std::memory_order_relaxed) *
+ raw_bytes_compressed +
+ compressed_block_size) /
+ static_cast<double>(new_raw_bytes_compressed),
+ std::memory_order_relaxed);
+ raw_bytes_compressed = new_raw_bytes_compressed;
+
+ uint64_t new_raw_bytes_inflight =
+ raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
+ std::memory_order_relaxed) -
+ raw_bytes_curr_block;
+
+ uint64_t new_blocks_inflight =
+ blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
+
+ estimated_file_size.store(
+ curr_file_size +
+ static_cast<uint64_t>(
+ static_cast<double>(new_raw_bytes_inflight) *
+ curr_compression_ratio.load(std::memory_order_relaxed)) +
+ new_blocks_inflight * kBlockTrailerSize,
+ std::memory_order_relaxed);
+
+ raw_bytes_curr_block_set = false;
+ }
+
+ void SetEstimatedFileSize(uint64_t size) {
+ estimated_file_size.store(size, std::memory_order_relaxed);
+ }
+
+ uint64_t GetEstimatedFileSize() {
+ return estimated_file_size.load(std::memory_order_relaxed);
+ }
+
+ void SetCurrBlockRawSize(uint64_t size) {
+ raw_bytes_curr_block = size;
+ raw_bytes_curr_block_set = true;
+ }
+
+ private:
+ // Raw bytes compressed so far.
+ uint64_t raw_bytes_compressed;
+ // Size of current block being appended.
+ uint64_t raw_bytes_curr_block;
+ // Whether raw_bytes_curr_block has been set for next
+ // ReapBlock call.
+ bool raw_bytes_curr_block_set;
+ // Raw bytes under compression and not appended yet.
+ std::atomic<uint64_t> raw_bytes_inflight;
+ // Number of blocks under compression and not appended yet.
+ std::atomic<uint64_t> blocks_inflight;
+ // Current compression ratio, maintained by BGWorkWriteRawBlock.
+ std::atomic<double> curr_compression_ratio;
+ // Estimated SST file size.
+ std::atomic<uint64_t> estimated_file_size;
+ };
+ FileSizeEstimator file_size_estimator;
+
+ // Facilities used for waiting first block completion. Need to Wait for
+ // the completion of first block compression and flush to get a non-zero
+ // compression ratio.
+ std::atomic<bool> first_block_processed;
+ std::condition_variable first_block_cond;
+ std::mutex first_block_mutex;
+
+ explicit ParallelCompressionRep(uint32_t parallel_threads)
+ : curr_block_keys(new Keys()),
+ block_rep_buf(parallel_threads),
+ block_rep_pool(parallel_threads),
+ compress_queue(parallel_threads),
+ write_queue(parallel_threads),
+ first_block_processed(false) {
+ for (uint32_t i = 0; i < parallel_threads; i++) {
+ block_rep_buf[i].contents = Slice();
+ block_rep_buf[i].compressed_contents = Slice();
+ block_rep_buf[i].data.reset(new std::string());
+ block_rep_buf[i].compressed_data.reset(new std::string());
+ block_rep_buf[i].compression_type = CompressionType();
+ block_rep_buf[i].first_key_in_next_block.reset(new std::string());
+ block_rep_buf[i].keys.reset(new Keys());
+ block_rep_buf[i].slot.reset(new BlockRepSlot());
+ block_rep_buf[i].status = Status::OK();
+ block_rep_pool.push(&block_rep_buf[i]);
+ }
+ }
+
+ ~ParallelCompressionRep() { block_rep_pool.finish(); }
+
+ // Make a block prepared to be emitted to compression thread
+ // Used in non-buffered mode
+ BlockRep* PrepareBlock(CompressionType compression_type,
+ const Slice* first_key_in_next_block,
+ BlockBuilder* data_block) {
+ BlockRep* block_rep =
+ PrepareBlockInternal(compression_type, first_key_in_next_block);
+ assert(block_rep != nullptr);
+ data_block->SwapAndReset(*(block_rep->data));
+ block_rep->contents = *(block_rep->data);
+ std::swap(block_rep->keys, curr_block_keys);
+ curr_block_keys->Clear();
+ return block_rep;
+ }
+
+ // Used in EnterUnbuffered
+ BlockRep* PrepareBlock(CompressionType compression_type,
+ const Slice* first_key_in_next_block,
+ std::string* data_block,
+ std::vector<std::string>* keys) {
+ BlockRep* block_rep =
+ PrepareBlockInternal(compression_type, first_key_in_next_block);
+ assert(block_rep != nullptr);
+ std::swap(*(block_rep->data), *data_block);
+ block_rep->contents = *(block_rep->data);
+ block_rep->keys->SwapAssign(*keys);
+ return block_rep;
+ }
+
+ // Emit a block to compression thread
+ void EmitBlock(BlockRep* block_rep) {
+ assert(block_rep != nullptr);
+ assert(block_rep->status.ok());
+ if (!write_queue.push(block_rep->slot.get())) {
+ return;
+ }
+ if (!compress_queue.push(block_rep)) {
+ return;
+ }
+
+ if (!first_block_processed.load(std::memory_order_relaxed)) {
+ std::unique_lock<std::mutex> lock(first_block_mutex);
+ first_block_cond.wait(lock, [this] {
+ return first_block_processed.load(std::memory_order_relaxed);
+ });
+ }
+ }
+
+ // Reap a block from compression thread
+ void ReapBlock(BlockRep* block_rep) {
+ assert(block_rep != nullptr);
+ block_rep->compressed_data->clear();
+ block_rep_pool.push(block_rep);
+
+ if (!first_block_processed.load(std::memory_order_relaxed)) {
+ std::lock_guard<std::mutex> lock(first_block_mutex);
+ first_block_processed.store(true, std::memory_order_relaxed);
+ first_block_cond.notify_one();
+ }
+ }
+
+ private:
+ BlockRep* PrepareBlockInternal(CompressionType compression_type,
+ const Slice* first_key_in_next_block) {
+ BlockRep* block_rep = nullptr;
+ block_rep_pool.pop(block_rep);
+ assert(block_rep != nullptr);
+
+ assert(block_rep->data);
+
+ block_rep->compression_type = compression_type;
+
+ if (first_key_in_next_block == nullptr) {
+ block_rep->first_key_in_next_block.reset(nullptr);
+ } else {
+ block_rep->first_key_in_next_block->assign(
+ first_key_in_next_block->data(), first_key_in_next_block->size());
+ }
+
+ return block_rep;
+ }
};
BlockBasedTableBuilder::BlockBasedTableBuilder(
const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const int level_at_creation,
const uint64_t creation_time, const uint64_t oldest_key_time,
- const uint64_t target_file_size, const uint64_t file_creation_time) {
+ const uint64_t target_file_size, const uint64_t file_creation_time,
+ const std::string& db_id, const std::string& db_session_id) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
sanitized_table_options.format_version = 1;
}
- rep_ = new Rep(ioptions, moptions, sanitized_table_options,
- internal_comparator, int_tbl_prop_collector_factories,
- column_family_id, file, compression_type,
- sample_for_compression, compression_opts, skip_filters,
- level_at_creation, column_family_name, creation_time,
- oldest_key_time, target_file_size, file_creation_time);
+ rep_ = new Rep(
+ ioptions, moptions, sanitized_table_options, internal_comparator,
+ int_tbl_prop_collector_factories, column_family_id, file,
+ compression_type, sample_for_compression, compression_opts, skip_filters,
+ level_at_creation, column_family_name, creation_time, oldest_key_time,
+ target_file_size, file_creation_time, db_id, db_session_id);
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
}
if (table_options.block_cache_compressed.get() != nullptr) {
- BlockBasedTable::GenerateCachePrefix(
+ BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
table_options.block_cache_compressed.get(), file->writable_file(),
&rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size);
}
+
+ if (rep_->IsParallelCompressionEnabled()) {
+ StartParallelCompression();
+ }
}
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
if (r->props.num_entries > r->props.num_range_deletions) {
assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
}
-#endif // NDEBUG
+#endif // !NDEBUG
auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
+ r->first_key_in_next_block = &key;
Flush();
- if (r->state == Rep::State::kBuffered &&
+ if (r->state == Rep::State::kBuffered && r->target_file_size != 0 &&
r->data_begin_offset > r->target_file_size) {
EnterUnbuffered();
}
// entries in the first block and < all entries in subsequent
// blocks.
if (ok() && r->state == Rep::State::kUnbuffered) {
- r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
+ if (r->IsParallelCompressionEnabled()) {
+ r->pc_rep->curr_block_keys->Clear();
+ } else {
+ r->index_builder->AddIndexEntry(&r->last_key, &key,
+ r->pending_handle);
+ }
}
}
// Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder.
- if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
- size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size();
- r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+ if (r->state == Rep::State::kUnbuffered) {
+ if (r->IsParallelCompressionEnabled()) {
+ r->pc_rep->curr_block_keys->PushBack(key);
+ } else {
+ if (r->filter_builder != nullptr) {
+ size_t ts_sz =
+ r->internal_comparator.user_comparator()->timestamp_size();
+ r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+ }
+ }
}
r->last_key.assign(key.data(), key.size());
}
r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
} else {
- r->index_builder->OnKeyAdded(key);
+ if (!r->IsParallelCompressionEnabled()) {
+ r->index_builder->OnKeyAdded(key);
+ }
}
- NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
+ // TODO offset passed in is not accurate for parallel compression case
+ NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.info_log);
} else if (value_type == kTypeRangeDeletion) {
r->range_del_block.Add(key, value);
- NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
+ // TODO offset passed in is not accurate for parallel compression case
+ NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.info_log);
} else {
assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
if (r->data_block.empty()) return;
- WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
+ if (r->IsParallelCompressionEnabled() &&
+ r->state == Rep::State::kUnbuffered) {
+ r->data_block.Finish();
+ ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
+ r->compression_type, r->first_key_in_next_block, &(r->data_block));
+ assert(block_rep != nullptr);
+ r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
+ r->get_offset());
+ r->pc_rep->EmitBlock(block_rep);
+ } else {
+ WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
+ }
}
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle,
bool is_data_block) {
+ Rep* r = rep_;
+ Slice block_contents;
+ CompressionType type;
+ if (r->state == Rep::State::kBuffered) {
+ assert(is_data_block);
+ assert(!r->data_block_and_keys_buffers.empty());
+ r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
+ r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
+ return;
+ }
+ Status compress_status;
+ CompressAndVerifyBlock(raw_block_contents, is_data_block,
+ *(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
+ &(r->compressed_output), &(block_contents), &type,
+ &compress_status);
+ r->SetStatus(compress_status);
+ if (!ok()) {
+ return;
+ }
+ WriteRawBlock(block_contents, type, handle, is_data_block);
+ r->compressed_output.clear();
+ if (is_data_block) {
+ if (r->filter_builder != nullptr) {
+ r->filter_builder->StartBlock(r->get_offset());
+ }
+ r->props.data_size = r->get_offset();
+ ++r->props.num_data_blocks;
+ }
+}
+
+void BlockBasedTableBuilder::BGWorkCompression(
+ const CompressionContext& compression_ctx,
+ UncompressionContext* verify_ctx) {
+ ParallelCompressionRep::BlockRep* block_rep = nullptr;
+ while (rep_->pc_rep->compress_queue.pop(block_rep)) {
+ assert(block_rep != nullptr);
+ CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
+ compression_ctx, verify_ctx,
+ block_rep->compressed_data.get(),
+ &block_rep->compressed_contents,
+ &(block_rep->compression_type), &block_rep->status);
+ block_rep->slot->Fill(block_rep);
+ }
+}
+
+void BlockBasedTableBuilder::CompressAndVerifyBlock(
+ const Slice& raw_block_contents, bool is_data_block,
+ const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
+ std::string* compressed_output, Slice* block_contents,
+ CompressionType* type, Status* out_status) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
- assert(ok());
Rep* r = rep_;
+ bool is_status_ok = ok();
+ if (!r->IsParallelCompressionEnabled()) {
+ assert(is_status_ok);
+ }
- auto type = r->compression_type;
+ *type = r->compression_type;
uint64_t sample_for_compression = r->sample_for_compression;
- Slice block_contents;
bool abort_compression = false;
StopWatchNano timer(
r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
- if (r->state == Rep::State::kBuffered) {
- assert(is_data_block);
- assert(!r->data_block_and_keys_buffers.empty());
- r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
- r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
- return;
- }
-
- if (raw_block_contents.size() < kCompressionSizeLimit) {
+ if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
const CompressionDict* compression_dict;
if (!is_data_block || r->compression_dict == nullptr) {
compression_dict = &CompressionDict::GetEmptyDict();
compression_dict = r->compression_dict.get();
}
assert(compression_dict != nullptr);
- CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
- *compression_dict, type,
+ CompressionInfo compression_info(r->compression_opts, compression_ctx,
+ *compression_dict, *type,
sample_for_compression);
std::string sampled_output_fast;
std::string sampled_output_slow;
- block_contents = CompressBlock(
- raw_block_contents, compression_info, &type,
+ *block_contents = CompressBlock(
+ raw_block_contents, compression_info, type,
r->table_options.format_version, is_data_block /* do_sample */,
- &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
+ compressed_output, &sampled_output_fast, &sampled_output_slow);
// notify collectors on block add
NotifyCollectTableCollectorsOnBlockAdd(
// Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the
// compressed data and compare to the input.
- if (type != kNoCompression && r->table_options.verify_compression) {
+ if (*type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer
const UncompressionDict* verify_dict;
if (!is_data_block || r->verify_dict == nullptr) {
}
assert(verify_dict != nullptr);
BlockContents contents;
- UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
+ UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
r->compression_type);
Status stat = UncompressBlockContentsForCompressionType(
- uncompression_info, block_contents.data(), block_contents.size(),
+ uncompression_info, block_contents->data(), block_contents->size(),
&contents, r->table_options.format_version, r->ioptions);
if (stat.ok()) {
abort_compression = true;
ROCKS_LOG_ERROR(r->ioptions.info_log,
"Decompressed block did not match raw block");
- r->status =
+ *out_status =
Status::Corruption("Decompressed block did not match raw block");
}
} else {
// Decompression reported an error. abort.
- r->status = Status::Corruption("Could not decompress");
+ *out_status = Status::Corruption(std::string("Could not decompress: ") +
+ stat.getState());
abort_compression = true;
}
}
// verification.
if (abort_compression) {
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
- type = kNoCompression;
- block_contents = raw_block_contents;
- } else if (type != kNoCompression) {
+ *type = kNoCompression;
+ *block_contents = raw_block_contents;
+ } else if (*type != kNoCompression) {
if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
raw_block_contents.size());
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
- } else if (type != r->compression_type) {
+ } else if (*type != r->compression_type) {
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
}
-
- WriteRawBlock(block_contents, type, handle, is_data_block);
- r->compressed_output.clear();
- if (is_data_block) {
- if (r->filter_builder != nullptr) {
- r->filter_builder->StartBlock(r->offset);
- }
- r->props.data_size = r->offset;
- ++r->props.num_data_blocks;
- }
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
BlockHandle* handle,
bool is_data_block) {
Rep* r = rep_;
+ Status s = Status::OK();
+ IOStatus io_s = IOStatus::OK();
StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
- handle->set_offset(r->offset);
+ handle->set_offset(r->get_offset());
handle->set_size(block_contents.size());
- assert(r->status.ok());
- r->status = r->file->Append(block_contents);
- if (r->status.ok()) {
+ assert(status().ok());
+ assert(io_status().ok());
+ io_s = r->file->Append(block_contents);
+ if (io_s.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
- char* trailer_without_type = trailer + 1;
+ uint32_t checksum = 0;
switch (r->table_options.checksum) {
case kNoChecksum:
- EncodeFixed32(trailer_without_type, 0);
break;
case kCRC32c: {
- auto crc = crc32c::Value(block_contents.data(), block_contents.size());
- crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
- EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
+ uint32_t crc =
+ crc32c::Value(block_contents.data(), block_contents.size());
+ // Extend to cover compression type
+ crc = crc32c::Extend(crc, trailer, 1);
+ checksum = crc32c::Mask(crc);
break;
}
case kxxHash: {
XXH32_state_t* const state = XXH32_createState();
XXH32_reset(state, 0);
- XXH32_update(state, block_contents.data(),
- static_cast<uint32_t>(block_contents.size()));
- XXH32_update(state, trailer, 1); // Extend to cover block type
- EncodeFixed32(trailer_without_type, XXH32_digest(state));
+ XXH32_update(state, block_contents.data(), block_contents.size());
+ // Extend to cover compression type
+ XXH32_update(state, trailer, 1);
+ checksum = XXH32_digest(state);
XXH32_freeState(state);
break;
}
case kxxHash64: {
XXH64_state_t* const state = XXH64_createState();
XXH64_reset(state, 0);
- XXH64_update(state, block_contents.data(),
- static_cast<uint32_t>(block_contents.size()));
- XXH64_update(state, trailer, 1); // Extend to cover block type
- EncodeFixed32(
- trailer_without_type,
- static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
- uint64_t{0xffffffff}));
+ XXH64_update(state, block_contents.data(), block_contents.size());
+ // Extend to cover compression type
+ XXH64_update(state, trailer, 1);
+ checksum = Lower32of64(XXH64_digest(state));
XXH64_freeState(state);
break;
}
+ default:
+ assert(false);
+ break;
}
-
- assert(r->status.ok());
+ EncodeFixed32(trailer + 1, checksum);
+ assert(io_s.ok());
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
static_cast<char*>(trailer));
- r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
- if (r->status.ok()) {
- r->status = InsertBlockInCache(block_contents, type, handle);
+ io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
+ if (io_s.ok()) {
+ assert(s.ok());
+ s = InsertBlockInCache(block_contents, type, handle);
+ if (!s.ok()) {
+ r->SetStatus(s);
+ }
+ } else {
+ r->SetIOStatus(io_s);
}
- if (r->status.ok()) {
- r->offset += block_contents.size() + kBlockTrailerSize;
+ if (s.ok() && io_s.ok()) {
+ r->set_offset(r->get_offset() + block_contents.size() +
+ kBlockTrailerSize);
if (r->table_options.block_align && is_data_block) {
size_t pad_bytes =
(r->alignment - ((block_contents.size() + kBlockTrailerSize) &
(r->alignment - 1))) &
(r->alignment - 1);
- r->status = r->file->Pad(pad_bytes);
- if (r->status.ok()) {
- r->offset += pad_bytes;
+ io_s = r->file->Pad(pad_bytes);
+ if (io_s.ok()) {
+ r->set_offset(r->get_offset() + pad_bytes);
+ } else {
+ r->SetIOStatus(io_s);
+ }
+ }
+ if (r->IsParallelCompressionEnabled()) {
+ if (is_data_block) {
+ r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
+ r->get_offset());
+ } else {
+ r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
}
}
}
+ } else {
+ r->SetIOStatus(io_s);
+ }
+ if (!io_s.ok() && s.ok()) {
+ r->SetStatus(io_s);
}
}
-Status BlockBasedTableBuilder::status() const { return rep_->status; }
+void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
+ Rep* r = rep_;
+ ParallelCompressionRep::BlockRepSlot* slot = nullptr;
+ ParallelCompressionRep::BlockRep* block_rep = nullptr;
+ while (r->pc_rep->write_queue.pop(slot)) {
+ assert(slot != nullptr);
+ slot->Take(block_rep);
+ assert(block_rep != nullptr);
+ if (!block_rep->status.ok()) {
+ r->SetStatus(block_rep->status);
+ // Reap block so that blocked Flush() can finish
+ // if there is one, and Flush() will notice !ok() next time.
+ block_rep->status = Status::OK();
+ r->pc_rep->ReapBlock(block_rep);
+ continue;
+ }
+
+ for (size_t i = 0; i < block_rep->keys->Size(); i++) {
+ auto& key = (*block_rep->keys)[i];
+ if (r->filter_builder != nullptr) {
+ size_t ts_sz =
+ r->internal_comparator.user_comparator()->timestamp_size();
+ r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+ }
+ r->index_builder->OnKeyAdded(key);
+ }
+
+ r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
+ WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
+ &r->pending_handle, true /* is_data_block*/);
+ if (!ok()) {
+ break;
+ }
+
+ if (r->filter_builder != nullptr) {
+ r->filter_builder->StartBlock(r->get_offset());
+ }
+ r->props.data_size = r->get_offset();
+ ++r->props.num_data_blocks;
+
+ if (block_rep->first_key_in_next_block == nullptr) {
+ r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
+ r->pending_handle);
+ } else {
+ Slice first_key_in_next_block =
+ Slice(*block_rep->first_key_in_next_block);
+ r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
+ &first_key_in_next_block,
+ r->pending_handle);
+ }
+
+ r->pc_rep->ReapBlock(block_rep);
+ }
+}
+
+void BlockBasedTableBuilder::StartParallelCompression() {
+ rep_->pc_rep.reset(
+ new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
+ rep_->pc_rep->compress_thread_pool.reserve(
+ rep_->compression_opts.parallel_threads);
+ for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
+ rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
+ BGWorkCompression(*(rep_->compression_ctxs[i]),
+ rep_->verify_ctxs[i].get());
+ });
+ }
+ rep_->pc_rep->write_thread.reset(
+ new port::Thread([this] { BGWorkWriteRawBlock(); }));
+}
+
+void BlockBasedTableBuilder::StopParallelCompression() {
+ rep_->pc_rep->compress_queue.finish();
+ for (auto& thread : rep_->pc_rep->compress_thread_pool) {
+ thread.join();
+ }
+ rep_->pc_rep->write_queue.finish();
+ rep_->pc_rep->write_thread->join();
+}
+
+Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
+
+IOStatus BlockBasedTableBuilder::io_status() const {
+ return rep_->GetIOStatus();
+}
static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
BlockContents* bc = reinterpret_cast<BlockContents*>(value);
static_cast<size_t>(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
- block_cache_compressed->Insert(
- key, block_contents_to_cache,
- block_contents_to_cache->ApproximateMemoryUsage(),
- &DeleteCachedBlockContents);
+ // How should we deal with compressed cache full?
+ block_cache_compressed
+ ->Insert(key, block_contents_to_cache,
+ block_contents_to_cache->ApproximateMemoryUsage(),
+ &DeleteCachedBlockContents)
+ .PermitUncheckedError();
// Invalidate OS cache.
- r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
+ r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
+ .PermitUncheckedError();
}
return Status::OK();
}
// HashIndexBuilder which is not multi-partition.
assert(index_blocks.meta_blocks.empty());
} else if (ok() && !index_builder_status.ok()) {
- rep_->status = index_builder_status;
+ rep_->SetStatus(index_builder_status);
}
if (ok()) {
for (const auto& item : index_blocks.meta_blocks) {
while (ok() && s.IsIncomplete()) {
s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
if (!s.ok() && !s.IsIncomplete()) {
- rep_->status = s;
+ rep_->SetStatus(s);
return;
}
if (rep_->table_options.enable_index_compression) {
rep_->props.creation_time = rep_->creation_time;
rep_->props.oldest_key_time = rep_->oldest_key_time;
rep_->props.file_creation_time = rep_->file_creation_time;
+ rep_->props.db_id = rep_->db_id;
+ rep_->props.db_session_id = rep_->db_session_id;
+ rep_->props.db_host_id = rep_->db_host_id;
// Add basic properties
property_block_builder.AddTableProperty(rep_->props);
footer.set_checksum(r->table_options.checksum);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
- assert(r->status.ok());
- r->status = r->file->Append(footer_encoding);
- if (r->status.ok()) {
- r->offset += footer_encoding.size();
+ assert(ok());
+ IOStatus ios = r->file->Append(footer_encoding);
+ if (ios.ok()) {
+ r->set_offset(r->get_offset() + footer_encoding.size());
+ } else {
+ r->SetIOStatus(ios);
+ r->SetStatus(ios);
}
}
r->compression_type == kZSTDNotFinalCompression));
for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
- const auto& data_block = r->data_block_and_keys_buffers[i].first;
+ auto& data_block = r->data_block_and_keys_buffers[i].first;
auto& keys = r->data_block_and_keys_buffers[i].second;
assert(!data_block.empty());
assert(!keys.empty());
- for (const auto& key : keys) {
- if (r->filter_builder != nullptr) {
- size_t ts_sz =
- r->internal_comparator.user_comparator()->timestamp_size();
- r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+ if (r->IsParallelCompressionEnabled()) {
+ Slice first_key_in_next_block;
+ const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
+ if (i + 1 < r->data_block_and_keys_buffers.size()) {
+ first_key_in_next_block =
+ r->data_block_and_keys_buffers[i + 1].second.front();
+ } else {
+ first_key_in_next_block_ptr = r->first_key_in_next_block;
+ }
+
+ ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
+ r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
+ assert(block_rep != nullptr);
+ r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
+ r->get_offset());
+ r->pc_rep->EmitBlock(block_rep);
+ } else {
+ for (const auto& key : keys) {
+ if (r->filter_builder != nullptr) {
+ size_t ts_sz =
+ r->internal_comparator.user_comparator()->timestamp_size();
+ r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
+ }
+ r->index_builder->OnKeyAdded(key);
+ }
+ WriteBlock(Slice(data_block), &r->pending_handle,
+ true /* is_data_block */);
+ if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
+ Slice first_key_in_next_block =
+ r->data_block_and_keys_buffers[i + 1].second.front();
+ Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
+ r->index_builder->AddIndexEntry(
+ &keys.back(), first_key_in_next_block_ptr, r->pending_handle);
}
- r->index_builder->OnKeyAdded(key);
- }
- WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
- if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
- Slice first_key_in_next_block =
- r->data_block_and_keys_buffers[i + 1].second.front();
- Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
- r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
- r->pending_handle);
}
}
r->data_block_and_keys_buffers.clear();
Rep* r = rep_;
assert(r->state != Rep::State::kClosed);
bool empty_data_block = r->data_block.empty();
+ r->first_key_in_next_block = nullptr;
Flush();
if (r->state == Rep::State::kBuffered) {
EnterUnbuffered();
}
- // To make sure properties block is able to keep the accurate size of index
- // block, we will finish writing all index entries first.
- if (ok() && !empty_data_block) {
- r->index_builder->AddIndexEntry(
- &r->last_key, nullptr /* no next data block */, r->pending_handle);
+ if (r->IsParallelCompressionEnabled()) {
+ StopParallelCompression();
+#ifndef NDEBUG
+ for (const auto& br : r->pc_rep->block_rep_buf) {
+ assert(br.status.ok());
+ }
+#endif // !NDEBUG
+ } else {
+ // To make sure properties block is able to keep the accurate size of index
+ // block, we will finish writing all index entries first.
+ if (ok() && !empty_data_block) {
+ r->index_builder->AddIndexEntry(
+ &r->last_key, nullptr /* no next data block */, r->pending_handle);
+ }
}
// Write meta blocks, metaindex block and footer in the following order.
if (ok()) {
WriteFooter(metaindex_block_handle, index_block_handle);
}
- if (r->file != nullptr) {
- file_checksum_ = r->file->GetFileChecksum();
- }
r->state = Rep::State::kClosed;
- return r->status;
+ r->SetStatus(r->CopyIOStatus());
+ Status ret_status = r->CopyStatus();
+ assert(!ret_status.ok() || io_status().ok());
+ return ret_status;
}
void BlockBasedTableBuilder::Abandon() {
assert(rep_->state != Rep::State::kClosed);
+ if (rep_->IsParallelCompressionEnabled()) {
+ StopParallelCompression();
+ }
rep_->state = Rep::State::kClosed;
+ rep_->CopyStatus().PermitUncheckedError();
+ rep_->CopyIOStatus().PermitUncheckedError();
}
uint64_t BlockBasedTableBuilder::NumEntries() const {
return rep_->props.num_entries;
}
+bool BlockBasedTableBuilder::IsEmpty() const {
+ return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
+}
+
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
+uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
+ if (rep_->IsParallelCompressionEnabled()) {
+ // Use compression ratio so far and inflight raw bytes to estimate
+ // final SST size.
+ return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
+ } else {
+ return FileSize();
+ }
+}
+
bool BlockBasedTableBuilder::NeedCompact() const {
for (const auto& collector : rep_->table_properties_collectors) {
if (collector->NeedCompact()) {
for (const auto& prop : collector->GetReadableProperties()) {
ret.readable_properties.insert(prop);
}
- collector->Finish(&ret.user_collected_properties);
+ collector->Finish(&ret.user_collected_properties).PermitUncheckedError();
}
return ret;
}
+std::string BlockBasedTableBuilder::GetFileChecksum() const {
+ if (rep_->file != nullptr) {
+ return rep_->file->GetFileChecksum();
+ } else {
+ return kUnknownFileChecksum;
+ }
+}
+
const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
if (rep_->file != nullptr) {
return rep_->file->GetFileChecksumFuncName();
} else {
- return kUnknownFileChecksumFuncName.c_str();
+ return kUnknownFileChecksumFuncName;
}
}