1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/builder.h"
16 #include "db/compaction_iterator.h"
17 #include "db/dbformat.h"
18 #include "db/event_helpers.h"
19 #include "db/internal_stats.h"
20 #include "db/merge_helper.h"
21 #include "db/table_cache.h"
22 #include "db/version_edit.h"
23 #include "monitoring/iostats_context_imp.h"
24 #include "monitoring/thread_status_util.h"
25 #include "rocksdb/db.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/iterator.h"
28 #include "rocksdb/options.h"
29 #include "rocksdb/table.h"
30 #include "table/block_based_table_builder.h"
31 #include "table/format.h"
32 #include "table/internal_iterator.h"
33 #include "util/file_reader_writer.h"
34 #include "util/filename.h"
35 #include "util/stop_watch.h"
36 #include "util/sync_point.h"
42 TableBuilder
* NewTableBuilder(
43 const ImmutableCFOptions
& ioptions
, const MutableCFOptions
& moptions
,
44 const InternalKeyComparator
& internal_comparator
,
45 const std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>>*
46 int_tbl_prop_collector_factories
,
47 uint32_t column_family_id
, const std::string
& column_family_name
,
48 WritableFileWriter
* file
, const CompressionType compression_type
,
49 const CompressionOptions
& compression_opts
, int level
,
50 const std::string
* compression_dict
, const bool skip_filters
,
51 const uint64_t creation_time
, const uint64_t oldest_key_time
) {
52 assert((column_family_id
==
53 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) ==
54 column_family_name
.empty());
55 return ioptions
.table_factory
->NewTableBuilder(
56 TableBuilderOptions(ioptions
, moptions
, internal_comparator
,
57 int_tbl_prop_collector_factories
, compression_type
,
58 compression_opts
, compression_dict
, skip_filters
,
59 column_family_name
, level
, creation_time
,
61 column_family_id
, file
);
65 const std::string
& dbname
, Env
* env
, const ImmutableCFOptions
& ioptions
,
66 const MutableCFOptions
& mutable_cf_options
, const EnvOptions
& env_options
,
67 TableCache
* table_cache
, InternalIterator
* iter
,
68 std::unique_ptr
<InternalIterator
> range_del_iter
, FileMetaData
* meta
,
69 const InternalKeyComparator
& internal_comparator
,
70 const std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>>*
71 int_tbl_prop_collector_factories
,
72 uint32_t column_family_id
, const std::string
& column_family_name
,
73 std::vector
<SequenceNumber
> snapshots
,
74 SequenceNumber earliest_write_conflict_snapshot
,
75 SnapshotChecker
* snapshot_checker
, const CompressionType compression
,
76 const CompressionOptions
& compression_opts
, bool paranoid_file_checks
,
77 InternalStats
* internal_stats
, TableFileCreationReason reason
,
78 EventLogger
* event_logger
, int job_id
, const Env::IOPriority io_priority
,
79 TableProperties
* table_properties
, int level
, const uint64_t creation_time
,
80 const uint64_t oldest_key_time
, Env::WriteLifeTimeHint write_hint
) {
81 assert((column_family_id
==
82 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) ==
83 column_family_name
.empty());
84 // Reports the IOStats for flush for every following bytes.
85 const size_t kReportFlushIOStatsEvery
= 1048576;
87 meta
->fd
.file_size
= 0;
89 std::unique_ptr
<RangeDelAggregator
> range_del_agg(
90 new RangeDelAggregator(internal_comparator
, snapshots
));
91 s
= range_del_agg
->AddTombstones(std::move(range_del_iter
));
93 // may be non-ok if a range tombstone key is unparsable
97 std::string fname
= TableFileName(ioptions
.cf_paths
, meta
->fd
.GetNumber(),
98 meta
->fd
.GetPathId());
100 EventHelpers::NotifyTableFileCreationStarted(
101 ioptions
.listeners
, dbname
, column_family_name
, fname
, job_id
, reason
);
102 #endif // !ROCKSDB_LITE
105 if (iter
->Valid() || !range_del_agg
->IsEmpty()) {
106 TableBuilder
* builder
;
107 unique_ptr
<WritableFileWriter
> file_writer
;
109 unique_ptr
<WritableFile
> file
;
111 bool use_direct_writes
= env_options
.use_direct_writes
;
112 TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes
);
114 s
= NewWritableFile(env
, fname
, &file
, env_options
);
116 EventHelpers::LogAndNotifyTableFileCreationFinished(
117 event_logger
, ioptions
.listeners
, dbname
, column_family_name
, fname
,
118 job_id
, meta
->fd
, tp
, reason
, s
);
121 file
->SetIOPriority(io_priority
);
122 file
->SetWriteLifeTimeHint(write_hint
);
124 file_writer
.reset(new WritableFileWriter(
125 std::move(file
), fname
, env_options
, ioptions
.statistics
));
126 builder
= NewTableBuilder(
127 ioptions
, mutable_cf_options
, internal_comparator
,
128 int_tbl_prop_collector_factories
, column_family_id
,
129 column_family_name
, file_writer
.get(), compression
, compression_opts
,
130 level
, nullptr /* compression_dict */, false /* skip_filters */,
131 creation_time
, oldest_key_time
);
134 MergeHelper
merge(env
, internal_comparator
.user_comparator(),
135 ioptions
.merge_operator
, nullptr, ioptions
.info_log
,
136 true /* internal key corruption is not ok */,
137 snapshots
.empty() ? 0 : snapshots
.back(),
140 CompactionIterator
c_iter(
141 iter
, internal_comparator
.user_comparator(), &merge
, kMaxSequenceNumber
,
142 &snapshots
, earliest_write_conflict_snapshot
, snapshot_checker
, env
,
143 ShouldReportDetailedTime(env
, ioptions
.statistics
),
144 true /* internal key corruption is not ok */, range_del_agg
.get());
145 c_iter
.SeekToFirst();
146 for (; c_iter
.Valid(); c_iter
.Next()) {
147 const Slice
& key
= c_iter
.key();
148 const Slice
& value
= c_iter
.value();
149 builder
->Add(key
, value
);
150 meta
->UpdateBoundaries(key
, c_iter
.ikey().sequence
);
152 // TODO(noetzli): Update stats after flush, too.
153 if (io_priority
== Env::IO_HIGH
&&
154 IOSTATS(bytes_written
) >= kReportFlushIOStatsEvery
) {
155 ThreadStatusUtil::SetThreadOperationProperty(
156 ThreadStatus::FLUSH_BYTES_WRITTEN
, IOSTATS(bytes_written
));
160 for (auto it
= range_del_agg
->NewIterator(); it
->Valid(); it
->Next()) {
161 auto tombstone
= it
->Tombstone();
162 auto kv
= tombstone
.Serialize();
163 builder
->Add(kv
.first
.Encode(), kv
.second
);
164 meta
->UpdateBoundariesForRange(kv
.first
, tombstone
.SerializeEndKey(),
165 tombstone
.seq_
, internal_comparator
);
168 // Finish and check for builder errors
169 tp
= builder
->GetTableProperties();
170 bool empty
= builder
->NumEntries() == 0 && tp
.num_range_deletions
== 0;
172 if (!s
.ok() || empty
) {
175 s
= builder
->Finish();
178 if (s
.ok() && !empty
) {
179 uint64_t file_size
= builder
->FileSize();
180 meta
->fd
.file_size
= file_size
;
181 meta
->marked_for_compaction
= builder
->NeedCompact();
182 assert(meta
->fd
.GetFileSize() > 0);
183 tp
= builder
->GetTableProperties(); // refresh now that builder is finished
184 if (table_properties
) {
185 *table_properties
= tp
;
190 // Finish and check for file errors
191 if (s
.ok() && !empty
) {
192 StopWatch
sw(env
, ioptions
.statistics
, TABLE_SYNC_MICROS
);
193 s
= file_writer
->Sync(ioptions
.use_fsync
);
195 if (s
.ok() && !empty
) {
196 s
= file_writer
->Close();
199 if (s
.ok() && !empty
) {
200 // Verify that the table is usable
201 // We set for_compaction to false and don't OptimizeForCompactionTableRead
202 // here because this is a special case after we finish the table building
203 // No matter whether use_direct_io_for_flush_and_compaction is true,
204 // we will regrad this verification as user reads since the goal is
205 // to cache it here for further user reads
206 std::unique_ptr
<InternalIterator
> it(table_cache
->NewIterator(
207 ReadOptions(), env_options
, internal_comparator
, *meta
,
208 nullptr /* range_del_agg */,
209 mutable_cf_options
.prefix_extractor
.get(), nullptr,
210 (internal_stats
== nullptr) ? nullptr
211 : internal_stats
->GetFileReadHist(0),
212 false /* for_compaction */, nullptr /* arena */,
213 false /* skip_filter */, level
));
215 if (s
.ok() && paranoid_file_checks
) {
216 for (it
->SeekToFirst(); it
->Valid(); it
->Next()) {
223 // Check for input iterator errors
224 if (!iter
->status().ok()) {
228 if (!s
.ok() || meta
->fd
.GetFileSize() == 0) {
229 env
->DeleteFile(fname
);
232 // Output to event logger and fire events.
233 EventHelpers::LogAndNotifyTableFileCreationFinished(
234 event_logger
, ioptions
.listeners
, dbname
, column_family_name
, fname
,
235 job_id
, meta
->fd
, tp
, reason
, s
);
240 } // namespace rocksdb