]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl_readonly.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_impl_readonly.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include "db/db_impl_readonly.h"
7
8#include "db/compacted_db_impl.h"
9#include "db/db_impl.h"
10#include "db/db_iter.h"
11#include "db/merge_context.h"
12#include "db/range_del_aggregator.h"
13#include "monitoring/perf_context_imp.h"
14
15namespace rocksdb {
16
17#ifndef ROCKSDB_LITE
18
19DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
20 const std::string& dbname)
21 : DBImpl(db_options, dbname) {
22 ROCKS_LOG_INFO(immutable_db_options_.info_log,
23 "Opening the db in read only mode");
24 LogFlush(immutable_db_options_.info_log);
25}
26
11fdf7f2 27DBImplReadOnly::~DBImplReadOnly() {}
7c673cae
FG
28
29// Implementations of the DB interface
30Status DBImplReadOnly::Get(const ReadOptions& read_options,
31 ColumnFamilyHandle* column_family, const Slice& key,
32 PinnableSlice* pinnable_val) {
33 assert(pinnable_val != nullptr);
11fdf7f2
TL
34 // TODO: stopwatch DB_GET needed?, perf timer needed?
35 PERF_TIMER_GUARD(get_snapshot_time);
7c673cae
FG
36 Status s;
37 SequenceNumber snapshot = versions_->LastSequence();
38 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
39 auto cfd = cfh->cfd();
11fdf7f2
TL
40 if (tracer_) {
41 InstrumentedMutexLock lock(&trace_mutex_);
42 if (tracer_) {
43 tracer_->Get(column_family, key);
44 }
45 }
7c673cae
FG
46 SuperVersion* super_version = cfd->GetSuperVersion();
47 MergeContext merge_context;
48 RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
49 LookupKey lkey(key, snapshot);
11fdf7f2 50 PERF_TIMER_STOP(get_snapshot_time);
7c673cae
FG
51 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
52 &range_del_agg, read_options)) {
53 pinnable_val->PinSelf();
11fdf7f2 54 RecordTick(stats_, MEMTABLE_HIT);
7c673cae
FG
55 } else {
56 PERF_TIMER_GUARD(get_from_output_files_time);
57 super_version->current->Get(read_options, lkey, pinnable_val, &s,
58 &merge_context, &range_del_agg);
11fdf7f2 59 RecordTick(stats_, MEMTABLE_MISS);
7c673cae 60 }
11fdf7f2
TL
61 RecordTick(stats_, NUMBER_KEYS_READ);
62 size_t size = pinnable_val->size();
63 RecordTick(stats_, BYTES_READ, size);
64 MeasureTime(stats_, BYTES_PER_READ, size);
65 PERF_COUNTER_ADD(get_read_bytes, size);
7c673cae
FG
66 return s;
67}
68
69Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
70 ColumnFamilyHandle* column_family) {
71 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
72 auto cfd = cfh->cfd();
73 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
74 SequenceNumber latest_snapshot = versions_->LastSequence();
11fdf7f2 75 ReadCallback* read_callback = nullptr; // No read callback provided.
7c673cae 76 auto db_iter = NewArenaWrappedDbIterator(
11fdf7f2 77 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
7c673cae
FG
78 (read_options.snapshot != nullptr
79 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
80 ->number_
81 : latest_snapshot),
82 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
11fdf7f2 83 super_version->version_number, read_callback);
7c673cae
FG
84 auto internal_iter =
85 NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
86 db_iter->GetRangeDelAggregator());
87 db_iter->SetIterUnderDBIter(internal_iter);
88 return db_iter;
89}
90
91Status DBImplReadOnly::NewIterators(
92 const ReadOptions& read_options,
93 const std::vector<ColumnFamilyHandle*>& column_families,
94 std::vector<Iterator*>* iterators) {
11fdf7f2 95 ReadCallback* read_callback = nullptr; // No read callback provided.
7c673cae
FG
96 if (iterators == nullptr) {
97 return Status::InvalidArgument("iterators not allowed to be nullptr");
98 }
99 iterators->clear();
100 iterators->reserve(column_families.size());
101 SequenceNumber latest_snapshot = versions_->LastSequence();
102
103 for (auto cfh : column_families) {
104 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
105 auto* sv = cfd->GetSuperVersion()->Ref();
106 auto* db_iter = NewArenaWrappedDbIterator(
11fdf7f2 107 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
7c673cae
FG
108 (read_options.snapshot != nullptr
109 ? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
110 ->number_
111 : latest_snapshot),
112 sv->mutable_cf_options.max_sequential_skip_in_iterations,
11fdf7f2 113 sv->version_number, read_callback);
7c673cae
FG
114 auto* internal_iter =
115 NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
116 db_iter->GetRangeDelAggregator());
117 db_iter->SetIterUnderDBIter(internal_iter);
118 iterators->push_back(db_iter);
119 }
120
121 return Status::OK();
122}
123
124Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
11fdf7f2 125 DB** dbptr, bool /*error_if_log_file_exist*/) {
7c673cae
FG
126 *dbptr = nullptr;
127
128 // Try to first open DB as fully compacted DB
129 Status s;
130 s = CompactedDBImpl::Open(options, dbname, dbptr);
131 if (s.ok()) {
132 return s;
133 }
134
135 DBOptions db_options(options);
136 ColumnFamilyOptions cf_options(options);
137 std::vector<ColumnFamilyDescriptor> column_families;
138 column_families.push_back(
139 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
140 std::vector<ColumnFamilyHandle*> handles;
141
142 s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
143 if (s.ok()) {
144 assert(handles.size() == 1);
145 // i can delete the handle since DBImpl is always holding a
146 // reference to default column family
147 delete handles[0];
148 }
149 return s;
150}
151
152Status DB::OpenForReadOnly(
153 const DBOptions& db_options, const std::string& dbname,
154 const std::vector<ColumnFamilyDescriptor>& column_families,
155 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
156 bool error_if_log_file_exist) {
157 *dbptr = nullptr;
158 handles->clear();
159
11fdf7f2 160 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae
FG
161 DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
162 impl->mutex_.Lock();
163 Status s = impl->Recover(column_families, true /* read only */,
164 error_if_log_file_exist);
165 if (s.ok()) {
166 // set column family handles
167 for (auto cf : column_families) {
168 auto cfd =
169 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
170 if (cfd == nullptr) {
171 s = Status::InvalidArgument("Column family not found: ", cf.name);
172 break;
173 }
174 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
175 }
176 }
177 if (s.ok()) {
178 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
11fdf7f2
TL
179 sv_context.NewSuperVersion();
180 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
7c673cae
FG
181 }
182 }
183 impl->mutex_.Unlock();
11fdf7f2 184 sv_context.Clean();
7c673cae
FG
185 if (s.ok()) {
186 *dbptr = impl;
187 for (auto* h : *handles) {
188 impl->NewThreadStatusCfInfo(
189 reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
190 }
191 } else {
192 for (auto h : *handles) {
193 delete h;
194 }
195 handles->clear();
196 delete impl;
197 }
198 return s;
199}
200
11fdf7f2 201#else // !ROCKSDB_LITE
7c673cae 202
11fdf7f2
TL
203Status DB::OpenForReadOnly(const Options& /*options*/,
204 const std::string& /*dbname*/, DB** /*dbptr*/,
205 bool /*error_if_log_file_exist*/) {
7c673cae
FG
206 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
207}
208
209Status DB::OpenForReadOnly(
11fdf7f2
TL
210 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
211 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
212 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/,
213 bool /*error_if_log_file_exist*/) {
7c673cae
FG
214 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
215}
216#endif // !ROCKSDB_LITE
217
11fdf7f2 218} // namespace rocksdb