]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/builder.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / builder.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/builder.h"
11
12 #include <algorithm>
13 #include <deque>
14 #include <vector>
15
16 #include "db/compaction_iterator.h"
17 #include "db/dbformat.h"
18 #include "db/event_helpers.h"
19 #include "db/internal_stats.h"
20 #include "db/merge_helper.h"
21 #include "db/table_cache.h"
22 #include "db/version_edit.h"
23 #include "monitoring/iostats_context_imp.h"
24 #include "monitoring/thread_status_util.h"
25 #include "rocksdb/db.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/iterator.h"
28 #include "rocksdb/options.h"
29 #include "rocksdb/table.h"
30 #include "table/block_based_table_builder.h"
31 #include "table/format.h"
32 #include "table/internal_iterator.h"
33 #include "util/file_reader_writer.h"
34 #include "util/filename.h"
35 #include "util/stop_watch.h"
36 #include "util/sync_point.h"
37
38 namespace rocksdb {
39
40 class TableFactory;
41
42 TableBuilder* NewTableBuilder(
43 const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
44 const InternalKeyComparator& internal_comparator,
45 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
46 int_tbl_prop_collector_factories,
47 uint32_t column_family_id, const std::string& column_family_name,
48 WritableFileWriter* file, const CompressionType compression_type,
49 const CompressionOptions& compression_opts, int level,
50 const std::string* compression_dict, const bool skip_filters,
51 const uint64_t creation_time, const uint64_t oldest_key_time) {
52 assert((column_family_id ==
53 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
54 column_family_name.empty());
55 return ioptions.table_factory->NewTableBuilder(
56 TableBuilderOptions(ioptions, moptions, internal_comparator,
57 int_tbl_prop_collector_factories, compression_type,
58 compression_opts, compression_dict, skip_filters,
59 column_family_name, level, creation_time,
60 oldest_key_time),
61 column_family_id, file);
62 }
63
64 Status BuildTable(
65 const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
66 const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
67 TableCache* table_cache, InternalIterator* iter,
68 std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta,
69 const InternalKeyComparator& internal_comparator,
70 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
71 int_tbl_prop_collector_factories,
72 uint32_t column_family_id, const std::string& column_family_name,
73 std::vector<SequenceNumber> snapshots,
74 SequenceNumber earliest_write_conflict_snapshot,
75 SnapshotChecker* snapshot_checker, const CompressionType compression,
76 const CompressionOptions& compression_opts, bool paranoid_file_checks,
77 InternalStats* internal_stats, TableFileCreationReason reason,
78 EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
79 TableProperties* table_properties, int level, const uint64_t creation_time,
80 const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) {
81 assert((column_family_id ==
82 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
83 column_family_name.empty());
84 // Reports the IOStats for flush for every following bytes.
85 const size_t kReportFlushIOStatsEvery = 1048576;
86 Status s;
87 meta->fd.file_size = 0;
88 iter->SeekToFirst();
89 std::unique_ptr<RangeDelAggregator> range_del_agg(
90 new RangeDelAggregator(internal_comparator, snapshots));
91 s = range_del_agg->AddTombstones(std::move(range_del_iter));
92 if (!s.ok()) {
93 // may be non-ok if a range tombstone key is unparsable
94 return s;
95 }
96
97 std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(),
98 meta->fd.GetPathId());
99 #ifndef ROCKSDB_LITE
100 EventHelpers::NotifyTableFileCreationStarted(
101 ioptions.listeners, dbname, column_family_name, fname, job_id, reason);
102 #endif // !ROCKSDB_LITE
103 TableProperties tp;
104
105 if (iter->Valid() || !range_del_agg->IsEmpty()) {
106 TableBuilder* builder;
107 unique_ptr<WritableFileWriter> file_writer;
108 {
109 unique_ptr<WritableFile> file;
110 #ifndef NDEBUG
111 bool use_direct_writes = env_options.use_direct_writes;
112 TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
113 #endif // !NDEBUG
114 s = NewWritableFile(env, fname, &file, env_options);
115 if (!s.ok()) {
116 EventHelpers::LogAndNotifyTableFileCreationFinished(
117 event_logger, ioptions.listeners, dbname, column_family_name, fname,
118 job_id, meta->fd, tp, reason, s);
119 return s;
120 }
121 file->SetIOPriority(io_priority);
122 file->SetWriteLifeTimeHint(write_hint);
123
124 file_writer.reset(new WritableFileWriter(
125 std::move(file), fname, env_options, ioptions.statistics));
126 builder = NewTableBuilder(
127 ioptions, mutable_cf_options, internal_comparator,
128 int_tbl_prop_collector_factories, column_family_id,
129 column_family_name, file_writer.get(), compression, compression_opts,
130 level, nullptr /* compression_dict */, false /* skip_filters */,
131 creation_time, oldest_key_time);
132 }
133
134 MergeHelper merge(env, internal_comparator.user_comparator(),
135 ioptions.merge_operator, nullptr, ioptions.info_log,
136 true /* internal key corruption is not ok */,
137 snapshots.empty() ? 0 : snapshots.back(),
138 snapshot_checker);
139
140 CompactionIterator c_iter(
141 iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
142 &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
143 ShouldReportDetailedTime(env, ioptions.statistics),
144 true /* internal key corruption is not ok */, range_del_agg.get());
145 c_iter.SeekToFirst();
146 for (; c_iter.Valid(); c_iter.Next()) {
147 const Slice& key = c_iter.key();
148 const Slice& value = c_iter.value();
149 builder->Add(key, value);
150 meta->UpdateBoundaries(key, c_iter.ikey().sequence);
151
152 // TODO(noetzli): Update stats after flush, too.
153 if (io_priority == Env::IO_HIGH &&
154 IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
155 ThreadStatusUtil::SetThreadOperationProperty(
156 ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
157 }
158 }
159
160 for (auto it = range_del_agg->NewIterator(); it->Valid(); it->Next()) {
161 auto tombstone = it->Tombstone();
162 auto kv = tombstone.Serialize();
163 builder->Add(kv.first.Encode(), kv.second);
164 meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
165 tombstone.seq_, internal_comparator);
166 }
167
168 // Finish and check for builder errors
169 tp = builder->GetTableProperties();
170 bool empty = builder->NumEntries() == 0 && tp.num_range_deletions == 0;
171 s = c_iter.status();
172 if (!s.ok() || empty) {
173 builder->Abandon();
174 } else {
175 s = builder->Finish();
176 }
177
178 if (s.ok() && !empty) {
179 uint64_t file_size = builder->FileSize();
180 meta->fd.file_size = file_size;
181 meta->marked_for_compaction = builder->NeedCompact();
182 assert(meta->fd.GetFileSize() > 0);
183 tp = builder->GetTableProperties(); // refresh now that builder is finished
184 if (table_properties) {
185 *table_properties = tp;
186 }
187 }
188 delete builder;
189
190 // Finish and check for file errors
191 if (s.ok() && !empty) {
192 StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
193 s = file_writer->Sync(ioptions.use_fsync);
194 }
195 if (s.ok() && !empty) {
196 s = file_writer->Close();
197 }
198
199 if (s.ok() && !empty) {
200 // Verify that the table is usable
201 // We set for_compaction to false and don't OptimizeForCompactionTableRead
202 // here because this is a special case after we finish the table building
203 // No matter whether use_direct_io_for_flush_and_compaction is true,
204 // we will regrad this verification as user reads since the goal is
205 // to cache it here for further user reads
206 std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
207 ReadOptions(), env_options, internal_comparator, *meta,
208 nullptr /* range_del_agg */,
209 mutable_cf_options.prefix_extractor.get(), nullptr,
210 (internal_stats == nullptr) ? nullptr
211 : internal_stats->GetFileReadHist(0),
212 false /* for_compaction */, nullptr /* arena */,
213 false /* skip_filter */, level));
214 s = it->status();
215 if (s.ok() && paranoid_file_checks) {
216 for (it->SeekToFirst(); it->Valid(); it->Next()) {
217 }
218 s = it->status();
219 }
220 }
221 }
222
223 // Check for input iterator errors
224 if (!iter->status().ok()) {
225 s = iter->status();
226 }
227
228 if (!s.ok() || meta->fd.GetFileSize() == 0) {
229 env->DeleteFile(fname);
230 }
231
232 // Output to event logger and fire events.
233 EventHelpers::LogAndNotifyTableFileCreationFinished(
234 event_logger, ioptions.listeners, dbname, column_family_name, fname,
235 job_id, meta->fd, tp, reason, s);
236
237 return s;
238 }
239
240 } // namespace rocksdb