]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/transaction_log_impl.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / transaction_log_impl.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10
11 #include "db/transaction_log_impl.h"
12 #include <inttypes.h>
13 #include "db/write_batch_internal.h"
14 #include "util/file_reader_writer.h"
15
16 namespace rocksdb {
17
18 TransactionLogIteratorImpl::TransactionLogIteratorImpl(
19 const std::string& dir, const ImmutableDBOptions* options,
20 const TransactionLogIterator::ReadOptions& read_options,
21 const EnvOptions& soptions, const SequenceNumber seq,
22 std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
23 const bool seq_per_batch)
24 : dir_(dir),
25 options_(options),
26 read_options_(read_options),
27 soptions_(soptions),
28 starting_sequence_number_(seq),
29 files_(std::move(files)),
30 started_(false),
31 is_valid_(false),
32 current_file_index_(0),
33 current_batch_seq_(0),
34 current_last_seq_(0),
35 versions_(versions),
36 seq_per_batch_(seq_per_batch) {
37 assert(files_ != nullptr);
38 assert(versions_ != nullptr);
39
40 reporter_.env = options_->env;
41 reporter_.info_log = options_->info_log.get();
42 SeekToStartSequence(); // Seek till starting sequence
43 }
44
45 Status TransactionLogIteratorImpl::OpenLogFile(
46 const LogFile* log_file,
47 std::unique_ptr<SequentialFileReader>* file_reader) {
48 Env* env = options_->env;
49 std::unique_ptr<SequentialFile> file;
50 std::string fname;
51 Status s;
52 EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_);
53 if (log_file->Type() == kArchivedLogFile) {
54 fname = ArchivedLogFileName(dir_, log_file->LogNumber());
55 s = env->NewSequentialFile(fname, &file, optimized_env_options);
56 } else {
57 fname = LogFileName(dir_, log_file->LogNumber());
58 s = env->NewSequentialFile(fname, &file, optimized_env_options);
59 if (!s.ok()) {
60 // If cannot open file in DB directory.
61 // Try the archive dir, as it could have moved in the meanwhile.
62 fname = ArchivedLogFileName(dir_, log_file->LogNumber());
63 s = env->NewSequentialFile(fname, &file, optimized_env_options);
64 }
65 }
66 if (s.ok()) {
67 file_reader->reset(new SequentialFileReader(std::move(file), fname));
68 }
69 return s;
70 }
71
72 BatchResult TransactionLogIteratorImpl::GetBatch() {
73 assert(is_valid_); // cannot call in a non valid state.
74 BatchResult result;
75 result.sequence = current_batch_seq_;
76 result.writeBatchPtr = std::move(current_batch_);
77 return result;
78 }
79
80 Status TransactionLogIteratorImpl::status() { return current_status_; }
81
82 bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
83
84 bool TransactionLogIteratorImpl::RestrictedRead(
85 Slice* record,
86 std::string* scratch) {
87 // Don't read if no more complete entries to read from logs
88 if (current_last_seq_ >= versions_->LastSequence()) {
89 return false;
90 }
91 return current_log_reader_->ReadRecord(record, scratch);
92 }
93
94 void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
95 bool strict) {
96 std::string scratch;
97 Slice record;
98 started_ = false;
99 is_valid_ = false;
100 if (files_->size() <= start_file_index) {
101 return;
102 }
103 Status s =
104 OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
105 if (!s.ok()) {
106 current_status_ = s;
107 reporter_.Info(current_status_.ToString().c_str());
108 return;
109 }
110 while (RestrictedRead(&record, &scratch)) {
111 if (record.size() < WriteBatchInternal::kHeader) {
112 reporter_.Corruption(
113 record.size(), Status::Corruption("very small log record"));
114 continue;
115 }
116 UpdateCurrentWriteBatch(record);
117 if (current_last_seq_ >= starting_sequence_number_) {
118 if (strict && current_batch_seq_ != starting_sequence_number_) {
119 current_status_ = Status::Corruption(
120 "Gap in sequence number. Could not "
121 "seek to required sequence number");
122 reporter_.Info(current_status_.ToString().c_str());
123 return;
124 } else if (strict) {
125 reporter_.Info("Could seek required sequence number. Iterator will "
126 "continue.");
127 }
128 is_valid_ = true;
129 started_ = true; // set started_ as we could seek till starting sequence
130 return;
131 } else {
132 is_valid_ = false;
133 }
134 }
135
136 // Could not find start sequence in first file. Normally this must be the
137 // only file. Otherwise log the error and let the iterator return next entry
138 // If strict is set, we want to seek exactly till the start sequence and it
139 // should have been present in the file we scanned above
140 if (strict) {
141 current_status_ = Status::Corruption(
142 "Gap in sequence number. Could not "
143 "seek to required sequence number");
144 reporter_.Info(current_status_.ToString().c_str());
145 } else if (files_->size() != 1) {
146 current_status_ = Status::Corruption(
147 "Start sequence was not found, "
148 "skipping to the next available");
149 reporter_.Info(current_status_.ToString().c_str());
150 // Let NextImpl find the next available entry. started_ remains false
151 // because we don't want to check for gaps while moving to start sequence
152 NextImpl(true);
153 }
154 }
155
156 void TransactionLogIteratorImpl::Next() {
157 return NextImpl(false);
158 }
159
160 void TransactionLogIteratorImpl::NextImpl(bool internal) {
161 std::string scratch;
162 Slice record;
163 is_valid_ = false;
164 if (!internal && !started_) {
165 // Runs every time until we can seek to the start sequence
166 return SeekToStartSequence();
167 }
168 while(true) {
169 assert(current_log_reader_);
170 if (current_log_reader_->IsEOF()) {
171 current_log_reader_->UnmarkEOF();
172 }
173 while (RestrictedRead(&record, &scratch)) {
174 if (record.size() < WriteBatchInternal::kHeader) {
175 reporter_.Corruption(
176 record.size(), Status::Corruption("very small log record"));
177 continue;
178 } else {
179 // started_ should be true if called by application
180 assert(internal || started_);
181 // started_ should be false if called internally
182 assert(!internal || !started_);
183 UpdateCurrentWriteBatch(record);
184 if (internal && !started_) {
185 started_ = true;
186 }
187 return;
188 }
189 }
190
191 // Open the next file
192 if (current_file_index_ < files_->size() - 1) {
193 ++current_file_index_;
194 Status s = OpenLogReader(files_->at(current_file_index_).get());
195 if (!s.ok()) {
196 is_valid_ = false;
197 current_status_ = s;
198 return;
199 }
200 } else {
201 is_valid_ = false;
202 if (current_last_seq_ == versions_->LastSequence()) {
203 current_status_ = Status::OK();
204 } else {
205 current_status_ = Status::Corruption("NO MORE DATA LEFT");
206 }
207 return;
208 }
209 }
210 }
211
212 bool TransactionLogIteratorImpl::IsBatchExpected(
213 const WriteBatch* batch, const SequenceNumber expected_seq) {
214 assert(batch);
215 SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
216 if (batchSeq != expected_seq) {
217 char buf[200];
218 snprintf(buf, sizeof(buf),
219 "Discontinuity in log records. Got seq=%" PRIu64
220 ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
221 ".Log iterator will reseek the correct batch.",
222 batchSeq, expected_seq, versions_->LastSequence());
223 reporter_.Info(buf);
224 return false;
225 }
226 return true;
227 }
228
229 void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
230 std::unique_ptr<WriteBatch> batch(new WriteBatch());
231 WriteBatchInternal::SetContents(batch.get(), record);
232
233 SequenceNumber expected_seq = current_last_seq_ + 1;
234 // If the iterator has started, then confirm that we get continuous batches
235 if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
236 // Seek to the batch having expected sequence number
237 if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
238 // Expected batch must lie in the previous log file
239 // Avoid underflow.
240 if (current_file_index_ != 0) {
241 current_file_index_--;
242 }
243 }
244 starting_sequence_number_ = expected_seq;
245 // currentStatus_ will be set to Ok if reseek succeeds
246 // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
247 // that allows gaps in the WAL since it will still skip over the gap.
248 current_status_ = Status::NotFound("Gap in sequence numbers");
249 // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
250 // should be disabled
251 return SeekToStartSequence(current_file_index_, !seq_per_batch_);
252 }
253
254 struct BatchCounter : public WriteBatch::Handler {
255 SequenceNumber sequence_;
256 BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
257 Status MarkNoop(bool empty_batch) override {
258 if (!empty_batch) {
259 sequence_++;
260 }
261 return Status::OK();
262 }
263 Status MarkEndPrepare(const Slice&) override {
264 sequence_++;
265 return Status::OK();
266 }
267 Status MarkCommit(const Slice&) override {
268 sequence_++;
269 return Status::OK();
270 }
271
272 Status PutCF(uint32_t /*cf*/, const Slice& /*key*/,
273 const Slice& /*val*/) override {
274 return Status::OK();
275 }
276 Status DeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
277 return Status::OK();
278 }
279 Status SingleDeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
280 return Status::OK();
281 }
282 Status MergeCF(uint32_t /*cf*/, const Slice& /*key*/,
283 const Slice& /*val*/) override {
284 return Status::OK();
285 }
286 Status MarkBeginPrepare(bool) override { return Status::OK(); }
287 Status MarkRollback(const Slice&) override { return Status::OK(); }
288 };
289
290 current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
291 if (seq_per_batch_) {
292 BatchCounter counter(current_batch_seq_);
293 batch->Iterate(&counter);
294 current_last_seq_ = counter.sequence_;
295 } else {
296 current_last_seq_ =
297 current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
298 }
299 // currentBatchSeq_ can only change here
300 assert(current_last_seq_ <= versions_->LastSequence());
301
302 current_batch_ = std::move(batch);
303 is_valid_ = true;
304 current_status_ = Status::OK();
305 }
306
307 Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
308 std::unique_ptr<SequentialFileReader> file;
309 Status s = OpenLogFile(log_file, &file);
310 if (!s.ok()) {
311 return s;
312 }
313 assert(file);
314 current_log_reader_.reset(
315 new log::Reader(options_->info_log, std::move(file), &reporter_,
316 read_options_.verify_checksums_, log_file->LogNumber()));
317 return Status::OK();
318 }
319 } // namespace rocksdb
320 #endif // ROCKSDB_LITE