]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl_readonly.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_readonly.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_impl/db_impl_readonly.h"
7
8 #include "db/arena_wrapped_db_iter.h"
9 #include "db/db_impl/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "logging/logging.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "util/cast_util.h"
16
17 namespace ROCKSDB_NAMESPACE {
18
19 #ifndef ROCKSDB_LITE
20
21 DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
22 const std::string& dbname)
23 : DBImpl(db_options, dbname, /*seq_per_batch*/ false,
24 /*batch_per_txn*/ true, /*read_only*/ true) {
25 ROCKS_LOG_INFO(immutable_db_options_.info_log,
26 "Opening the db in read only mode");
27 LogFlush(immutable_db_options_.info_log);
28 }
29
30 DBImplReadOnly::~DBImplReadOnly() {}
31
32 // Implementations of the DB interface
33 Status DBImplReadOnly::Get(const ReadOptions& read_options,
34 ColumnFamilyHandle* column_family, const Slice& key,
35 PinnableSlice* pinnable_val) {
36 return Get(read_options, column_family, key, pinnable_val,
37 /*timestamp*/ nullptr);
38 }
39
40 Status DBImplReadOnly::Get(const ReadOptions& read_options,
41 ColumnFamilyHandle* column_family, const Slice& key,
42 PinnableSlice* pinnable_val,
43 std::string* timestamp) {
44 assert(pinnable_val != nullptr);
45 // TODO: stopwatch DB_GET needed?, perf timer needed?
46 PERF_TIMER_GUARD(get_snapshot_time);
47
48 assert(column_family);
49 if (read_options.timestamp) {
50 const Status s = FailIfTsMismatchCf(
51 column_family, *(read_options.timestamp), /*ts_for_read=*/true);
52 if (!s.ok()) {
53 return s;
54 }
55 } else {
56 const Status s = FailIfCfHasTs(column_family);
57 if (!s.ok()) {
58 return s;
59 }
60 }
61
62 // Clear the timestamps for returning results so that we can distinguish
63 // between tombstone or key that has never been written
64 if (timestamp) {
65 timestamp->clear();
66 }
67
68 const Comparator* ucmp = column_family->GetComparator();
69 assert(ucmp);
70 std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
71
72 Status s;
73 SequenceNumber snapshot = versions_->LastSequence();
74 GetWithTimestampReadCallback read_cb(snapshot);
75 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
76 auto cfd = cfh->cfd();
77 if (tracer_) {
78 InstrumentedMutexLock lock(&trace_mutex_);
79 if (tracer_) {
80 tracer_->Get(column_family, key);
81 }
82 }
83 SuperVersion* super_version = cfd->GetSuperVersion();
84 MergeContext merge_context;
85 SequenceNumber max_covering_tombstone_seq = 0;
86 LookupKey lkey(key, snapshot, read_options.timestamp);
87 PERF_TIMER_STOP(get_snapshot_time);
88 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
89 /*columns=*/nullptr, ts, &s, &merge_context,
90 &max_covering_tombstone_seq, read_options,
91 false /* immutable_memtable */, &read_cb)) {
92 pinnable_val->PinSelf();
93 RecordTick(stats_, MEMTABLE_HIT);
94 } else {
95 PERF_TIMER_GUARD(get_from_output_files_time);
96 PinnedIteratorsManager pinned_iters_mgr;
97 super_version->current->Get(
98 read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s,
99 &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
100 /*value_found*/ nullptr,
101 /*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb,
102 /*is_blob*/ nullptr,
103 /*do_merge*/ true);
104 RecordTick(stats_, MEMTABLE_MISS);
105 }
106 RecordTick(stats_, NUMBER_KEYS_READ);
107 size_t size = pinnable_val->size();
108 RecordTick(stats_, BYTES_READ, size);
109 RecordInHistogram(stats_, BYTES_PER_READ, size);
110 PERF_COUNTER_ADD(get_read_bytes, size);
111 return s;
112 }
113
114 Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
115 ColumnFamilyHandle* column_family) {
116 assert(column_family);
117 if (read_options.timestamp) {
118 const Status s = FailIfTsMismatchCf(
119 column_family, *(read_options.timestamp), /*ts_for_read=*/true);
120 if (!s.ok()) {
121 return NewErrorIterator(s);
122 }
123 } else {
124 const Status s = FailIfCfHasTs(column_family);
125 if (!s.ok()) {
126 return NewErrorIterator(s);
127 }
128 }
129 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
130 auto cfd = cfh->cfd();
131 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
132 SequenceNumber latest_snapshot = versions_->LastSequence();
133 SequenceNumber read_seq =
134 read_options.snapshot != nullptr
135 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
136 ->number_
137 : latest_snapshot;
138 ReadCallback* read_callback = nullptr; // No read callback provided.
139 auto db_iter = NewArenaWrappedDbIterator(
140 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
141 super_version->current, read_seq,
142 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
143 super_version->version_number, read_callback);
144 auto internal_iter = NewInternalIterator(
145 db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
146 read_seq, /* allow_unprepared_value */ true, db_iter);
147 db_iter->SetIterUnderDBIter(internal_iter);
148 return db_iter;
149 }
150
151 Status DBImplReadOnly::NewIterators(
152 const ReadOptions& read_options,
153 const std::vector<ColumnFamilyHandle*>& column_families,
154 std::vector<Iterator*>* iterators) {
155 if (read_options.timestamp) {
156 for (auto* cf : column_families) {
157 assert(cf);
158 const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
159 /*ts_for_read=*/true);
160 if (!s.ok()) {
161 return s;
162 }
163 }
164 } else {
165 for (auto* cf : column_families) {
166 assert(cf);
167 const Status s = FailIfCfHasTs(cf);
168 if (!s.ok()) {
169 return s;
170 }
171 }
172 }
173
174 ReadCallback* read_callback = nullptr; // No read callback provided.
175 if (iterators == nullptr) {
176 return Status::InvalidArgument("iterators not allowed to be nullptr");
177 }
178 iterators->clear();
179 iterators->reserve(column_families.size());
180 SequenceNumber latest_snapshot = versions_->LastSequence();
181 SequenceNumber read_seq =
182 read_options.snapshot != nullptr
183 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
184 ->number_
185 : latest_snapshot;
186
187 for (auto cfh : column_families) {
188 auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
189 auto* sv = cfd->GetSuperVersion()->Ref();
190 auto* db_iter = NewArenaWrappedDbIterator(
191 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
192 sv->current, read_seq,
193 sv->mutable_cf_options.max_sequential_skip_in_iterations,
194 sv->version_number, read_callback);
195 auto* internal_iter = NewInternalIterator(
196 db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), read_seq,
197 /* allow_unprepared_value */ true, db_iter);
198 db_iter->SetIterUnderDBIter(internal_iter);
199 iterators->push_back(db_iter);
200 }
201
202 return Status::OK();
203 }
204
205 namespace {
206 // Return OK if dbname exists in the file system or create it if
207 // create_if_missing
208 Status OpenForReadOnlyCheckExistence(const DBOptions& db_options,
209 const std::string& dbname) {
210 Status s;
211 if (!db_options.create_if_missing) {
212 // Attempt to read "CURRENT" file
213 const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
214 std::string manifest_path;
215 uint64_t manifest_file_number;
216 s = VersionSet::GetCurrentManifestPath(dbname, fs.get(), &manifest_path,
217 &manifest_file_number);
218 } else {
219 // Historic behavior that doesn't necessarily make sense
220 s = db_options.env->CreateDirIfMissing(dbname);
221 }
222 return s;
223 }
224 } // namespace
225
226 Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
227 DB** dbptr, bool /*error_if_wal_file_exists*/) {
228 Status s = OpenForReadOnlyCheckExistence(options, dbname);
229 if (!s.ok()) {
230 return s;
231 }
232
233 *dbptr = nullptr;
234
235 // Try to first open DB as fully compacted DB
236 s = CompactedDBImpl::Open(options, dbname, dbptr);
237 if (s.ok()) {
238 return s;
239 }
240
241 DBOptions db_options(options);
242 ColumnFamilyOptions cf_options(options);
243 std::vector<ColumnFamilyDescriptor> column_families;
244 column_families.push_back(
245 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
246 std::vector<ColumnFamilyHandle*> handles;
247
248 s = DBImplReadOnly::OpenForReadOnlyWithoutCheck(
249 db_options, dbname, column_families, &handles, dbptr);
250 if (s.ok()) {
251 assert(handles.size() == 1);
252 // i can delete the handle since DBImpl is always holding a
253 // reference to default column family
254 delete handles[0];
255 }
256 return s;
257 }
258
259 Status DB::OpenForReadOnly(
260 const DBOptions& db_options, const std::string& dbname,
261 const std::vector<ColumnFamilyDescriptor>& column_families,
262 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
263 bool error_if_wal_file_exists) {
264 // If dbname does not exist in the file system, should not do anything
265 Status s = OpenForReadOnlyCheckExistence(db_options, dbname);
266 if (!s.ok()) {
267 return s;
268 }
269
270 return DBImplReadOnly::OpenForReadOnlyWithoutCheck(
271 db_options, dbname, column_families, handles, dbptr,
272 error_if_wal_file_exists);
273 }
274
275 Status DBImplReadOnly::OpenForReadOnlyWithoutCheck(
276 const DBOptions& db_options, const std::string& dbname,
277 const std::vector<ColumnFamilyDescriptor>& column_families,
278 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
279 bool error_if_wal_file_exists) {
280 *dbptr = nullptr;
281 handles->clear();
282
283 SuperVersionContext sv_context(/* create_superversion */ true);
284 DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
285 impl->mutex_.Lock();
286 Status s = impl->Recover(column_families, true /* read only */,
287 error_if_wal_file_exists);
288 if (s.ok()) {
289 // set column family handles
290 for (auto cf : column_families) {
291 auto cfd =
292 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
293 if (cfd == nullptr) {
294 s = Status::InvalidArgument("Column family not found", cf.name);
295 break;
296 }
297 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
298 }
299 }
300 if (s.ok()) {
301 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
302 sv_context.NewSuperVersion();
303 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
304 }
305 }
306 impl->mutex_.Unlock();
307 sv_context.Clean();
308 if (s.ok()) {
309 *dbptr = impl;
310 for (auto* h : *handles) {
311 impl->NewThreadStatusCfInfo(
312 static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
313 }
314 } else {
315 for (auto h : *handles) {
316 delete h;
317 }
318 handles->clear();
319 delete impl;
320 }
321 return s;
322 }
323
324 #else // !ROCKSDB_LITE
325
326 Status DB::OpenForReadOnly(const Options& /*options*/,
327 const std::string& /*dbname*/, DB** /*dbptr*/,
328 bool /*error_if_wal_file_exists*/) {
329 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
330 }
331
332 Status DB::OpenForReadOnly(
333 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
334 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
335 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
336 bool /*error_if_wal_file_exists*/) {
337 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
338 }
339 #endif // !ROCKSDB_LITE
340
341 } // namespace ROCKSDB_NAMESPACE