1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/db_impl/db_impl_readonly.h"
8 #include "db/arena_wrapped_db_iter.h"
9 #include "db/db_impl/compacted_db_impl.h"
10 #include "db/db_impl/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/merge_context.h"
13 #include "logging/logging.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "util/cast_util.h"
17 namespace ROCKSDB_NAMESPACE
{
21 DBImplReadOnly::DBImplReadOnly(const DBOptions
& db_options
,
22 const std::string
& dbname
)
23 : DBImpl(db_options
, dbname
, /*seq_per_batch*/ false,
24 /*batch_per_txn*/ true, /*read_only*/ true) {
25 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
26 "Opening the db in read only mode");
27 LogFlush(immutable_db_options_
.info_log
);
30 DBImplReadOnly::~DBImplReadOnly() {}
32 // Implementations of the DB interface
33 Status
DBImplReadOnly::Get(const ReadOptions
& read_options
,
34 ColumnFamilyHandle
* column_family
, const Slice
& key
,
35 PinnableSlice
* pinnable_val
) {
36 return Get(read_options
, column_family
, key
, pinnable_val
,
37 /*timestamp*/ nullptr);
40 Status
DBImplReadOnly::Get(const ReadOptions
& read_options
,
41 ColumnFamilyHandle
* column_family
, const Slice
& key
,
42 PinnableSlice
* pinnable_val
,
43 std::string
* timestamp
) {
44 assert(pinnable_val
!= nullptr);
45 // TODO: stopwatch DB_GET needed?, perf timer needed?
46 PERF_TIMER_GUARD(get_snapshot_time
);
48 assert(column_family
);
49 if (read_options
.timestamp
) {
50 const Status s
= FailIfTsMismatchCf(
51 column_family
, *(read_options
.timestamp
), /*ts_for_read=*/true);
56 const Status s
= FailIfCfHasTs(column_family
);
62 // Clear the timestamps for returning results so that we can distinguish
63 // between tombstone or key that has never been written
68 const Comparator
* ucmp
= column_family
->GetComparator();
70 std::string
* ts
= ucmp
->timestamp_size() > 0 ? timestamp
: nullptr;
73 SequenceNumber snapshot
= versions_
->LastSequence();
74 GetWithTimestampReadCallback
read_cb(snapshot
);
75 auto cfh
= static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
76 auto cfd
= cfh
->cfd();
78 InstrumentedMutexLock
lock(&trace_mutex_
);
80 tracer_
->Get(column_family
, key
);
83 SuperVersion
* super_version
= cfd
->GetSuperVersion();
84 MergeContext merge_context
;
85 SequenceNumber max_covering_tombstone_seq
= 0;
86 LookupKey
lkey(key
, snapshot
, read_options
.timestamp
);
87 PERF_TIMER_STOP(get_snapshot_time
);
88 if (super_version
->mem
->Get(lkey
, pinnable_val
->GetSelf(),
89 /*columns=*/nullptr, ts
, &s
, &merge_context
,
90 &max_covering_tombstone_seq
, read_options
,
91 false /* immutable_memtable */, &read_cb
)) {
92 pinnable_val
->PinSelf();
93 RecordTick(stats_
, MEMTABLE_HIT
);
95 PERF_TIMER_GUARD(get_from_output_files_time
);
96 PinnedIteratorsManager pinned_iters_mgr
;
97 super_version
->current
->Get(
98 read_options
, lkey
, pinnable_val
, /*columns=*/nullptr, ts
, &s
,
99 &merge_context
, &max_covering_tombstone_seq
, &pinned_iters_mgr
,
100 /*value_found*/ nullptr,
101 /*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb
,
104 RecordTick(stats_
, MEMTABLE_MISS
);
106 RecordTick(stats_
, NUMBER_KEYS_READ
);
107 size_t size
= pinnable_val
->size();
108 RecordTick(stats_
, BYTES_READ
, size
);
109 RecordInHistogram(stats_
, BYTES_PER_READ
, size
);
110 PERF_COUNTER_ADD(get_read_bytes
, size
);
114 Iterator
* DBImplReadOnly::NewIterator(const ReadOptions
& read_options
,
115 ColumnFamilyHandle
* column_family
) {
116 assert(column_family
);
117 if (read_options
.timestamp
) {
118 const Status s
= FailIfTsMismatchCf(
119 column_family
, *(read_options
.timestamp
), /*ts_for_read=*/true);
121 return NewErrorIterator(s
);
124 const Status s
= FailIfCfHasTs(column_family
);
126 return NewErrorIterator(s
);
129 auto cfh
= static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
130 auto cfd
= cfh
->cfd();
131 SuperVersion
* super_version
= cfd
->GetSuperVersion()->Ref();
132 SequenceNumber latest_snapshot
= versions_
->LastSequence();
133 SequenceNumber read_seq
=
134 read_options
.snapshot
!= nullptr
135 ? reinterpret_cast<const SnapshotImpl
*>(read_options
.snapshot
)
138 ReadCallback
* read_callback
= nullptr; // No read callback provided.
139 auto db_iter
= NewArenaWrappedDbIterator(
140 env_
, read_options
, *cfd
->ioptions(), super_version
->mutable_cf_options
,
141 super_version
->current
, read_seq
,
142 super_version
->mutable_cf_options
.max_sequential_skip_in_iterations
,
143 super_version
->version_number
, read_callback
);
144 auto internal_iter
= NewInternalIterator(
145 db_iter
->GetReadOptions(), cfd
, super_version
, db_iter
->GetArena(),
146 read_seq
, /* allow_unprepared_value */ true, db_iter
);
147 db_iter
->SetIterUnderDBIter(internal_iter
);
151 Status
DBImplReadOnly::NewIterators(
152 const ReadOptions
& read_options
,
153 const std::vector
<ColumnFamilyHandle
*>& column_families
,
154 std::vector
<Iterator
*>* iterators
) {
155 if (read_options
.timestamp
) {
156 for (auto* cf
: column_families
) {
158 const Status s
= FailIfTsMismatchCf(cf
, *(read_options
.timestamp
),
159 /*ts_for_read=*/true);
165 for (auto* cf
: column_families
) {
167 const Status s
= FailIfCfHasTs(cf
);
174 ReadCallback
* read_callback
= nullptr; // No read callback provided.
175 if (iterators
== nullptr) {
176 return Status::InvalidArgument("iterators not allowed to be nullptr");
179 iterators
->reserve(column_families
.size());
180 SequenceNumber latest_snapshot
= versions_
->LastSequence();
181 SequenceNumber read_seq
=
182 read_options
.snapshot
!= nullptr
183 ? reinterpret_cast<const SnapshotImpl
*>(read_options
.snapshot
)
187 for (auto cfh
: column_families
) {
188 auto* cfd
= static_cast_with_check
<ColumnFamilyHandleImpl
>(cfh
)->cfd();
189 auto* sv
= cfd
->GetSuperVersion()->Ref();
190 auto* db_iter
= NewArenaWrappedDbIterator(
191 env_
, read_options
, *cfd
->ioptions(), sv
->mutable_cf_options
,
192 sv
->current
, read_seq
,
193 sv
->mutable_cf_options
.max_sequential_skip_in_iterations
,
194 sv
->version_number
, read_callback
);
195 auto* internal_iter
= NewInternalIterator(
196 db_iter
->GetReadOptions(), cfd
, sv
, db_iter
->GetArena(), read_seq
,
197 /* allow_unprepared_value */ true, db_iter
);
198 db_iter
->SetIterUnderDBIter(internal_iter
);
199 iterators
->push_back(db_iter
);
206 // Return OK if dbname exists in the file system or create it if
208 Status
OpenForReadOnlyCheckExistence(const DBOptions
& db_options
,
209 const std::string
& dbname
) {
211 if (!db_options
.create_if_missing
) {
212 // Attempt to read "CURRENT" file
213 const std::shared_ptr
<FileSystem
>& fs
= db_options
.env
->GetFileSystem();
214 std::string manifest_path
;
215 uint64_t manifest_file_number
;
216 s
= VersionSet::GetCurrentManifestPath(dbname
, fs
.get(), &manifest_path
,
217 &manifest_file_number
);
219 // Historic behavior that doesn't necessarily make sense
220 s
= db_options
.env
->CreateDirIfMissing(dbname
);
226 Status
DB::OpenForReadOnly(const Options
& options
, const std::string
& dbname
,
227 DB
** dbptr
, bool /*error_if_wal_file_exists*/) {
228 Status s
= OpenForReadOnlyCheckExistence(options
, dbname
);
235 // Try to first open DB as fully compacted DB
236 s
= CompactedDBImpl::Open(options
, dbname
, dbptr
);
241 DBOptions
db_options(options
);
242 ColumnFamilyOptions
cf_options(options
);
243 std::vector
<ColumnFamilyDescriptor
> column_families
;
244 column_families
.push_back(
245 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
246 std::vector
<ColumnFamilyHandle
*> handles
;
248 s
= DBImplReadOnly::OpenForReadOnlyWithoutCheck(
249 db_options
, dbname
, column_families
, &handles
, dbptr
);
251 assert(handles
.size() == 1);
252 // i can delete the handle since DBImpl is always holding a
253 // reference to default column family
259 Status
DB::OpenForReadOnly(
260 const DBOptions
& db_options
, const std::string
& dbname
,
261 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
262 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
,
263 bool error_if_wal_file_exists
) {
264 // If dbname does not exist in the file system, should not do anything
265 Status s
= OpenForReadOnlyCheckExistence(db_options
, dbname
);
270 return DBImplReadOnly::OpenForReadOnlyWithoutCheck(
271 db_options
, dbname
, column_families
, handles
, dbptr
,
272 error_if_wal_file_exists
);
275 Status
DBImplReadOnly::OpenForReadOnlyWithoutCheck(
276 const DBOptions
& db_options
, const std::string
& dbname
,
277 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
278 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
,
279 bool error_if_wal_file_exists
) {
283 SuperVersionContext
sv_context(/* create_superversion */ true);
284 DBImplReadOnly
* impl
= new DBImplReadOnly(db_options
, dbname
);
286 Status s
= impl
->Recover(column_families
, true /* read only */,
287 error_if_wal_file_exists
);
289 // set column family handles
290 for (auto cf
: column_families
) {
292 impl
->versions_
->GetColumnFamilySet()->GetColumnFamily(cf
.name
);
293 if (cfd
== nullptr) {
294 s
= Status::InvalidArgument("Column family not found", cf
.name
);
297 handles
->push_back(new ColumnFamilyHandleImpl(cfd
, impl
, &impl
->mutex_
));
301 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
302 sv_context
.NewSuperVersion();
303 cfd
->InstallSuperVersion(&sv_context
, &impl
->mutex_
);
306 impl
->mutex_
.Unlock();
310 for (auto* h
: *handles
) {
311 impl
->NewThreadStatusCfInfo(
312 static_cast_with_check
<ColumnFamilyHandleImpl
>(h
)->cfd());
315 for (auto h
: *handles
) {
324 #else // !ROCKSDB_LITE
326 Status
DB::OpenForReadOnly(const Options
& /*options*/,
327 const std::string
& /*dbname*/, DB
** /*dbptr*/,
328 bool /*error_if_wal_file_exists*/) {
329 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
332 Status
DB::OpenForReadOnly(
333 const DBOptions
& /*db_options*/, const std::string
& /*dbname*/,
334 const std::vector
<ColumnFamilyDescriptor
>& /*column_families*/,
335 std::vector
<ColumnFamilyHandle
*>* /*handles*/, DB
** /*dbptr*/,
336 bool /*error_if_wal_file_exists*/) {
337 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
339 #endif // !ROCKSDB_LITE
341 } // namespace ROCKSDB_NAMESPACE