1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
11 #include "db/transaction_log_impl.h"
13 #include "db/write_batch_internal.h"
14 #include "util/file_reader_writer.h"
18 TransactionLogIteratorImpl::TransactionLogIteratorImpl(
19 const std::string
& dir
, const ImmutableDBOptions
* options
,
20 const TransactionLogIterator::ReadOptions
& read_options
,
21 const EnvOptions
& soptions
, const SequenceNumber seq
,
22 std::unique_ptr
<VectorLogPtr
> files
, VersionSet
const* const versions
,
23 const bool seq_per_batch
)
26 read_options_(read_options
),
28 starting_sequence_number_(seq
),
29 files_(std::move(files
)),
32 current_file_index_(0),
33 current_batch_seq_(0),
36 seq_per_batch_(seq_per_batch
) {
37 assert(files_
!= nullptr);
38 assert(versions_
!= nullptr);
40 reporter_
.env
= options_
->env
;
41 reporter_
.info_log
= options_
->info_log
.get();
42 SeekToStartSequence(); // Seek till starting sequence
45 Status
TransactionLogIteratorImpl::OpenLogFile(
46 const LogFile
* log_file
,
47 std::unique_ptr
<SequentialFileReader
>* file_reader
) {
48 Env
* env
= options_
->env
;
49 std::unique_ptr
<SequentialFile
> file
;
52 EnvOptions optimized_env_options
= env
->OptimizeForLogRead(soptions_
);
53 if (log_file
->Type() == kArchivedLogFile
) {
54 fname
= ArchivedLogFileName(dir_
, log_file
->LogNumber());
55 s
= env
->NewSequentialFile(fname
, &file
, optimized_env_options
);
57 fname
= LogFileName(dir_
, log_file
->LogNumber());
58 s
= env
->NewSequentialFile(fname
, &file
, optimized_env_options
);
60 // If cannot open file in DB directory.
61 // Try the archive dir, as it could have moved in the meanwhile.
62 fname
= ArchivedLogFileName(dir_
, log_file
->LogNumber());
63 s
= env
->NewSequentialFile(fname
, &file
, optimized_env_options
);
67 file_reader
->reset(new SequentialFileReader(std::move(file
), fname
));
72 BatchResult
TransactionLogIteratorImpl::GetBatch() {
73 assert(is_valid_
); // cannot call in a non valid state.
75 result
.sequence
= current_batch_seq_
;
76 result
.writeBatchPtr
= std::move(current_batch_
);
80 Status
TransactionLogIteratorImpl::status() { return current_status_
; }
82 bool TransactionLogIteratorImpl::Valid() { return started_
&& is_valid_
; }
84 bool TransactionLogIteratorImpl::RestrictedRead(
86 std::string
* scratch
) {
87 // Don't read if no more complete entries to read from logs
88 if (current_last_seq_
>= versions_
->LastSequence()) {
91 return current_log_reader_
->ReadRecord(record
, scratch
);
94 void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index
,
100 if (files_
->size() <= start_file_index
) {
104 OpenLogReader(files_
->at(static_cast<size_t>(start_file_index
)).get());
107 reporter_
.Info(current_status_
.ToString().c_str());
110 while (RestrictedRead(&record
, &scratch
)) {
111 if (record
.size() < WriteBatchInternal::kHeader
) {
112 reporter_
.Corruption(
113 record
.size(), Status::Corruption("very small log record"));
116 UpdateCurrentWriteBatch(record
);
117 if (current_last_seq_
>= starting_sequence_number_
) {
118 if (strict
&& current_batch_seq_
!= starting_sequence_number_
) {
119 current_status_
= Status::Corruption(
120 "Gap in sequence number. Could not "
121 "seek to required sequence number");
122 reporter_
.Info(current_status_
.ToString().c_str());
125 reporter_
.Info("Could seek required sequence number. Iterator will "
129 started_
= true; // set started_ as we could seek till starting sequence
136 // Could not find start sequence in first file. Normally this must be the
137 // only file. Otherwise log the error and let the iterator return next entry
138 // If strict is set, we want to seek exactly till the start sequence and it
139 // should have been present in the file we scanned above
141 current_status_
= Status::Corruption(
142 "Gap in sequence number. Could not "
143 "seek to required sequence number");
144 reporter_
.Info(current_status_
.ToString().c_str());
145 } else if (files_
->size() != 1) {
146 current_status_
= Status::Corruption(
147 "Start sequence was not found, "
148 "skipping to the next available");
149 reporter_
.Info(current_status_
.ToString().c_str());
150 // Let NextImpl find the next available entry. started_ remains false
151 // because we don't want to check for gaps while moving to start sequence
156 void TransactionLogIteratorImpl::Next() {
157 return NextImpl(false);
160 void TransactionLogIteratorImpl::NextImpl(bool internal
) {
164 if (!internal
&& !started_
) {
165 // Runs every time until we can seek to the start sequence
166 return SeekToStartSequence();
169 assert(current_log_reader_
);
170 if (current_log_reader_
->IsEOF()) {
171 current_log_reader_
->UnmarkEOF();
173 while (RestrictedRead(&record
, &scratch
)) {
174 if (record
.size() < WriteBatchInternal::kHeader
) {
175 reporter_
.Corruption(
176 record
.size(), Status::Corruption("very small log record"));
179 // started_ should be true if called by application
180 assert(internal
|| started_
);
181 // started_ should be false if called internally
182 assert(!internal
|| !started_
);
183 UpdateCurrentWriteBatch(record
);
184 if (internal
&& !started_
) {
191 // Open the next file
192 if (current_file_index_
< files_
->size() - 1) {
193 ++current_file_index_
;
194 Status s
= OpenLogReader(files_
->at(current_file_index_
).get());
202 if (current_last_seq_
== versions_
->LastSequence()) {
203 current_status_
= Status::OK();
205 current_status_
= Status::Corruption("NO MORE DATA LEFT");
212 bool TransactionLogIteratorImpl::IsBatchExpected(
213 const WriteBatch
* batch
, const SequenceNumber expected_seq
) {
215 SequenceNumber batchSeq
= WriteBatchInternal::Sequence(batch
);
216 if (batchSeq
!= expected_seq
) {
218 snprintf(buf
, sizeof(buf
),
219 "Discontinuity in log records. Got seq=%" PRIu64
220 ", Expected seq=%" PRIu64
", Last flushed seq=%" PRIu64
221 ".Log iterator will reseek the correct batch.",
222 batchSeq
, expected_seq
, versions_
->LastSequence());
229 void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice
& record
) {
230 std::unique_ptr
<WriteBatch
> batch(new WriteBatch());
231 WriteBatchInternal::SetContents(batch
.get(), record
);
233 SequenceNumber expected_seq
= current_last_seq_
+ 1;
234 // If the iterator has started, then confirm that we get continuous batches
235 if (started_
&& !IsBatchExpected(batch
.get(), expected_seq
)) {
236 // Seek to the batch having expected sequence number
237 if (expected_seq
< files_
->at(current_file_index_
)->StartSequence()) {
238 // Expected batch must lie in the previous log file
240 if (current_file_index_
!= 0) {
241 current_file_index_
--;
244 starting_sequence_number_
= expected_seq
;
245 // currentStatus_ will be set to Ok if reseek succeeds
246 // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
247 // that allows gaps in the WAL since it will still skip over the gap.
248 current_status_
= Status::NotFound("Gap in sequence numbers");
249 // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
250 // should be disabled
251 return SeekToStartSequence(current_file_index_
, !seq_per_batch_
);
254 struct BatchCounter
: public WriteBatch::Handler
{
255 SequenceNumber sequence_
;
256 BatchCounter(SequenceNumber sequence
) : sequence_(sequence
) {}
257 Status
MarkNoop(bool empty_batch
) override
{
263 Status
MarkEndPrepare(const Slice
&) override
{
267 Status
MarkCommit(const Slice
&) override
{
272 Status
PutCF(uint32_t /*cf*/, const Slice
& /*key*/,
273 const Slice
& /*val*/) override
{
276 Status
DeleteCF(uint32_t /*cf*/, const Slice
& /*key*/) override
{
279 Status
SingleDeleteCF(uint32_t /*cf*/, const Slice
& /*key*/) override
{
282 Status
MergeCF(uint32_t /*cf*/, const Slice
& /*key*/,
283 const Slice
& /*val*/) override
{
286 Status
MarkBeginPrepare(bool) override
{ return Status::OK(); }
287 Status
MarkRollback(const Slice
&) override
{ return Status::OK(); }
290 current_batch_seq_
= WriteBatchInternal::Sequence(batch
.get());
291 if (seq_per_batch_
) {
292 BatchCounter
counter(current_batch_seq_
);
293 batch
->Iterate(&counter
);
294 current_last_seq_
= counter
.sequence_
;
297 current_batch_seq_
+ WriteBatchInternal::Count(batch
.get()) - 1;
299 // currentBatchSeq_ can only change here
300 assert(current_last_seq_
<= versions_
->LastSequence());
302 current_batch_
= std::move(batch
);
304 current_status_
= Status::OK();
307 Status
TransactionLogIteratorImpl::OpenLogReader(const LogFile
* log_file
) {
308 std::unique_ptr
<SequentialFileReader
> file
;
309 Status s
= OpenLogFile(log_file
, &file
);
314 current_log_reader_
.reset(
315 new log::Reader(options_
->info_log
, std::move(file
), &reporter_
,
316 read_options_
.verify_checksums_
, log_file
->LogNumber()));
319 } // namespace rocksdb
320 #endif // ROCKSDB_LITE