]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/blob/blob_file_builder.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / blob / blob_file_builder.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/blob/blob_file_builder.h"
7
8 #include <cassert>
9
10 #include "db/blob/blob_file_addition.h"
11 #include "db/blob/blob_index.h"
12 #include "db/blob/blob_log_format.h"
13 #include "db/blob/blob_log_writer.h"
14 #include "db/version_set.h"
15 #include "file/filename.h"
16 #include "file/read_write_util.h"
17 #include "file/writable_file_writer.h"
18 #include "logging/logging.h"
19 #include "options/cf_options.h"
20 #include "rocksdb/slice.h"
21 #include "rocksdb/status.h"
22 #include "test_util/sync_point.h"
23 #include "util/compression.h"
24
25 namespace ROCKSDB_NAMESPACE {
26
27 BlobFileBuilder::BlobFileBuilder(
28 VersionSet* versions, Env* env, FileSystem* fs,
29 const ImmutableCFOptions* immutable_cf_options,
30 const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
31 int job_id, uint32_t column_family_id,
32 const std::string& column_family_name, Env::IOPriority io_priority,
33 Env::WriteLifeTimeHint write_hint,
34 std::vector<std::string>* blob_file_paths,
35 std::vector<BlobFileAddition>* blob_file_additions)
36 : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env,
37 fs, immutable_cf_options, mutable_cf_options,
38 file_options, job_id, column_family_id,
39 column_family_name, io_priority, write_hint,
40 blob_file_paths, blob_file_additions) {}
41
42 BlobFileBuilder::BlobFileBuilder(
43 std::function<uint64_t()> file_number_generator, Env* env, FileSystem* fs,
44 const ImmutableCFOptions* immutable_cf_options,
45 const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
46 int job_id, uint32_t column_family_id,
47 const std::string& column_family_name, Env::IOPriority io_priority,
48 Env::WriteLifeTimeHint write_hint,
49 std::vector<std::string>* blob_file_paths,
50 std::vector<BlobFileAddition>* blob_file_additions)
51 : file_number_generator_(std::move(file_number_generator)),
52 env_(env),
53 fs_(fs),
54 immutable_cf_options_(immutable_cf_options),
55 min_blob_size_(mutable_cf_options->min_blob_size),
56 blob_file_size_(mutable_cf_options->blob_file_size),
57 blob_compression_type_(mutable_cf_options->blob_compression_type),
58 file_options_(file_options),
59 job_id_(job_id),
60 column_family_id_(column_family_id),
61 column_family_name_(column_family_name),
62 io_priority_(io_priority),
63 write_hint_(write_hint),
64 blob_file_paths_(blob_file_paths),
65 blob_file_additions_(blob_file_additions),
66 blob_count_(0),
67 blob_bytes_(0) {
68 assert(file_number_generator_);
69 assert(env_);
70 assert(fs_);
71 assert(immutable_cf_options_);
72 assert(file_options_);
73 assert(blob_file_paths_);
74 assert(blob_file_paths_->empty());
75 assert(blob_file_additions_);
76 assert(blob_file_additions_->empty());
77 }
78
79 BlobFileBuilder::~BlobFileBuilder() = default;
80
81 Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
82 std::string* blob_index) {
83 assert(blob_index);
84 assert(blob_index->empty());
85
86 if (value.size() < min_blob_size_) {
87 return Status::OK();
88 }
89
90 {
91 const Status s = OpenBlobFileIfNeeded();
92 if (!s.ok()) {
93 return s;
94 }
95 }
96
97 Slice blob = value;
98 std::string compressed_blob;
99
100 {
101 const Status s = CompressBlobIfNeeded(&blob, &compressed_blob);
102 if (!s.ok()) {
103 return s;
104 }
105 }
106
107 uint64_t blob_file_number = 0;
108 uint64_t blob_offset = 0;
109
110 {
111 const Status s =
112 WriteBlobToFile(key, blob, &blob_file_number, &blob_offset);
113 if (!s.ok()) {
114 return s;
115 }
116 }
117
118 {
119 const Status s = CloseBlobFileIfNeeded();
120 if (!s.ok()) {
121 return s;
122 }
123 }
124
125 BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
126 blob_compression_type_);
127
128 return Status::OK();
129 }
130
131 Status BlobFileBuilder::Finish() {
132 if (!IsBlobFileOpen()) {
133 return Status::OK();
134 }
135
136 return CloseBlobFile();
137 }
138
139 bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_; }
140
141 Status BlobFileBuilder::OpenBlobFileIfNeeded() {
142 if (IsBlobFileOpen()) {
143 return Status::OK();
144 }
145
146 assert(!blob_count_);
147 assert(!blob_bytes_);
148
149 assert(file_number_generator_);
150 const uint64_t blob_file_number = file_number_generator_();
151
152 assert(immutable_cf_options_);
153 assert(!immutable_cf_options_->cf_paths.empty());
154 std::string blob_file_path = BlobFileName(
155 immutable_cf_options_->cf_paths.front().path, blob_file_number);
156
157 std::unique_ptr<FSWritableFile> file;
158
159 {
160 TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile");
161
162 assert(file_options_);
163 const Status s =
164 NewWritableFile(fs_, blob_file_path, &file, *file_options_);
165 if (!s.ok()) {
166 return s;
167 }
168 }
169
170 // Note: files get added to blob_file_paths_ right after the open, so they
171 // can be cleaned up upon failure. Contrast this with blob_file_additions_,
172 // which only contains successfully written files.
173 assert(blob_file_paths_);
174 blob_file_paths_->emplace_back(std::move(blob_file_path));
175
176 assert(file);
177 file->SetIOPriority(io_priority_);
178 file->SetWriteLifeTimeHint(write_hint_);
179
180 Statistics* const statistics = immutable_cf_options_->statistics;
181
182 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
183 std::move(file), blob_file_paths_->back(), *file_options_, env_,
184 nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners,
185 immutable_cf_options_->file_checksum_gen_factory));
186
187 std::unique_ptr<BlobLogWriter> blob_log_writer(
188 new BlobLogWriter(std::move(file_writer), env_, statistics,
189 blob_file_number, immutable_cf_options_->use_fsync));
190
191 constexpr bool has_ttl = false;
192 constexpr ExpirationRange expiration_range;
193
194 BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
195 expiration_range);
196
197 {
198 TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader");
199
200 const Status s = blob_log_writer->WriteHeader(header);
201 if (!s.ok()) {
202 return s;
203 }
204 }
205
206 writer_ = std::move(blob_log_writer);
207
208 assert(IsBlobFileOpen());
209
210 return Status::OK();
211 }
212
213 Status BlobFileBuilder::CompressBlobIfNeeded(
214 Slice* blob, std::string* compressed_blob) const {
215 assert(blob);
216 assert(compressed_blob);
217 assert(compressed_blob->empty());
218
219 if (blob_compression_type_ == kNoCompression) {
220 return Status::OK();
221 }
222
223 CompressionOptions opts;
224 CompressionContext context(blob_compression_type_);
225 constexpr uint64_t sample_for_compression = 0;
226
227 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
228 blob_compression_type_, sample_for_compression);
229
230 constexpr uint32_t compression_format_version = 2;
231
232 if (!CompressData(*blob, info, compression_format_version, compressed_blob)) {
233 return Status::Corruption("Error compressing blob");
234 }
235
236 *blob = Slice(*compressed_blob);
237
238 return Status::OK();
239 }
240
241 Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
242 uint64_t* blob_file_number,
243 uint64_t* blob_offset) {
244 assert(IsBlobFileOpen());
245 assert(blob_file_number);
246 assert(blob_offset);
247
248 uint64_t key_offset = 0;
249
250 TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord");
251
252 const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
253 if (!s.ok()) {
254 return s;
255 }
256
257 *blob_file_number = writer_->get_log_number();
258
259 ++blob_count_;
260 blob_bytes_ += BlobLogRecord::kHeaderSize + key.size() + blob.size();
261
262 return Status::OK();
263 }
264
265 Status BlobFileBuilder::CloseBlobFile() {
266 assert(IsBlobFileOpen());
267
268 BlobLogFooter footer;
269 footer.blob_count = blob_count_;
270
271 std::string checksum_method;
272 std::string checksum_value;
273
274 TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter");
275
276 const Status s =
277 writer_->AppendFooter(footer, &checksum_method, &checksum_value);
278 if (!s.ok()) {
279 return s;
280 }
281
282 const uint64_t blob_file_number = writer_->get_log_number();
283
284 assert(blob_file_additions_);
285 blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
286 std::move(checksum_method),
287 std::move(checksum_value));
288
289 assert(immutable_cf_options_);
290 ROCKS_LOG_INFO(immutable_cf_options_->info_log,
291 "[%s] [JOB %d] Generated blob file #%" PRIu64 ": %" PRIu64
292 " total blobs, %" PRIu64 " total bytes",
293 column_family_name_.c_str(), job_id_, blob_file_number,
294 blob_count_, blob_bytes_);
295
296 writer_.reset();
297 blob_count_ = 0;
298 blob_bytes_ = 0;
299
300 return Status::OK();
301 }
302
303 Status BlobFileBuilder::CloseBlobFileIfNeeded() {
304 assert(IsBlobFileOpen());
305
306 const WritableFileWriter* const file_writer = writer_->file();
307 assert(file_writer);
308
309 if (file_writer->GetFileSize() < blob_file_size_) {
310 return Status::OK();
311 }
312
313 return CloseBlobFile();
314 }
315
316 } // namespace ROCKSDB_NAMESPACE