1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/db_impl/db_impl_secondary.h"
10 #include "db/arena_wrapped_db_iter.h"
11 #include "db/merge_context.h"
12 #include "logging/auto_roll_logger.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/cast_util.h"
16 namespace ROCKSDB_NAMESPACE
{
19 DBImplSecondary::DBImplSecondary(const DBOptions
& db_options
,
20 const std::string
& dbname
)
21 : DBImpl(db_options
, dbname
) {
22 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
23 "Opening the db in secondary mode");
24 LogFlush(immutable_db_options_
.info_log
);
27 DBImplSecondary::~DBImplSecondary() {}
29 Status
DBImplSecondary::Recover(
30 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
31 bool /*readonly*/, bool /*error_if_wal_file_exists*/,
32 bool /*error_if_data_exists_in_wals*/, uint64_t*) {
35 JobContext
job_context(0);
37 s
= static_cast<ReactiveVersionSet
*>(versions_
.get())
38 ->Recover(column_families
, &manifest_reader_
, &manifest_reporter_
,
39 &manifest_reader_status_
);
43 if (immutable_db_options_
.paranoid_checks
&& s
.ok()) {
44 s
= CheckConsistency();
46 // Initial max_total_in_memory_state_ before recovery logs.
47 max_total_in_memory_state_
= 0;
48 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
49 auto* mutable_cf_options
= cfd
->GetLatestMutableCFOptions();
50 max_total_in_memory_state_
+= mutable_cf_options
->write_buffer_size
*
51 mutable_cf_options
->max_write_buffer_number
;
54 default_cf_handle_
= new ColumnFamilyHandleImpl(
55 versions_
->GetColumnFamilySet()->GetDefault(), this, &mutex_
);
56 default_cf_internal_stats_
= default_cf_handle_
->cfd()->internal_stats();
57 single_column_family_mode_
=
58 versions_
->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
60 std::unordered_set
<ColumnFamilyData
*> cfds_changed
;
61 s
= FindAndRecoverLogFiles(&cfds_changed
, &job_context
);
64 if (s
.IsPathNotFound()) {
65 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
66 "Secondary tries to read WAL, but WAL file(s) have already "
67 "been purged by primary.");
70 // TODO: update options_file_number_ needed?
76 // find new WAL and apply them in order to the secondary instance
77 Status
DBImplSecondary::FindAndRecoverLogFiles(
78 std::unordered_set
<ColumnFamilyData
*>* cfds_changed
,
79 JobContext
* job_context
) {
80 assert(nullptr != cfds_changed
);
81 assert(nullptr != job_context
);
83 std::vector
<uint64_t> logs
;
84 s
= FindNewLogNumbers(&logs
);
85 if (s
.ok() && !logs
.empty()) {
86 SequenceNumber
next_sequence(kMaxSequenceNumber
);
87 s
= RecoverLogFiles(logs
, &next_sequence
, cfds_changed
, job_context
);
92 // List wal_dir and find all new WALs, return these log numbers
93 Status
DBImplSecondary::FindNewLogNumbers(std::vector
<uint64_t>* logs
) {
94 assert(logs
!= nullptr);
95 std::vector
<std::string
> filenames
;
97 s
= env_
->GetChildren(immutable_db_options_
.wal_dir
, &filenames
);
99 return Status::InvalidArgument("Failed to open wal_dir",
100 immutable_db_options_
.wal_dir
);
101 } else if (!s
.ok()) {
105 // if log_readers_ is non-empty, it means we have applied all logs with log
106 // numbers smaller than the smallest log in log_readers_, so there is no
107 // need to pass these logs to RecoverLogFiles
108 uint64_t log_number_min
= 0;
109 if (!log_readers_
.empty()) {
110 log_number_min
= log_readers_
.begin()->first
;
112 for (size_t i
= 0; i
< filenames
.size(); i
++) {
115 if (ParseFileName(filenames
[i
], &number
, &type
) && type
== kWalFile
&&
116 number
>= log_number_min
) {
117 logs
->push_back(number
);
120 // Recover logs in the order that they were generated
121 if (!logs
->empty()) {
122 std::sort(logs
->begin(), logs
->end());
127 Status
DBImplSecondary::MaybeInitLogReader(
128 uint64_t log_number
, log::FragmentBufferedReader
** log_reader
) {
129 auto iter
= log_readers_
.find(log_number
);
130 // make sure the log file is still present
131 if (iter
== log_readers_
.end() ||
132 iter
->second
->reader_
->GetLogNumber() != log_number
) {
133 // delete the obsolete log reader if log number mismatch
134 if (iter
!= log_readers_
.end()) {
135 log_readers_
.erase(iter
);
137 // initialize log reader from log_number
138 // TODO: min_log_number_to_keep_2pc check needed?
140 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log_number
);
141 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
142 "Recovering log #%" PRIu64
" mode %d", log_number
,
143 static_cast<int>(immutable_db_options_
.wal_recovery_mode
));
145 std::unique_ptr
<SequentialFileReader
> file_reader
;
147 std::unique_ptr
<FSSequentialFile
> file
;
148 Status status
= fs_
->NewSequentialFile(
149 fname
, fs_
->OptimizeForLogRead(file_options_
), &file
,
152 *log_reader
= nullptr;
155 file_reader
.reset(new SequentialFileReader(
156 std::move(file
), fname
, immutable_db_options_
.log_readahead_size
,
160 // Create the log reader.
161 LogReaderContainer
* log_reader_container
= new LogReaderContainer(
162 env_
, immutable_db_options_
.info_log
, std::move(fname
),
163 std::move(file_reader
), log_number
);
164 log_readers_
.insert(std::make_pair(
165 log_number
, std::unique_ptr
<LogReaderContainer
>(log_reader_container
)));
167 iter
= log_readers_
.find(log_number
);
168 assert(iter
!= log_readers_
.end());
169 *log_reader
= iter
->second
->reader_
;
173 // After manifest recovery, replay WALs and refresh log_readers_ if necessary
174 // REQUIRES: log_numbers are sorted in ascending order
175 Status
DBImplSecondary::RecoverLogFiles(
176 const std::vector
<uint64_t>& log_numbers
, SequenceNumber
* next_sequence
,
177 std::unordered_set
<ColumnFamilyData
*>* cfds_changed
,
178 JobContext
* job_context
) {
179 assert(nullptr != cfds_changed
);
180 assert(nullptr != job_context
);
183 for (auto log_number
: log_numbers
) {
184 log::FragmentBufferedReader
* reader
= nullptr;
185 status
= MaybeInitLogReader(log_number
, &reader
);
189 assert(reader
!= nullptr);
191 for (auto log_number
: log_numbers
) {
192 auto it
= log_readers_
.find(log_number
);
193 assert(it
!= log_readers_
.end());
194 log::FragmentBufferedReader
* reader
= it
->second
->reader_
;
195 Status
* wal_read_status
= it
->second
->status_
;
196 assert(wal_read_status
);
197 // Manually update the file number allocation counter in VersionSet.
198 versions_
->MarkFileNumberUsed(log_number
);
200 // Determine if we should tolerate incomplete records at the tail end of the
201 // Read all the records and add to a memtable
206 while (reader
->ReadRecord(&record
, &scratch
,
207 immutable_db_options_
.wal_recovery_mode
) &&
208 wal_read_status
->ok() && status
.ok()) {
209 if (record
.size() < WriteBatchInternal::kHeader
) {
210 reader
->GetReporter()->Corruption(
211 record
.size(), Status::Corruption("log record too small"));
214 status
= WriteBatchInternal::SetContents(&batch
, record
);
218 SequenceNumber seq_of_batch
= WriteBatchInternal::Sequence(&batch
);
219 std::vector
<uint32_t> column_family_ids
;
220 status
= CollectColumnFamilyIdsFromWriteBatch(batch
, &column_family_ids
);
222 for (const auto id
: column_family_ids
) {
223 ColumnFamilyData
* cfd
=
224 versions_
->GetColumnFamilySet()->GetColumnFamily(id
);
225 if (cfd
== nullptr) {
228 if (cfds_changed
->count(cfd
) == 0) {
229 cfds_changed
->insert(cfd
);
231 const std::vector
<FileMetaData
*>& l0_files
=
232 cfd
->current()->storage_info()->LevelFiles(0);
234 l0_files
.empty() ? 0 : l0_files
.back()->fd
.largest_seqno
;
235 // If the write batch's sequence number is smaller than the last
236 // sequence number of the largest sequence persisted for this column
237 // family, then its data must reside in an SST that has already been
238 // added in the prior MANIFEST replay.
239 if (seq_of_batch
<= seq
) {
242 auto curr_log_num
= port::kMaxUint64
;
243 if (cfd_to_current_log_
.count(cfd
) > 0) {
244 curr_log_num
= cfd_to_current_log_
[cfd
];
246 // If the active memtable contains records added by replaying an
247 // earlier WAL, then we need to seal the memtable, add it to the
248 // immutable memtable list and create a new active memtable.
249 if (!cfd
->mem()->IsEmpty() && (curr_log_num
== port::kMaxUint64
||
250 curr_log_num
!= log_number
)) {
251 const MutableCFOptions mutable_cf_options
=
252 *cfd
->GetLatestMutableCFOptions();
254 cfd
->ConstructNewMemtable(mutable_cf_options
, seq_of_batch
);
255 cfd
->mem()->SetNextLogNumber(log_number
);
256 cfd
->imm()->Add(cfd
->mem(), &job_context
->memtables_to_free
);
258 cfd
->SetMemtable(new_mem
);
261 bool has_valid_writes
= false;
262 status
= WriteBatchInternal::InsertInto(
263 &batch
, column_family_memtables_
.get(),
264 nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
265 true, log_number
, this, false /* concurrent_memtable_writes */,
266 next_sequence
, &has_valid_writes
, seq_per_batch_
, batch_per_txn_
);
268 // If column family was not found, it might mean that the WAL write
269 // batch references to the column family that was dropped after the
270 // insert. We don't want to fail the whole write batch in that case --
271 // we just ignore the update.
272 // That's why we set ignore missing column families to true
273 // passing null flush_scheduler will disable memtable flushing which is
274 // needed for secondary instances
276 for (const auto id
: column_family_ids
) {
277 ColumnFamilyData
* cfd
=
278 versions_
->GetColumnFamilySet()->GetColumnFamily(id
);
279 if (cfd
== nullptr) {
282 std::unordered_map
<ColumnFamilyData
*, uint64_t>::iterator iter
=
283 cfd_to_current_log_
.find(cfd
);
284 if (iter
== cfd_to_current_log_
.end()) {
285 cfd_to_current_log_
.insert({cfd
, log_number
});
286 } else if (log_number
> iter
->second
) {
287 iter
->second
= log_number
;
290 auto last_sequence
= *next_sequence
- 1;
291 if ((*next_sequence
!= kMaxSequenceNumber
) &&
292 (versions_
->LastSequence() <= last_sequence
)) {
293 versions_
->SetLastAllocatedSequence(last_sequence
);
294 versions_
->SetLastPublishedSequence(last_sequence
);
295 versions_
->SetLastSequence(last_sequence
);
298 // We are treating this as a failure while reading since we read valid
299 // blocks that do not form coherent data
300 reader
->GetReporter()->Corruption(record
.size(), status
);
303 if (status
.ok() && !wal_read_status
->ok()) {
304 status
= *wal_read_status
;
310 // remove logreaders from map after successfully recovering the WAL
311 if (log_readers_
.size() > 1) {
312 auto erase_iter
= log_readers_
.begin();
313 std::advance(erase_iter
, log_readers_
.size() - 1);
314 log_readers_
.erase(log_readers_
.begin(), erase_iter
);
319 // Implementation of the DB interface
320 Status
DBImplSecondary::Get(const ReadOptions
& read_options
,
321 ColumnFamilyHandle
* column_family
, const Slice
& key
,
322 PinnableSlice
* value
) {
323 return GetImpl(read_options
, column_family
, key
, value
);
326 Status
DBImplSecondary::GetImpl(const ReadOptions
& read_options
,
327 ColumnFamilyHandle
* column_family
,
328 const Slice
& key
, PinnableSlice
* pinnable_val
) {
329 assert(pinnable_val
!= nullptr);
330 PERF_CPU_TIMER_GUARD(get_cpu_nanos
, env_
);
331 StopWatch
sw(env_
, stats_
, DB_GET
);
332 PERF_TIMER_GUARD(get_snapshot_time
);
334 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(column_family
);
335 ColumnFamilyData
* cfd
= cfh
->cfd();
337 InstrumentedMutexLock
lock(&trace_mutex_
);
339 tracer_
->Get(column_family
, key
);
342 // Acquire SuperVersion
343 SuperVersion
* super_version
= GetAndRefSuperVersion(cfd
);
344 SequenceNumber snapshot
= versions_
->LastSequence();
345 MergeContext merge_context
;
346 SequenceNumber max_covering_tombstone_seq
= 0;
348 LookupKey
lkey(key
, snapshot
);
349 PERF_TIMER_STOP(get_snapshot_time
);
352 if (super_version
->mem
->Get(lkey
, pinnable_val
->GetSelf(),
353 /*timestamp=*/nullptr, &s
, &merge_context
,
354 &max_covering_tombstone_seq
, read_options
)) {
356 pinnable_val
->PinSelf();
357 RecordTick(stats_
, MEMTABLE_HIT
);
358 } else if ((s
.ok() || s
.IsMergeInProgress()) &&
359 super_version
->imm
->Get(
360 lkey
, pinnable_val
->GetSelf(), /*timestamp=*/nullptr, &s
,
361 &merge_context
, &max_covering_tombstone_seq
, read_options
)) {
363 pinnable_val
->PinSelf();
364 RecordTick(stats_
, MEMTABLE_HIT
);
366 if (!done
&& !s
.ok() && !s
.IsMergeInProgress()) {
367 ReturnAndCleanupSuperVersion(cfd
, super_version
);
371 PERF_TIMER_GUARD(get_from_output_files_time
);
372 super_version
->current
->Get(read_options
, lkey
, pinnable_val
,
373 /*timestamp=*/nullptr, &s
, &merge_context
,
374 &max_covering_tombstone_seq
);
375 RecordTick(stats_
, MEMTABLE_MISS
);
378 PERF_TIMER_GUARD(get_post_process_time
);
379 ReturnAndCleanupSuperVersion(cfd
, super_version
);
380 RecordTick(stats_
, NUMBER_KEYS_READ
);
381 size_t size
= pinnable_val
->size();
382 RecordTick(stats_
, BYTES_READ
, size
);
383 RecordTimeToHistogram(stats_
, BYTES_PER_READ
, size
);
384 PERF_COUNTER_ADD(get_read_bytes
, size
);
389 Iterator
* DBImplSecondary::NewIterator(const ReadOptions
& read_options
,
390 ColumnFamilyHandle
* column_family
) {
391 if (read_options
.managed
) {
392 return NewErrorIterator(
393 Status::NotSupported("Managed iterator is not supported anymore."));
395 if (read_options
.read_tier
== kPersistedTier
) {
396 return NewErrorIterator(Status::NotSupported(
397 "ReadTier::kPersistedData is not yet supported in iterators."));
399 Iterator
* result
= nullptr;
400 auto cfh
= static_cast_with_check
<ColumnFamilyHandleImpl
>(column_family
);
401 auto cfd
= cfh
->cfd();
402 ReadCallback
* read_callback
= nullptr; // No read callback provided.
403 if (read_options
.tailing
) {
404 return NewErrorIterator(Status::NotSupported(
405 "tailing iterator not supported in secondary mode"));
406 } else if (read_options
.snapshot
!= nullptr) {
407 // TODO (yanqin) support snapshot.
408 return NewErrorIterator(
409 Status::NotSupported("snapshot not supported in secondary mode"));
411 auto snapshot
= versions_
->LastSequence();
412 result
= NewIteratorImpl(read_options
, cfd
, snapshot
, read_callback
);
417 ArenaWrappedDBIter
* DBImplSecondary::NewIteratorImpl(
418 const ReadOptions
& read_options
, ColumnFamilyData
* cfd
,
419 SequenceNumber snapshot
, ReadCallback
* read_callback
) {
420 assert(nullptr != cfd
);
421 SuperVersion
* super_version
= cfd
->GetReferencedSuperVersion(this);
422 auto db_iter
= NewArenaWrappedDbIterator(
423 env_
, read_options
, *cfd
->ioptions(), super_version
->mutable_cf_options
,
425 super_version
->mutable_cf_options
.max_sequential_skip_in_iterations
,
426 super_version
->version_number
, read_callback
);
427 auto internal_iter
= NewInternalIterator(
428 db_iter
->GetReadOptions(), cfd
, super_version
, db_iter
->GetArena(),
429 db_iter
->GetRangeDelAggregator(), snapshot
,
430 /* allow_unprepared_value */ true);
431 db_iter
->SetIterUnderDBIter(internal_iter
);
435 Status
DBImplSecondary::NewIterators(
436 const ReadOptions
& read_options
,
437 const std::vector
<ColumnFamilyHandle
*>& column_families
,
438 std::vector
<Iterator
*>* iterators
) {
439 if (read_options
.managed
) {
440 return Status::NotSupported("Managed iterator is not supported anymore.");
442 if (read_options
.read_tier
== kPersistedTier
) {
443 return Status::NotSupported(
444 "ReadTier::kPersistedData is not yet supported in iterators.");
446 ReadCallback
* read_callback
= nullptr; // No read callback provided.
447 if (iterators
== nullptr) {
448 return Status::InvalidArgument("iterators not allowed to be nullptr");
451 iterators
->reserve(column_families
.size());
452 if (read_options
.tailing
) {
453 return Status::NotSupported(
454 "tailing iterator not supported in secondary mode");
455 } else if (read_options
.snapshot
!= nullptr) {
456 // TODO (yanqin) support snapshot.
457 return Status::NotSupported("snapshot not supported in secondary mode");
459 SequenceNumber read_seq
= versions_
->LastSequence();
460 for (auto cfh
: column_families
) {
461 ColumnFamilyData
* cfd
= static_cast<ColumnFamilyHandleImpl
*>(cfh
)->cfd();
462 iterators
->push_back(
463 NewIteratorImpl(read_options
, cfd
, read_seq
, read_callback
));
469 Status
DBImplSecondary::CheckConsistency() {
471 Status s
= DBImpl::CheckConsistency();
472 // If DBImpl::CheckConsistency() which is stricter returns success, then we
473 // do not need to give a second chance.
477 // It's possible that DBImpl::CheckConssitency() can fail because the primary
478 // may have removed certain files, causing the GetFileSize(name) call to
479 // fail and returning a PathNotFound. In this case, we take a best-effort
480 // approach and just proceed.
481 TEST_SYNC_POINT_CALLBACK(
482 "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s
);
484 if (immutable_db_options_
.skip_checking_sst_file_sizes_on_db_open
) {
488 std::vector
<LiveFileMetaData
> metadata
;
489 versions_
->GetLiveFilesMetaData(&metadata
);
491 std::string corruption_messages
;
492 for (const auto& md
: metadata
) {
493 // md.name has a leading "/".
494 std::string file_path
= md
.db_path
+ md
.name
;
497 s
= env_
->GetFileSize(file_path
, &fsize
);
499 (env_
->GetFileSize(Rocks2LevelTableFileName(file_path
), &fsize
).ok() ||
500 s
.IsPathNotFound())) {
504 corruption_messages
+=
505 "Can't access " + md
.name
+ ": " + s
.ToString() + "\n";
508 return corruption_messages
.empty() ? Status::OK()
509 : Status::Corruption(corruption_messages
);
512 Status
DBImplSecondary::TryCatchUpWithPrimary() {
513 assert(versions_
.get() != nullptr);
514 assert(manifest_reader_
.get() != nullptr);
516 // read the manifest and apply new changes to the secondary instance
517 std::unordered_set
<ColumnFamilyData
*> cfds_changed
;
518 JobContext
job_context(0, true /*create_superversion*/);
520 InstrumentedMutexLock
lock_guard(&mutex_
);
521 s
= static_cast_with_check
<ReactiveVersionSet
>(versions_
.get())
522 ->ReadAndApply(&mutex_
, &manifest_reader_
, &cfds_changed
);
524 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "Last sequence is %" PRIu64
,
525 static_cast<uint64_t>(versions_
->LastSequence()));
526 for (ColumnFamilyData
* cfd
: cfds_changed
) {
527 if (cfd
->IsDropped()) {
528 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
, "[%s] is dropped\n",
529 cfd
->GetName().c_str());
532 VersionStorageInfo::LevelSummaryStorage tmp
;
533 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
534 "[%s] Level summary: %s\n", cfd
->GetName().c_str(),
535 cfd
->current()->storage_info()->LevelSummary(&tmp
));
538 // list wal_dir to discover new WALs and apply new changes to the secondary
541 s
= FindAndRecoverLogFiles(&cfds_changed
, &job_context
);
543 if (s
.IsPathNotFound()) {
545 immutable_db_options_
.info_log
,
546 "Secondary tries to read WAL, but WAL file(s) have already "
547 "been purged by primary.");
551 for (auto cfd
: cfds_changed
) {
552 cfd
->imm()->RemoveOldMemTables(cfd
->GetLogNumber(),
553 &job_context
.memtables_to_free
);
554 auto& sv_context
= job_context
.superversion_contexts
.back();
555 cfd
->InstallSuperVersion(&sv_context
, &mutex_
);
556 sv_context
.NewSuperVersion();
562 // Cleanup unused, obsolete files.
563 JobContext
purge_files_job_context(0);
565 InstrumentedMutexLock
lock_guard(&mutex_
);
566 // Currently, secondary instance does not own the database files, thus it
567 // is unnecessary for the secondary to force full scan.
568 FindObsoleteFiles(&purge_files_job_context
, /*force=*/false);
570 if (purge_files_job_context
.HaveSomethingToDelete()) {
571 PurgeObsoleteFiles(purge_files_job_context
);
573 purge_files_job_context
.Clean();
577 Status
DB::OpenAsSecondary(const Options
& options
, const std::string
& dbname
,
578 const std::string
& secondary_path
, DB
** dbptr
) {
581 DBOptions
db_options(options
);
582 ColumnFamilyOptions
cf_options(options
);
583 std::vector
<ColumnFamilyDescriptor
> column_families
;
584 column_families
.emplace_back(kDefaultColumnFamilyName
, cf_options
);
585 std::vector
<ColumnFamilyHandle
*> handles
;
587 Status s
= DB::OpenAsSecondary(db_options
, dbname
, secondary_path
,
588 column_families
, &handles
, dbptr
);
590 assert(handles
.size() == 1);
596 Status
DB::OpenAsSecondary(
597 const DBOptions
& db_options
, const std::string
& dbname
,
598 const std::string
& secondary_path
,
599 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
600 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
) {
602 if (db_options
.max_open_files
!= -1) {
603 // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
604 // on SST files so that db secondary can still have access to old SSTs
605 // while primary instance may delete original.
606 return Status::InvalidArgument("require max_open_files to be -1");
609 DBOptions
tmp_opts(db_options
);
611 if (nullptr == tmp_opts
.info_log
) {
612 s
= CreateLoggerFromOptions(secondary_path
, tmp_opts
, &tmp_opts
.info_log
);
614 tmp_opts
.info_log
= nullptr;
619 DBImplSecondary
* impl
= new DBImplSecondary(tmp_opts
, dbname
);
620 impl
->versions_
.reset(new ReactiveVersionSet(
621 dbname
, &impl
->immutable_db_options_
, impl
->file_options_
,
622 impl
->table_cache_
.get(), impl
->write_buffer_manager_
,
623 &impl
->write_controller_
, impl
->io_tracer_
));
624 impl
->column_family_memtables_
.reset(
625 new ColumnFamilyMemTablesImpl(impl
->versions_
->GetColumnFamilySet()));
626 impl
->wal_in_db_path_
= IsWalDirSameAsDBPath(&impl
->immutable_db_options_
);
629 s
= impl
->Recover(column_families
, true, false, false);
631 for (auto cf
: column_families
) {
633 impl
->versions_
->GetColumnFamilySet()->GetColumnFamily(cf
.name
);
634 if (nullptr == cfd
) {
635 s
= Status::InvalidArgument("Column family not found", cf
.name
);
638 handles
->push_back(new ColumnFamilyHandleImpl(cfd
, impl
, &impl
->mutex_
));
641 SuperVersionContext
sv_context(true /* create_superversion */);
643 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
644 sv_context
.NewSuperVersion();
645 cfd
->InstallSuperVersion(&sv_context
, &impl
->mutex_
);
648 impl
->mutex_
.Unlock();
652 for (auto h
: *handles
) {
653 impl
->NewThreadStatusCfInfo(
654 static_cast_with_check
<ColumnFamilyHandleImpl
>(h
)->cfd());
657 for (auto h
: *handles
) {
665 #else // !ROCKSDB_LITE
667 Status
DB::OpenAsSecondary(const Options
& /*options*/,
668 const std::string
& /*name*/,
669 const std::string
& /*secondary_path*/,
671 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
674 Status
DB::OpenAsSecondary(
675 const DBOptions
& /*db_options*/, const std::string
& /*dbname*/,
676 const std::string
& /*secondary_path*/,
677 const std::vector
<ColumnFamilyDescriptor
>& /*column_families*/,
678 std::vector
<ColumnFamilyHandle
*>* /*handles*/, DB
** /*dbptr*/) {
679 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
681 #endif // !ROCKSDB_LITE
683 } // namespace ROCKSDB_NAMESPACE