]>
Commit | Line | Data |
---|---|---|
f67539c2 | 1 | |
11fdf7f2 TL |
2 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
3 | // This source code is licensed under both the GPLv2 (found in the | |
4 | // COPYING file in the root directory) and Apache 2.0 License | |
5 | // (found in the LICENSE.Apache file in the root directory). | |
6 | #ifndef ROCKSDB_LITE | |
7 | #include "utilities/blob_db/blob_file.h" | |
8 | ||
11fdf7f2 | 9 | #include <stdio.h> |
f67539c2 | 10 | #include <cinttypes> |
11fdf7f2 TL |
11 | |
12 | #include <algorithm> | |
11fdf7f2 TL |
13 | #include <memory> |
14 | ||
15 | #include "db/column_family.h" | |
f67539c2 | 16 | #include "db/db_impl/db_impl.h" |
11fdf7f2 | 17 | #include "db/dbformat.h" |
f67539c2 TL |
18 | #include "env/composite_env_wrapper.h" |
19 | #include "file/filename.h" | |
20 | #include "file/readahead_raf.h" | |
21 | #include "logging/logging.h" | |
11fdf7f2 TL |
22 | #include "utilities/blob_db/blob_db_impl.h" |
23 | ||
f67539c2 | 24 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
25 | |
26 | namespace blob_db { | |
27 | ||
11fdf7f2 TL |
28 | BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, |
29 | Logger* info_log) | |
f67539c2 TL |
30 | : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {} |
31 | ||
32 | BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, | |
33 | Logger* info_log, uint32_t column_family_id, | |
34 | CompressionType compression, bool has_ttl, | |
35 | const ExpirationRange& expiration_range) | |
11fdf7f2 TL |
36 | : parent_(p), |
37 | path_to_dir_(bdir), | |
38 | file_number_(fn), | |
39 | info_log_(info_log), | |
f67539c2 TL |
40 | column_family_id_(column_family_id), |
41 | compression_(compression), | |
42 | has_ttl_(has_ttl), | |
43 | expiration_range_(expiration_range), | |
44 | header_(column_family_id, compression, has_ttl, expiration_range), | |
45 | header_valid_(true) {} | |
11fdf7f2 TL |
46 | |
47 | BlobFile::~BlobFile() { | |
48 | if (obsolete_) { | |
49 | std::string pn(PathName()); | |
50 | Status s = Env::Default()->DeleteFile(PathName()); | |
51 | if (!s.ok()) { | |
52 | // ROCKS_LOG_INFO(db_options_.info_log, | |
53 | // "File could not be deleted %s", pn.c_str()); | |
54 | } | |
55 | } | |
56 | } | |
57 | ||
f67539c2 | 58 | uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; } |
11fdf7f2 TL |
59 | |
60 | std::string BlobFile::PathName() const { | |
61 | return BlobFileName(path_to_dir_, file_number_); | |
62 | } | |
63 | ||
64 | std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader( | |
65 | Env* env, const DBOptions& db_options, | |
66 | const EnvOptions& env_options) const { | |
67 | constexpr size_t kReadaheadSize = 2 * 1024 * 1024; | |
68 | std::unique_ptr<RandomAccessFile> sfile; | |
69 | std::string path_name(PathName()); | |
70 | Status s = env->NewRandomAccessFile(path_name, &sfile, env_options); | |
71 | if (!s.ok()) { | |
72 | // report something here. | |
73 | return nullptr; | |
74 | } | |
75 | sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize); | |
76 | ||
77 | std::unique_ptr<RandomAccessFileReader> sfile_reader; | |
f67539c2 TL |
78 | sfile_reader.reset(new RandomAccessFileReader( |
79 | NewLegacyRandomAccessFileWrapper(sfile), path_name)); | |
11fdf7f2 TL |
80 | |
81 | std::shared_ptr<Reader> log_reader = std::make_shared<Reader>( | |
82 | std::move(sfile_reader), db_options.env, db_options.statistics.get()); | |
83 | ||
84 | return log_reader; | |
85 | } | |
86 | ||
87 | std::string BlobFile::DumpState() const { | |
88 | char str[1000]; | |
89 | snprintf( | |
90 | str, sizeof(str), | |
91 | "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 | |
92 | " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 | |
93 | "), writer: %d reader: %d", | |
94 | path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), | |
95 | closed_.load(), obsolete_.load(), expiration_range_.first, | |
96 | expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); | |
97 | return str; | |
98 | } | |
99 | ||
100 | void BlobFile::MarkObsolete(SequenceNumber sequence) { | |
101 | assert(Immutable()); | |
102 | obsolete_sequence_ = sequence; | |
103 | obsolete_.store(true); | |
104 | } | |
105 | ||
106 | bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const { | |
107 | assert(last_fsync_ <= file_size_); | |
108 | return (hard) ? file_size_ > last_fsync_ | |
109 | : (file_size_ - last_fsync_) >= bytes_per_sync; | |
110 | } | |
111 | ||
f67539c2 | 112 | Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) { |
11fdf7f2 TL |
113 | BlobLogFooter footer; |
114 | footer.blob_count = blob_count_; | |
115 | if (HasTTL()) { | |
116 | footer.expiration_range = expiration_range_; | |
117 | } | |
118 | ||
119 | // this will close the file and reset the Writable File Pointer. | |
120 | Status s = log_writer_->AppendFooter(footer); | |
121 | if (s.ok()) { | |
122 | closed_ = true; | |
f67539c2 | 123 | immutable_sequence_ = sequence; |
11fdf7f2 TL |
124 | file_size_ += BlobLogFooter::kSize; |
125 | } | |
126 | // delete the sequential writer | |
127 | log_writer_.reset(); | |
128 | return s; | |
129 | } | |
130 | ||
131 | Status BlobFile::ReadFooter(BlobLogFooter* bf) { | |
132 | if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { | |
133 | return Status::IOError("File does not have footer", PathName()); | |
134 | } | |
135 | ||
136 | uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; | |
137 | // assume that ra_file_reader_ is valid before we enter this | |
138 | assert(ra_file_reader_); | |
139 | ||
140 | Slice result; | |
141 | char scratch[BlobLogFooter::kSize + 10]; | |
142 | Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, | |
143 | scratch); | |
144 | if (!s.ok()) return s; | |
145 | if (result.size() != BlobLogFooter::kSize) { | |
146 | // should not happen | |
147 | return Status::IOError("EOF reached before footer"); | |
148 | } | |
149 | ||
150 | s = bf->DecodeFrom(result); | |
151 | return s; | |
152 | } | |
153 | ||
154 | Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { | |
155 | // assume that file has been fully fsync'd | |
156 | last_fsync_.store(file_size_); | |
157 | blob_count_ = footer.blob_count; | |
158 | expiration_range_ = footer.expiration_range; | |
159 | closed_ = true; | |
160 | return Status::OK(); | |
161 | } | |
162 | ||
163 | Status BlobFile::Fsync() { | |
164 | Status s; | |
165 | if (log_writer_.get()) { | |
166 | s = log_writer_->Sync(); | |
167 | last_fsync_.store(file_size_.load()); | |
168 | } | |
169 | return s; | |
170 | } | |
171 | ||
172 | void BlobFile::CloseRandomAccessLocked() { | |
173 | ra_file_reader_.reset(); | |
174 | last_access_ = -1; | |
175 | } | |
176 | ||
177 | Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, | |
178 | std::shared_ptr<RandomAccessFileReader>* reader, | |
179 | bool* fresh_open) { | |
180 | assert(reader != nullptr); | |
181 | assert(fresh_open != nullptr); | |
182 | *fresh_open = false; | |
183 | int64_t current_time = 0; | |
184 | env->GetCurrentTime(¤t_time); | |
185 | last_access_.store(current_time); | |
186 | Status s; | |
187 | ||
188 | { | |
189 | ReadLock lockbfile_r(&mutex_); | |
190 | if (ra_file_reader_) { | |
191 | *reader = ra_file_reader_; | |
192 | return s; | |
193 | } | |
194 | } | |
195 | ||
196 | WriteLock lockbfile_w(&mutex_); | |
197 | // Double check. | |
198 | if (ra_file_reader_) { | |
199 | *reader = ra_file_reader_; | |
200 | return s; | |
201 | } | |
202 | ||
203 | std::unique_ptr<RandomAccessFile> rfile; | |
204 | s = env->NewRandomAccessFile(PathName(), &rfile, env_options); | |
205 | if (!s.ok()) { | |
206 | ROCKS_LOG_ERROR(info_log_, | |
207 | "Failed to open blob file for random-read: %s status: '%s'" | |
208 | " exists: '%s'", | |
209 | PathName().c_str(), s.ToString().c_str(), | |
210 | env->FileExists(PathName()).ToString().c_str()); | |
211 | return s; | |
212 | } | |
213 | ||
f67539c2 TL |
214 | ra_file_reader_ = std::make_shared<RandomAccessFileReader>( |
215 | NewLegacyRandomAccessFileWrapper(rfile), PathName()); | |
11fdf7f2 TL |
216 | *reader = ra_file_reader_; |
217 | *fresh_open = true; | |
218 | return s; | |
219 | } | |
220 | ||
221 | Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { | |
222 | assert(Immutable()); | |
223 | // Get file size. | |
224 | uint64_t file_size = 0; | |
225 | Status s = env->GetFileSize(PathName(), &file_size); | |
226 | if (s.ok()) { | |
227 | file_size_ = file_size; | |
228 | } else { | |
229 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 230 | "Failed to get size of blob file %" PRIu64 |
11fdf7f2 TL |
231 | ", status: %s", |
232 | file_number_, s.ToString().c_str()); | |
233 | return s; | |
234 | } | |
235 | if (file_size < BlobLogHeader::kSize) { | |
236 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 237 | "Incomplete blob file blob file %" PRIu64 |
11fdf7f2 TL |
238 | ", size: %" PRIu64, |
239 | file_number_, file_size); | |
240 | return Status::Corruption("Incomplete blob file header."); | |
241 | } | |
242 | ||
243 | // Create file reader. | |
244 | std::unique_ptr<RandomAccessFile> file; | |
245 | s = env->NewRandomAccessFile(PathName(), &file, env_options); | |
246 | if (!s.ok()) { | |
247 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 248 | "Failed to open blob file %" PRIu64 ", status: %s", |
11fdf7f2 TL |
249 | file_number_, s.ToString().c_str()); |
250 | return s; | |
251 | } | |
252 | std::unique_ptr<RandomAccessFileReader> file_reader( | |
f67539c2 TL |
253 | new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), |
254 | PathName())); | |
11fdf7f2 TL |
255 | |
256 | // Read file header. | |
257 | char header_buf[BlobLogHeader::kSize]; | |
258 | Slice header_slice; | |
259 | s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf); | |
260 | if (!s.ok()) { | |
261 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 262 | "Failed to read header of blob file %" PRIu64 |
11fdf7f2 TL |
263 | ", status: %s", |
264 | file_number_, s.ToString().c_str()); | |
265 | return s; | |
266 | } | |
267 | BlobLogHeader header; | |
268 | s = header.DecodeFrom(header_slice); | |
269 | if (!s.ok()) { | |
270 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 271 | "Failed to decode header of blob file %" PRIu64 |
11fdf7f2 TL |
272 | ", status: %s", |
273 | file_number_, s.ToString().c_str()); | |
274 | return s; | |
275 | } | |
276 | column_family_id_ = header.column_family_id; | |
277 | compression_ = header.compression; | |
278 | has_ttl_ = header.has_ttl; | |
279 | if (has_ttl_) { | |
280 | expiration_range_ = header.expiration_range; | |
281 | } | |
282 | header_valid_ = true; | |
283 | ||
284 | // Read file footer. | |
285 | if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { | |
286 | // OK not to have footer. | |
287 | assert(!footer_valid_); | |
288 | return Status::OK(); | |
289 | } | |
290 | char footer_buf[BlobLogFooter::kSize]; | |
291 | Slice footer_slice; | |
292 | s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, | |
293 | &footer_slice, footer_buf); | |
294 | if (!s.ok()) { | |
295 | ROCKS_LOG_ERROR(info_log_, | |
494da23a | 296 | "Failed to read footer of blob file %" PRIu64 |
11fdf7f2 TL |
297 | ", status: %s", |
298 | file_number_, s.ToString().c_str()); | |
299 | return s; | |
300 | } | |
301 | BlobLogFooter footer; | |
302 | s = footer.DecodeFrom(footer_slice); | |
303 | if (!s.ok()) { | |
304 | // OK not to have footer. | |
305 | assert(!footer_valid_); | |
306 | return Status::OK(); | |
307 | } | |
308 | blob_count_ = footer.blob_count; | |
309 | if (has_ttl_) { | |
310 | assert(header.expiration_range.first <= footer.expiration_range.first); | |
311 | assert(header.expiration_range.second >= footer.expiration_range.second); | |
312 | expiration_range_ = footer.expiration_range; | |
313 | } | |
314 | footer_valid_ = true; | |
315 | return Status::OK(); | |
316 | } | |
317 | ||
318 | } // namespace blob_db | |
f67539c2 | 319 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 | 320 | #endif // ROCKSDB_LITE |