1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/db_impl_secondary.h"
7 #include "db/db_iter.h"
8 #include "db/merge_context.h"
9 #include "monitoring/perf_context_imp.h"
10 #include "util/auto_roll_logger.h"
16 DBImplSecondary::DBImplSecondary(const DBOptions
& db_options
,
17 const std::string
& dbname
)
18 : DBImpl(db_options
, dbname
) {
19 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
20 "Opening the db in secondary mode");
21 LogFlush(immutable_db_options_
.info_log
);
24 DBImplSecondary::~DBImplSecondary() {}
26 Status
DBImplSecondary::Recover(
27 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
28 bool /*readonly*/, bool /*error_if_log_file_exist*/,
29 bool /*error_if_data_exists_in_logs*/) {
33 s
= static_cast<ReactiveVersionSet
*>(versions_
.get())
34 ->Recover(column_families
, &manifest_reader_
, &manifest_reporter_
,
35 &manifest_reader_status_
);
39 if (immutable_db_options_
.paranoid_checks
&& s
.ok()) {
40 s
= CheckConsistency();
42 // Initial max_total_in_memory_state_ before recovery logs.
43 max_total_in_memory_state_
= 0;
44 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
45 auto* mutable_cf_options
= cfd
->GetLatestMutableCFOptions();
46 max_total_in_memory_state_
+= mutable_cf_options
->write_buffer_size
*
47 mutable_cf_options
->max_write_buffer_number
;
50 default_cf_handle_
= new ColumnFamilyHandleImpl(
51 versions_
->GetColumnFamilySet()->GetDefault(), this, &mutex_
);
52 default_cf_internal_stats_
= default_cf_handle_
->cfd()->internal_stats();
53 single_column_family_mode_
=
54 versions_
->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
57 // TODO: attempt to recover from WAL files.
61 // Implementation of the DB interface
62 Status
DBImplSecondary::Get(const ReadOptions
& read_options
,
63 ColumnFamilyHandle
* column_family
, const Slice
& key
,
64 PinnableSlice
* value
) {
65 return GetImpl(read_options
, column_family
, key
, value
);
68 Status
DBImplSecondary::GetImpl(const ReadOptions
& read_options
,
69 ColumnFamilyHandle
* column_family
,
70 const Slice
& key
, PinnableSlice
* pinnable_val
) {
71 assert(pinnable_val
!= nullptr);
72 PERF_CPU_TIMER_GUARD(get_cpu_nanos
, env_
);
73 StopWatch
sw(env_
, stats_
, DB_GET
);
74 PERF_TIMER_GUARD(get_snapshot_time
);
76 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(column_family
);
77 ColumnFamilyData
* cfd
= cfh
->cfd();
79 InstrumentedMutexLock
lock(&trace_mutex_
);
81 tracer_
->Get(column_family
, key
);
84 // Acquire SuperVersion
85 SuperVersion
* super_version
= GetAndRefSuperVersion(cfd
);
86 SequenceNumber snapshot
= versions_
->LastSequence();
87 MergeContext merge_context
;
88 SequenceNumber max_covering_tombstone_seq
= 0;
90 LookupKey
lkey(key
, snapshot
);
91 PERF_TIMER_STOP(get_snapshot_time
);
94 if (super_version
->mem
->Get(lkey
, pinnable_val
->GetSelf(), &s
, &merge_context
,
95 &max_covering_tombstone_seq
, read_options
)) {
97 pinnable_val
->PinSelf();
98 RecordTick(stats_
, MEMTABLE_HIT
);
99 } else if ((s
.ok() || s
.IsMergeInProgress()) &&
100 super_version
->imm
->Get(
101 lkey
, pinnable_val
->GetSelf(), &s
, &merge_context
,
102 &max_covering_tombstone_seq
, read_options
)) {
104 pinnable_val
->PinSelf();
105 RecordTick(stats_
, MEMTABLE_HIT
);
107 if (!done
&& !s
.ok() && !s
.IsMergeInProgress()) {
108 ReturnAndCleanupSuperVersion(cfd
, super_version
);
112 PERF_TIMER_GUARD(get_from_output_files_time
);
113 super_version
->current
->Get(read_options
, lkey
, pinnable_val
, &s
,
114 &merge_context
, &max_covering_tombstone_seq
);
115 RecordTick(stats_
, MEMTABLE_MISS
);
118 PERF_TIMER_GUARD(get_post_process_time
);
119 ReturnAndCleanupSuperVersion(cfd
, super_version
);
120 RecordTick(stats_
, NUMBER_KEYS_READ
);
121 size_t size
= pinnable_val
->size();
122 RecordTick(stats_
, BYTES_READ
, size
);
123 RecordTimeToHistogram(stats_
, BYTES_PER_READ
, size
);
124 PERF_COUNTER_ADD(get_read_bytes
, size
);
129 Iterator
* DBImplSecondary::NewIterator(const ReadOptions
& read_options
,
130 ColumnFamilyHandle
* column_family
) {
131 if (read_options
.managed
) {
132 return NewErrorIterator(
133 Status::NotSupported("Managed iterator is not supported anymore."));
135 if (read_options
.read_tier
== kPersistedTier
) {
136 return NewErrorIterator(Status::NotSupported(
137 "ReadTier::kPersistedData is not yet supported in iterators."));
139 Iterator
* result
= nullptr;
140 auto cfh
= reinterpret_cast<ColumnFamilyHandleImpl
*>(column_family
);
141 auto cfd
= cfh
->cfd();
142 ReadCallback
* read_callback
= nullptr; // No read callback provided.
143 if (read_options
.tailing
) {
144 return NewErrorIterator(Status::NotSupported(
145 "tailing iterator not supported in secondary mode"));
146 } else if (read_options
.snapshot
!= nullptr) {
147 // TODO (yanqin) support snapshot.
148 return NewErrorIterator(
149 Status::NotSupported("snapshot not supported in secondary mode"));
151 auto snapshot
= versions_
->LastSequence();
152 result
= NewIteratorImpl(read_options
, cfd
, snapshot
, read_callback
);
157 ArenaWrappedDBIter
* DBImplSecondary::NewIteratorImpl(
158 const ReadOptions
& read_options
, ColumnFamilyData
* cfd
,
159 SequenceNumber snapshot
, ReadCallback
* read_callback
) {
160 assert(nullptr != cfd
);
161 SuperVersion
* super_version
= cfd
->GetReferencedSuperVersion(&mutex_
);
162 auto db_iter
= NewArenaWrappedDbIterator(
163 env_
, read_options
, *cfd
->ioptions(), super_version
->mutable_cf_options
,
165 super_version
->mutable_cf_options
.max_sequential_skip_in_iterations
,
166 super_version
->version_number
, read_callback
);
168 NewInternalIterator(read_options
, cfd
, super_version
, db_iter
->GetArena(),
169 db_iter
->GetRangeDelAggregator(), snapshot
);
170 db_iter
->SetIterUnderDBIter(internal_iter
);
174 Status
DBImplSecondary::NewIterators(
175 const ReadOptions
& read_options
,
176 const std::vector
<ColumnFamilyHandle
*>& column_families
,
177 std::vector
<Iterator
*>* iterators
) {
178 if (read_options
.managed
) {
179 return Status::NotSupported("Managed iterator is not supported anymore.");
181 if (read_options
.read_tier
== kPersistedTier
) {
182 return Status::NotSupported(
183 "ReadTier::kPersistedData is not yet supported in iterators.");
185 ReadCallback
* read_callback
= nullptr; // No read callback provided.
186 if (iterators
== nullptr) {
187 return Status::InvalidArgument("iterators not allowed to be nullptr");
190 iterators
->reserve(column_families
.size());
191 if (read_options
.tailing
) {
192 return Status::NotSupported(
193 "tailing iterator not supported in secondary mode");
194 } else if (read_options
.snapshot
!= nullptr) {
195 // TODO (yanqin) support snapshot.
196 return Status::NotSupported("snapshot not supported in secondary mode");
198 SequenceNumber read_seq
= versions_
->LastSequence();
199 for (auto cfh
: column_families
) {
200 ColumnFamilyData
* cfd
= static_cast<ColumnFamilyHandleImpl
*>(cfh
)->cfd();
201 iterators
->push_back(
202 NewIteratorImpl(read_options
, cfd
, read_seq
, read_callback
));
208 Status
DBImplSecondary::TryCatchUpWithPrimary() {
209 assert(versions_
.get() != nullptr);
210 assert(manifest_reader_
.get() != nullptr);
212 std::unordered_set
<ColumnFamilyData
*> cfds_changed
;
213 InstrumentedMutexLock
lock_guard(&mutex_
);
214 s
= static_cast<ReactiveVersionSet
*>(versions_
.get())
215 ->ReadAndApply(&mutex_
, &manifest_reader_
, &cfds_changed
);
217 SuperVersionContext
sv_context(true /* create_superversion */);
218 for (auto cfd
: cfds_changed
) {
219 sv_context
.NewSuperVersion();
220 cfd
->InstallSuperVersion(&sv_context
, &mutex_
);
227 Status
DB::OpenAsSecondary(const Options
& options
, const std::string
& dbname
,
228 const std::string
& secondary_path
, DB
** dbptr
) {
231 DBOptions
db_options(options
);
232 ColumnFamilyOptions
cf_options(options
);
233 std::vector
<ColumnFamilyDescriptor
> column_families
;
234 column_families
.emplace_back(kDefaultColumnFamilyName
, cf_options
);
235 std::vector
<ColumnFamilyHandle
*> handles
;
237 Status s
= DB::OpenAsSecondary(db_options
, dbname
, secondary_path
,
238 column_families
, &handles
, dbptr
);
240 assert(handles
.size() == 1);
246 Status
DB::OpenAsSecondary(
247 const DBOptions
& db_options
, const std::string
& dbname
,
248 const std::string
& secondary_path
,
249 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
250 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
) {
252 if (db_options
.max_open_files
!= -1) {
253 // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
254 // on SST files so that db secondary can still have access to old SSTs
255 // while primary instance may delete original.
256 return Status::InvalidArgument("require max_open_files to be -1");
259 DBOptions
tmp_opts(db_options
);
260 if (nullptr == tmp_opts
.info_log
) {
261 Env
* env
= tmp_opts
.env
;
262 assert(env
!= nullptr);
263 std::string secondary_abs_path
;
264 env
->GetAbsolutePath(secondary_path
, &secondary_abs_path
);
265 std::string fname
= InfoLogFileName(secondary_path
, secondary_abs_path
,
266 tmp_opts
.db_log_dir
);
268 env
->CreateDirIfMissing(secondary_path
);
269 if (tmp_opts
.log_file_time_to_roll
> 0 || tmp_opts
.max_log_file_size
> 0) {
270 AutoRollLogger
* result
= new AutoRollLogger(
271 env
, secondary_path
, tmp_opts
.db_log_dir
, tmp_opts
.max_log_file_size
,
272 tmp_opts
.log_file_time_to_roll
, tmp_opts
.info_log_level
);
273 Status s
= result
->GetStatus();
277 tmp_opts
.info_log
.reset(result
);
280 if (nullptr == tmp_opts
.info_log
) {
282 fname
, OldInfoLogFileName(secondary_path
, env
->NowMicros(),
283 secondary_abs_path
, tmp_opts
.db_log_dir
));
284 Status s
= env
->NewLogger(fname
, &(tmp_opts
.info_log
));
285 if (tmp_opts
.info_log
!= nullptr) {
286 tmp_opts
.info_log
->SetInfoLogLevel(tmp_opts
.info_log_level
);
291 assert(tmp_opts
.info_log
!= nullptr);
294 DBImplSecondary
* impl
= new DBImplSecondary(tmp_opts
, dbname
);
295 impl
->versions_
.reset(new ReactiveVersionSet(
296 dbname
, &impl
->immutable_db_options_
, impl
->env_options_
,
297 impl
->table_cache_
.get(), impl
->write_buffer_manager_
,
298 &impl
->write_controller_
));
299 impl
->column_family_memtables_
.reset(
300 new ColumnFamilyMemTablesImpl(impl
->versions_
->GetColumnFamilySet()));
302 Status s
= impl
->Recover(column_families
, true, false, false);
304 for (auto cf
: column_families
) {
306 impl
->versions_
->GetColumnFamilySet()->GetColumnFamily(cf
.name
);
307 if (nullptr == cfd
) {
308 s
= Status::InvalidArgument("Column family not found: ", cf
.name
);
311 handles
->push_back(new ColumnFamilyHandleImpl(cfd
, impl
, &impl
->mutex_
));
314 SuperVersionContext
sv_context(true /* create_superversion */);
316 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
317 sv_context
.NewSuperVersion();
318 cfd
->InstallSuperVersion(&sv_context
, &impl
->mutex_
);
321 impl
->mutex_
.Unlock();
325 for (auto h
: *handles
) {
326 impl
->NewThreadStatusCfInfo(
327 reinterpret_cast<ColumnFamilyHandleImpl
*>(h
)->cfd());
330 for (auto h
: *handles
) {
338 #else // !ROCKSDB_LITE
340 Status
DB::OpenAsSecondary(const Options
& /*options*/,
341 const std::string
& /*name*/,
342 const std::string
& /*secondary_path*/,
344 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
347 Status
DB::OpenAsSecondary(
348 const DBOptions
& /*db_options*/, const std::string
& /*dbname*/,
349 const std::string
& /*secondary_path*/,
350 const std::vector
<ColumnFamilyDescriptor
>& /*column_families*/,
351 std::vector
<ColumnFamilyHandle
*>* /*handles*/, DB
** /*dbptr*/) {
352 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
354 #endif // !ROCKSDB_LITE
356 } // namespace rocksdb