1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "db/transaction_log_impl.h"
12 #include "db/write_batch_internal.h"
13 #include "file/sequence_file_reader.h"
14 #include "util/defer.h"
16 namespace ROCKSDB_NAMESPACE
{
18 TransactionLogIteratorImpl::TransactionLogIteratorImpl(
19 const std::string
& dir
, const ImmutableDBOptions
* options
,
20 const TransactionLogIterator::ReadOptions
& read_options
,
21 const EnvOptions
& soptions
, const SequenceNumber seq
,
22 std::unique_ptr
<VectorLogPtr
> files
, VersionSet
const* const versions
,
23 const bool seq_per_batch
, const std::shared_ptr
<IOTracer
>& io_tracer
)
26 read_options_(read_options
),
28 starting_sequence_number_(seq
),
29 files_(std::move(files
)),
31 seq_per_batch_(seq_per_batch
),
32 io_tracer_(io_tracer
),
35 current_file_index_(0),
36 current_batch_seq_(0),
37 current_last_seq_(0) {
38 assert(files_
!= nullptr);
39 assert(versions_
!= nullptr);
40 assert(!seq_per_batch_
);
41 current_status_
.PermitUncheckedError(); // Clear on start
42 reporter_
.env
= options_
->env
;
43 reporter_
.info_log
= options_
->info_log
.get();
44 SeekToStartSequence(); // Seek till starting sequence
47 Status
TransactionLogIteratorImpl::OpenLogFile(
48 const LogFile
* log_file
,
49 std::unique_ptr
<SequentialFileReader
>* file_reader
) {
50 FileSystemPtr
fs(options_
->fs
, io_tracer_
);
51 std::unique_ptr
<FSSequentialFile
> file
;
54 EnvOptions optimized_env_options
= fs
->OptimizeForLogRead(soptions_
);
55 if (log_file
->Type() == kArchivedLogFile
) {
56 fname
= ArchivedLogFileName(dir_
, log_file
->LogNumber());
57 s
= fs
->NewSequentialFile(fname
, optimized_env_options
, &file
, nullptr);
59 fname
= LogFileName(dir_
, log_file
->LogNumber());
60 s
= fs
->NewSequentialFile(fname
, optimized_env_options
, &file
, nullptr);
62 // If cannot open file in DB directory.
63 // Try the archive dir, as it could have moved in the meanwhile.
64 fname
= ArchivedLogFileName(dir_
, log_file
->LogNumber());
65 s
= fs
->NewSequentialFile(fname
, optimized_env_options
, &file
, nullptr);
69 file_reader
->reset(new SequentialFileReader(std::move(file
), fname
,
70 io_tracer_
, options_
->listeners
,
71 options_
->rate_limiter
.get()));
76 BatchResult
TransactionLogIteratorImpl::GetBatch() {
77 assert(is_valid_
); // cannot call in a non valid state.
79 result
.sequence
= current_batch_seq_
;
80 result
.writeBatchPtr
= std::move(current_batch_
);
84 Status
TransactionLogIteratorImpl::status() { return current_status_
; }
86 bool TransactionLogIteratorImpl::Valid() { return started_
&& is_valid_
; }
88 bool TransactionLogIteratorImpl::RestrictedRead(Slice
* record
) {
89 // Don't read if no more complete entries to read from logs
90 if (current_last_seq_
>= versions_
->LastSequence()) {
93 return current_log_reader_
->ReadRecord(record
, &scratch_
);
96 void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index
,
101 // Check invariant of TransactionLogIterator when SeekToStartSequence()
103 const Defer
defer([this]() {
105 assert(current_status_
.ok());
106 if (starting_sequence_number_
> current_batch_seq_
) {
107 assert(current_batch_seq_
< current_last_seq_
);
108 assert(current_last_seq_
>= starting_sequence_number_
);
112 if (files_
->size() <= start_file_index
) {
114 } else if (!current_status_
.ok()) {
118 OpenLogReader(files_
->at(static_cast<size_t>(start_file_index
)).get());
121 reporter_
.Info(current_status_
.ToString().c_str());
124 while (RestrictedRead(&record
)) {
125 if (record
.size() < WriteBatchInternal::kHeader
) {
126 reporter_
.Corruption(record
.size(),
127 Status::Corruption("very small log record"));
130 UpdateCurrentWriteBatch(record
);
131 if (current_last_seq_
>= starting_sequence_number_
) {
132 if (strict
&& current_batch_seq_
!= starting_sequence_number_
) {
133 current_status_
= Status::Corruption(
134 "Gap in sequence number. Could not "
135 "seek to required sequence number");
136 reporter_
.Info(current_status_
.ToString().c_str());
140 "Could seek required sequence number. Iterator will "
144 started_
= true; // set started_ as we could seek till starting sequence
151 // Could not find start sequence in first file. Normally this must be the
152 // only file. Otherwise log the error and let the iterator return next entry
153 // If strict is set, we want to seek exactly till the start sequence and it
154 // should have been present in the file we scanned above
156 current_status_
= Status::Corruption(
157 "Gap in sequence number. Could not "
158 "seek to required sequence number");
159 reporter_
.Info(current_status_
.ToString().c_str());
160 } else if (files_
->size() != 1) {
161 current_status_
= Status::Corruption(
162 "Start sequence was not found, "
163 "skipping to the next available");
164 reporter_
.Info(current_status_
.ToString().c_str());
165 // Let NextImpl find the next available entry. started_ remains false
166 // because we don't want to check for gaps while moving to start sequence
171 void TransactionLogIteratorImpl::Next() {
172 if (!current_status_
.ok()) {
175 return NextImpl(false);
178 void TransactionLogIteratorImpl::NextImpl(bool internal
) {
181 if (!internal
&& !started_
) {
182 // Runs every time until we can seek to the start sequence
183 SeekToStartSequence();
186 assert(current_log_reader_
);
187 if (current_log_reader_
->IsEOF()) {
188 current_log_reader_
->UnmarkEOF();
190 while (RestrictedRead(&record
)) {
191 if (record
.size() < WriteBatchInternal::kHeader
) {
192 reporter_
.Corruption(record
.size(),
193 Status::Corruption("very small log record"));
196 // started_ should be true if called by application
197 assert(internal
|| started_
);
198 // started_ should be false if called internally
199 assert(!internal
|| !started_
);
200 UpdateCurrentWriteBatch(record
);
201 if (internal
&& !started_
) {
208 // Open the next file
209 if (current_file_index_
< files_
->size() - 1) {
210 ++current_file_index_
;
211 Status s
= OpenLogReader(files_
->at(current_file_index_
).get());
219 if (current_last_seq_
== versions_
->LastSequence()) {
220 current_status_
= Status::OK();
222 const char* msg
= "Create a new iterator to fetch the new tail.";
223 current_status_
= Status::TryAgain(msg
);
230 bool TransactionLogIteratorImpl::IsBatchExpected(
231 const WriteBatch
* batch
, const SequenceNumber expected_seq
) {
233 SequenceNumber batchSeq
= WriteBatchInternal::Sequence(batch
);
234 if (batchSeq
!= expected_seq
) {
236 snprintf(buf
, sizeof(buf
),
237 "Discontinuity in log records. Got seq=%" PRIu64
238 ", Expected seq=%" PRIu64
", Last flushed seq=%" PRIu64
239 ".Log iterator will reseek the correct batch.",
240 batchSeq
, expected_seq
, versions_
->LastSequence());
247 void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice
& record
) {
248 std::unique_ptr
<WriteBatch
> batch(new WriteBatch());
249 Status s
= WriteBatchInternal::SetContents(batch
.get(), record
);
250 s
.PermitUncheckedError(); // TODO: What should we do with this error?
252 SequenceNumber expected_seq
= current_last_seq_
+ 1;
253 // If the iterator has started, then confirm that we get continuous batches
254 if (started_
&& !IsBatchExpected(batch
.get(), expected_seq
)) {
255 // Seek to the batch having expected sequence number
256 if (expected_seq
< files_
->at(current_file_index_
)->StartSequence()) {
257 // Expected batch must lie in the previous log file
259 if (current_file_index_
!= 0) {
260 current_file_index_
--;
263 starting_sequence_number_
= expected_seq
;
264 // currentStatus_ will be set to Ok if reseek succeeds
265 // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
266 // that allows gaps in the WAL since it will still skip over the gap.
267 current_status_
= Status::NotFound("Gap in sequence numbers");
268 // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
269 // should be disabled
270 return SeekToStartSequence(current_file_index_
, !seq_per_batch_
);
273 current_batch_seq_
= WriteBatchInternal::Sequence(batch
.get());
274 assert(!seq_per_batch_
);
276 current_batch_seq_
+ WriteBatchInternal::Count(batch
.get()) - 1;
277 // currentBatchSeq_ can only change here
278 assert(current_last_seq_
<= versions_
->LastSequence());
280 current_batch_
= std::move(batch
);
282 current_status_
= Status::OK();
285 Status
TransactionLogIteratorImpl::OpenLogReader(const LogFile
* log_file
) {
286 std::unique_ptr
<SequentialFileReader
> file
;
287 Status s
= OpenLogFile(log_file
, &file
);
292 current_log_reader_
.reset(
293 new log::Reader(options_
->info_log
, std::move(file
), &reporter_
,
294 read_options_
.verify_checksums_
, log_file
->LogNumber()));
297 } // namespace ROCKSDB_NAMESPACE
298 #endif // ROCKSDB_LITE