]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/table/sst_file_writer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / sst_file_writer.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include "rocksdb/sst_file_writer.h"
7
8#include <vector>
f67539c2 9
7c673cae 10#include "db/dbformat.h"
f67539c2
TL
11#include "env/composite_env_wrapper.h"
12#include "file/writable_file_writer.h"
7c673cae 13#include "rocksdb/table.h"
f67539c2 14#include "table/block_based/block_based_table_builder.h"
7c673cae 15#include "table/sst_file_writer_collectors.h"
f67539c2 16#include "test_util/sync_point.h"
7c673cae 17
f67539c2 18namespace ROCKSDB_NAMESPACE {
7c673cae
FG
19
20const std::string ExternalSstFilePropertyNames::kVersion =
21 "rocksdb.external_sst_file.version";
22const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
23 "rocksdb.external_sst_file.global_seqno";
24
25#ifndef ROCKSDB_LITE
26
27const size_t kFadviseTrigger = 1024 * 1024; // 1MB
28
29struct SstFileWriter::Rep {
30 Rep(const EnvOptions& _env_options, const Options& options,
11fdf7f2
TL
31 Env::IOPriority _io_priority, const Comparator* _user_comparator,
32 ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
7c673cae
FG
33 : env_options(_env_options),
34 ioptions(options),
35 mutable_cf_options(options),
11fdf7f2 36 io_priority(_io_priority),
7c673cae
FG
37 internal_comparator(_user_comparator),
38 cfh(_cfh),
39 invalidate_page_cache(_invalidate_page_cache),
11fdf7f2
TL
40 last_fadvise_size(0),
41 skip_filters(_skip_filters) {}
7c673cae
FG
42
43 std::unique_ptr<WritableFileWriter> file_writer;
44 std::unique_ptr<TableBuilder> builder;
45 EnvOptions env_options;
46 ImmutableCFOptions ioptions;
47 MutableCFOptions mutable_cf_options;
11fdf7f2 48 Env::IOPriority io_priority;
7c673cae
FG
49 InternalKeyComparator internal_comparator;
50 ExternalSstFileInfo file_info;
51 InternalKey ikey;
52 std::string column_family_name;
53 ColumnFamilyHandle* cfh;
54 // If true, We will give the OS a hint that this file pages is not needed
11fdf7f2 55 // every time we write 1MB to the file.
7c673cae 56 bool invalidate_page_cache;
11fdf7f2 57 // The size of the file during the last time we called Fadvise to remove
7c673cae
FG
58 // cached pages from page cache.
59 uint64_t last_fadvise_size;
11fdf7f2
TL
60 bool skip_filters;
61 Status Add(const Slice& user_key, const Slice& value,
62 const ValueType value_type) {
63 if (!builder) {
64 return Status::InvalidArgument("File is not opened");
65 }
66
67 if (file_info.num_entries == 0) {
68 file_info.smallest_key.assign(user_key.data(), user_key.size());
69 } else {
70 if (internal_comparator.user_comparator()->Compare(
71 user_key, file_info.largest_key) <= 0) {
72 // Make sure that keys are added in order
f67539c2
TL
73 return Status::InvalidArgument(
74 "Keys must be added in strict ascending order.");
11fdf7f2
TL
75 }
76 }
77
78 // TODO(tec) : For external SST files we could omit the seqno and type.
79 switch (value_type) {
80 case ValueType::kTypeValue:
81 ikey.Set(user_key, 0 /* Sequence Number */,
82 ValueType::kTypeValue /* Put */);
83 break;
84 case ValueType::kTypeMerge:
85 ikey.Set(user_key, 0 /* Sequence Number */,
86 ValueType::kTypeMerge /* Merge */);
87 break;
88 case ValueType::kTypeDeletion:
89 ikey.Set(user_key, 0 /* Sequence Number */,
90 ValueType::kTypeDeletion /* Delete */);
91 break;
92 default:
93 return Status::InvalidArgument("Value type is not supported");
94 }
95 builder->Add(ikey.Encode(), value);
96
97 // update file info
98 file_info.num_entries++;
99 file_info.largest_key.assign(user_key.data(), user_key.size());
100 file_info.file_size = builder->FileSize();
101
20effc67 102 return InvalidatePageCache(false /* closing */);
11fdf7f2
TL
103 }
104
105 Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
106 if (!builder) {
107 return Status::InvalidArgument("File is not opened");
108 }
109
110 RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
111 if (file_info.num_range_del_entries == 0) {
112 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
113 tombstone.start_key_.size());
114 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
115 tombstone.end_key_.size());
116 } else {
117 if (internal_comparator.user_comparator()->Compare(
118 tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
119 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
120 tombstone.start_key_.size());
121 }
122 if (internal_comparator.user_comparator()->Compare(
123 tombstone.end_key_, file_info.largest_range_del_key) > 0) {
124 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
125 tombstone.end_key_.size());
126 }
127 }
128
129 auto ikey_and_end_key = tombstone.Serialize();
130 builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
131
132 // update file info
133 file_info.num_range_del_entries++;
134 file_info.file_size = builder->FileSize();
135
20effc67 136 return InvalidatePageCache(false /* closing */);
11fdf7f2
TL
137 }
138
20effc67
TL
139 Status InvalidatePageCache(bool closing) {
140 Status s = Status::OK();
11fdf7f2
TL
141 if (invalidate_page_cache == false) {
142 // Fadvise disabled
20effc67 143 return s;
11fdf7f2
TL
144 }
145 uint64_t bytes_since_last_fadvise =
146 builder->FileSize() - last_fadvise_size;
147 if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
148 TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
149 &(bytes_since_last_fadvise));
20effc67
TL
150 // Tell the OS that we don't need this file in page cache
151 s = file_writer->InvalidateCache(0, 0);
152 if (s.IsNotSupported()) {
153 // NotSupported is fine as it could be a file type that doesn't use page
154 // cache.
155 s = Status::OK();
156 }
11fdf7f2
TL
157 last_fadvise_size = builder->FileSize();
158 }
20effc67 159 return s;
11fdf7f2 160 }
7c673cae
FG
161};
162
163SstFileWriter::SstFileWriter(const EnvOptions& env_options,
164 const Options& options,
165 const Comparator* user_comparator,
166 ColumnFamilyHandle* column_family,
11fdf7f2
TL
167 bool invalidate_page_cache,
168 Env::IOPriority io_priority, bool skip_filters)
169 : rep_(new Rep(env_options, options, io_priority, user_comparator,
170 column_family, invalidate_page_cache, skip_filters)) {
7c673cae
FG
171 rep_->file_info.file_size = 0;
172}
173
174SstFileWriter::~SstFileWriter() {
175 if (rep_->builder) {
176 // User did not call Finish() or Finish() failed, we need to
177 // abandon the builder.
178 rep_->builder->Abandon();
179 }
7c673cae
FG
180}
181
182Status SstFileWriter::Open(const std::string& file_path) {
11fdf7f2 183 Rep* r = rep_.get();
7c673cae
FG
184 Status s;
185 std::unique_ptr<WritableFile> sst_file;
186 s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
187 if (!s.ok()) {
188 return s;
189 }
190
11fdf7f2
TL
191 sst_file->SetIOPriority(r->io_priority);
192
7c673cae 193 CompressionType compression_type;
11fdf7f2 194 CompressionOptions compression_opts;
20effc67
TL
195 if (r->mutable_cf_options.bottommost_compression !=
196 kDisableCompressionOption) {
197 compression_type = r->mutable_cf_options.bottommost_compression;
198 if (r->mutable_cf_options.bottommost_compression_opts.enabled) {
199 compression_opts = r->mutable_cf_options.bottommost_compression_opts;
11fdf7f2 200 } else {
20effc67 201 compression_opts = r->mutable_cf_options.compression_opts;
11fdf7f2 202 }
7c673cae
FG
203 } else if (!r->ioptions.compression_per_level.empty()) {
204 // Use the compression of the last level if we have per level compression
205 compression_type = *(r->ioptions.compression_per_level.rbegin());
20effc67 206 compression_opts = r->mutable_cf_options.compression_opts;
7c673cae
FG
207 } else {
208 compression_type = r->mutable_cf_options.compression;
20effc67 209 compression_opts = r->mutable_cf_options.compression_opts;
7c673cae 210 }
494da23a
TL
211 uint64_t sample_for_compression =
212 r->mutable_cf_options.sample_for_compression;
7c673cae
FG
213
214 std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
215 int_tbl_prop_collector_factories;
216
217 // SstFileWriter properties collector to add SstFileWriter version.
218 int_tbl_prop_collector_factories.emplace_back(
219 new SstFileWriterPropertiesCollectorFactory(2 /* version */,
220 0 /* global_seqno*/));
221
222 // User collector factories
223 auto user_collector_factories =
224 r->ioptions.table_properties_collector_factories;
225 for (size_t i = 0; i < user_collector_factories.size(); i++) {
226 int_tbl_prop_collector_factories.emplace_back(
227 new UserKeyTablePropertiesCollectorFactory(
228 user_collector_factories[i]));
229 }
230 int unknown_level = -1;
231 uint32_t cf_id;
232
233 if (r->cfh != nullptr) {
234 // user explicitly specified that this file will be ingested into cfh,
235 // we can persist this information in the file.
236 cf_id = r->cfh->GetID();
237 r->column_family_name = r->cfh->GetName();
238 } else {
239 r->column_family_name = "";
240 cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
241 }
20effc67
TL
242 // SstFileWriter is used to create sst files that can be added to database
243 // later. Therefore, no real db_id and db_session_id are associated with it.
244 // Here we mimic the way db_session_id behaves by resetting the db_session_id
245 // every time SstFileWriter is used, and in this case db_id is set to be "SST
246 // Writer".
247 std::string db_session_id = r->ioptions.env->GenerateUniqueId();
248 if (!db_session_id.empty() && db_session_id.back() == '\n') {
249 db_session_id.pop_back();
250 }
7c673cae 251 TableBuilderOptions table_builder_options(
11fdf7f2 252 r->ioptions, r->mutable_cf_options, r->internal_comparator,
494da23a
TL
253 &int_tbl_prop_collector_factories, compression_type,
254 sample_for_compression, compression_opts, r->skip_filters,
20effc67
TL
255 r->column_family_name, unknown_level, 0 /* creation_time */,
256 0 /* oldest_key_time */, 0 /* target_file_size */,
257 0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id);
258 r->file_writer.reset(new WritableFileWriter(
259 NewLegacyWritableFileWrapper(std::move(sst_file)), file_path,
260 r->env_options, r->ioptions.env, nullptr /* io_tracer */,
261 nullptr /* stats */, r->ioptions.listeners,
262 r->ioptions.file_checksum_gen_factory));
7c673cae
FG
263
264 // TODO(tec) : If table_factory is using compressed block cache, we will
265 // be adding the external sst file blocks into it, which is wasteful.
266 r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
267 table_builder_options, cf_id, r->file_writer.get()));
268
11fdf7f2 269 r->file_info = ExternalSstFileInfo();
7c673cae 270 r->file_info.file_path = file_path;
7c673cae
FG
271 r->file_info.version = 2;
272 return s;
273}
274
275Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
11fdf7f2
TL
276 return rep_->Add(user_key, value, ValueType::kTypeValue);
277}
7c673cae 278
11fdf7f2
TL
279Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
280 return rep_->Add(user_key, value, ValueType::kTypeValue);
281}
7c673cae 282
11fdf7f2
TL
283Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
284 return rep_->Add(user_key, value, ValueType::kTypeMerge);
285}
7c673cae 286
11fdf7f2
TL
287Status SstFileWriter::Delete(const Slice& user_key) {
288 return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
289}
7c673cae 290
11fdf7f2
TL
291Status SstFileWriter::DeleteRange(const Slice& begin_key,
292 const Slice& end_key) {
293 return rep_->DeleteRange(begin_key, end_key);
7c673cae
FG
294}
295
296Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
11fdf7f2 297 Rep* r = rep_.get();
7c673cae
FG
298 if (!r->builder) {
299 return Status::InvalidArgument("File is not opened");
300 }
11fdf7f2
TL
301 if (r->file_info.num_entries == 0 &&
302 r->file_info.num_range_del_entries == 0) {
7c673cae
FG
303 return Status::InvalidArgument("Cannot create sst file with no entries");
304 }
305
306 Status s = r->builder->Finish();
307 r->file_info.file_size = r->builder->FileSize();
308
309 if (s.ok()) {
310 s = r->file_writer->Sync(r->ioptions.use_fsync);
20effc67
TL
311 if (s.ok()) {
312 s = r->InvalidatePageCache(true /* closing */);
313 }
7c673cae
FG
314 if (s.ok()) {
315 s = r->file_writer->Close();
316 }
317 }
20effc67
TL
318 if (s.ok()) {
319 r->file_info.file_checksum = r->file_writer->GetFileChecksum();
320 r->file_info.file_checksum_func_name =
321 r->file_writer->GetFileChecksumFuncName();
322 }
7c673cae
FG
323 if (!s.ok()) {
324 r->ioptions.env->DeleteFile(r->file_info.file_path);
325 }
326
327 if (file_info != nullptr) {
328 *file_info = r->file_info;
329 }
330
331 r->builder.reset();
332 return s;
333}
334
7c673cae
FG
335uint64_t SstFileWriter::FileSize() {
336 return rep_->file_info.file_size;
337}
338#endif // !ROCKSDB_LITE
339
f67539c2 340} // namespace ROCKSDB_NAMESPACE