]>
Commit | Line | Data |
---|---|---|
f67539c2 | 1 | |
11fdf7f2 TL |
2 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
3 | // This source code is licensed under both the GPLv2 (found in the | |
4 | // COPYING file in the root directory) and Apache 2.0 License | |
5 | // (found in the LICENSE.Apache file in the root directory). | |
6 | #ifndef ROCKSDB_LITE | |
7 | #include "utilities/blob_db/blob_file.h" | |
8 | ||
11fdf7f2 | 9 | #include <stdio.h> |
f67539c2 | 10 | #include <cinttypes> |
11fdf7f2 TL |
11 | |
12 | #include <algorithm> | |
11fdf7f2 TL |
13 | #include <memory> |
14 | ||
15 | #include "db/column_family.h" | |
f67539c2 | 16 | #include "db/db_impl/db_impl.h" |
11fdf7f2 | 17 | #include "db/dbformat.h" |
f67539c2 TL |
18 | #include "env/composite_env_wrapper.h" |
19 | #include "file/filename.h" | |
20 | #include "file/readahead_raf.h" | |
21 | #include "logging/logging.h" | |
11fdf7f2 TL |
22 | #include "utilities/blob_db/blob_db_impl.h" |
23 | ||
f67539c2 | 24 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
25 | |
26 | namespace blob_db { | |
27 | ||
11fdf7f2 TL |
28 | BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, |
29 | Logger* info_log) | |
f67539c2 TL |
30 | : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {} |
31 | ||
32 | BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, | |
33 | Logger* info_log, uint32_t column_family_id, | |
34 | CompressionType compression, bool has_ttl, | |
35 | const ExpirationRange& expiration_range) | |
11fdf7f2 TL |
36 | : parent_(p), |
37 | path_to_dir_(bdir), | |
38 | file_number_(fn), | |
39 | info_log_(info_log), | |
f67539c2 TL |
40 | column_family_id_(column_family_id), |
41 | compression_(compression), | |
42 | has_ttl_(has_ttl), | |
43 | expiration_range_(expiration_range), | |
44 | header_(column_family_id, compression, has_ttl, expiration_range), | |
45 | header_valid_(true) {} | |
11fdf7f2 TL |
46 | |
47 | BlobFile::~BlobFile() { | |
48 | if (obsolete_) { | |
49 | std::string pn(PathName()); | |
50 | Status s = Env::Default()->DeleteFile(PathName()); | |
51 | if (!s.ok()) { | |
52 | // ROCKS_LOG_INFO(db_options_.info_log, | |
53 | // "File could not be deleted %s", pn.c_str()); | |
54 | } | |
55 | } | |
56 | } | |
57 | ||
f67539c2 | 58 | uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; } |
11fdf7f2 TL |
59 | |
60 | std::string BlobFile::PathName() const { | |
61 | return BlobFileName(path_to_dir_, file_number_); | |
62 | } | |
63 | ||
11fdf7f2 TL |
64 | std::string BlobFile::DumpState() const { |
65 | char str[1000]; | |
66 | snprintf( | |
67 | str, sizeof(str), | |
68 | "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 | |
69 | " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 | |
70 | "), writer: %d reader: %d", | |
71 | path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), | |
72 | closed_.load(), obsolete_.load(), expiration_range_.first, | |
73 | expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); | |
74 | return str; | |
75 | } | |
76 | ||
77 | void BlobFile::MarkObsolete(SequenceNumber sequence) { | |
78 | assert(Immutable()); | |
79 | obsolete_sequence_ = sequence; | |
80 | obsolete_.store(true); | |
81 | } | |
82 | ||
f67539c2 | 83 | Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) { |
11fdf7f2 TL |
84 | BlobLogFooter footer; |
85 | footer.blob_count = blob_count_; | |
86 | if (HasTTL()) { | |
87 | footer.expiration_range = expiration_range_; | |
88 | } | |
89 | ||
90 | // this will close the file and reset the Writable File Pointer. | |
20effc67 TL |
91 | Status s = log_writer_->AppendFooter(footer, /* checksum_method */ nullptr, |
92 | /* checksum_value */ nullptr); | |
11fdf7f2 TL |
93 | if (s.ok()) { |
94 | closed_ = true; | |
f67539c2 | 95 | immutable_sequence_ = sequence; |
11fdf7f2 TL |
96 | file_size_ += BlobLogFooter::kSize; |
97 | } | |
98 | // delete the sequential writer | |
99 | log_writer_.reset(); | |
100 | return s; | |
101 | } | |
102 | ||
103 | Status BlobFile::ReadFooter(BlobLogFooter* bf) { | |
104 | if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { | |
105 | return Status::IOError("File does not have footer", PathName()); | |
106 | } | |
107 | ||
108 | uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; | |
109 | // assume that ra_file_reader_ is valid before we enter this | |
110 | assert(ra_file_reader_); | |
111 | ||
112 | Slice result; | |
20effc67 TL |
113 | std::string buf; |
114 | AlignedBuf aligned_buf; | |
115 | Status s; | |
116 | if (ra_file_reader_->use_direct_io()) { | |
117 | s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, | |
118 | &result, nullptr, &aligned_buf); | |
119 | } else { | |
120 | buf.reserve(BlobLogFooter::kSize + 10); | |
121 | s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, | |
122 | &result, &buf[0], nullptr); | |
123 | } | |
11fdf7f2 TL |
124 | if (!s.ok()) return s; |
125 | if (result.size() != BlobLogFooter::kSize) { | |
126 | // should not happen | |
127 | return Status::IOError("EOF reached before footer"); | |
128 | } | |
129 | ||
130 | s = bf->DecodeFrom(result); | |
131 | return s; | |
132 | } | |
133 | ||
134 | Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { | |
11fdf7f2 TL |
135 | blob_count_ = footer.blob_count; |
136 | expiration_range_ = footer.expiration_range; | |
137 | closed_ = true; | |
138 | return Status::OK(); | |
139 | } | |
140 | ||
141 | Status BlobFile::Fsync() { | |
142 | Status s; | |
143 | if (log_writer_.get()) { | |
144 | s = log_writer_->Sync(); | |
11fdf7f2 TL |
145 | } |
146 | return s; | |
147 | } | |
148 | ||
149 | void BlobFile::CloseRandomAccessLocked() { | |
150 | ra_file_reader_.reset(); | |
151 | last_access_ = -1; | |
152 | } | |
153 | ||
154 | Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, | |
155 | std::shared_ptr<RandomAccessFileReader>* reader, | |
156 | bool* fresh_open) { | |
157 | assert(reader != nullptr); | |
158 | assert(fresh_open != nullptr); | |
159 | *fresh_open = false; | |
160 | int64_t current_time = 0; | |
161 | env->GetCurrentTime(¤t_time); | |
162 | last_access_.store(current_time); | |
163 | Status s; | |
164 | ||
165 | { | |
166 | ReadLock lockbfile_r(&mutex_); | |
167 | if (ra_file_reader_) { | |
168 | *reader = ra_file_reader_; | |
169 | return s; | |
170 | } | |
171 | } | |
172 | ||
173 | WriteLock lockbfile_w(&mutex_); | |
174 | // Double check. | |
175 | if (ra_file_reader_) { | |
176 | *reader = ra_file_reader_; | |
177 | return s; | |
178 | } | |
179 | ||
180 | std::unique_ptr<RandomAccessFile> rfile; | |
181 | s = env->NewRandomAccessFile(PathName(), &rfile, env_options); | |
182 | if (!s.ok()) { | |
183 | ROCKS_LOG_ERROR(info_log_, | |
184 | "Failed to open blob file for random-read: %s status: '%s'" | |
185 | " exists: '%s'", | |
186 | PathName().c_str(), s.ToString().c_str(), | |
187 | env->FileExists(PathName()).ToString().c_str()); | |
188 | return s; | |
189 | } | |
190 | ||
f67539c2 TL |
191 | ra_file_reader_ = std::make_shared<RandomAccessFileReader>( |
192 | NewLegacyRandomAccessFileWrapper(rfile), PathName()); | |
11fdf7f2 TL |
193 | *reader = ra_file_reader_; |
194 | *fresh_open = true; | |
195 | return s; | |
196 | } | |
197 | ||
198 | Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { | |
199 | assert(Immutable()); | |
200 | // Get file size. | |
201 | uint64_t file_size = 0; | |
202 | Status s = env->GetFileSize(PathName(), &file_size); | |
203 | if (s.ok()) { | |
204 | file_size_ = file_size; | |
205 | } else { | |
206 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 207 | "Failed to get size of blob file %" PRIu64 |
11fdf7f2 TL |
208 | ", status: %s", |
209 | file_number_, s.ToString().c_str()); | |
210 | return s; | |
211 | } | |
212 | if (file_size < BlobLogHeader::kSize) { | |
213 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 214 | "Incomplete blob file blob file %" PRIu64 |
11fdf7f2 TL |
215 | ", size: %" PRIu64, |
216 | file_number_, file_size); | |
217 | return Status::Corruption("Incomplete blob file header."); | |
218 | } | |
219 | ||
220 | // Create file reader. | |
221 | std::unique_ptr<RandomAccessFile> file; | |
222 | s = env->NewRandomAccessFile(PathName(), &file, env_options); | |
223 | if (!s.ok()) { | |
224 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 225 | "Failed to open blob file %" PRIu64 ", status: %s", |
11fdf7f2 TL |
226 | file_number_, s.ToString().c_str()); |
227 | return s; | |
228 | } | |
229 | std::unique_ptr<RandomAccessFileReader> file_reader( | |
f67539c2 TL |
230 | new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), |
231 | PathName())); | |
11fdf7f2 TL |
232 | |
233 | // Read file header. | |
20effc67 TL |
234 | std::string header_buf; |
235 | AlignedBuf aligned_buf; | |
11fdf7f2 | 236 | Slice header_slice; |
20effc67 TL |
237 | if (file_reader->use_direct_io()) { |
238 | s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, | |
239 | nullptr, &aligned_buf); | |
240 | } else { | |
241 | header_buf.reserve(BlobLogHeader::kSize); | |
242 | s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, | |
243 | &header_buf[0], nullptr); | |
244 | } | |
11fdf7f2 TL |
245 | if (!s.ok()) { |
246 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 247 | "Failed to read header of blob file %" PRIu64 |
11fdf7f2 TL |
248 | ", status: %s", |
249 | file_number_, s.ToString().c_str()); | |
250 | return s; | |
251 | } | |
252 | BlobLogHeader header; | |
253 | s = header.DecodeFrom(header_slice); | |
254 | if (!s.ok()) { | |
255 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 256 | "Failed to decode header of blob file %" PRIu64 |
11fdf7f2 TL |
257 | ", status: %s", |
258 | file_number_, s.ToString().c_str()); | |
259 | return s; | |
260 | } | |
261 | column_family_id_ = header.column_family_id; | |
262 | compression_ = header.compression; | |
263 | has_ttl_ = header.has_ttl; | |
264 | if (has_ttl_) { | |
265 | expiration_range_ = header.expiration_range; | |
266 | } | |
267 | header_valid_ = true; | |
268 | ||
269 | // Read file footer. | |
270 | if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { | |
271 | // OK not to have footer. | |
272 | assert(!footer_valid_); | |
273 | return Status::OK(); | |
274 | } | |
20effc67 | 275 | std::string footer_buf; |
11fdf7f2 | 276 | Slice footer_slice; |
20effc67 TL |
277 | if (file_reader->use_direct_io()) { |
278 | s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, | |
279 | BlobLogFooter::kSize, &footer_slice, nullptr, | |
280 | &aligned_buf); | |
281 | } else { | |
282 | footer_buf.reserve(BlobLogFooter::kSize); | |
283 | s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, | |
284 | BlobLogFooter::kSize, &footer_slice, &footer_buf[0], | |
285 | nullptr); | |
286 | } | |
11fdf7f2 TL |
287 | if (!s.ok()) { |
288 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 289 | "Failed to read footer of blob file %" PRIu64 |
11fdf7f2 TL |
290 | ", status: %s", |
291 | file_number_, s.ToString().c_str()); | |
292 | return s; | |
293 | } | |
294 | BlobLogFooter footer; | |
295 | s = footer.DecodeFrom(footer_slice); | |
296 | if (!s.ok()) { | |
297 | // OK not to have footer. | |
298 | assert(!footer_valid_); | |
299 | return Status::OK(); | |
300 | } | |
301 | blob_count_ = footer.blob_count; | |
302 | if (has_ttl_) { | |
303 | assert(header.expiration_range.first <= footer.expiration_range.first); | |
304 | assert(header.expiration_range.second >= footer.expiration_range.second); | |
305 | expiration_range_ = footer.expiration_range; | |
306 | } | |
307 | footer_valid_ = true; | |
308 | return Status::OK(); | |
309 | } | |
310 | ||
311 | } // namespace blob_db | |
f67539c2 | 312 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 | 313 | #endif // ROCKSDB_LITE |