1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/db_impl/db_impl_readonly.h"
8 #include "db/arena_wrapped_db_iter.h"
9 #include "db/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/cast_util.h"
16 namespace ROCKSDB_NAMESPACE
{
20 DBImplReadOnly::DBImplReadOnly(const DBOptions
& db_options
,
21 const std::string
& dbname
)
22 : DBImpl(db_options
, dbname
) {
23 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
24 "Opening the db in read only mode");
25 LogFlush(immutable_db_options_
.info_log
);
28 DBImplReadOnly::~DBImplReadOnly() {}
30 // Implementations of the DB interface
31 Status
DBImplReadOnly::Get(const ReadOptions
& read_options
,
32 ColumnFamilyHandle
* column_family
, const Slice
& key
,
33 PinnableSlice
* pinnable_val
) {
34 assert(pinnable_val
!= nullptr);
35 // TODO: stopwatch DB_GET needed?, perf timer needed?
36 PERF_TIMER_GUARD(get_snapshot_time
);
38 SequenceNumber snapshot
= versions_
->LastSequence();
39 auto cfh
= static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
40 auto cfd
= cfh
->cfd();
42 InstrumentedMutexLock
lock(&trace_mutex_
);
44 tracer_
->Get(column_family
, key
);
47 SuperVersion
* super_version
= cfd
->GetSuperVersion();
48 MergeContext merge_context
;
49 SequenceNumber max_covering_tombstone_seq
= 0;
50 LookupKey
lkey(key
, snapshot
);
51 PERF_TIMER_STOP(get_snapshot_time
);
52 if (super_version
->mem
->Get(lkey
, pinnable_val
->GetSelf(),
53 /*timestamp=*/nullptr, &s
, &merge_context
,
54 &max_covering_tombstone_seq
, read_options
)) {
55 pinnable_val
->PinSelf();
56 RecordTick(stats_
, MEMTABLE_HIT
);
58 PERF_TIMER_GUARD(get_from_output_files_time
);
59 super_version
->current
->Get(read_options
, lkey
, pinnable_val
,
60 /*timestamp=*/nullptr, &s
, &merge_context
,
61 &max_covering_tombstone_seq
);
62 RecordTick(stats_
, MEMTABLE_MISS
);
64 RecordTick(stats_
, NUMBER_KEYS_READ
);
65 size_t size
= pinnable_val
->size();
66 RecordTick(stats_
, BYTES_READ
, size
);
67 RecordInHistogram(stats_
, BYTES_PER_READ
, size
);
68 PERF_COUNTER_ADD(get_read_bytes
, size
);
72 Iterator
* DBImplReadOnly::NewIterator(const ReadOptions
& read_options
,
73 ColumnFamilyHandle
* column_family
) {
74 auto cfh
= static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
75 auto cfd
= cfh
->cfd();
76 SuperVersion
* super_version
= cfd
->GetSuperVersion()->Ref();
77 SequenceNumber latest_snapshot
= versions_
->LastSequence();
78 SequenceNumber read_seq
=
79 read_options
.snapshot
!= nullptr
80 ? reinterpret_cast<const SnapshotImpl
*>(read_options
.snapshot
)
83 ReadCallback
* read_callback
= nullptr; // No read callback provided.
84 auto db_iter
= NewArenaWrappedDbIterator(
85 env_
, read_options
, *cfd
->ioptions(), super_version
->mutable_cf_options
,
87 super_version
->mutable_cf_options
.max_sequential_skip_in_iterations
,
88 super_version
->version_number
, read_callback
);
89 auto internal_iter
= NewInternalIterator(
90 db_iter
->GetReadOptions(), cfd
, super_version
, db_iter
->GetArena(),
91 db_iter
->GetRangeDelAggregator(), read_seq
,
92 /* allow_unprepared_value */ true);
93 db_iter
->SetIterUnderDBIter(internal_iter
);
97 Status
DBImplReadOnly::NewIterators(
98 const ReadOptions
& read_options
,
99 const std::vector
<ColumnFamilyHandle
*>& column_families
,
100 std::vector
<Iterator
*>* iterators
) {
101 ReadCallback
* read_callback
= nullptr; // No read callback provided.
102 if (iterators
== nullptr) {
103 return Status::InvalidArgument("iterators not allowed to be nullptr");
106 iterators
->reserve(column_families
.size());
107 SequenceNumber latest_snapshot
= versions_
->LastSequence();
108 SequenceNumber read_seq
=
109 read_options
.snapshot
!= nullptr
110 ? reinterpret_cast<const SnapshotImpl
*>(read_options
.snapshot
)
114 for (auto cfh
: column_families
) {
115 auto* cfd
= static_cast_with_check
<ColumnFamilyHandleImpl
>(cfh
)->cfd();
116 auto* sv
= cfd
->GetSuperVersion()->Ref();
117 auto* db_iter
= NewArenaWrappedDbIterator(
118 env_
, read_options
, *cfd
->ioptions(), sv
->mutable_cf_options
, read_seq
,
119 sv
->mutable_cf_options
.max_sequential_skip_in_iterations
,
120 sv
->version_number
, read_callback
);
121 auto* internal_iter
= NewInternalIterator(
122 db_iter
->GetReadOptions(), cfd
, sv
, db_iter
->GetArena(),
123 db_iter
->GetRangeDelAggregator(), read_seq
,
124 /* allow_unprepared_value */ true);
125 db_iter
->SetIterUnderDBIter(internal_iter
);
126 iterators
->push_back(db_iter
);
133 // Return OK if dbname exists in the file system
134 // or create_if_missing is false
135 Status
OpenForReadOnlyCheckExistence(const DBOptions
& db_options
,
136 const std::string
& dbname
) {
138 if (!db_options
.create_if_missing
) {
139 // Attempt to read "CURRENT" file
140 const std::shared_ptr
<FileSystem
>& fs
= db_options
.env
->GetFileSystem();
141 std::string manifest_path
;
142 uint64_t manifest_file_number
;
143 s
= VersionSet::GetCurrentManifestPath(dbname
, fs
.get(), &manifest_path
,
144 &manifest_file_number
);
146 return Status::NotFound(CurrentFileName(dbname
), "does not exist");
153 Status
DB::OpenForReadOnly(const Options
& options
, const std::string
& dbname
,
154 DB
** dbptr
, bool /*error_if_wal_file_exists*/) {
155 // If dbname does not exist in the file system, should not do anything
156 Status s
= OpenForReadOnlyCheckExistence(options
, dbname
);
163 // Try to first open DB as fully compacted DB
164 s
= CompactedDBImpl::Open(options
, dbname
, dbptr
);
169 DBOptions
db_options(options
);
170 ColumnFamilyOptions
cf_options(options
);
171 std::vector
<ColumnFamilyDescriptor
> column_families
;
172 column_families
.push_back(
173 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
174 std::vector
<ColumnFamilyHandle
*> handles
;
176 s
= DBImplReadOnly::OpenForReadOnlyWithoutCheck(
177 db_options
, dbname
, column_families
, &handles
, dbptr
);
179 assert(handles
.size() == 1);
180 // i can delete the handle since DBImpl is always holding a
181 // reference to default column family
187 Status
DB::OpenForReadOnly(
188 const DBOptions
& db_options
, const std::string
& dbname
,
189 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
190 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
,
191 bool error_if_wal_file_exists
) {
192 // If dbname does not exist in the file system, should not do anything
193 Status s
= OpenForReadOnlyCheckExistence(db_options
, dbname
);
198 return DBImplReadOnly::OpenForReadOnlyWithoutCheck(
199 db_options
, dbname
, column_families
, handles
, dbptr
,
200 error_if_wal_file_exists
);
203 Status
DBImplReadOnly::OpenForReadOnlyWithoutCheck(
204 const DBOptions
& db_options
, const std::string
& dbname
,
205 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
206 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
,
207 bool error_if_wal_file_exists
) {
211 SuperVersionContext
sv_context(/* create_superversion */ true);
212 DBImplReadOnly
* impl
= new DBImplReadOnly(db_options
, dbname
);
214 Status s
= impl
->Recover(column_families
, true /* read only */,
215 error_if_wal_file_exists
);
217 // set column family handles
218 for (auto cf
: column_families
) {
220 impl
->versions_
->GetColumnFamilySet()->GetColumnFamily(cf
.name
);
221 if (cfd
== nullptr) {
222 s
= Status::InvalidArgument("Column family not found", cf
.name
);
225 handles
->push_back(new ColumnFamilyHandleImpl(cfd
, impl
, &impl
->mutex_
));
229 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
230 sv_context
.NewSuperVersion();
231 cfd
->InstallSuperVersion(&sv_context
, &impl
->mutex_
);
234 impl
->mutex_
.Unlock();
238 for (auto* h
: *handles
) {
239 impl
->NewThreadStatusCfInfo(
240 static_cast_with_check
<ColumnFamilyHandleImpl
>(h
)->cfd());
243 for (auto h
: *handles
) {
252 #else // !ROCKSDB_LITE
254 Status
DB::OpenForReadOnly(const Options
& /*options*/,
255 const std::string
& /*dbname*/, DB
** /*dbptr*/,
256 bool /*error_if_wal_file_exists*/) {
257 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
260 Status
DB::OpenForReadOnly(
261 const DBOptions
& /*db_options*/, const std::string
& /*dbname*/,
262 const std::vector
<ColumnFamilyDescriptor
>& /*column_families*/,
263 std::vector
<ColumnFamilyHandle
*>* /*handles*/, DB
** /*dbptr*/,
264 bool /*error_if_wal_file_exists*/) {
265 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
267 #endif // !ROCKSDB_LITE
269 } // namespace ROCKSDB_NAMESPACE