]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/sst_file_writer.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / table / sst_file_writer.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "rocksdb/sst_file_writer.h"
7
8 #include <vector>
9 #include "db/dbformat.h"
10 #include "rocksdb/table.h"
11 #include "table/block_based_table_builder.h"
12 #include "table/sst_file_writer_collectors.h"
13 #include "util/file_reader_writer.h"
14 #include "util/sync_point.h"
15
16 namespace rocksdb {
17
18 const std::string ExternalSstFilePropertyNames::kVersion =
19 "rocksdb.external_sst_file.version";
20 const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
21 "rocksdb.external_sst_file.global_seqno";
22
23 #ifndef ROCKSDB_LITE
24
25 const size_t kFadviseTrigger = 1024 * 1024; // 1MB
26
27 struct SstFileWriter::Rep {
28 Rep(const EnvOptions& _env_options, const Options& options,
29 Env::IOPriority _io_priority, const Comparator* _user_comparator,
30 ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
31 : env_options(_env_options),
32 ioptions(options),
33 mutable_cf_options(options),
34 io_priority(_io_priority),
35 internal_comparator(_user_comparator),
36 cfh(_cfh),
37 invalidate_page_cache(_invalidate_page_cache),
38 last_fadvise_size(0),
39 skip_filters(_skip_filters) {}
40
41 std::unique_ptr<WritableFileWriter> file_writer;
42 std::unique_ptr<TableBuilder> builder;
43 EnvOptions env_options;
44 ImmutableCFOptions ioptions;
45 MutableCFOptions mutable_cf_options;
46 Env::IOPriority io_priority;
47 InternalKeyComparator internal_comparator;
48 ExternalSstFileInfo file_info;
49 InternalKey ikey;
50 std::string column_family_name;
51 ColumnFamilyHandle* cfh;
52 // If true, We will give the OS a hint that this file pages is not needed
53 // every time we write 1MB to the file.
54 bool invalidate_page_cache;
55 // The size of the file during the last time we called Fadvise to remove
56 // cached pages from page cache.
57 uint64_t last_fadvise_size;
58 bool skip_filters;
59 Status Add(const Slice& user_key, const Slice& value,
60 const ValueType value_type) {
61 if (!builder) {
62 return Status::InvalidArgument("File is not opened");
63 }
64
65 if (file_info.num_entries == 0) {
66 file_info.smallest_key.assign(user_key.data(), user_key.size());
67 } else {
68 if (internal_comparator.user_comparator()->Compare(
69 user_key, file_info.largest_key) <= 0) {
70 // Make sure that keys are added in order
71 return Status::InvalidArgument("Keys must be added in order");
72 }
73 }
74
75 // TODO(tec) : For external SST files we could omit the seqno and type.
76 switch (value_type) {
77 case ValueType::kTypeValue:
78 ikey.Set(user_key, 0 /* Sequence Number */,
79 ValueType::kTypeValue /* Put */);
80 break;
81 case ValueType::kTypeMerge:
82 ikey.Set(user_key, 0 /* Sequence Number */,
83 ValueType::kTypeMerge /* Merge */);
84 break;
85 case ValueType::kTypeDeletion:
86 ikey.Set(user_key, 0 /* Sequence Number */,
87 ValueType::kTypeDeletion /* Delete */);
88 break;
89 default:
90 return Status::InvalidArgument("Value type is not supported");
91 }
92 builder->Add(ikey.Encode(), value);
93
94 // update file info
95 file_info.num_entries++;
96 file_info.largest_key.assign(user_key.data(), user_key.size());
97 file_info.file_size = builder->FileSize();
98
99 InvalidatePageCache(false /* closing */);
100
101 return Status::OK();
102 }
103
104 Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
105 if (!builder) {
106 return Status::InvalidArgument("File is not opened");
107 }
108
109 RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
110 if (file_info.num_range_del_entries == 0) {
111 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
112 tombstone.start_key_.size());
113 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
114 tombstone.end_key_.size());
115 } else {
116 if (internal_comparator.user_comparator()->Compare(
117 tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
118 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
119 tombstone.start_key_.size());
120 }
121 if (internal_comparator.user_comparator()->Compare(
122 tombstone.end_key_, file_info.largest_range_del_key) > 0) {
123 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
124 tombstone.end_key_.size());
125 }
126 }
127
128 auto ikey_and_end_key = tombstone.Serialize();
129 builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
130
131 // update file info
132 file_info.num_range_del_entries++;
133 file_info.file_size = builder->FileSize();
134
135 InvalidatePageCache(false /* closing */);
136
137 return Status::OK();
138 }
139
140 void InvalidatePageCache(bool closing) {
141 if (invalidate_page_cache == false) {
142 // Fadvise disabled
143 return;
144 }
145 uint64_t bytes_since_last_fadvise =
146 builder->FileSize() - last_fadvise_size;
147 if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
148 TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
149 &(bytes_since_last_fadvise));
150 // Tell the OS that we dont need this file in page cache
151 file_writer->InvalidateCache(0, 0);
152 last_fadvise_size = builder->FileSize();
153 }
154 }
155
156 };
157
158 SstFileWriter::SstFileWriter(const EnvOptions& env_options,
159 const Options& options,
160 const Comparator* user_comparator,
161 ColumnFamilyHandle* column_family,
162 bool invalidate_page_cache,
163 Env::IOPriority io_priority, bool skip_filters)
164 : rep_(new Rep(env_options, options, io_priority, user_comparator,
165 column_family, invalidate_page_cache, skip_filters)) {
166 rep_->file_info.file_size = 0;
167 }
168
169 SstFileWriter::~SstFileWriter() {
170 if (rep_->builder) {
171 // User did not call Finish() or Finish() failed, we need to
172 // abandon the builder.
173 rep_->builder->Abandon();
174 }
175 }
176
177 Status SstFileWriter::Open(const std::string& file_path) {
178 Rep* r = rep_.get();
179 Status s;
180 std::unique_ptr<WritableFile> sst_file;
181 s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
182 if (!s.ok()) {
183 return s;
184 }
185
186 sst_file->SetIOPriority(r->io_priority);
187
188 CompressionType compression_type;
189 CompressionOptions compression_opts;
190 if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
191 compression_type = r->ioptions.bottommost_compression;
192 if (r->ioptions.bottommost_compression_opts.enabled) {
193 compression_opts = r->ioptions.bottommost_compression_opts;
194 } else {
195 compression_opts = r->ioptions.compression_opts;
196 }
197 } else if (!r->ioptions.compression_per_level.empty()) {
198 // Use the compression of the last level if we have per level compression
199 compression_type = *(r->ioptions.compression_per_level.rbegin());
200 compression_opts = r->ioptions.compression_opts;
201 } else {
202 compression_type = r->mutable_cf_options.compression;
203 compression_opts = r->ioptions.compression_opts;
204 }
205
206 std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
207 int_tbl_prop_collector_factories;
208
209 // SstFileWriter properties collector to add SstFileWriter version.
210 int_tbl_prop_collector_factories.emplace_back(
211 new SstFileWriterPropertiesCollectorFactory(2 /* version */,
212 0 /* global_seqno*/));
213
214 // User collector factories
215 auto user_collector_factories =
216 r->ioptions.table_properties_collector_factories;
217 for (size_t i = 0; i < user_collector_factories.size(); i++) {
218 int_tbl_prop_collector_factories.emplace_back(
219 new UserKeyTablePropertiesCollectorFactory(
220 user_collector_factories[i]));
221 }
222 int unknown_level = -1;
223 uint32_t cf_id;
224
225 if (r->cfh != nullptr) {
226 // user explicitly specified that this file will be ingested into cfh,
227 // we can persist this information in the file.
228 cf_id = r->cfh->GetID();
229 r->column_family_name = r->cfh->GetName();
230 } else {
231 r->column_family_name = "";
232 cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
233 }
234
235 TableBuilderOptions table_builder_options(
236 r->ioptions, r->mutable_cf_options, r->internal_comparator,
237 &int_tbl_prop_collector_factories, compression_type, compression_opts,
238 nullptr /* compression_dict */, r->skip_filters, r->column_family_name,
239 unknown_level);
240 r->file_writer.reset(
241 new WritableFileWriter(std::move(sst_file), file_path, r->env_options));
242
243 // TODO(tec) : If table_factory is using compressed block cache, we will
244 // be adding the external sst file blocks into it, which is wasteful.
245 r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
246 table_builder_options, cf_id, r->file_writer.get()));
247
248 r->file_info = ExternalSstFileInfo();
249 r->file_info.file_path = file_path;
250 r->file_info.version = 2;
251 return s;
252 }
253
254 Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
255 return rep_->Add(user_key, value, ValueType::kTypeValue);
256 }
257
258 Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
259 return rep_->Add(user_key, value, ValueType::kTypeValue);
260 }
261
262 Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
263 return rep_->Add(user_key, value, ValueType::kTypeMerge);
264 }
265
266 Status SstFileWriter::Delete(const Slice& user_key) {
267 return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
268 }
269
270 Status SstFileWriter::DeleteRange(const Slice& begin_key,
271 const Slice& end_key) {
272 return rep_->DeleteRange(begin_key, end_key);
273 }
274
275 Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
276 Rep* r = rep_.get();
277 if (!r->builder) {
278 return Status::InvalidArgument("File is not opened");
279 }
280 if (r->file_info.num_entries == 0 &&
281 r->file_info.num_range_del_entries == 0) {
282 return Status::InvalidArgument("Cannot create sst file with no entries");
283 }
284
285 Status s = r->builder->Finish();
286 r->file_info.file_size = r->builder->FileSize();
287
288 if (s.ok()) {
289 s = r->file_writer->Sync(r->ioptions.use_fsync);
290 r->InvalidatePageCache(true /* closing */);
291 if (s.ok()) {
292 s = r->file_writer->Close();
293 }
294 }
295 if (!s.ok()) {
296 r->ioptions.env->DeleteFile(r->file_info.file_path);
297 }
298
299 if (file_info != nullptr) {
300 *file_info = r->file_info;
301 }
302
303 r->builder.reset();
304 return s;
305 }
306
307 uint64_t SstFileWriter::FileSize() {
308 return rep_->file_info.file_size;
309 }
310 #endif // !ROCKSDB_LITE
311
312 } // namespace rocksdb