class SequentialFileReader;
class Logger;
-using std::unique_ptr;
namespace log {
// If "checksum" is true, verify checksums if available.
Reader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in
- unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
+ std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
bool checksum, uint64_t log_num);
- ~Reader();
+ virtual ~Reader();
// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
- bool ReadRecord(Slice* record, std::string* scratch,
- WALRecoveryMode wal_recovery_mode =
- WALRecoveryMode::kTolerateCorruptedTailRecords);
+ virtual bool ReadRecord(Slice* record, std::string* scratch,
+ WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords);
// Returns the physical offset of the last record returned by ReadRecord.
//
return eof_;
}
+ // returns true if the reader has encountered read error.
+ bool hasReadError() const { return read_error_; }
+
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
- void UnmarkEOF();
+ virtual void UnmarkEOF();
SequentialFileReader* file() { return file_.get(); }
- private:
+ Reporter* GetReporter() const { return reporter_; }
+
+ protected:
std::shared_ptr<Logger> info_log_;
- const unique_ptr<SequentialFileReader> file_;
+ const std::unique_ptr<SequentialFileReader> file_;
Reporter* const reporter_;
bool const checksum_;
char* const backing_store_;
+
+ // Internal state variables used for reading records
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file
// Read some more
bool ReadMore(size_t* drop_size, int *error);
+ void UnmarkEOFInternal();
+
// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
+ private:
// No copying allowed
Reader(const Reader&);
void operator=(const Reader&);
};
+class FragmentBufferedReader : public Reader {
+ public:
+ FragmentBufferedReader(std::shared_ptr<Logger> info_log,
+ // @lint-ignore TXT2 T25377293 Grandfathered in
+ std::unique_ptr<SequentialFileReader>&& _file,
+ Reporter* reporter, bool checksum, uint64_t log_num)
+ : Reader(info_log, std::move(_file), reporter, checksum, log_num),
+ fragments_(),
+ in_fragmented_record_(false) {}
+ ~FragmentBufferedReader() override {}
+ bool ReadRecord(Slice* record, std::string* scratch,
+ WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords) override;
+ void UnmarkEOF() override;
+
+ private:
+ std::string fragments_;
+ bool in_fragmented_record_;
+
+ bool TryReadFragment(Slice* result, size_t* drop_size,
+ unsigned int* fragment_type_or_err);
+
+ bool TryReadMore(size_t* drop_size, int* error);
+
+ // No copy allowed
+ FragmentBufferedReader(const FragmentBufferedReader&);
+ void operator=(const FragmentBufferedReader&);
+};
+
} // namespace log
} // namespace rocksdb