]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/blob/blob_file_reader.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / blob / blob_file_reader.cc
CommitLineData
20effc67
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "db/blob/blob_file_reader.h"
7
8#include <cassert>
9#include <string>
10
11#include "db/blob/blob_log_format.h"
12#include "file/filename.h"
13#include "options/cf_options.h"
14#include "rocksdb/file_system.h"
15#include "rocksdb/slice.h"
16#include "rocksdb/status.h"
17#include "test_util/sync_point.h"
18#include "util/compression.h"
19#include "util/crc32c.h"
20
21namespace ROCKSDB_NAMESPACE {
22
23Status BlobFileReader::Create(
24 const ImmutableCFOptions& immutable_cf_options,
25 const FileOptions& file_options, uint32_t column_family_id,
26 HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
27 std::unique_ptr<BlobFileReader>* blob_file_reader) {
28 assert(blob_file_reader);
29 assert(!*blob_file_reader);
30
31 uint64_t file_size = 0;
32 std::unique_ptr<RandomAccessFileReader> file_reader;
33
34 {
35 const Status s =
36 OpenFile(immutable_cf_options, file_options, blob_file_read_hist,
37 blob_file_number, &file_size, &file_reader);
38 if (!s.ok()) {
39 return s;
40 }
41 }
42
43 assert(file_reader);
44
45 CompressionType compression_type = kNoCompression;
46
47 {
48 const Status s =
49 ReadHeader(file_reader.get(), column_family_id, &compression_type);
50 if (!s.ok()) {
51 return s;
52 }
53 }
54
55 {
56 const Status s = ReadFooter(file_size, file_reader.get());
57 if (!s.ok()) {
58 return s;
59 }
60 }
61
62 blob_file_reader->reset(
63 new BlobFileReader(std::move(file_reader), file_size, compression_type));
64
65 return Status::OK();
66}
67
68Status BlobFileReader::OpenFile(
69 const ImmutableCFOptions& immutable_cf_options,
70 const FileOptions& file_opts, HistogramImpl* blob_file_read_hist,
71 uint64_t blob_file_number, uint64_t* file_size,
72 std::unique_ptr<RandomAccessFileReader>* file_reader) {
73 assert(file_size);
74 assert(file_reader);
75
76 const auto& cf_paths = immutable_cf_options.cf_paths;
77 assert(!cf_paths.empty());
78
79 const std::string blob_file_path =
80 BlobFileName(cf_paths.front().path, blob_file_number);
81
82 FileSystem* const fs = immutable_cf_options.fs;
83 assert(fs);
84
85 constexpr IODebugContext* dbg = nullptr;
86
87 {
88 TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize");
89
90 const Status s =
91 fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg);
92 if (!s.ok()) {
93 return s;
94 }
95 }
96
97 if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
98 return Status::Corruption("Malformed blob file");
99 }
100
101 std::unique_ptr<FSRandomAccessFile> file;
102
103 {
104 TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile");
105
106 const Status s =
107 fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg);
108 if (!s.ok()) {
109 return s;
110 }
111 }
112
113 assert(file);
114
115 if (immutable_cf_options.advise_random_on_open) {
116 file->Hint(FSRandomAccessFile::kRandom);
117 }
118
119 file_reader->reset(new RandomAccessFileReader(
120 std::move(file), blob_file_path, immutable_cf_options.env,
121 std::shared_ptr<IOTracer>(), immutable_cf_options.statistics,
122 BLOB_DB_BLOB_FILE_READ_MICROS, blob_file_read_hist,
123 immutable_cf_options.rate_limiter, immutable_cf_options.listeners));
124
125 return Status::OK();
126}
127
128Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
129 uint32_t column_family_id,
130 CompressionType* compression_type) {
131 assert(file_reader);
132 assert(compression_type);
133
134 Slice header_slice;
135 Buffer buf;
136 AlignedBuf aligned_buf;
137
138 {
139 TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile");
140
141 constexpr uint64_t read_offset = 0;
142 constexpr size_t read_size = BlobLogHeader::kSize;
143
144 const Status s = ReadFromFile(file_reader, read_offset, read_size,
145 &header_slice, &buf, &aligned_buf);
146 if (!s.ok()) {
147 return s;
148 }
149
150 TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult",
151 &header_slice);
152 }
153
154 BlobLogHeader header;
155
156 {
157 const Status s = header.DecodeFrom(header_slice);
158 if (!s.ok()) {
159 return s;
160 }
161 }
162
163 constexpr ExpirationRange no_expiration_range;
164
165 if (header.has_ttl || header.expiration_range != no_expiration_range) {
166 return Status::Corruption("Unexpected TTL blob file");
167 }
168
169 if (header.column_family_id != column_family_id) {
170 return Status::Corruption("Column family ID mismatch");
171 }
172
173 *compression_type = header.compression;
174
175 return Status::OK();
176}
177
178Status BlobFileReader::ReadFooter(uint64_t file_size,
179 const RandomAccessFileReader* file_reader) {
180 assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
181 assert(file_reader);
182
183 Slice footer_slice;
184 Buffer buf;
185 AlignedBuf aligned_buf;
186
187 {
188 TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile");
189
190 const uint64_t read_offset = file_size - BlobLogFooter::kSize;
191 constexpr size_t read_size = BlobLogFooter::kSize;
192
193 const Status s = ReadFromFile(file_reader, read_offset, read_size,
194 &footer_slice, &buf, &aligned_buf);
195 if (!s.ok()) {
196 return s;
197 }
198
199 TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult",
200 &footer_slice);
201 }
202
203 BlobLogFooter footer;
204
205 {
206 const Status s = footer.DecodeFrom(footer_slice);
207 if (!s.ok()) {
208 return s;
209 }
210 }
211
212 constexpr ExpirationRange no_expiration_range;
213
214 if (footer.expiration_range != no_expiration_range) {
215 return Status::Corruption("Unexpected TTL blob file");
216 }
217
218 return Status::OK();
219}
220
221Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
222 uint64_t read_offset, size_t read_size,
223 Slice* slice, Buffer* buf,
224 AlignedBuf* aligned_buf) {
225 assert(slice);
226 assert(buf);
227 assert(aligned_buf);
228
229 assert(file_reader);
230
231 Status s;
232
233 if (file_reader->use_direct_io()) {
234 constexpr char* scratch = nullptr;
235
236 s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
237 aligned_buf);
238 } else {
239 buf->reset(new char[read_size]);
240 constexpr AlignedBuf* aligned_scratch = nullptr;
241
242 s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
243 buf->get(), aligned_scratch);
244 }
245
246 if (!s.ok()) {
247 return s;
248 }
249
250 if (slice->size() != read_size) {
251 return Status::Corruption("Failed to read data from blob file");
252 }
253
254 return Status::OK();
255}
256
257BlobFileReader::BlobFileReader(
258 std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
259 CompressionType compression_type)
260 : file_reader_(std::move(file_reader)),
261 file_size_(file_size),
262 compression_type_(compression_type) {
263 assert(file_reader_);
264}
265
266BlobFileReader::~BlobFileReader() = default;
267
268Status BlobFileReader::GetBlob(const ReadOptions& read_options,
269 const Slice& user_key, uint64_t offset,
270 uint64_t value_size,
271 CompressionType compression_type,
272 PinnableSlice* value) const {
273 assert(value);
274
275 const uint64_t key_size = user_key.size();
276
277 if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
278 return Status::Corruption("Invalid blob offset");
279 }
280
281 if (compression_type != compression_type_) {
282 return Status::Corruption("Compression type mismatch when reading blob");
283 }
284
285 // Note: if verify_checksum is set, we read the entire blob record to be able
286 // to perform the verification; otherwise, we just read the blob itself. Since
287 // the offset in BlobIndex actually points to the blob value, we need to make
288 // an adjustment in the former case.
289 const uint64_t adjustment =
290 read_options.verify_checksums
291 ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
292 : 0;
293 assert(offset >= adjustment);
294
295 Slice record_slice;
296 Buffer buf;
297 AlignedBuf aligned_buf;
298
299 {
300 TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
301
302 const uint64_t record_offset = offset - adjustment;
303 const uint64_t record_size = value_size + adjustment;
304
305 const Status s = ReadFromFile(file_reader_.get(), record_offset,
306 static_cast<size_t>(record_size),
307 &record_slice, &buf, &aligned_buf);
308 if (!s.ok()) {
309 return s;
310 }
311
312 TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
313 &record_slice);
314 }
315
316 if (read_options.verify_checksums) {
317 const Status s = VerifyBlob(record_slice, user_key, value_size);
318 if (!s.ok()) {
319 return s;
320 }
321 }
322
323 const Slice value_slice(record_slice.data() + adjustment, value_size);
324
325 {
326 const Status s =
327 UncompressBlobIfNeeded(value_slice, compression_type, value);
328 if (!s.ok()) {
329 return s;
330 }
331 }
332
333 return Status::OK();
334}
335
336Status BlobFileReader::VerifyBlob(const Slice& record_slice,
337 const Slice& user_key, uint64_t value_size) {
338 BlobLogRecord record;
339
340 const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
341
342 {
343 const Status s = record.DecodeHeaderFrom(header_slice);
344 if (!s.ok()) {
345 return s;
346 }
347 }
348
349 if (record.key_size != user_key.size()) {
350 return Status::Corruption("Key size mismatch when reading blob");
351 }
352
353 if (record.value_size != value_size) {
354 return Status::Corruption("Value size mismatch when reading blob");
355 }
356
357 record.key =
358 Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size);
359 if (record.key != user_key) {
360 return Status::Corruption("Key mismatch when reading blob");
361 }
362
363 record.value = Slice(record.key.data() + record.key_size, value_size);
364
365 {
366 TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC",
367 &record);
368
369 const Status s = record.CheckBlobCRC();
370 if (!s.ok()) {
371 return s;
372 }
373 }
374
375 return Status::OK();
376}
377
378Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
379 CompressionType compression_type,
380 PinnableSlice* value) {
381 assert(value);
382
383 if (compression_type == kNoCompression) {
384 SaveValue(value_slice, value);
385
386 return Status::OK();
387 }
388
389 UncompressionContext context(compression_type);
390 UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
391 compression_type);
392
393 size_t uncompressed_size = 0;
394 constexpr uint32_t compression_format_version = 2;
395 constexpr MemoryAllocator* allocator = nullptr;
396
397 CacheAllocationPtr output =
398 UncompressData(info, value_slice.data(), value_slice.size(),
399 &uncompressed_size, compression_format_version, allocator);
400
401 TEST_SYNC_POINT_CALLBACK(
402 "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output);
403
404 if (!output) {
405 return Status::Corruption("Unable to uncompress blob");
406 }
407
408 SaveValue(Slice(output.get(), uncompressed_size), value);
409
410 return Status::OK();
411}
412
413void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) {
414 assert(dst);
415
416 if (dst->IsPinned()) {
417 dst->Reset();
418 }
419
420 dst->PinSelf(src);
421}
422
423} // namespace ROCKSDB_NAMESPACE