// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/sst_file_writer.h"
struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const Options& options,
- const Comparator* _user_comparator, ColumnFamilyHandle* _cfh,
- bool _invalidate_page_cache)
+ Env::IOPriority _io_priority, const Comparator* _user_comparator,
+ ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
: env_options(_env_options),
ioptions(options),
mutable_cf_options(options),
+ io_priority(_io_priority),
internal_comparator(_user_comparator),
cfh(_cfh),
invalidate_page_cache(_invalidate_page_cache),
- last_fadvise_size(0) {}
+ last_fadvise_size(0),
+ skip_filters(_skip_filters) {}
std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<TableBuilder> builder;
EnvOptions env_options;
ImmutableCFOptions ioptions;
MutableCFOptions mutable_cf_options;
+ Env::IOPriority io_priority;
InternalKeyComparator internal_comparator;
ExternalSstFileInfo file_info;
InternalKey ikey;
std::string column_family_name;
ColumnFamilyHandle* cfh;
// If true, We will give the OS a hint that this file pages is not needed
- // everytime we write 1MB to the file
+ // every time we write 1MB to the file.
bool invalidate_page_cache;
- // the size of the file during the last time we called Fadvise to remove
+ // The size of the file during the last time we called Fadvise to remove
// cached pages from page cache.
uint64_t last_fadvise_size;
+ bool skip_filters;
+ Status Add(const Slice& user_key, const Slice& value,
+ const ValueType value_type) {
+ if (!builder) {
+ return Status::InvalidArgument("File is not opened");
+ }
+
+ if (file_info.num_entries == 0) {
+ file_info.smallest_key.assign(user_key.data(), user_key.size());
+ } else {
+ if (internal_comparator.user_comparator()->Compare(
+ user_key, file_info.largest_key) <= 0) {
+ // Make sure that keys are added in order
+ return Status::InvalidArgument("Keys must be added in order");
+ }
+ }
+
+ // TODO(tec) : For external SST files we could omit the seqno and type.
+ switch (value_type) {
+ case ValueType::kTypeValue:
+ ikey.Set(user_key, 0 /* Sequence Number */,
+ ValueType::kTypeValue /* Put */);
+ break;
+ case ValueType::kTypeMerge:
+ ikey.Set(user_key, 0 /* Sequence Number */,
+ ValueType::kTypeMerge /* Merge */);
+ break;
+ case ValueType::kTypeDeletion:
+ ikey.Set(user_key, 0 /* Sequence Number */,
+ ValueType::kTypeDeletion /* Delete */);
+ break;
+ default:
+ return Status::InvalidArgument("Value type is not supported");
+ }
+ builder->Add(ikey.Encode(), value);
+
+ // update file info
+ file_info.num_entries++;
+ file_info.largest_key.assign(user_key.data(), user_key.size());
+ file_info.file_size = builder->FileSize();
+
+ InvalidatePageCache(false /* closing */);
+
+ return Status::OK();
+ }
+
+ Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
+ if (!builder) {
+ return Status::InvalidArgument("File is not opened");
+ }
+
+ RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
+ if (file_info.num_range_del_entries == 0) {
+ file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
+ tombstone.start_key_.size());
+ file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
+ tombstone.end_key_.size());
+ } else {
+ if (internal_comparator.user_comparator()->Compare(
+ tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
+ file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
+ tombstone.start_key_.size());
+ }
+ if (internal_comparator.user_comparator()->Compare(
+ tombstone.end_key_, file_info.largest_range_del_key) > 0) {
+ file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
+ tombstone.end_key_.size());
+ }
+ }
+
+ auto ikey_and_end_key = tombstone.Serialize();
+ builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
+
+ // update file info
+ file_info.num_range_del_entries++;
+ file_info.file_size = builder->FileSize();
+
+ InvalidatePageCache(false /* closing */);
+
+ return Status::OK();
+ }
+
+ void InvalidatePageCache(bool closing) {
+ if (invalidate_page_cache == false) {
+ // Fadvise disabled
+ return;
+ }
+ uint64_t bytes_since_last_fadvise =
+ builder->FileSize() - last_fadvise_size;
+ if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
+ TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
+ &(bytes_since_last_fadvise));
+ // Tell the OS that we dont need this file in page cache
+ file_writer->InvalidateCache(0, 0);
+ last_fadvise_size = builder->FileSize();
+ }
+ }
+
};
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
const Options& options,
const Comparator* user_comparator,
ColumnFamilyHandle* column_family,
- bool invalidate_page_cache)
- : rep_(new Rep(env_options, options, user_comparator, column_family,
- invalidate_page_cache)) {
+ bool invalidate_page_cache,
+ Env::IOPriority io_priority, bool skip_filters)
+ : rep_(new Rep(env_options, options, io_priority, user_comparator,
+ column_family, invalidate_page_cache, skip_filters)) {
rep_->file_info.file_size = 0;
}
// abandon the builder.
rep_->builder->Abandon();
}
-
- delete rep_;
}
Status SstFileWriter::Open(const std::string& file_path) {
- Rep* r = rep_;
+ Rep* r = rep_.get();
Status s;
std::unique_ptr<WritableFile> sst_file;
s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
return s;
}
+ sst_file->SetIOPriority(r->io_priority);
+
CompressionType compression_type;
+ CompressionOptions compression_opts;
if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
compression_type = r->ioptions.bottommost_compression;
+ if (r->ioptions.bottommost_compression_opts.enabled) {
+ compression_opts = r->ioptions.bottommost_compression_opts;
+ } else {
+ compression_opts = r->ioptions.compression_opts;
+ }
} else if (!r->ioptions.compression_per_level.empty()) {
// Use the compression of the last level if we have per level compression
compression_type = *(r->ioptions.compression_per_level.rbegin());
+ compression_opts = r->ioptions.compression_opts;
} else {
compression_type = r->mutable_cf_options.compression;
+ compression_opts = r->ioptions.compression_opts;
}
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
}
TableBuilderOptions table_builder_options(
- r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
- compression_type, r->ioptions.compression_opts,
- nullptr /* compression_dict */, false /* skip_filters */,
- r->column_family_name, unknown_level);
+ r->ioptions, r->mutable_cf_options, r->internal_comparator,
+ &int_tbl_prop_collector_factories, compression_type, compression_opts,
+ nullptr /* compression_dict */, r->skip_filters, r->column_family_name,
+ unknown_level);
r->file_writer.reset(
- new WritableFileWriter(std::move(sst_file), r->env_options));
+ new WritableFileWriter(std::move(sst_file), file_path, r->env_options));
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
table_builder_options, cf_id, r->file_writer.get()));
+ r->file_info = ExternalSstFileInfo();
r->file_info.file_path = file_path;
- r->file_info.file_size = 0;
- r->file_info.num_entries = 0;
- r->file_info.sequence_number = 0;
r->file_info.version = 2;
return s;
}
Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
- Rep* r = rep_;
- if (!r->builder) {
- return Status::InvalidArgument("File is not opened");
- }
-
- if (r->file_info.num_entries == 0) {
- r->file_info.smallest_key.assign(user_key.data(), user_key.size());
- } else {
- if (r->internal_comparator.user_comparator()->Compare(
- user_key, r->file_info.largest_key) <= 0) {
- // Make sure that keys are added in order
- return Status::InvalidArgument("Keys must be added in order");
- }
- }
+ return rep_->Add(user_key, value, ValueType::kTypeValue);
+}
- // TODO(tec) : For external SST files we could omit the seqno and type.
- r->ikey.Set(user_key, 0 /* Sequence Number */,
- ValueType::kTypeValue /* Put */);
- r->builder->Add(r->ikey.Encode(), value);
+Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
+ return rep_->Add(user_key, value, ValueType::kTypeValue);
+}
- // update file info
- r->file_info.num_entries++;
- r->file_info.largest_key.assign(user_key.data(), user_key.size());
- r->file_info.file_size = r->builder->FileSize();
+Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
+ return rep_->Add(user_key, value, ValueType::kTypeMerge);
+}
- InvalidatePageCache(false /* closing */);
+Status SstFileWriter::Delete(const Slice& user_key) {
+ return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
+}
- return Status::OK();
+Status SstFileWriter::DeleteRange(const Slice& begin_key,
+ const Slice& end_key) {
+ return rep_->DeleteRange(begin_key, end_key);
}
Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
- Rep* r = rep_;
+ Rep* r = rep_.get();
if (!r->builder) {
return Status::InvalidArgument("File is not opened");
}
- if (r->file_info.num_entries == 0) {
+ if (r->file_info.num_entries == 0 &&
+ r->file_info.num_range_del_entries == 0) {
return Status::InvalidArgument("Cannot create sst file with no entries");
}
if (s.ok()) {
s = r->file_writer->Sync(r->ioptions.use_fsync);
- InvalidatePageCache(true /* closing */);
+ r->InvalidatePageCache(true /* closing */);
if (s.ok()) {
s = r->file_writer->Close();
}
return s;
}
-void SstFileWriter::InvalidatePageCache(bool closing) {
- Rep* r = rep_;
- if (r->invalidate_page_cache == false) {
- // Fadvise disabled
- return;
- }
-
- uint64_t bytes_since_last_fadvise =
- r->builder->FileSize() - r->last_fadvise_size;
- if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
- TEST_SYNC_POINT_CALLBACK("SstFileWriter::InvalidatePageCache",
- &(bytes_since_last_fadvise));
- // Tell the OS that we dont need this file in page cache
- r->file_writer->InvalidateCache(0, 0);
- r->last_fadvise_size = r->builder->FileSize();
- }
-}
-
uint64_t SstFileWriter::FileSize() {
return rep_->file_info.file_size;
}