1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/builder.h"
16 #include "db/compaction_iterator.h"
17 #include "db/dbformat.h"
18 #include "db/event_helpers.h"
19 #include "db/internal_stats.h"
20 #include "db/merge_helper.h"
21 #include "db/table_cache.h"
22 #include "db/version_edit.h"
23 #include "monitoring/iostats_context_imp.h"
24 #include "monitoring/thread_status_util.h"
25 #include "rocksdb/db.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/iterator.h"
28 #include "rocksdb/options.h"
29 #include "rocksdb/table.h"
30 #include "table/block_based_table_builder.h"
31 #include "table/internal_iterator.h"
32 #include "util/file_reader_writer.h"
33 #include "util/filename.h"
34 #include "util/stop_watch.h"
35 #include "util/sync_point.h"
41 TableBuilder
* NewTableBuilder(
42 const ImmutableCFOptions
& ioptions
,
43 const InternalKeyComparator
& internal_comparator
,
44 const std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>>*
45 int_tbl_prop_collector_factories
,
46 uint32_t column_family_id
, const std::string
& column_family_name
,
47 WritableFileWriter
* file
, const CompressionType compression_type
,
48 const CompressionOptions
& compression_opts
,
50 const std::string
* compression_dict
, const bool skip_filters
) {
51 assert((column_family_id
==
52 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) ==
53 column_family_name
.empty());
54 return ioptions
.table_factory
->NewTableBuilder(
55 TableBuilderOptions(ioptions
, internal_comparator
,
56 int_tbl_prop_collector_factories
, compression_type
,
57 compression_opts
, compression_dict
, skip_filters
,
58 column_family_name
, level
),
59 column_family_id
, file
);
63 const std::string
& dbname
, Env
* env
, const ImmutableCFOptions
& ioptions
,
64 const MutableCFOptions
& mutable_cf_options
, const EnvOptions
& env_options
,
65 TableCache
* table_cache
, InternalIterator
* iter
,
66 std::unique_ptr
<InternalIterator
> range_del_iter
, FileMetaData
* meta
,
67 const InternalKeyComparator
& internal_comparator
,
68 const std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>>*
69 int_tbl_prop_collector_factories
,
70 uint32_t column_family_id
, const std::string
& column_family_name
,
71 std::vector
<SequenceNumber
> snapshots
,
72 SequenceNumber earliest_write_conflict_snapshot
,
73 const CompressionType compression
,
74 const CompressionOptions
& compression_opts
, bool paranoid_file_checks
,
75 InternalStats
* internal_stats
, TableFileCreationReason reason
,
76 EventLogger
* event_logger
, int job_id
, const Env::IOPriority io_priority
,
77 TableProperties
* table_properties
, int level
) {
78 assert((column_family_id
==
79 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) ==
80 column_family_name
.empty());
81 // Reports the IOStats for flush for every following bytes.
82 const size_t kReportFlushIOStatsEvery
= 1048576;
84 meta
->fd
.file_size
= 0;
86 std::unique_ptr
<RangeDelAggregator
> range_del_agg(
87 new RangeDelAggregator(internal_comparator
, snapshots
));
88 s
= range_del_agg
->AddTombstones(std::move(range_del_iter
));
90 // may be non-ok if a range tombstone key is unparsable
94 std::string fname
= TableFileName(ioptions
.db_paths
, meta
->fd
.GetNumber(),
95 meta
->fd
.GetPathId());
97 EventHelpers::NotifyTableFileCreationStarted(
98 ioptions
.listeners
, dbname
, column_family_name
, fname
, job_id
, reason
);
99 #endif // !ROCKSDB_LITE
102 if (iter
->Valid() || range_del_agg
->ShouldAddTombstones()) {
103 TableBuilder
* builder
;
104 unique_ptr
<WritableFileWriter
> file_writer
;
106 unique_ptr
<WritableFile
> file
;
108 bool use_direct_writes
= env_options
.use_direct_writes
;
109 TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes
);
111 s
= NewWritableFile(env
, fname
, &file
, env_options
);
113 EventHelpers::LogAndNotifyTableFileCreationFinished(
114 event_logger
, ioptions
.listeners
, dbname
, column_family_name
, fname
,
115 job_id
, meta
->fd
, tp
, reason
, s
);
118 file
->SetIOPriority(io_priority
);
120 file_writer
.reset(new WritableFileWriter(std::move(file
), env_options
,
121 ioptions
.statistics
));
123 builder
= NewTableBuilder(
124 ioptions
, internal_comparator
, int_tbl_prop_collector_factories
,
125 column_family_id
, column_family_name
, file_writer
.get(), compression
,
126 compression_opts
, level
);
129 MergeHelper
merge(env
, internal_comparator
.user_comparator(),
130 ioptions
.merge_operator
, nullptr, ioptions
.info_log
,
131 true /* internal key corruption is not ok */,
132 snapshots
.empty() ? 0 : snapshots
.back());
134 CompactionIterator
c_iter(
135 iter
, internal_comparator
.user_comparator(), &merge
, kMaxSequenceNumber
,
136 &snapshots
, earliest_write_conflict_snapshot
, env
,
137 true /* internal key corruption is not ok */, range_del_agg
.get());
138 c_iter
.SeekToFirst();
139 for (; c_iter
.Valid(); c_iter
.Next()) {
140 const Slice
& key
= c_iter
.key();
141 const Slice
& value
= c_iter
.value();
142 builder
->Add(key
, value
);
143 meta
->UpdateBoundaries(key
, c_iter
.ikey().sequence
);
145 // TODO(noetzli): Update stats after flush, too.
146 if (io_priority
== Env::IO_HIGH
&&
147 IOSTATS(bytes_written
) >= kReportFlushIOStatsEvery
) {
148 ThreadStatusUtil::SetThreadOperationProperty(
149 ThreadStatus::FLUSH_BYTES_WRITTEN
, IOSTATS(bytes_written
));
152 // nullptr for table_{min,max} so all range tombstones will be flushed
153 range_del_agg
->AddToBuilder(builder
, nullptr /* lower_bound */,
154 nullptr /* upper_bound */, meta
);
156 // Finish and check for builder errors
157 bool empty
= builder
->NumEntries() == 0;
159 if (!s
.ok() || empty
) {
162 s
= builder
->Finish();
165 if (s
.ok() && !empty
) {
166 uint64_t file_size
= builder
->FileSize();
167 meta
->fd
.file_size
= file_size
;
168 meta
->marked_for_compaction
= builder
->NeedCompact();
169 assert(meta
->fd
.GetFileSize() > 0);
170 tp
= builder
->GetTableProperties();
171 if (table_properties
) {
172 *table_properties
= tp
;
177 // Finish and check for file errors
178 if (s
.ok() && !empty
) {
179 StopWatch
sw(env
, ioptions
.statistics
, TABLE_SYNC_MICROS
);
180 file_writer
->Sync(ioptions
.use_fsync
);
182 if (s
.ok() && !empty
) {
183 s
= file_writer
->Close();
186 if (s
.ok() && !empty
) {
187 // Verify that the table is usable
188 // We set for_compaction to false and don't OptimizeForCompactionTableRead
189 // here because this is a special case after we finish the table building
190 // No matter whether use_direct_io_for_flush_and_compaction is true,
191 // we will regrad this verification as user reads since the goal is
192 // to cache it here for further user reads
193 std::unique_ptr
<InternalIterator
> it(table_cache
->NewIterator(
194 ReadOptions(), env_options
, internal_comparator
, meta
->fd
,
195 nullptr /* range_del_agg */, nullptr,
196 (internal_stats
== nullptr) ? nullptr
197 : internal_stats
->GetFileReadHist(0),
198 false /* for_compaction */, nullptr /* arena */,
199 false /* skip_filter */, level
));
201 if (s
.ok() && paranoid_file_checks
) {
202 for (it
->SeekToFirst(); it
->Valid(); it
->Next()) {
209 // Check for input iterator errors
210 if (!iter
->status().ok()) {
214 if (!s
.ok() || meta
->fd
.GetFileSize() == 0) {
215 env
->DeleteFile(fname
);
218 // Output to event logger and fire events.
219 EventHelpers::LogAndNotifyTableFileCreationFinished(
220 event_logger
, ioptions
.listeners
, dbname
, column_family_name
, fname
,
221 job_id
, meta
->fd
, tp
, reason
, s
);
226 } // namespace rocksdb