1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/builder.h"
16 #include "db/compaction/compaction_iterator.h"
17 #include "db/dbformat.h"
18 #include "db/event_helpers.h"
19 #include "db/internal_stats.h"
20 #include "db/merge_helper.h"
21 #include "db/range_del_aggregator.h"
22 #include "db/table_cache.h"
23 #include "db/version_edit.h"
24 #include "file/filename.h"
25 #include "file/read_write_util.h"
26 #include "file/writable_file_writer.h"
27 #include "monitoring/iostats_context_imp.h"
28 #include "monitoring/thread_status_util.h"
29 #include "rocksdb/db.h"
30 #include "rocksdb/env.h"
31 #include "rocksdb/iterator.h"
32 #include "rocksdb/options.h"
33 #include "rocksdb/table.h"
34 #include "table/block_based/block_based_table_builder.h"
35 #include "table/format.h"
36 #include "table/internal_iterator.h"
37 #include "test_util/sync_point.h"
38 #include "util/stop_watch.h"
40 namespace ROCKSDB_NAMESPACE
{
44 TableBuilder
* NewTableBuilder(
45 const ImmutableCFOptions
& ioptions
, const MutableCFOptions
& moptions
,
46 const InternalKeyComparator
& internal_comparator
,
47 const std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>>*
48 int_tbl_prop_collector_factories
,
49 uint32_t column_family_id
, const std::string
& column_family_name
,
50 WritableFileWriter
* file
, const CompressionType compression_type
,
51 uint64_t sample_for_compression
, const CompressionOptions
& compression_opts
,
52 int level
, const bool skip_filters
, const uint64_t creation_time
,
53 const uint64_t oldest_key_time
, const uint64_t target_file_size
,
54 const uint64_t file_creation_time
) {
55 assert((column_family_id
==
56 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) ==
57 column_family_name
.empty());
58 return ioptions
.table_factory
->NewTableBuilder(
59 TableBuilderOptions(ioptions
, moptions
, internal_comparator
,
60 int_tbl_prop_collector_factories
, compression_type
,
61 sample_for_compression
, compression_opts
,
62 skip_filters
, column_family_name
, level
,
63 creation_time
, oldest_key_time
, target_file_size
,
65 column_family_id
, file
);
69 const std::string
& dbname
, Env
* env
, FileSystem
* fs
,
70 const ImmutableCFOptions
& ioptions
,
71 const MutableCFOptions
& mutable_cf_options
, const FileOptions
& file_options
,
72 TableCache
* table_cache
, InternalIterator
* iter
,
73 std::vector
<std::unique_ptr
<FragmentedRangeTombstoneIterator
>>
75 FileMetaData
* meta
, const InternalKeyComparator
& internal_comparator
,
76 const std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>>*
77 int_tbl_prop_collector_factories
,
78 uint32_t column_family_id
, const std::string
& column_family_name
,
79 std::vector
<SequenceNumber
> snapshots
,
80 SequenceNumber earliest_write_conflict_snapshot
,
81 SnapshotChecker
* snapshot_checker
, const CompressionType compression
,
82 uint64_t sample_for_compression
, const CompressionOptions
& compression_opts
,
83 bool paranoid_file_checks
, InternalStats
* internal_stats
,
84 TableFileCreationReason reason
, EventLogger
* event_logger
, int job_id
,
85 const Env::IOPriority io_priority
, TableProperties
* table_properties
,
86 int level
, const uint64_t creation_time
, const uint64_t oldest_key_time
,
87 Env::WriteLifeTimeHint write_hint
, const uint64_t file_creation_time
) {
88 assert((column_family_id
==
89 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) ==
90 column_family_name
.empty());
91 // Reports the IOStats for flush for every following bytes.
92 const size_t kReportFlushIOStatsEvery
= 1048576;
94 meta
->fd
.file_size
= 0;
96 std::unique_ptr
<CompactionRangeDelAggregator
> range_del_agg(
97 new CompactionRangeDelAggregator(&internal_comparator
, snapshots
));
98 for (auto& range_del_iter
: range_del_iters
) {
99 range_del_agg
->AddTombstones(std::move(range_del_iter
));
102 std::string fname
= TableFileName(ioptions
.cf_paths
, meta
->fd
.GetNumber(),
103 meta
->fd
.GetPathId());
105 EventHelpers::NotifyTableFileCreationStarted(
106 ioptions
.listeners
, dbname
, column_family_name
, fname
, job_id
, reason
);
107 #endif // !ROCKSDB_LITE
110 if (iter
->Valid() || !range_del_agg
->IsEmpty()) {
111 TableBuilder
* builder
;
112 std::unique_ptr
<WritableFileWriter
> file_writer
;
113 // Currently we only enable dictionary compression during compaction to the
115 CompressionOptions
compression_opts_for_flush(compression_opts
);
116 compression_opts_for_flush
.max_dict_bytes
= 0;
117 compression_opts_for_flush
.zstd_max_train_bytes
= 0;
119 std::unique_ptr
<FSWritableFile
> file
;
121 bool use_direct_writes
= file_options
.use_direct_writes
;
122 TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes
);
124 s
= NewWritableFile(fs
, fname
, &file
, file_options
);
126 EventHelpers::LogAndNotifyTableFileCreationFinished(
127 event_logger
, ioptions
.listeners
, dbname
, column_family_name
, fname
,
128 job_id
, meta
->fd
, kInvalidBlobFileNumber
, tp
, reason
, s
);
131 file
->SetIOPriority(io_priority
);
132 file
->SetWriteLifeTimeHint(write_hint
);
134 file_writer
.reset(new WritableFileWriter(
135 std::move(file
), fname
, file_options
, env
, ioptions
.statistics
,
136 ioptions
.listeners
, ioptions
.sst_file_checksum_func
));
138 builder
= NewTableBuilder(
139 ioptions
, mutable_cf_options
, internal_comparator
,
140 int_tbl_prop_collector_factories
, column_family_id
,
141 column_family_name
, file_writer
.get(), compression
,
142 sample_for_compression
, compression_opts_for_flush
, level
,
143 false /* skip_filters */, creation_time
, oldest_key_time
,
144 0 /*target_file_size*/, file_creation_time
);
147 MergeHelper
merge(env
, internal_comparator
.user_comparator(),
148 ioptions
.merge_operator
, nullptr, ioptions
.info_log
,
149 true /* internal key corruption is not ok */,
150 snapshots
.empty() ? 0 : snapshots
.back(),
153 CompactionIterator
c_iter(
154 iter
, internal_comparator
.user_comparator(), &merge
, kMaxSequenceNumber
,
155 &snapshots
, earliest_write_conflict_snapshot
, snapshot_checker
, env
,
156 ShouldReportDetailedTime(env
, ioptions
.statistics
),
157 true /* internal key corruption is not ok */, range_del_agg
.get());
158 c_iter
.SeekToFirst();
159 for (; c_iter
.Valid(); c_iter
.Next()) {
160 const Slice
& key
= c_iter
.key();
161 const Slice
& value
= c_iter
.value();
162 const ParsedInternalKey
& ikey
= c_iter
.ikey();
163 builder
->Add(key
, value
);
164 meta
->UpdateBoundaries(key
, value
, ikey
.sequence
, ikey
.type
);
166 // TODO(noetzli): Update stats after flush, too.
167 if (io_priority
== Env::IO_HIGH
&&
168 IOSTATS(bytes_written
) >= kReportFlushIOStatsEvery
) {
169 ThreadStatusUtil::SetThreadOperationProperty(
170 ThreadStatus::FLUSH_BYTES_WRITTEN
, IOSTATS(bytes_written
));
174 auto range_del_it
= range_del_agg
->NewIterator();
175 for (range_del_it
->SeekToFirst(); range_del_it
->Valid();
176 range_del_it
->Next()) {
177 auto tombstone
= range_del_it
->Tombstone();
178 auto kv
= tombstone
.Serialize();
179 builder
->Add(kv
.first
.Encode(), kv
.second
);
180 meta
->UpdateBoundariesForRange(kv
.first
, tombstone
.SerializeEndKey(),
181 tombstone
.seq_
, internal_comparator
);
184 // Finish and check for builder errors
185 tp
= builder
->GetTableProperties();
186 bool empty
= builder
->NumEntries() == 0 && tp
.num_range_deletions
== 0;
188 if (!s
.ok() || empty
) {
191 s
= builder
->Finish();
194 if (s
.ok() && !empty
) {
195 uint64_t file_size
= builder
->FileSize();
196 meta
->fd
.file_size
= file_size
;
197 meta
->marked_for_compaction
= builder
->NeedCompact();
198 assert(meta
->fd
.GetFileSize() > 0);
199 tp
= builder
->GetTableProperties(); // refresh now that builder is finished
200 if (table_properties
) {
201 *table_properties
= tp
;
203 // Add the checksum information to file metadata.
204 meta
->file_checksum
= builder
->GetFileChecksum();
205 meta
->file_checksum_func_name
= builder
->GetFileChecksumFuncName();
209 // Finish and check for file errors
210 if (s
.ok() && !empty
) {
211 StopWatch
sw(env
, ioptions
.statistics
, TABLE_SYNC_MICROS
);
212 s
= file_writer
->Sync(ioptions
.use_fsync
);
214 if (s
.ok() && !empty
) {
215 s
= file_writer
->Close();
218 if (s
.ok() && !empty
) {
219 // Verify that the table is usable
220 // We set for_compaction to false and don't OptimizeForCompactionTableRead
221 // here because this is a special case after we finish the table building
222 // No matter whether use_direct_io_for_flush_and_compaction is true,
223 // we will regrad this verification as user reads since the goal is
224 // to cache it here for further user reads
225 std::unique_ptr
<InternalIterator
> it(table_cache
->NewIterator(
226 ReadOptions(), file_options
, internal_comparator
, *meta
,
227 nullptr /* range_del_agg */,
228 mutable_cf_options
.prefix_extractor
.get(), nullptr,
229 (internal_stats
== nullptr) ? nullptr
230 : internal_stats
->GetFileReadHist(0),
231 TableReaderCaller::kFlush
, /*arena=*/nullptr,
232 /*skip_filter=*/false, level
, /*smallest_compaction_key=*/nullptr,
233 /*largest_compaction_key*/ nullptr));
235 if (s
.ok() && paranoid_file_checks
) {
236 for (it
->SeekToFirst(); it
->Valid(); it
->Next()) {
243 // Check for input iterator errors
244 if (!iter
->status().ok()) {
248 if (!s
.ok() || meta
->fd
.GetFileSize() == 0) {
249 fs
->DeleteFile(fname
, IOOptions(), nullptr);
252 if (meta
->fd
.GetFileSize() == 0) {
255 // Output to event logger and fire events.
256 EventHelpers::LogAndNotifyTableFileCreationFinished(
257 event_logger
, ioptions
.listeners
, dbname
, column_family_name
, fname
,
258 job_id
, meta
->fd
, meta
->oldest_blob_file_number
, tp
, reason
, s
);
263 } // namespace ROCKSDB_NAMESPACE