1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/blob/blob_file_builder.h"
10 #include "db/blob/blob_contents.h"
11 #include "db/blob/blob_file_addition.h"
12 #include "db/blob/blob_file_completion_callback.h"
13 #include "db/blob/blob_index.h"
14 #include "db/blob/blob_log_format.h"
15 #include "db/blob/blob_log_writer.h"
16 #include "db/event_helpers.h"
17 #include "db/version_set.h"
18 #include "file/filename.h"
19 #include "file/read_write_util.h"
20 #include "file/writable_file_writer.h"
21 #include "logging/logging.h"
22 #include "options/cf_options.h"
23 #include "options/options_helper.h"
24 #include "rocksdb/slice.h"
25 #include "rocksdb/status.h"
26 #include "test_util/sync_point.h"
27 #include "trace_replay/io_tracer.h"
28 #include "util/compression.h"
30 namespace ROCKSDB_NAMESPACE
{
32 BlobFileBuilder::BlobFileBuilder(
33 VersionSet
* versions
, FileSystem
* fs
,
34 const ImmutableOptions
* immutable_options
,
35 const MutableCFOptions
* mutable_cf_options
, const FileOptions
* file_options
,
36 std::string db_id
, std::string db_session_id
, int job_id
,
37 uint32_t column_family_id
, const std::string
& column_family_name
,
38 Env::IOPriority io_priority
, Env::WriteLifeTimeHint write_hint
,
39 const std::shared_ptr
<IOTracer
>& io_tracer
,
40 BlobFileCompletionCallback
* blob_callback
,
41 BlobFileCreationReason creation_reason
,
42 std::vector
<std::string
>* blob_file_paths
,
43 std::vector
<BlobFileAddition
>* blob_file_additions
)
44 : BlobFileBuilder([versions
]() { return versions
->NewFileNumber(); }, fs
,
45 immutable_options
, mutable_cf_options
, file_options
,
46 db_id
, db_session_id
, job_id
, column_family_id
,
47 column_family_name
, io_priority
, write_hint
, io_tracer
,
48 blob_callback
, creation_reason
, blob_file_paths
,
49 blob_file_additions
) {}
51 BlobFileBuilder::BlobFileBuilder(
52 std::function
<uint64_t()> file_number_generator
, FileSystem
* fs
,
53 const ImmutableOptions
* immutable_options
,
54 const MutableCFOptions
* mutable_cf_options
, const FileOptions
* file_options
,
55 std::string db_id
, std::string db_session_id
, int job_id
,
56 uint32_t column_family_id
, const std::string
& column_family_name
,
57 Env::IOPriority io_priority
, Env::WriteLifeTimeHint write_hint
,
58 const std::shared_ptr
<IOTracer
>& io_tracer
,
59 BlobFileCompletionCallback
* blob_callback
,
60 BlobFileCreationReason creation_reason
,
61 std::vector
<std::string
>* blob_file_paths
,
62 std::vector
<BlobFileAddition
>* blob_file_additions
)
63 : file_number_generator_(std::move(file_number_generator
)),
65 immutable_options_(immutable_options
),
66 min_blob_size_(mutable_cf_options
->min_blob_size
),
67 blob_file_size_(mutable_cf_options
->blob_file_size
),
68 blob_compression_type_(mutable_cf_options
->blob_compression_type
),
69 prepopulate_blob_cache_(mutable_cf_options
->prepopulate_blob_cache
),
70 file_options_(file_options
),
71 db_id_(std::move(db_id
)),
72 db_session_id_(std::move(db_session_id
)),
74 column_family_id_(column_family_id
),
75 column_family_name_(column_family_name
),
76 io_priority_(io_priority
),
77 write_hint_(write_hint
),
78 io_tracer_(io_tracer
),
79 blob_callback_(blob_callback
),
80 creation_reason_(creation_reason
),
81 blob_file_paths_(blob_file_paths
),
82 blob_file_additions_(blob_file_additions
),
85 assert(file_number_generator_
);
87 assert(immutable_options_
);
88 assert(file_options_
);
89 assert(blob_file_paths_
);
90 assert(blob_file_paths_
->empty());
91 assert(blob_file_additions_
);
92 assert(blob_file_additions_
->empty());
95 BlobFileBuilder::~BlobFileBuilder() = default;
97 Status
BlobFileBuilder::Add(const Slice
& key
, const Slice
& value
,
98 std::string
* blob_index
) {
100 assert(blob_index
->empty());
102 if (value
.size() < min_blob_size_
) {
107 const Status s
= OpenBlobFileIfNeeded();
114 std::string compressed_blob
;
117 const Status s
= CompressBlobIfNeeded(&blob
, &compressed_blob
);
123 uint64_t blob_file_number
= 0;
124 uint64_t blob_offset
= 0;
128 WriteBlobToFile(key
, blob
, &blob_file_number
, &blob_offset
);
135 const Status s
= CloseBlobFileIfNeeded();
143 PutBlobIntoCacheIfNeeded(value
, blob_file_number
, blob_offset
);
145 ROCKS_LOG_WARN(immutable_options_
->info_log
,
146 "Failed to pre-populate the blob into blob cache: %s",
147 s
.ToString().c_str());
151 BlobIndex::EncodeBlob(blob_index
, blob_file_number
, blob_offset
, blob
.size(),
152 blob_compression_type_
);
157 Status
BlobFileBuilder::Finish() {
158 if (!IsBlobFileOpen()) {
162 return CloseBlobFile();
165 bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_
; }
167 Status
BlobFileBuilder::OpenBlobFileIfNeeded() {
168 if (IsBlobFileOpen()) {
172 assert(!blob_count_
);
173 assert(!blob_bytes_
);
175 assert(file_number_generator_
);
176 const uint64_t blob_file_number
= file_number_generator_();
178 assert(immutable_options_
);
179 assert(!immutable_options_
->cf_paths
.empty());
180 std::string blob_file_path
=
181 BlobFileName(immutable_options_
->cf_paths
.front().path
, blob_file_number
);
183 if (blob_callback_
) {
184 blob_callback_
->OnBlobFileCreationStarted(
185 blob_file_path
, column_family_name_
, job_id_
, creation_reason_
);
188 std::unique_ptr
<FSWritableFile
> file
;
191 assert(file_options_
);
192 Status s
= NewWritableFile(fs_
, blob_file_path
, &file
, *file_options_
);
194 TEST_SYNC_POINT_CALLBACK(
195 "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s
);
202 // Note: files get added to blob_file_paths_ right after the open, so they
203 // can be cleaned up upon failure. Contrast this with blob_file_additions_,
204 // which only contains successfully written files.
205 assert(blob_file_paths_
);
206 blob_file_paths_
->emplace_back(std::move(blob_file_path
));
209 file
->SetIOPriority(io_priority_
);
210 file
->SetWriteLifeTimeHint(write_hint_
);
211 FileTypeSet tmp_set
= immutable_options_
->checksum_handoff_file_types
;
212 Statistics
* const statistics
= immutable_options_
->stats
;
213 std::unique_ptr
<WritableFileWriter
> file_writer(new WritableFileWriter(
214 std::move(file
), blob_file_paths_
->back(), *file_options_
,
215 immutable_options_
->clock
, io_tracer_
, statistics
,
216 immutable_options_
->listeners
,
217 immutable_options_
->file_checksum_gen_factory
.get(),
218 tmp_set
.Contains(FileType::kBlobFile
), false));
220 constexpr bool do_flush
= false;
222 std::unique_ptr
<BlobLogWriter
> blob_log_writer(new BlobLogWriter(
223 std::move(file_writer
), immutable_options_
->clock
, statistics
,
224 blob_file_number
, immutable_options_
->use_fsync
, do_flush
));
226 constexpr bool has_ttl
= false;
227 constexpr ExpirationRange expiration_range
;
229 BlobLogHeader
header(column_family_id_
, blob_compression_type_
, has_ttl
,
233 Status s
= blob_log_writer
->WriteHeader(header
);
235 TEST_SYNC_POINT_CALLBACK(
236 "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s
);
243 writer_
= std::move(blob_log_writer
);
245 assert(IsBlobFileOpen());
250 Status
BlobFileBuilder::CompressBlobIfNeeded(
251 Slice
* blob
, std::string
* compressed_blob
) const {
253 assert(compressed_blob
);
254 assert(compressed_blob
->empty());
255 assert(immutable_options_
);
257 if (blob_compression_type_
== kNoCompression
) {
261 CompressionOptions opts
;
262 CompressionContext
context(blob_compression_type_
);
263 constexpr uint64_t sample_for_compression
= 0;
265 CompressionInfo
info(opts
, context
, CompressionDict::GetEmptyDict(),
266 blob_compression_type_
, sample_for_compression
);
268 constexpr uint32_t compression_format_version
= 2;
270 bool success
= false;
273 StopWatch
stop_watch(immutable_options_
->clock
, immutable_options_
->stats
,
274 BLOB_DB_COMPRESSION_MICROS
);
276 CompressData(*blob
, info
, compression_format_version
, compressed_blob
);
280 return Status::Corruption("Error compressing blob");
283 *blob
= Slice(*compressed_blob
);
288 Status
BlobFileBuilder::WriteBlobToFile(const Slice
& key
, const Slice
& blob
,
289 uint64_t* blob_file_number
,
290 uint64_t* blob_offset
) {
291 assert(IsBlobFileOpen());
292 assert(blob_file_number
);
295 uint64_t key_offset
= 0;
297 Status s
= writer_
->AddRecord(key
, blob
, &key_offset
, blob_offset
);
299 TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s
);
305 *blob_file_number
= writer_
->get_log_number();
308 blob_bytes_
+= BlobLogRecord::kHeaderSize
+ key
.size() + blob
.size();
313 Status
BlobFileBuilder::CloseBlobFile() {
314 assert(IsBlobFileOpen());
316 BlobLogFooter footer
;
317 footer
.blob_count
= blob_count_
;
319 std::string checksum_method
;
320 std::string checksum_value
;
322 Status s
= writer_
->AppendFooter(footer
, &checksum_method
, &checksum_value
);
324 TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s
);
330 const uint64_t blob_file_number
= writer_
->get_log_number();
332 if (blob_callback_
) {
333 s
= blob_callback_
->OnBlobFileCompleted(
334 blob_file_paths_
->back(), column_family_name_
, job_id_
,
335 blob_file_number
, creation_reason_
, s
, checksum_value
, checksum_method
,
336 blob_count_
, blob_bytes_
);
339 assert(blob_file_additions_
);
340 blob_file_additions_
->emplace_back(blob_file_number
, blob_count_
, blob_bytes_
,
341 std::move(checksum_method
),
342 std::move(checksum_value
));
344 assert(immutable_options_
);
345 ROCKS_LOG_INFO(immutable_options_
->logger
,
346 "[%s] [JOB %d] Generated blob file #%" PRIu64
": %" PRIu64
347 " total blobs, %" PRIu64
" total bytes",
348 column_family_name_
.c_str(), job_id_
, blob_file_number
,
349 blob_count_
, blob_bytes_
);
358 Status
BlobFileBuilder::CloseBlobFileIfNeeded() {
359 assert(IsBlobFileOpen());
361 const WritableFileWriter
* const file_writer
= writer_
->file();
364 if (file_writer
->GetFileSize() < blob_file_size_
) {
368 return CloseBlobFile();
371 void BlobFileBuilder::Abandon(const Status
& s
) {
372 if (!IsBlobFileOpen()) {
375 if (blob_callback_
) {
376 // BlobFileBuilder::Abandon() is called because of error while writing to
377 // Blob files. So we can ignore the below error.
379 ->OnBlobFileCompleted(blob_file_paths_
->back(), column_family_name_
,
380 job_id_
, writer_
->get_log_number(),
381 creation_reason_
, s
, "", "", blob_count_
,
383 .PermitUncheckedError();
391 Status
BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice
& blob
,
392 uint64_t blob_file_number
,
393 uint64_t blob_offset
) const {
394 Status s
= Status::OK();
396 auto blob_cache
= immutable_options_
->blob_cache
;
397 auto statistics
= immutable_options_
->statistics
.get();
399 prepopulate_blob_cache_
== PrepopulateBlobCache::kFlushOnly
&&
400 creation_reason_
== BlobFileCreationReason::kFlush
;
402 if (blob_cache
&& warm_cache
) {
403 const OffsetableCacheKey
base_cache_key(db_id_
, db_session_id_
,
405 const CacheKey cache_key
= base_cache_key
.WithOffset(blob_offset
);
406 const Slice key
= cache_key
.AsSlice();
408 const Cache::Priority priority
= Cache::Priority::BOTTOM
;
410 // Objects to be put into the cache have to be heap-allocated and
411 // self-contained, i.e. own their contents. The Cache has to be able to
412 // take unique ownership of them.
413 CacheAllocationPtr allocation
=
414 AllocateBlock(blob
.size(), blob_cache
->memory_allocator());
415 memcpy(allocation
.get(), blob
.data(), blob
.size());
416 std::unique_ptr
<BlobContents
> buf
=
417 BlobContents::Create(std::move(allocation
), blob
.size());
419 Cache::CacheItemHelper
* const cache_item_helper
=
420 BlobContents::GetCacheItemHelper();
421 assert(cache_item_helper
);
423 if (immutable_options_
->lowest_used_cache_tier
==
424 CacheTier::kNonVolatileBlockTier
) {
425 s
= blob_cache
->Insert(key
, buf
.get(), cache_item_helper
,
426 buf
->ApproximateMemoryUsage(),
427 nullptr /* cache_handle */, priority
);
429 s
= blob_cache
->Insert(key
, buf
.get(), buf
->ApproximateMemoryUsage(),
430 cache_item_helper
->del_cb
,
431 nullptr /* cache_handle */, priority
);
435 RecordTick(statistics
, BLOB_DB_CACHE_ADD
);
436 RecordTick(statistics
, BLOB_DB_CACHE_BYTES_WRITE
, buf
->size());
439 RecordTick(statistics
, BLOB_DB_CACHE_ADD_FAILURES
);
446 } // namespace ROCKSDB_NAMESPACE