]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/transaction_log_impl.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / rocksdb / db / transaction_log_impl.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "db/transaction_log_impl.h"
9
10 #include <cinttypes>
11
12 #include "db/write_batch_internal.h"
13 #include "file/sequence_file_reader.h"
14 #include "util/defer.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 TransactionLogIteratorImpl::TransactionLogIteratorImpl(
19 const std::string& dir, const ImmutableDBOptions* options,
20 const TransactionLogIterator::ReadOptions& read_options,
21 const EnvOptions& soptions, const SequenceNumber seq,
22 std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
23 const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
24 : dir_(dir),
25 options_(options),
26 read_options_(read_options),
27 soptions_(soptions),
28 starting_sequence_number_(seq),
29 files_(std::move(files)),
30 versions_(versions),
31 seq_per_batch_(seq_per_batch),
32 io_tracer_(io_tracer),
33 started_(false),
34 is_valid_(false),
35 current_file_index_(0),
36 current_batch_seq_(0),
37 current_last_seq_(0) {
38 assert(files_ != nullptr);
39 assert(versions_ != nullptr);
40 assert(!seq_per_batch_);
41 current_status_.PermitUncheckedError(); // Clear on start
42 reporter_.env = options_->env;
43 reporter_.info_log = options_->info_log.get();
44 SeekToStartSequence(); // Seek till starting sequence
45 }
46
47 Status TransactionLogIteratorImpl::OpenLogFile(
48 const LogFile* log_file,
49 std::unique_ptr<SequentialFileReader>* file_reader) {
50 FileSystemPtr fs(options_->fs, io_tracer_);
51 std::unique_ptr<FSSequentialFile> file;
52 std::string fname;
53 Status s;
54 EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
55 if (log_file->Type() == kArchivedLogFile) {
56 fname = ArchivedLogFileName(dir_, log_file->LogNumber());
57 s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
58 } else {
59 fname = LogFileName(dir_, log_file->LogNumber());
60 s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
61 if (!s.ok()) {
62 // If cannot open file in DB directory.
63 // Try the archive dir, as it could have moved in the meanwhile.
64 fname = ArchivedLogFileName(dir_, log_file->LogNumber());
65 s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
66 }
67 }
68 if (s.ok()) {
69 file_reader->reset(new SequentialFileReader(std::move(file), fname,
70 io_tracer_, options_->listeners,
71 options_->rate_limiter.get()));
72 }
73 return s;
74 }
75
76 BatchResult TransactionLogIteratorImpl::GetBatch() {
77 assert(is_valid_); // cannot call in a non valid state.
78 BatchResult result;
79 result.sequence = current_batch_seq_;
80 result.writeBatchPtr = std::move(current_batch_);
81 return result;
82 }
83
84 Status TransactionLogIteratorImpl::status() { return current_status_; }
85
86 bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
87
88 bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
89 // Don't read if no more complete entries to read from logs
90 if (current_last_seq_ >= versions_->LastSequence()) {
91 return false;
92 }
93 return current_log_reader_->ReadRecord(record, &scratch_);
94 }
95
96 void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
97 bool strict) {
98 Slice record;
99 started_ = false;
100 is_valid_ = false;
101 // Check invariant of TransactionLogIterator when SeekToStartSequence()
102 // succeeds.
103 const Defer defer([this]() {
104 if (is_valid_) {
105 assert(current_status_.ok());
106 if (starting_sequence_number_ > current_batch_seq_) {
107 assert(current_batch_seq_ < current_last_seq_);
108 assert(current_last_seq_ >= starting_sequence_number_);
109 }
110 }
111 });
112 if (files_->size() <= start_file_index) {
113 return;
114 } else if (!current_status_.ok()) {
115 return;
116 }
117 Status s =
118 OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
119 if (!s.ok()) {
120 current_status_ = s;
121 reporter_.Info(current_status_.ToString().c_str());
122 return;
123 }
124 while (RestrictedRead(&record)) {
125 if (record.size() < WriteBatchInternal::kHeader) {
126 reporter_.Corruption(record.size(),
127 Status::Corruption("very small log record"));
128 continue;
129 }
130 UpdateCurrentWriteBatch(record);
131 if (current_last_seq_ >= starting_sequence_number_) {
132 if (strict && current_batch_seq_ != starting_sequence_number_) {
133 current_status_ = Status::Corruption(
134 "Gap in sequence number. Could not "
135 "seek to required sequence number");
136 reporter_.Info(current_status_.ToString().c_str());
137 return;
138 } else if (strict) {
139 reporter_.Info(
140 "Could seek required sequence number. Iterator will "
141 "continue.");
142 }
143 is_valid_ = true;
144 started_ = true; // set started_ as we could seek till starting sequence
145 return;
146 } else {
147 is_valid_ = false;
148 }
149 }
150
151 // Could not find start sequence in first file. Normally this must be the
152 // only file. Otherwise log the error and let the iterator return next entry
153 // If strict is set, we want to seek exactly till the start sequence and it
154 // should have been present in the file we scanned above
155 if (strict) {
156 current_status_ = Status::Corruption(
157 "Gap in sequence number. Could not "
158 "seek to required sequence number");
159 reporter_.Info(current_status_.ToString().c_str());
160 } else if (files_->size() != 1) {
161 current_status_ = Status::Corruption(
162 "Start sequence was not found, "
163 "skipping to the next available");
164 reporter_.Info(current_status_.ToString().c_str());
165 // Let NextImpl find the next available entry. started_ remains false
166 // because we don't want to check for gaps while moving to start sequence
167 NextImpl(true);
168 }
169 }
170
171 void TransactionLogIteratorImpl::Next() {
172 if (!current_status_.ok()) {
173 return;
174 }
175 return NextImpl(false);
176 }
177
178 void TransactionLogIteratorImpl::NextImpl(bool internal) {
179 Slice record;
180 is_valid_ = false;
181 if (!internal && !started_) {
182 // Runs every time until we can seek to the start sequence
183 SeekToStartSequence();
184 }
185 while (true) {
186 assert(current_log_reader_);
187 if (current_log_reader_->IsEOF()) {
188 current_log_reader_->UnmarkEOF();
189 }
190 while (RestrictedRead(&record)) {
191 if (record.size() < WriteBatchInternal::kHeader) {
192 reporter_.Corruption(record.size(),
193 Status::Corruption("very small log record"));
194 continue;
195 } else {
196 // started_ should be true if called by application
197 assert(internal || started_);
198 // started_ should be false if called internally
199 assert(!internal || !started_);
200 UpdateCurrentWriteBatch(record);
201 if (internal && !started_) {
202 started_ = true;
203 }
204 return;
205 }
206 }
207
208 // Open the next file
209 if (current_file_index_ < files_->size() - 1) {
210 ++current_file_index_;
211 Status s = OpenLogReader(files_->at(current_file_index_).get());
212 if (!s.ok()) {
213 is_valid_ = false;
214 current_status_ = s;
215 return;
216 }
217 } else {
218 is_valid_ = false;
219 if (current_last_seq_ == versions_->LastSequence()) {
220 current_status_ = Status::OK();
221 } else {
222 const char* msg = "Create a new iterator to fetch the new tail.";
223 current_status_ = Status::TryAgain(msg);
224 }
225 return;
226 }
227 }
228 }
229
230 bool TransactionLogIteratorImpl::IsBatchExpected(
231 const WriteBatch* batch, const SequenceNumber expected_seq) {
232 assert(batch);
233 SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
234 if (batchSeq != expected_seq) {
235 char buf[200];
236 snprintf(buf, sizeof(buf),
237 "Discontinuity in log records. Got seq=%" PRIu64
238 ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
239 ".Log iterator will reseek the correct batch.",
240 batchSeq, expected_seq, versions_->LastSequence());
241 reporter_.Info(buf);
242 return false;
243 }
244 return true;
245 }
246
247 void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
248 std::unique_ptr<WriteBatch> batch(new WriteBatch());
249 Status s = WriteBatchInternal::SetContents(batch.get(), record);
250 s.PermitUncheckedError(); // TODO: What should we do with this error?
251
252 SequenceNumber expected_seq = current_last_seq_ + 1;
253 // If the iterator has started, then confirm that we get continuous batches
254 if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
255 // Seek to the batch having expected sequence number
256 if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
257 // Expected batch must lie in the previous log file
258 // Avoid underflow.
259 if (current_file_index_ != 0) {
260 current_file_index_--;
261 }
262 }
263 starting_sequence_number_ = expected_seq;
264 // currentStatus_ will be set to Ok if reseek succeeds
265 // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
266 // that allows gaps in the WAL since it will still skip over the gap.
267 current_status_ = Status::NotFound("Gap in sequence numbers");
268 // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
269 // should be disabled
270 return SeekToStartSequence(current_file_index_, !seq_per_batch_);
271 }
272
273 current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
274 assert(!seq_per_batch_);
275 current_last_seq_ =
276 current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
277 // currentBatchSeq_ can only change here
278 assert(current_last_seq_ <= versions_->LastSequence());
279
280 current_batch_ = std::move(batch);
281 is_valid_ = true;
282 current_status_ = Status::OK();
283 }
284
285 Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
286 std::unique_ptr<SequentialFileReader> file;
287 Status s = OpenLogFile(log_file, &file);
288 if (!s.ok()) {
289 return s;
290 }
291 assert(file);
292 current_log_reader_.reset(
293 new log::Reader(options_->info_log, std::move(file), &reporter_,
294 read_options_.verify_checksums_, log_file->LogNumber()));
295 return Status::OK();
296 }
297 } // namespace ROCKSDB_NAMESPACE
298 #endif // ROCKSDB_LITE