]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/blob/blob_log_sequential_reader.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / blob / blob_log_sequential_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6
7 #include "db/blob/blob_log_sequential_reader.h"
8
9 #include "file/random_access_file_reader.h"
10 #include "monitoring/statistics.h"
11 #include "util/stop_watch.h"
12
13 namespace ROCKSDB_NAMESPACE {
14
15 BlobLogSequentialReader::BlobLogSequentialReader(
16 std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
17 Statistics* statistics)
18 : file_(std::move(file_reader)),
19 env_(env),
20 statistics_(statistics),
21 next_byte_(0) {}
22
23 BlobLogSequentialReader::~BlobLogSequentialReader() = default;
24
25 Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice,
26 char* buf) {
27 assert(slice);
28 assert(file_);
29
30 StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
31 Status s = file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size),
32 slice, buf, nullptr);
33 next_byte_ += size;
34 if (!s.ok()) {
35 return s;
36 }
37 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size());
38 if (slice->size() != size) {
39 return Status::Corruption("EOF reached while reading record");
40 }
41 return s;
42 }
43
44 Status BlobLogSequentialReader::ReadHeader(BlobLogHeader* header) {
45 assert(header);
46 assert(next_byte_ == 0);
47
48 static_assert(BlobLogHeader::kSize <= sizeof(header_buf_),
49 "Buffer is smaller than BlobLogHeader::kSize");
50
51 Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_);
52 if (!s.ok()) {
53 return s;
54 }
55
56 if (buffer_.size() != BlobLogHeader::kSize) {
57 return Status::Corruption("EOF reached before file header");
58 }
59
60 return header->DecodeFrom(buffer_);
61 }
62
63 Status BlobLogSequentialReader::ReadRecord(BlobLogRecord* record,
64 ReadLevel level,
65 uint64_t* blob_offset) {
66 assert(record);
67 static_assert(BlobLogRecord::kHeaderSize <= sizeof(header_buf_),
68 "Buffer is smaller than BlobLogRecord::kHeaderSize");
69
70 Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_);
71 if (!s.ok()) {
72 return s;
73 }
74 if (buffer_.size() != BlobLogRecord::kHeaderSize) {
75 return Status::Corruption("EOF reached before record header");
76 }
77
78 s = record->DecodeHeaderFrom(buffer_);
79 if (!s.ok()) {
80 return s;
81 }
82
83 uint64_t kb_size = record->key_size + record->value_size;
84 if (blob_offset != nullptr) {
85 *blob_offset = next_byte_ + record->key_size;
86 }
87
88 switch (level) {
89 case kReadHeader:
90 next_byte_ += kb_size;
91 break;
92
93 case kReadHeaderKey:
94 record->key_buf.reset(new char[record->key_size]);
95 s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
96 next_byte_ += record->value_size;
97 break;
98
99 case kReadHeaderKeyBlob:
100 record->key_buf.reset(new char[record->key_size]);
101 s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
102 if (s.ok()) {
103 record->value_buf.reset(new char[record->value_size]);
104 s = ReadSlice(record->value_size, &record->value,
105 record->value_buf.get());
106 }
107 if (s.ok()) {
108 s = record->CheckBlobCRC();
109 }
110 break;
111 }
112 return s;
113 }
114
115 Status BlobLogSequentialReader::ReadFooter(BlobLogFooter* footer) {
116 assert(footer);
117 static_assert(BlobLogFooter::kSize <= sizeof(header_buf_),
118 "Buffer is smaller than BlobLogFooter::kSize");
119
120 Status s = ReadSlice(BlobLogFooter::kSize, &buffer_, header_buf_);
121 if (!s.ok()) {
122 return s;
123 }
124
125 if (buffer_.size() != BlobLogFooter::kSize) {
126 return Status::Corruption("EOF reached before file footer");
127 }
128
129 return footer->DecodeFrom(buffer_);
130 }
131
132 } // namespace ROCKSDB_NAMESPACE