]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl_readonly.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_readonly.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_impl/db_impl_readonly.h"
7 #include "db/arena_wrapped_db_iter.h"
8
9 #include "db/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "monitoring/perf_context_imp.h"
14
15 namespace ROCKSDB_NAMESPACE {
16
17 #ifndef ROCKSDB_LITE
18
19 DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
20 const std::string& dbname)
21 : DBImpl(db_options, dbname) {
22 ROCKS_LOG_INFO(immutable_db_options_.info_log,
23 "Opening the db in read only mode");
24 LogFlush(immutable_db_options_.info_log);
25 }
26
27 DBImplReadOnly::~DBImplReadOnly() {}
28
29 // Implementations of the DB interface
30 Status DBImplReadOnly::Get(const ReadOptions& read_options,
31 ColumnFamilyHandle* column_family, const Slice& key,
32 PinnableSlice* pinnable_val) {
33 assert(pinnable_val != nullptr);
34 // TODO: stopwatch DB_GET needed?, perf timer needed?
35 PERF_TIMER_GUARD(get_snapshot_time);
36 Status s;
37 SequenceNumber snapshot = versions_->LastSequence();
38 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
39 auto cfd = cfh->cfd();
40 if (tracer_) {
41 InstrumentedMutexLock lock(&trace_mutex_);
42 if (tracer_) {
43 tracer_->Get(column_family, key);
44 }
45 }
46 SuperVersion* super_version = cfd->GetSuperVersion();
47 MergeContext merge_context;
48 SequenceNumber max_covering_tombstone_seq = 0;
49 LookupKey lkey(key, snapshot);
50 PERF_TIMER_STOP(get_snapshot_time);
51 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
52 &max_covering_tombstone_seq, read_options)) {
53 pinnable_val->PinSelf();
54 RecordTick(stats_, MEMTABLE_HIT);
55 } else {
56 PERF_TIMER_GUARD(get_from_output_files_time);
57 super_version->current->Get(read_options, lkey, pinnable_val, &s,
58 &merge_context, &max_covering_tombstone_seq);
59 RecordTick(stats_, MEMTABLE_MISS);
60 }
61 RecordTick(stats_, NUMBER_KEYS_READ);
62 size_t size = pinnable_val->size();
63 RecordTick(stats_, BYTES_READ, size);
64 RecordInHistogram(stats_, BYTES_PER_READ, size);
65 PERF_COUNTER_ADD(get_read_bytes, size);
66 return s;
67 }
68
69 Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
70 ColumnFamilyHandle* column_family) {
71 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
72 auto cfd = cfh->cfd();
73 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
74 SequenceNumber latest_snapshot = versions_->LastSequence();
75 SequenceNumber read_seq =
76 read_options.snapshot != nullptr
77 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
78 ->number_
79 : latest_snapshot;
80 ReadCallback* read_callback = nullptr; // No read callback provided.
81 auto db_iter = NewArenaWrappedDbIterator(
82 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
83 read_seq,
84 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
85 super_version->version_number, read_callback);
86 auto internal_iter =
87 NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
88 db_iter->GetRangeDelAggregator(), read_seq);
89 db_iter->SetIterUnderDBIter(internal_iter);
90 return db_iter;
91 }
92
93 Status DBImplReadOnly::NewIterators(
94 const ReadOptions& read_options,
95 const std::vector<ColumnFamilyHandle*>& column_families,
96 std::vector<Iterator*>* iterators) {
97 ReadCallback* read_callback = nullptr; // No read callback provided.
98 if (iterators == nullptr) {
99 return Status::InvalidArgument("iterators not allowed to be nullptr");
100 }
101 iterators->clear();
102 iterators->reserve(column_families.size());
103 SequenceNumber latest_snapshot = versions_->LastSequence();
104 SequenceNumber read_seq =
105 read_options.snapshot != nullptr
106 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
107 ->number_
108 : latest_snapshot;
109
110 for (auto cfh : column_families) {
111 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
112 auto* sv = cfd->GetSuperVersion()->Ref();
113 auto* db_iter = NewArenaWrappedDbIterator(
114 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
115 sv->mutable_cf_options.max_sequential_skip_in_iterations,
116 sv->version_number, read_callback);
117 auto* internal_iter =
118 NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
119 db_iter->GetRangeDelAggregator(), read_seq);
120 db_iter->SetIterUnderDBIter(internal_iter);
121 iterators->push_back(db_iter);
122 }
123
124 return Status::OK();
125 }
126
127 Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
128 DB** dbptr, bool /*error_if_log_file_exist*/) {
129 *dbptr = nullptr;
130
131 // Try to first open DB as fully compacted DB
132 Status s;
133 s = CompactedDBImpl::Open(options, dbname, dbptr);
134 if (s.ok()) {
135 return s;
136 }
137
138 DBOptions db_options(options);
139 ColumnFamilyOptions cf_options(options);
140 std::vector<ColumnFamilyDescriptor> column_families;
141 column_families.push_back(
142 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
143 std::vector<ColumnFamilyHandle*> handles;
144
145 s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
146 if (s.ok()) {
147 assert(handles.size() == 1);
148 // i can delete the handle since DBImpl is always holding a
149 // reference to default column family
150 delete handles[0];
151 }
152 return s;
153 }
154
155 Status DB::OpenForReadOnly(
156 const DBOptions& db_options, const std::string& dbname,
157 const std::vector<ColumnFamilyDescriptor>& column_families,
158 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
159 bool error_if_log_file_exist) {
160 *dbptr = nullptr;
161 handles->clear();
162
163 SuperVersionContext sv_context(/* create_superversion */ true);
164 DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
165 impl->mutex_.Lock();
166 Status s = impl->Recover(column_families, true /* read only */,
167 error_if_log_file_exist);
168 if (s.ok()) {
169 // set column family handles
170 for (auto cf : column_families) {
171 auto cfd =
172 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
173 if (cfd == nullptr) {
174 s = Status::InvalidArgument("Column family not found: ", cf.name);
175 break;
176 }
177 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
178 }
179 }
180 if (s.ok()) {
181 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
182 sv_context.NewSuperVersion();
183 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
184 }
185 }
186 impl->mutex_.Unlock();
187 sv_context.Clean();
188 if (s.ok()) {
189 *dbptr = impl;
190 for (auto* h : *handles) {
191 impl->NewThreadStatusCfInfo(
192 reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
193 }
194 } else {
195 for (auto h : *handles) {
196 delete h;
197 }
198 handles->clear();
199 delete impl;
200 }
201 return s;
202 }
203
204 #else // !ROCKSDB_LITE
205
206 Status DB::OpenForReadOnly(const Options& /*options*/,
207 const std::string& /*dbname*/, DB** /*dbptr*/,
208 bool /*error_if_log_file_exist*/) {
209 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
210 }
211
212 Status DB::OpenForReadOnly(
213 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
214 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
215 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
216 bool /*error_if_log_file_exist*/) {
217 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
218 }
219 #endif // !ROCKSDB_LITE
220
221 } // namespace ROCKSDB_NAMESPACE