1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "rocksdb/sst_file_writer.h"
9 #include "db/dbformat.h"
10 #include "rocksdb/table.h"
11 #include "table/block_based_table_builder.h"
12 #include "table/sst_file_writer_collectors.h"
13 #include "util/file_reader_writer.h"
14 #include "util/sync_point.h"
18 const std::string
ExternalSstFilePropertyNames::kVersion
=
19 "rocksdb.external_sst_file.version";
20 const std::string
ExternalSstFilePropertyNames::kGlobalSeqno
=
21 "rocksdb.external_sst_file.global_seqno";
25 const size_t kFadviseTrigger
= 1024 * 1024; // 1MB
27 struct SstFileWriter::Rep
{
28 Rep(const EnvOptions
& _env_options
, const Options
& options
,
29 Env::IOPriority _io_priority
, const Comparator
* _user_comparator
,
30 ColumnFamilyHandle
* _cfh
, bool _invalidate_page_cache
, bool _skip_filters
)
31 : env_options(_env_options
),
33 mutable_cf_options(options
),
34 io_priority(_io_priority
),
35 internal_comparator(_user_comparator
),
37 invalidate_page_cache(_invalidate_page_cache
),
39 skip_filters(_skip_filters
) {}
41 std::unique_ptr
<WritableFileWriter
> file_writer
;
42 std::unique_ptr
<TableBuilder
> builder
;
43 EnvOptions env_options
;
44 ImmutableCFOptions ioptions
;
45 MutableCFOptions mutable_cf_options
;
46 Env::IOPriority io_priority
;
47 InternalKeyComparator internal_comparator
;
48 ExternalSstFileInfo file_info
;
50 std::string column_family_name
;
51 ColumnFamilyHandle
* cfh
;
52 // If true, We will give the OS a hint that this file pages is not needed
53 // every time we write 1MB to the file.
54 bool invalidate_page_cache
;
55 // The size of the file during the last time we called Fadvise to remove
56 // cached pages from page cache.
57 uint64_t last_fadvise_size
;
59 Status
Add(const Slice
& user_key
, const Slice
& value
,
60 const ValueType value_type
) {
62 return Status::InvalidArgument("File is not opened");
65 if (file_info
.num_entries
== 0) {
66 file_info
.smallest_key
.assign(user_key
.data(), user_key
.size());
68 if (internal_comparator
.user_comparator()->Compare(
69 user_key
, file_info
.largest_key
) <= 0) {
70 // Make sure that keys are added in order
71 return Status::InvalidArgument("Keys must be added in order");
75 // TODO(tec) : For external SST files we could omit the seqno and type.
77 case ValueType::kTypeValue
:
78 ikey
.Set(user_key
, 0 /* Sequence Number */,
79 ValueType::kTypeValue
/* Put */);
81 case ValueType::kTypeMerge
:
82 ikey
.Set(user_key
, 0 /* Sequence Number */,
83 ValueType::kTypeMerge
/* Merge */);
85 case ValueType::kTypeDeletion
:
86 ikey
.Set(user_key
, 0 /* Sequence Number */,
87 ValueType::kTypeDeletion
/* Delete */);
90 return Status::InvalidArgument("Value type is not supported");
92 builder
->Add(ikey
.Encode(), value
);
95 file_info
.num_entries
++;
96 file_info
.largest_key
.assign(user_key
.data(), user_key
.size());
97 file_info
.file_size
= builder
->FileSize();
99 InvalidatePageCache(false /* closing */);
104 Status
DeleteRange(const Slice
& begin_key
, const Slice
& end_key
) {
106 return Status::InvalidArgument("File is not opened");
109 RangeTombstone
tombstone(begin_key
, end_key
, 0 /* Sequence Number */);
110 if (file_info
.num_range_del_entries
== 0) {
111 file_info
.smallest_range_del_key
.assign(tombstone
.start_key_
.data(),
112 tombstone
.start_key_
.size());
113 file_info
.largest_range_del_key
.assign(tombstone
.end_key_
.data(),
114 tombstone
.end_key_
.size());
116 if (internal_comparator
.user_comparator()->Compare(
117 tombstone
.start_key_
, file_info
.smallest_range_del_key
) < 0) {
118 file_info
.smallest_range_del_key
.assign(tombstone
.start_key_
.data(),
119 tombstone
.start_key_
.size());
121 if (internal_comparator
.user_comparator()->Compare(
122 tombstone
.end_key_
, file_info
.largest_range_del_key
) > 0) {
123 file_info
.largest_range_del_key
.assign(tombstone
.end_key_
.data(),
124 tombstone
.end_key_
.size());
128 auto ikey_and_end_key
= tombstone
.Serialize();
129 builder
->Add(ikey_and_end_key
.first
.Encode(), ikey_and_end_key
.second
);
132 file_info
.num_range_del_entries
++;
133 file_info
.file_size
= builder
->FileSize();
135 InvalidatePageCache(false /* closing */);
140 void InvalidatePageCache(bool closing
) {
141 if (invalidate_page_cache
== false) {
145 uint64_t bytes_since_last_fadvise
=
146 builder
->FileSize() - last_fadvise_size
;
147 if (bytes_since_last_fadvise
> kFadviseTrigger
|| closing
) {
148 TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
149 &(bytes_since_last_fadvise
));
150 // Tell the OS that we dont need this file in page cache
151 file_writer
->InvalidateCache(0, 0);
152 last_fadvise_size
= builder
->FileSize();
158 SstFileWriter::SstFileWriter(const EnvOptions
& env_options
,
159 const Options
& options
,
160 const Comparator
* user_comparator
,
161 ColumnFamilyHandle
* column_family
,
162 bool invalidate_page_cache
,
163 Env::IOPriority io_priority
, bool skip_filters
)
164 : rep_(new Rep(env_options
, options
, io_priority
, user_comparator
,
165 column_family
, invalidate_page_cache
, skip_filters
)) {
166 rep_
->file_info
.file_size
= 0;
169 SstFileWriter::~SstFileWriter() {
171 // User did not call Finish() or Finish() failed, we need to
172 // abandon the builder.
173 rep_
->builder
->Abandon();
177 Status
SstFileWriter::Open(const std::string
& file_path
) {
180 std::unique_ptr
<WritableFile
> sst_file
;
181 s
= r
->ioptions
.env
->NewWritableFile(file_path
, &sst_file
, r
->env_options
);
186 sst_file
->SetIOPriority(r
->io_priority
);
188 CompressionType compression_type
;
189 CompressionOptions compression_opts
;
190 if (r
->ioptions
.bottommost_compression
!= kDisableCompressionOption
) {
191 compression_type
= r
->ioptions
.bottommost_compression
;
192 if (r
->ioptions
.bottommost_compression_opts
.enabled
) {
193 compression_opts
= r
->ioptions
.bottommost_compression_opts
;
195 compression_opts
= r
->ioptions
.compression_opts
;
197 } else if (!r
->ioptions
.compression_per_level
.empty()) {
198 // Use the compression of the last level if we have per level compression
199 compression_type
= *(r
->ioptions
.compression_per_level
.rbegin());
200 compression_opts
= r
->ioptions
.compression_opts
;
202 compression_type
= r
->mutable_cf_options
.compression
;
203 compression_opts
= r
->ioptions
.compression_opts
;
206 std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>>
207 int_tbl_prop_collector_factories
;
209 // SstFileWriter properties collector to add SstFileWriter version.
210 int_tbl_prop_collector_factories
.emplace_back(
211 new SstFileWriterPropertiesCollectorFactory(2 /* version */,
212 0 /* global_seqno*/));
214 // User collector factories
215 auto user_collector_factories
=
216 r
->ioptions
.table_properties_collector_factories
;
217 for (size_t i
= 0; i
< user_collector_factories
.size(); i
++) {
218 int_tbl_prop_collector_factories
.emplace_back(
219 new UserKeyTablePropertiesCollectorFactory(
220 user_collector_factories
[i
]));
222 int unknown_level
= -1;
225 if (r
->cfh
!= nullptr) {
226 // user explicitly specified that this file will be ingested into cfh,
227 // we can persist this information in the file.
228 cf_id
= r
->cfh
->GetID();
229 r
->column_family_name
= r
->cfh
->GetName();
231 r
->column_family_name
= "";
232 cf_id
= TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
;
235 TableBuilderOptions
table_builder_options(
236 r
->ioptions
, r
->mutable_cf_options
, r
->internal_comparator
,
237 &int_tbl_prop_collector_factories
, compression_type
, compression_opts
,
238 nullptr /* compression_dict */, r
->skip_filters
, r
->column_family_name
,
240 r
->file_writer
.reset(
241 new WritableFileWriter(std::move(sst_file
), file_path
, r
->env_options
));
243 // TODO(tec) : If table_factory is using compressed block cache, we will
244 // be adding the external sst file blocks into it, which is wasteful.
245 r
->builder
.reset(r
->ioptions
.table_factory
->NewTableBuilder(
246 table_builder_options
, cf_id
, r
->file_writer
.get()));
248 r
->file_info
= ExternalSstFileInfo();
249 r
->file_info
.file_path
= file_path
;
250 r
->file_info
.version
= 2;
254 Status
SstFileWriter::Add(const Slice
& user_key
, const Slice
& value
) {
255 return rep_
->Add(user_key
, value
, ValueType::kTypeValue
);
258 Status
SstFileWriter::Put(const Slice
& user_key
, const Slice
& value
) {
259 return rep_
->Add(user_key
, value
, ValueType::kTypeValue
);
262 Status
SstFileWriter::Merge(const Slice
& user_key
, const Slice
& value
) {
263 return rep_
->Add(user_key
, value
, ValueType::kTypeMerge
);
266 Status
SstFileWriter::Delete(const Slice
& user_key
) {
267 return rep_
->Add(user_key
, Slice(), ValueType::kTypeDeletion
);
270 Status
SstFileWriter::DeleteRange(const Slice
& begin_key
,
271 const Slice
& end_key
) {
272 return rep_
->DeleteRange(begin_key
, end_key
);
275 Status
SstFileWriter::Finish(ExternalSstFileInfo
* file_info
) {
278 return Status::InvalidArgument("File is not opened");
280 if (r
->file_info
.num_entries
== 0 &&
281 r
->file_info
.num_range_del_entries
== 0) {
282 return Status::InvalidArgument("Cannot create sst file with no entries");
285 Status s
= r
->builder
->Finish();
286 r
->file_info
.file_size
= r
->builder
->FileSize();
289 s
= r
->file_writer
->Sync(r
->ioptions
.use_fsync
);
290 r
->InvalidatePageCache(true /* closing */);
292 s
= r
->file_writer
->Close();
296 r
->ioptions
.env
->DeleteFile(r
->file_info
.file_path
);
299 if (file_info
!= nullptr) {
300 *file_info
= r
->file_info
;
307 uint64_t SstFileWriter::FileSize() {
308 return rep_
->file_info
.file_size
;
310 #endif // !ROCKSDB_LITE
312 } // namespace rocksdb