1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/blob/blob_file_builder.h"
10 #include "db/blob/blob_file_addition.h"
11 #include "db/blob/blob_index.h"
12 #include "db/blob/blob_log_format.h"
13 #include "db/blob/blob_log_writer.h"
14 #include "db/version_set.h"
15 #include "file/filename.h"
16 #include "file/read_write_util.h"
17 #include "file/writable_file_writer.h"
18 #include "logging/logging.h"
19 #include "options/cf_options.h"
20 #include "rocksdb/slice.h"
21 #include "rocksdb/status.h"
22 #include "test_util/sync_point.h"
23 #include "util/compression.h"
25 namespace ROCKSDB_NAMESPACE
{
27 BlobFileBuilder::BlobFileBuilder(
28 VersionSet
* versions
, Env
* env
, FileSystem
* fs
,
29 const ImmutableCFOptions
* immutable_cf_options
,
30 const MutableCFOptions
* mutable_cf_options
, const FileOptions
* file_options
,
31 int job_id
, uint32_t column_family_id
,
32 const std::string
& column_family_name
, Env::IOPriority io_priority
,
33 Env::WriteLifeTimeHint write_hint
,
34 std::vector
<std::string
>* blob_file_paths
,
35 std::vector
<BlobFileAddition
>* blob_file_additions
)
36 : BlobFileBuilder([versions
]() { return versions
->NewFileNumber(); }, env
,
37 fs
, immutable_cf_options
, mutable_cf_options
,
38 file_options
, job_id
, column_family_id
,
39 column_family_name
, io_priority
, write_hint
,
40 blob_file_paths
, blob_file_additions
) {}
42 BlobFileBuilder::BlobFileBuilder(
43 std::function
<uint64_t()> file_number_generator
, Env
* env
, FileSystem
* fs
,
44 const ImmutableCFOptions
* immutable_cf_options
,
45 const MutableCFOptions
* mutable_cf_options
, const FileOptions
* file_options
,
46 int job_id
, uint32_t column_family_id
,
47 const std::string
& column_family_name
, Env::IOPriority io_priority
,
48 Env::WriteLifeTimeHint write_hint
,
49 std::vector
<std::string
>* blob_file_paths
,
50 std::vector
<BlobFileAddition
>* blob_file_additions
)
51 : file_number_generator_(std::move(file_number_generator
)),
54 immutable_cf_options_(immutable_cf_options
),
55 min_blob_size_(mutable_cf_options
->min_blob_size
),
56 blob_file_size_(mutable_cf_options
->blob_file_size
),
57 blob_compression_type_(mutable_cf_options
->blob_compression_type
),
58 file_options_(file_options
),
60 column_family_id_(column_family_id
),
61 column_family_name_(column_family_name
),
62 io_priority_(io_priority
),
63 write_hint_(write_hint
),
64 blob_file_paths_(blob_file_paths
),
65 blob_file_additions_(blob_file_additions
),
68 assert(file_number_generator_
);
71 assert(immutable_cf_options_
);
72 assert(file_options_
);
73 assert(blob_file_paths_
);
74 assert(blob_file_paths_
->empty());
75 assert(blob_file_additions_
);
76 assert(blob_file_additions_
->empty());
79 BlobFileBuilder::~BlobFileBuilder() = default;
81 Status
BlobFileBuilder::Add(const Slice
& key
, const Slice
& value
,
82 std::string
* blob_index
) {
84 assert(blob_index
->empty());
86 if (value
.size() < min_blob_size_
) {
91 const Status s
= OpenBlobFileIfNeeded();
98 std::string compressed_blob
;
101 const Status s
= CompressBlobIfNeeded(&blob
, &compressed_blob
);
107 uint64_t blob_file_number
= 0;
108 uint64_t blob_offset
= 0;
112 WriteBlobToFile(key
, blob
, &blob_file_number
, &blob_offset
);
119 const Status s
= CloseBlobFileIfNeeded();
125 BlobIndex::EncodeBlob(blob_index
, blob_file_number
, blob_offset
, blob
.size(),
126 blob_compression_type_
);
131 Status
BlobFileBuilder::Finish() {
132 if (!IsBlobFileOpen()) {
136 return CloseBlobFile();
139 bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_
; }
141 Status
BlobFileBuilder::OpenBlobFileIfNeeded() {
142 if (IsBlobFileOpen()) {
146 assert(!blob_count_
);
147 assert(!blob_bytes_
);
149 assert(file_number_generator_
);
150 const uint64_t blob_file_number
= file_number_generator_();
152 assert(immutable_cf_options_
);
153 assert(!immutable_cf_options_
->cf_paths
.empty());
154 std::string blob_file_path
= BlobFileName(
155 immutable_cf_options_
->cf_paths
.front().path
, blob_file_number
);
157 std::unique_ptr
<FSWritableFile
> file
;
160 TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile");
162 assert(file_options_
);
164 NewWritableFile(fs_
, blob_file_path
, &file
, *file_options_
);
170 // Note: files get added to blob_file_paths_ right after the open, so they
171 // can be cleaned up upon failure. Contrast this with blob_file_additions_,
172 // which only contains successfully written files.
173 assert(blob_file_paths_
);
174 blob_file_paths_
->emplace_back(std::move(blob_file_path
));
177 file
->SetIOPriority(io_priority_
);
178 file
->SetWriteLifeTimeHint(write_hint_
);
180 Statistics
* const statistics
= immutable_cf_options_
->statistics
;
182 std::unique_ptr
<WritableFileWriter
> file_writer(new WritableFileWriter(
183 std::move(file
), blob_file_paths_
->back(), *file_options_
, env_
,
184 nullptr /*IOTracer*/, statistics
, immutable_cf_options_
->listeners
,
185 immutable_cf_options_
->file_checksum_gen_factory
));
187 std::unique_ptr
<BlobLogWriter
> blob_log_writer(
188 new BlobLogWriter(std::move(file_writer
), env_
, statistics
,
189 blob_file_number
, immutable_cf_options_
->use_fsync
));
191 constexpr bool has_ttl
= false;
192 constexpr ExpirationRange expiration_range
;
194 BlobLogHeader
header(column_family_id_
, blob_compression_type_
, has_ttl
,
198 TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader");
200 const Status s
= blob_log_writer
->WriteHeader(header
);
206 writer_
= std::move(blob_log_writer
);
208 assert(IsBlobFileOpen());
213 Status
BlobFileBuilder::CompressBlobIfNeeded(
214 Slice
* blob
, std::string
* compressed_blob
) const {
216 assert(compressed_blob
);
217 assert(compressed_blob
->empty());
219 if (blob_compression_type_
== kNoCompression
) {
223 CompressionOptions opts
;
224 CompressionContext
context(blob_compression_type_
);
225 constexpr uint64_t sample_for_compression
= 0;
227 CompressionInfo
info(opts
, context
, CompressionDict::GetEmptyDict(),
228 blob_compression_type_
, sample_for_compression
);
230 constexpr uint32_t compression_format_version
= 2;
232 if (!CompressData(*blob
, info
, compression_format_version
, compressed_blob
)) {
233 return Status::Corruption("Error compressing blob");
236 *blob
= Slice(*compressed_blob
);
241 Status
BlobFileBuilder::WriteBlobToFile(const Slice
& key
, const Slice
& blob
,
242 uint64_t* blob_file_number
,
243 uint64_t* blob_offset
) {
244 assert(IsBlobFileOpen());
245 assert(blob_file_number
);
248 uint64_t key_offset
= 0;
250 TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord");
252 const Status s
= writer_
->AddRecord(key
, blob
, &key_offset
, blob_offset
);
257 *blob_file_number
= writer_
->get_log_number();
260 blob_bytes_
+= BlobLogRecord::kHeaderSize
+ key
.size() + blob
.size();
265 Status
BlobFileBuilder::CloseBlobFile() {
266 assert(IsBlobFileOpen());
268 BlobLogFooter footer
;
269 footer
.blob_count
= blob_count_
;
271 std::string checksum_method
;
272 std::string checksum_value
;
274 TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter");
277 writer_
->AppendFooter(footer
, &checksum_method
, &checksum_value
);
282 const uint64_t blob_file_number
= writer_
->get_log_number();
284 assert(blob_file_additions_
);
285 blob_file_additions_
->emplace_back(blob_file_number
, blob_count_
, blob_bytes_
,
286 std::move(checksum_method
),
287 std::move(checksum_value
));
289 assert(immutable_cf_options_
);
290 ROCKS_LOG_INFO(immutable_cf_options_
->info_log
,
291 "[%s] [JOB %d] Generated blob file #%" PRIu64
": %" PRIu64
292 " total blobs, %" PRIu64
" total bytes",
293 column_family_name_
.c_str(), job_id_
, blob_file_number
,
294 blob_count_
, blob_bytes_
);
303 Status
BlobFileBuilder::CloseBlobFileIfNeeded() {
304 assert(IsBlobFileOpen());
306 const WritableFileWriter
* const file_writer
= writer_
->file();
309 if (file_writer
->GetFileSize() < blob_file_size_
) {
313 return CloseBlobFile();
316 } // namespace ROCKSDB_NAMESPACE