]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/blob/blob_file_builder.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / blob / blob_file_builder.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/blob/blob_file_builder.h"
7
8 #include <cassert>
9
10 #include "db/blob/blob_contents.h"
11 #include "db/blob/blob_file_addition.h"
12 #include "db/blob/blob_file_completion_callback.h"
13 #include "db/blob/blob_index.h"
14 #include "db/blob/blob_log_format.h"
15 #include "db/blob/blob_log_writer.h"
16 #include "db/event_helpers.h"
17 #include "db/version_set.h"
18 #include "file/filename.h"
19 #include "file/read_write_util.h"
20 #include "file/writable_file_writer.h"
21 #include "logging/logging.h"
22 #include "options/cf_options.h"
23 #include "options/options_helper.h"
24 #include "rocksdb/slice.h"
25 #include "rocksdb/status.h"
26 #include "test_util/sync_point.h"
27 #include "trace_replay/io_tracer.h"
28 #include "util/compression.h"
29
30 namespace ROCKSDB_NAMESPACE {
31
32 BlobFileBuilder::BlobFileBuilder(
33 VersionSet* versions, FileSystem* fs,
34 const ImmutableOptions* immutable_options,
35 const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
36 std::string db_id, std::string db_session_id, int job_id,
37 uint32_t column_family_id, const std::string& column_family_name,
38 Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
39 const std::shared_ptr<IOTracer>& io_tracer,
40 BlobFileCompletionCallback* blob_callback,
41 BlobFileCreationReason creation_reason,
42 std::vector<std::string>* blob_file_paths,
43 std::vector<BlobFileAddition>* blob_file_additions)
44 : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
45 immutable_options, mutable_cf_options, file_options,
46 db_id, db_session_id, job_id, column_family_id,
47 column_family_name, io_priority, write_hint, io_tracer,
48 blob_callback, creation_reason, blob_file_paths,
49 blob_file_additions) {}
50
51 BlobFileBuilder::BlobFileBuilder(
52 std::function<uint64_t()> file_number_generator, FileSystem* fs,
53 const ImmutableOptions* immutable_options,
54 const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
55 std::string db_id, std::string db_session_id, int job_id,
56 uint32_t column_family_id, const std::string& column_family_name,
57 Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint,
58 const std::shared_ptr<IOTracer>& io_tracer,
59 BlobFileCompletionCallback* blob_callback,
60 BlobFileCreationReason creation_reason,
61 std::vector<std::string>* blob_file_paths,
62 std::vector<BlobFileAddition>* blob_file_additions)
63 : file_number_generator_(std::move(file_number_generator)),
64 fs_(fs),
65 immutable_options_(immutable_options),
66 min_blob_size_(mutable_cf_options->min_blob_size),
67 blob_file_size_(mutable_cf_options->blob_file_size),
68 blob_compression_type_(mutable_cf_options->blob_compression_type),
69 prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
70 file_options_(file_options),
71 db_id_(std::move(db_id)),
72 db_session_id_(std::move(db_session_id)),
73 job_id_(job_id),
74 column_family_id_(column_family_id),
75 column_family_name_(column_family_name),
76 io_priority_(io_priority),
77 write_hint_(write_hint),
78 io_tracer_(io_tracer),
79 blob_callback_(blob_callback),
80 creation_reason_(creation_reason),
81 blob_file_paths_(blob_file_paths),
82 blob_file_additions_(blob_file_additions),
83 blob_count_(0),
84 blob_bytes_(0) {
85 assert(file_number_generator_);
86 assert(fs_);
87 assert(immutable_options_);
88 assert(file_options_);
89 assert(blob_file_paths_);
90 assert(blob_file_paths_->empty());
91 assert(blob_file_additions_);
92 assert(blob_file_additions_->empty());
93 }
94
95 BlobFileBuilder::~BlobFileBuilder() = default;
96
97 Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
98 std::string* blob_index) {
99 assert(blob_index);
100 assert(blob_index->empty());
101
102 if (value.size() < min_blob_size_) {
103 return Status::OK();
104 }
105
106 {
107 const Status s = OpenBlobFileIfNeeded();
108 if (!s.ok()) {
109 return s;
110 }
111 }
112
113 Slice blob = value;
114 std::string compressed_blob;
115
116 {
117 const Status s = CompressBlobIfNeeded(&blob, &compressed_blob);
118 if (!s.ok()) {
119 return s;
120 }
121 }
122
123 uint64_t blob_file_number = 0;
124 uint64_t blob_offset = 0;
125
126 {
127 const Status s =
128 WriteBlobToFile(key, blob, &blob_file_number, &blob_offset);
129 if (!s.ok()) {
130 return s;
131 }
132 }
133
134 {
135 const Status s = CloseBlobFileIfNeeded();
136 if (!s.ok()) {
137 return s;
138 }
139 }
140
141 {
142 const Status s =
143 PutBlobIntoCacheIfNeeded(value, blob_file_number, blob_offset);
144 if (!s.ok()) {
145 ROCKS_LOG_WARN(immutable_options_->info_log,
146 "Failed to pre-populate the blob into blob cache: %s",
147 s.ToString().c_str());
148 }
149 }
150
151 BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
152 blob_compression_type_);
153
154 return Status::OK();
155 }
156
157 Status BlobFileBuilder::Finish() {
158 if (!IsBlobFileOpen()) {
159 return Status::OK();
160 }
161
162 return CloseBlobFile();
163 }
164
165 bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_; }
166
167 Status BlobFileBuilder::OpenBlobFileIfNeeded() {
168 if (IsBlobFileOpen()) {
169 return Status::OK();
170 }
171
172 assert(!blob_count_);
173 assert(!blob_bytes_);
174
175 assert(file_number_generator_);
176 const uint64_t blob_file_number = file_number_generator_();
177
178 assert(immutable_options_);
179 assert(!immutable_options_->cf_paths.empty());
180 std::string blob_file_path =
181 BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number);
182
183 if (blob_callback_) {
184 blob_callback_->OnBlobFileCreationStarted(
185 blob_file_path, column_family_name_, job_id_, creation_reason_);
186 }
187
188 std::unique_ptr<FSWritableFile> file;
189
190 {
191 assert(file_options_);
192 Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_);
193
194 TEST_SYNC_POINT_CALLBACK(
195 "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);
196
197 if (!s.ok()) {
198 return s;
199 }
200 }
201
202 // Note: files get added to blob_file_paths_ right after the open, so they
203 // can be cleaned up upon failure. Contrast this with blob_file_additions_,
204 // which only contains successfully written files.
205 assert(blob_file_paths_);
206 blob_file_paths_->emplace_back(std::move(blob_file_path));
207
208 assert(file);
209 file->SetIOPriority(io_priority_);
210 file->SetWriteLifeTimeHint(write_hint_);
211 FileTypeSet tmp_set = immutable_options_->checksum_handoff_file_types;
212 Statistics* const statistics = immutable_options_->stats;
213 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
214 std::move(file), blob_file_paths_->back(), *file_options_,
215 immutable_options_->clock, io_tracer_, statistics,
216 immutable_options_->listeners,
217 immutable_options_->file_checksum_gen_factory.get(),
218 tmp_set.Contains(FileType::kBlobFile), false));
219
220 constexpr bool do_flush = false;
221
222 std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
223 std::move(file_writer), immutable_options_->clock, statistics,
224 blob_file_number, immutable_options_->use_fsync, do_flush));
225
226 constexpr bool has_ttl = false;
227 constexpr ExpirationRange expiration_range;
228
229 BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
230 expiration_range);
231
232 {
233 Status s = blob_log_writer->WriteHeader(header);
234
235 TEST_SYNC_POINT_CALLBACK(
236 "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);
237
238 if (!s.ok()) {
239 return s;
240 }
241 }
242
243 writer_ = std::move(blob_log_writer);
244
245 assert(IsBlobFileOpen());
246
247 return Status::OK();
248 }
249
250 Status BlobFileBuilder::CompressBlobIfNeeded(
251 Slice* blob, std::string* compressed_blob) const {
252 assert(blob);
253 assert(compressed_blob);
254 assert(compressed_blob->empty());
255 assert(immutable_options_);
256
257 if (blob_compression_type_ == kNoCompression) {
258 return Status::OK();
259 }
260
261 CompressionOptions opts;
262 CompressionContext context(blob_compression_type_);
263 constexpr uint64_t sample_for_compression = 0;
264
265 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
266 blob_compression_type_, sample_for_compression);
267
268 constexpr uint32_t compression_format_version = 2;
269
270 bool success = false;
271
272 {
273 StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
274 BLOB_DB_COMPRESSION_MICROS);
275 success =
276 CompressData(*blob, info, compression_format_version, compressed_blob);
277 }
278
279 if (!success) {
280 return Status::Corruption("Error compressing blob");
281 }
282
283 *blob = Slice(*compressed_blob);
284
285 return Status::OK();
286 }
287
288 Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
289 uint64_t* blob_file_number,
290 uint64_t* blob_offset) {
291 assert(IsBlobFileOpen());
292 assert(blob_file_number);
293 assert(blob_offset);
294
295 uint64_t key_offset = 0;
296
297 Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
298
299 TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);
300
301 if (!s.ok()) {
302 return s;
303 }
304
305 *blob_file_number = writer_->get_log_number();
306
307 ++blob_count_;
308 blob_bytes_ += BlobLogRecord::kHeaderSize + key.size() + blob.size();
309
310 return Status::OK();
311 }
312
313 Status BlobFileBuilder::CloseBlobFile() {
314 assert(IsBlobFileOpen());
315
316 BlobLogFooter footer;
317 footer.blob_count = blob_count_;
318
319 std::string checksum_method;
320 std::string checksum_value;
321
322 Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value);
323
324 TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);
325
326 if (!s.ok()) {
327 return s;
328 }
329
330 const uint64_t blob_file_number = writer_->get_log_number();
331
332 if (blob_callback_) {
333 s = blob_callback_->OnBlobFileCompleted(
334 blob_file_paths_->back(), column_family_name_, job_id_,
335 blob_file_number, creation_reason_, s, checksum_value, checksum_method,
336 blob_count_, blob_bytes_);
337 }
338
339 assert(blob_file_additions_);
340 blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
341 std::move(checksum_method),
342 std::move(checksum_value));
343
344 assert(immutable_options_);
345 ROCKS_LOG_INFO(immutable_options_->logger,
346 "[%s] [JOB %d] Generated blob file #%" PRIu64 ": %" PRIu64
347 " total blobs, %" PRIu64 " total bytes",
348 column_family_name_.c_str(), job_id_, blob_file_number,
349 blob_count_, blob_bytes_);
350
351 writer_.reset();
352 blob_count_ = 0;
353 blob_bytes_ = 0;
354
355 return s;
356 }
357
358 Status BlobFileBuilder::CloseBlobFileIfNeeded() {
359 assert(IsBlobFileOpen());
360
361 const WritableFileWriter* const file_writer = writer_->file();
362 assert(file_writer);
363
364 if (file_writer->GetFileSize() < blob_file_size_) {
365 return Status::OK();
366 }
367
368 return CloseBlobFile();
369 }
370
371 void BlobFileBuilder::Abandon(const Status& s) {
372 if (!IsBlobFileOpen()) {
373 return;
374 }
375 if (blob_callback_) {
376 // BlobFileBuilder::Abandon() is called because of error while writing to
377 // Blob files. So we can ignore the below error.
378 blob_callback_
379 ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_,
380 job_id_, writer_->get_log_number(),
381 creation_reason_, s, "", "", blob_count_,
382 blob_bytes_)
383 .PermitUncheckedError();
384 }
385
386 writer_.reset();
387 blob_count_ = 0;
388 blob_bytes_ = 0;
389 }
390
391 Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,
392 uint64_t blob_file_number,
393 uint64_t blob_offset) const {
394 Status s = Status::OK();
395
396 auto blob_cache = immutable_options_->blob_cache;
397 auto statistics = immutable_options_->statistics.get();
398 bool warm_cache =
399 prepopulate_blob_cache_ == PrepopulateBlobCache::kFlushOnly &&
400 creation_reason_ == BlobFileCreationReason::kFlush;
401
402 if (blob_cache && warm_cache) {
403 const OffsetableCacheKey base_cache_key(db_id_, db_session_id_,
404 blob_file_number);
405 const CacheKey cache_key = base_cache_key.WithOffset(blob_offset);
406 const Slice key = cache_key.AsSlice();
407
408 const Cache::Priority priority = Cache::Priority::BOTTOM;
409
410 // Objects to be put into the cache have to be heap-allocated and
411 // self-contained, i.e. own their contents. The Cache has to be able to
412 // take unique ownership of them.
413 CacheAllocationPtr allocation =
414 AllocateBlock(blob.size(), blob_cache->memory_allocator());
415 memcpy(allocation.get(), blob.data(), blob.size());
416 std::unique_ptr<BlobContents> buf =
417 BlobContents::Create(std::move(allocation), blob.size());
418
419 Cache::CacheItemHelper* const cache_item_helper =
420 BlobContents::GetCacheItemHelper();
421 assert(cache_item_helper);
422
423 if (immutable_options_->lowest_used_cache_tier ==
424 CacheTier::kNonVolatileBlockTier) {
425 s = blob_cache->Insert(key, buf.get(), cache_item_helper,
426 buf->ApproximateMemoryUsage(),
427 nullptr /* cache_handle */, priority);
428 } else {
429 s = blob_cache->Insert(key, buf.get(), buf->ApproximateMemoryUsage(),
430 cache_item_helper->del_cb,
431 nullptr /* cache_handle */, priority);
432 }
433
434 if (s.ok()) {
435 RecordTick(statistics, BLOB_DB_CACHE_ADD);
436 RecordTick(statistics, BLOB_DB_CACHE_BYTES_WRITE, buf->size());
437 buf.release();
438 } else {
439 RecordTick(statistics, BLOB_DB_CACHE_ADD_FAILURES);
440 }
441 }
442
443 return s;
444 }
445
446 } // namespace ROCKSDB_NAMESPACE