]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl_readonly.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_readonly.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_impl/db_impl_readonly.h"
7
8 #include "db/arena_wrapped_db_iter.h"
9 #include "db/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/cast_util.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 #ifndef ROCKSDB_LITE
19
20 DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
21 const std::string& dbname)
22 : DBImpl(db_options, dbname) {
23 ROCKS_LOG_INFO(immutable_db_options_.info_log,
24 "Opening the db in read only mode");
25 LogFlush(immutable_db_options_.info_log);
26 }
27
28 DBImplReadOnly::~DBImplReadOnly() {}
29
30 // Implementations of the DB interface
31 Status DBImplReadOnly::Get(const ReadOptions& read_options,
32 ColumnFamilyHandle* column_family, const Slice& key,
33 PinnableSlice* pinnable_val) {
34 assert(pinnable_val != nullptr);
35 // TODO: stopwatch DB_GET needed?, perf timer needed?
36 PERF_TIMER_GUARD(get_snapshot_time);
37 Status s;
38 SequenceNumber snapshot = versions_->LastSequence();
39 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
40 auto cfd = cfh->cfd();
41 if (tracer_) {
42 InstrumentedMutexLock lock(&trace_mutex_);
43 if (tracer_) {
44 tracer_->Get(column_family, key);
45 }
46 }
47 SuperVersion* super_version = cfd->GetSuperVersion();
48 MergeContext merge_context;
49 SequenceNumber max_covering_tombstone_seq = 0;
50 LookupKey lkey(key, snapshot);
51 PERF_TIMER_STOP(get_snapshot_time);
52 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
53 /*timestamp=*/nullptr, &s, &merge_context,
54 &max_covering_tombstone_seq, read_options)) {
55 pinnable_val->PinSelf();
56 RecordTick(stats_, MEMTABLE_HIT);
57 } else {
58 PERF_TIMER_GUARD(get_from_output_files_time);
59 super_version->current->Get(read_options, lkey, pinnable_val,
60 /*timestamp=*/nullptr, &s, &merge_context,
61 &max_covering_tombstone_seq);
62 RecordTick(stats_, MEMTABLE_MISS);
63 }
64 RecordTick(stats_, NUMBER_KEYS_READ);
65 size_t size = pinnable_val->size();
66 RecordTick(stats_, BYTES_READ, size);
67 RecordInHistogram(stats_, BYTES_PER_READ, size);
68 PERF_COUNTER_ADD(get_read_bytes, size);
69 return s;
70 }
71
72 Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
73 ColumnFamilyHandle* column_family) {
74 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
75 auto cfd = cfh->cfd();
76 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
77 SequenceNumber latest_snapshot = versions_->LastSequence();
78 SequenceNumber read_seq =
79 read_options.snapshot != nullptr
80 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
81 ->number_
82 : latest_snapshot;
83 ReadCallback* read_callback = nullptr; // No read callback provided.
84 auto db_iter = NewArenaWrappedDbIterator(
85 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
86 read_seq,
87 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
88 super_version->version_number, read_callback);
89 auto internal_iter = NewInternalIterator(
90 db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
91 db_iter->GetRangeDelAggregator(), read_seq,
92 /* allow_unprepared_value */ true);
93 db_iter->SetIterUnderDBIter(internal_iter);
94 return db_iter;
95 }
96
97 Status DBImplReadOnly::NewIterators(
98 const ReadOptions& read_options,
99 const std::vector<ColumnFamilyHandle*>& column_families,
100 std::vector<Iterator*>* iterators) {
101 ReadCallback* read_callback = nullptr; // No read callback provided.
102 if (iterators == nullptr) {
103 return Status::InvalidArgument("iterators not allowed to be nullptr");
104 }
105 iterators->clear();
106 iterators->reserve(column_families.size());
107 SequenceNumber latest_snapshot = versions_->LastSequence();
108 SequenceNumber read_seq =
109 read_options.snapshot != nullptr
110 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
111 ->number_
112 : latest_snapshot;
113
114 for (auto cfh : column_families) {
115 auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
116 auto* sv = cfd->GetSuperVersion()->Ref();
117 auto* db_iter = NewArenaWrappedDbIterator(
118 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
119 sv->mutable_cf_options.max_sequential_skip_in_iterations,
120 sv->version_number, read_callback);
121 auto* internal_iter = NewInternalIterator(
122 db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
123 db_iter->GetRangeDelAggregator(), read_seq,
124 /* allow_unprepared_value */ true);
125 db_iter->SetIterUnderDBIter(internal_iter);
126 iterators->push_back(db_iter);
127 }
128
129 return Status::OK();
130 }
131
132 namespace {
133 // Return OK if dbname exists in the file system
134 // or create_if_missing is false
135 Status OpenForReadOnlyCheckExistence(const DBOptions& db_options,
136 const std::string& dbname) {
137 Status s;
138 if (!db_options.create_if_missing) {
139 // Attempt to read "CURRENT" file
140 const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
141 std::string manifest_path;
142 uint64_t manifest_file_number;
143 s = VersionSet::GetCurrentManifestPath(dbname, fs.get(), &manifest_path,
144 &manifest_file_number);
145 if (!s.ok()) {
146 return Status::NotFound(CurrentFileName(dbname), "does not exist");
147 }
148 }
149 return s;
150 }
151 } // namespace
152
153 Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
154 DB** dbptr, bool /*error_if_wal_file_exists*/) {
155 // If dbname does not exist in the file system, should not do anything
156 Status s = OpenForReadOnlyCheckExistence(options, dbname);
157 if (!s.ok()) {
158 return s;
159 }
160
161 *dbptr = nullptr;
162
163 // Try to first open DB as fully compacted DB
164 s = CompactedDBImpl::Open(options, dbname, dbptr);
165 if (s.ok()) {
166 return s;
167 }
168
169 DBOptions db_options(options);
170 ColumnFamilyOptions cf_options(options);
171 std::vector<ColumnFamilyDescriptor> column_families;
172 column_families.push_back(
173 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
174 std::vector<ColumnFamilyHandle*> handles;
175
176 s = DBImplReadOnly::OpenForReadOnlyWithoutCheck(
177 db_options, dbname, column_families, &handles, dbptr);
178 if (s.ok()) {
179 assert(handles.size() == 1);
180 // i can delete the handle since DBImpl is always holding a
181 // reference to default column family
182 delete handles[0];
183 }
184 return s;
185 }
186
187 Status DB::OpenForReadOnly(
188 const DBOptions& db_options, const std::string& dbname,
189 const std::vector<ColumnFamilyDescriptor>& column_families,
190 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
191 bool error_if_wal_file_exists) {
192 // If dbname does not exist in the file system, should not do anything
193 Status s = OpenForReadOnlyCheckExistence(db_options, dbname);
194 if (!s.ok()) {
195 return s;
196 }
197
198 return DBImplReadOnly::OpenForReadOnlyWithoutCheck(
199 db_options, dbname, column_families, handles, dbptr,
200 error_if_wal_file_exists);
201 }
202
203 Status DBImplReadOnly::OpenForReadOnlyWithoutCheck(
204 const DBOptions& db_options, const std::string& dbname,
205 const std::vector<ColumnFamilyDescriptor>& column_families,
206 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
207 bool error_if_wal_file_exists) {
208 *dbptr = nullptr;
209 handles->clear();
210
211 SuperVersionContext sv_context(/* create_superversion */ true);
212 DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
213 impl->mutex_.Lock();
214 Status s = impl->Recover(column_families, true /* read only */,
215 error_if_wal_file_exists);
216 if (s.ok()) {
217 // set column family handles
218 for (auto cf : column_families) {
219 auto cfd =
220 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
221 if (cfd == nullptr) {
222 s = Status::InvalidArgument("Column family not found", cf.name);
223 break;
224 }
225 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
226 }
227 }
228 if (s.ok()) {
229 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
230 sv_context.NewSuperVersion();
231 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
232 }
233 }
234 impl->mutex_.Unlock();
235 sv_context.Clean();
236 if (s.ok()) {
237 *dbptr = impl;
238 for (auto* h : *handles) {
239 impl->NewThreadStatusCfInfo(
240 static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
241 }
242 } else {
243 for (auto h : *handles) {
244 delete h;
245 }
246 handles->clear();
247 delete impl;
248 }
249 return s;
250 }
251
252 #else // !ROCKSDB_LITE
253
254 Status DB::OpenForReadOnly(const Options& /*options*/,
255 const std::string& /*dbname*/, DB** /*dbptr*/,
256 bool /*error_if_wal_file_exists*/) {
257 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
258 }
259
260 Status DB::OpenForReadOnly(
261 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
262 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
263 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
264 bool /*error_if_wal_file_exists*/) {
265 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
266 }
267 #endif // !ROCKSDB_LITE
268
269 } // namespace ROCKSDB_NAMESPACE