]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #include "rocksdb/sst_file_writer.h" | |
7 | ||
8 | #include <vector> | |
f67539c2 | 9 | |
7c673cae | 10 | #include "db/dbformat.h" |
f67539c2 TL |
11 | #include "env/composite_env_wrapper.h" |
12 | #include "file/writable_file_writer.h" | |
7c673cae | 13 | #include "rocksdb/table.h" |
f67539c2 | 14 | #include "table/block_based/block_based_table_builder.h" |
7c673cae | 15 | #include "table/sst_file_writer_collectors.h" |
f67539c2 | 16 | #include "test_util/sync_point.h" |
7c673cae | 17 | |
f67539c2 | 18 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
19 | |
20 | const std::string ExternalSstFilePropertyNames::kVersion = | |
21 | "rocksdb.external_sst_file.version"; | |
22 | const std::string ExternalSstFilePropertyNames::kGlobalSeqno = | |
23 | "rocksdb.external_sst_file.global_seqno"; | |
24 | ||
25 | #ifndef ROCKSDB_LITE | |
26 | ||
27 | const size_t kFadviseTrigger = 1024 * 1024; // 1MB | |
28 | ||
29 | struct SstFileWriter::Rep { | |
30 | Rep(const EnvOptions& _env_options, const Options& options, | |
11fdf7f2 TL |
31 | Env::IOPriority _io_priority, const Comparator* _user_comparator, |
32 | ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters) | |
7c673cae FG |
33 | : env_options(_env_options), |
34 | ioptions(options), | |
35 | mutable_cf_options(options), | |
11fdf7f2 | 36 | io_priority(_io_priority), |
7c673cae FG |
37 | internal_comparator(_user_comparator), |
38 | cfh(_cfh), | |
39 | invalidate_page_cache(_invalidate_page_cache), | |
11fdf7f2 TL |
40 | last_fadvise_size(0), |
41 | skip_filters(_skip_filters) {} | |
7c673cae FG |
42 | |
43 | std::unique_ptr<WritableFileWriter> file_writer; | |
44 | std::unique_ptr<TableBuilder> builder; | |
45 | EnvOptions env_options; | |
46 | ImmutableCFOptions ioptions; | |
47 | MutableCFOptions mutable_cf_options; | |
11fdf7f2 | 48 | Env::IOPriority io_priority; |
7c673cae FG |
49 | InternalKeyComparator internal_comparator; |
50 | ExternalSstFileInfo file_info; | |
51 | InternalKey ikey; | |
52 | std::string column_family_name; | |
53 | ColumnFamilyHandle* cfh; | |
54 | // If true, We will give the OS a hint that this file pages is not needed | |
11fdf7f2 | 55 | // every time we write 1MB to the file. |
7c673cae | 56 | bool invalidate_page_cache; |
11fdf7f2 | 57 | // The size of the file during the last time we called Fadvise to remove |
7c673cae FG |
58 | // cached pages from page cache. |
59 | uint64_t last_fadvise_size; | |
11fdf7f2 TL |
60 | bool skip_filters; |
61 | Status Add(const Slice& user_key, const Slice& value, | |
62 | const ValueType value_type) { | |
63 | if (!builder) { | |
64 | return Status::InvalidArgument("File is not opened"); | |
65 | } | |
66 | ||
67 | if (file_info.num_entries == 0) { | |
68 | file_info.smallest_key.assign(user_key.data(), user_key.size()); | |
69 | } else { | |
70 | if (internal_comparator.user_comparator()->Compare( | |
71 | user_key, file_info.largest_key) <= 0) { | |
72 | // Make sure that keys are added in order | |
f67539c2 TL |
73 | return Status::InvalidArgument( |
74 | "Keys must be added in strict ascending order."); | |
11fdf7f2 TL |
75 | } |
76 | } | |
77 | ||
78 | // TODO(tec) : For external SST files we could omit the seqno and type. | |
79 | switch (value_type) { | |
80 | case ValueType::kTypeValue: | |
81 | ikey.Set(user_key, 0 /* Sequence Number */, | |
82 | ValueType::kTypeValue /* Put */); | |
83 | break; | |
84 | case ValueType::kTypeMerge: | |
85 | ikey.Set(user_key, 0 /* Sequence Number */, | |
86 | ValueType::kTypeMerge /* Merge */); | |
87 | break; | |
88 | case ValueType::kTypeDeletion: | |
89 | ikey.Set(user_key, 0 /* Sequence Number */, | |
90 | ValueType::kTypeDeletion /* Delete */); | |
91 | break; | |
92 | default: | |
93 | return Status::InvalidArgument("Value type is not supported"); | |
94 | } | |
95 | builder->Add(ikey.Encode(), value); | |
96 | ||
97 | // update file info | |
98 | file_info.num_entries++; | |
99 | file_info.largest_key.assign(user_key.data(), user_key.size()); | |
100 | file_info.file_size = builder->FileSize(); | |
101 | ||
20effc67 | 102 | return InvalidatePageCache(false /* closing */); |
11fdf7f2 TL |
103 | } |
104 | ||
105 | Status DeleteRange(const Slice& begin_key, const Slice& end_key) { | |
106 | if (!builder) { | |
107 | return Status::InvalidArgument("File is not opened"); | |
108 | } | |
109 | ||
110 | RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */); | |
111 | if (file_info.num_range_del_entries == 0) { | |
112 | file_info.smallest_range_del_key.assign(tombstone.start_key_.data(), | |
113 | tombstone.start_key_.size()); | |
114 | file_info.largest_range_del_key.assign(tombstone.end_key_.data(), | |
115 | tombstone.end_key_.size()); | |
116 | } else { | |
117 | if (internal_comparator.user_comparator()->Compare( | |
118 | tombstone.start_key_, file_info.smallest_range_del_key) < 0) { | |
119 | file_info.smallest_range_del_key.assign(tombstone.start_key_.data(), | |
120 | tombstone.start_key_.size()); | |
121 | } | |
122 | if (internal_comparator.user_comparator()->Compare( | |
123 | tombstone.end_key_, file_info.largest_range_del_key) > 0) { | |
124 | file_info.largest_range_del_key.assign(tombstone.end_key_.data(), | |
125 | tombstone.end_key_.size()); | |
126 | } | |
127 | } | |
128 | ||
129 | auto ikey_and_end_key = tombstone.Serialize(); | |
130 | builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second); | |
131 | ||
132 | // update file info | |
133 | file_info.num_range_del_entries++; | |
134 | file_info.file_size = builder->FileSize(); | |
135 | ||
20effc67 | 136 | return InvalidatePageCache(false /* closing */); |
11fdf7f2 TL |
137 | } |
138 | ||
20effc67 TL |
139 | Status InvalidatePageCache(bool closing) { |
140 | Status s = Status::OK(); | |
11fdf7f2 TL |
141 | if (invalidate_page_cache == false) { |
142 | // Fadvise disabled | |
20effc67 | 143 | return s; |
11fdf7f2 TL |
144 | } |
145 | uint64_t bytes_since_last_fadvise = | |
146 | builder->FileSize() - last_fadvise_size; | |
147 | if (bytes_since_last_fadvise > kFadviseTrigger || closing) { | |
148 | TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache", | |
149 | &(bytes_since_last_fadvise)); | |
20effc67 TL |
150 | // Tell the OS that we don't need this file in page cache |
151 | s = file_writer->InvalidateCache(0, 0); | |
152 | if (s.IsNotSupported()) { | |
153 | // NotSupported is fine as it could be a file type that doesn't use page | |
154 | // cache. | |
155 | s = Status::OK(); | |
156 | } | |
11fdf7f2 TL |
157 | last_fadvise_size = builder->FileSize(); |
158 | } | |
20effc67 | 159 | return s; |
11fdf7f2 | 160 | } |
7c673cae FG |
161 | }; |
162 | ||
163 | SstFileWriter::SstFileWriter(const EnvOptions& env_options, | |
164 | const Options& options, | |
165 | const Comparator* user_comparator, | |
166 | ColumnFamilyHandle* column_family, | |
11fdf7f2 TL |
167 | bool invalidate_page_cache, |
168 | Env::IOPriority io_priority, bool skip_filters) | |
169 | : rep_(new Rep(env_options, options, io_priority, user_comparator, | |
170 | column_family, invalidate_page_cache, skip_filters)) { | |
7c673cae FG |
171 | rep_->file_info.file_size = 0; |
172 | } | |
173 | ||
174 | SstFileWriter::~SstFileWriter() { | |
175 | if (rep_->builder) { | |
176 | // User did not call Finish() or Finish() failed, we need to | |
177 | // abandon the builder. | |
178 | rep_->builder->Abandon(); | |
179 | } | |
7c673cae FG |
180 | } |
181 | ||
182 | Status SstFileWriter::Open(const std::string& file_path) { | |
11fdf7f2 | 183 | Rep* r = rep_.get(); |
7c673cae FG |
184 | Status s; |
185 | std::unique_ptr<WritableFile> sst_file; | |
186 | s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); | |
187 | if (!s.ok()) { | |
188 | return s; | |
189 | } | |
190 | ||
11fdf7f2 TL |
191 | sst_file->SetIOPriority(r->io_priority); |
192 | ||
7c673cae | 193 | CompressionType compression_type; |
11fdf7f2 | 194 | CompressionOptions compression_opts; |
20effc67 TL |
195 | if (r->mutable_cf_options.bottommost_compression != |
196 | kDisableCompressionOption) { | |
197 | compression_type = r->mutable_cf_options.bottommost_compression; | |
198 | if (r->mutable_cf_options.bottommost_compression_opts.enabled) { | |
199 | compression_opts = r->mutable_cf_options.bottommost_compression_opts; | |
11fdf7f2 | 200 | } else { |
20effc67 | 201 | compression_opts = r->mutable_cf_options.compression_opts; |
11fdf7f2 | 202 | } |
7c673cae FG |
203 | } else if (!r->ioptions.compression_per_level.empty()) { |
204 | // Use the compression of the last level if we have per level compression | |
205 | compression_type = *(r->ioptions.compression_per_level.rbegin()); | |
20effc67 | 206 | compression_opts = r->mutable_cf_options.compression_opts; |
7c673cae FG |
207 | } else { |
208 | compression_type = r->mutable_cf_options.compression; | |
20effc67 | 209 | compression_opts = r->mutable_cf_options.compression_opts; |
7c673cae | 210 | } |
494da23a TL |
211 | uint64_t sample_for_compression = |
212 | r->mutable_cf_options.sample_for_compression; | |
7c673cae FG |
213 | |
214 | std::vector<std::unique_ptr<IntTblPropCollectorFactory>> | |
215 | int_tbl_prop_collector_factories; | |
216 | ||
217 | // SstFileWriter properties collector to add SstFileWriter version. | |
218 | int_tbl_prop_collector_factories.emplace_back( | |
219 | new SstFileWriterPropertiesCollectorFactory(2 /* version */, | |
220 | 0 /* global_seqno*/)); | |
221 | ||
222 | // User collector factories | |
223 | auto user_collector_factories = | |
224 | r->ioptions.table_properties_collector_factories; | |
225 | for (size_t i = 0; i < user_collector_factories.size(); i++) { | |
226 | int_tbl_prop_collector_factories.emplace_back( | |
227 | new UserKeyTablePropertiesCollectorFactory( | |
228 | user_collector_factories[i])); | |
229 | } | |
230 | int unknown_level = -1; | |
231 | uint32_t cf_id; | |
232 | ||
233 | if (r->cfh != nullptr) { | |
234 | // user explicitly specified that this file will be ingested into cfh, | |
235 | // we can persist this information in the file. | |
236 | cf_id = r->cfh->GetID(); | |
237 | r->column_family_name = r->cfh->GetName(); | |
238 | } else { | |
239 | r->column_family_name = ""; | |
240 | cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; | |
241 | } | |
20effc67 TL |
242 | // SstFileWriter is used to create sst files that can be added to database |
243 | // later. Therefore, no real db_id and db_session_id are associated with it. | |
244 | // Here we mimic the way db_session_id behaves by resetting the db_session_id | |
245 | // every time SstFileWriter is used, and in this case db_id is set to be "SST | |
246 | // Writer". | |
247 | std::string db_session_id = r->ioptions.env->GenerateUniqueId(); | |
248 | if (!db_session_id.empty() && db_session_id.back() == '\n') { | |
249 | db_session_id.pop_back(); | |
250 | } | |
7c673cae | 251 | TableBuilderOptions table_builder_options( |
11fdf7f2 | 252 | r->ioptions, r->mutable_cf_options, r->internal_comparator, |
494da23a TL |
253 | &int_tbl_prop_collector_factories, compression_type, |
254 | sample_for_compression, compression_opts, r->skip_filters, | |
20effc67 TL |
255 | r->column_family_name, unknown_level, 0 /* creation_time */, |
256 | 0 /* oldest_key_time */, 0 /* target_file_size */, | |
257 | 0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id); | |
258 | r->file_writer.reset(new WritableFileWriter( | |
259 | NewLegacyWritableFileWrapper(std::move(sst_file)), file_path, | |
260 | r->env_options, r->ioptions.env, nullptr /* io_tracer */, | |
261 | nullptr /* stats */, r->ioptions.listeners, | |
262 | r->ioptions.file_checksum_gen_factory)); | |
7c673cae FG |
263 | |
264 | // TODO(tec) : If table_factory is using compressed block cache, we will | |
265 | // be adding the external sst file blocks into it, which is wasteful. | |
266 | r->builder.reset(r->ioptions.table_factory->NewTableBuilder( | |
267 | table_builder_options, cf_id, r->file_writer.get())); | |
268 | ||
11fdf7f2 | 269 | r->file_info = ExternalSstFileInfo(); |
7c673cae | 270 | r->file_info.file_path = file_path; |
7c673cae FG |
271 | r->file_info.version = 2; |
272 | return s; | |
273 | } | |
274 | ||
275 | Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { | |
11fdf7f2 TL |
276 | return rep_->Add(user_key, value, ValueType::kTypeValue); |
277 | } | |
7c673cae | 278 | |
11fdf7f2 TL |
279 | Status SstFileWriter::Put(const Slice& user_key, const Slice& value) { |
280 | return rep_->Add(user_key, value, ValueType::kTypeValue); | |
281 | } | |
7c673cae | 282 | |
11fdf7f2 TL |
283 | Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) { |
284 | return rep_->Add(user_key, value, ValueType::kTypeMerge); | |
285 | } | |
7c673cae | 286 | |
11fdf7f2 TL |
287 | Status SstFileWriter::Delete(const Slice& user_key) { |
288 | return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion); | |
289 | } | |
7c673cae | 290 | |
11fdf7f2 TL |
291 | Status SstFileWriter::DeleteRange(const Slice& begin_key, |
292 | const Slice& end_key) { | |
293 | return rep_->DeleteRange(begin_key, end_key); | |
7c673cae FG |
294 | } |
295 | ||
296 | Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { | |
11fdf7f2 | 297 | Rep* r = rep_.get(); |
7c673cae FG |
298 | if (!r->builder) { |
299 | return Status::InvalidArgument("File is not opened"); | |
300 | } | |
11fdf7f2 TL |
301 | if (r->file_info.num_entries == 0 && |
302 | r->file_info.num_range_del_entries == 0) { | |
7c673cae FG |
303 | return Status::InvalidArgument("Cannot create sst file with no entries"); |
304 | } | |
305 | ||
306 | Status s = r->builder->Finish(); | |
307 | r->file_info.file_size = r->builder->FileSize(); | |
308 | ||
309 | if (s.ok()) { | |
310 | s = r->file_writer->Sync(r->ioptions.use_fsync); | |
20effc67 TL |
311 | if (s.ok()) { |
312 | s = r->InvalidatePageCache(true /* closing */); | |
313 | } | |
7c673cae FG |
314 | if (s.ok()) { |
315 | s = r->file_writer->Close(); | |
316 | } | |
317 | } | |
20effc67 TL |
318 | if (s.ok()) { |
319 | r->file_info.file_checksum = r->file_writer->GetFileChecksum(); | |
320 | r->file_info.file_checksum_func_name = | |
321 | r->file_writer->GetFileChecksumFuncName(); | |
322 | } | |
7c673cae FG |
323 | if (!s.ok()) { |
324 | r->ioptions.env->DeleteFile(r->file_info.file_path); | |
325 | } | |
326 | ||
327 | if (file_info != nullptr) { | |
328 | *file_info = r->file_info; | |
329 | } | |
330 | ||
331 | r->builder.reset(); | |
332 | return s; | |
333 | } | |
334 | ||
7c673cae FG |
335 | uint64_t SstFileWriter::FileSize() { |
336 | return rep_->file_info.file_size; | |
337 | } | |
338 | #endif // !ROCKSDB_LITE | |
339 | ||
f67539c2 | 340 | } // namespace ROCKSDB_NAMESPACE |