]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl/db_impl_secondary.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_secondary.cc
CommitLineData
f67539c2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "db/db_impl/db_impl_secondary.h"
7
8#include <cinttypes>
9
10#include "db/arena_wrapped_db_iter.h"
11#include "db/merge_context.h"
12#include "logging/auto_roll_logger.h"
13#include "monitoring/perf_context_imp.h"
14#include "util/cast_util.h"
15
16namespace ROCKSDB_NAMESPACE {
17
18#ifndef ROCKSDB_LITE
19DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
20 const std::string& dbname)
21 : DBImpl(db_options, dbname) {
22 ROCKS_LOG_INFO(immutable_db_options_.info_log,
23 "Opening the db in secondary mode");
24 LogFlush(immutable_db_options_.info_log);
25}
26
27DBImplSecondary::~DBImplSecondary() {}
28
29Status DBImplSecondary::Recover(
30 const std::vector<ColumnFamilyDescriptor>& column_families,
20effc67
TL
31 bool /*readonly*/, bool /*error_if_wal_file_exists*/,
32 bool /*error_if_data_exists_in_wals*/, uint64_t*) {
f67539c2
TL
33 mutex_.AssertHeld();
34
35 JobContext job_context(0);
36 Status s;
37 s = static_cast<ReactiveVersionSet*>(versions_.get())
38 ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
39 &manifest_reader_status_);
40 if (!s.ok()) {
41 return s;
42 }
43 if (immutable_db_options_.paranoid_checks && s.ok()) {
44 s = CheckConsistency();
45 }
46 // Initial max_total_in_memory_state_ before recovery logs.
47 max_total_in_memory_state_ = 0;
48 for (auto cfd : *versions_->GetColumnFamilySet()) {
49 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
50 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
51 mutable_cf_options->max_write_buffer_number;
52 }
53 if (s.ok()) {
54 default_cf_handle_ = new ColumnFamilyHandleImpl(
55 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
56 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
57 single_column_family_mode_ =
58 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
59
60 std::unordered_set<ColumnFamilyData*> cfds_changed;
61 s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
62 }
63
64 if (s.IsPathNotFound()) {
65 ROCKS_LOG_INFO(immutable_db_options_.info_log,
66 "Secondary tries to read WAL, but WAL file(s) have already "
67 "been purged by primary.");
68 s = Status::OK();
69 }
70 // TODO: update options_file_number_ needed?
71
72 job_context.Clean();
73 return s;
74}
75
76// find new WAL and apply them in order to the secondary instance
77Status DBImplSecondary::FindAndRecoverLogFiles(
78 std::unordered_set<ColumnFamilyData*>* cfds_changed,
79 JobContext* job_context) {
80 assert(nullptr != cfds_changed);
81 assert(nullptr != job_context);
82 Status s;
83 std::vector<uint64_t> logs;
84 s = FindNewLogNumbers(&logs);
85 if (s.ok() && !logs.empty()) {
86 SequenceNumber next_sequence(kMaxSequenceNumber);
87 s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
88 }
89 return s;
90}
91
92// List wal_dir and find all new WALs, return these log numbers
93Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
94 assert(logs != nullptr);
95 std::vector<std::string> filenames;
96 Status s;
97 s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
98 if (s.IsNotFound()) {
99 return Status::InvalidArgument("Failed to open wal_dir",
100 immutable_db_options_.wal_dir);
101 } else if (!s.ok()) {
102 return s;
103 }
104
105 // if log_readers_ is non-empty, it means we have applied all logs with log
106 // numbers smaller than the smallest log in log_readers_, so there is no
107 // need to pass these logs to RecoverLogFiles
108 uint64_t log_number_min = 0;
109 if (!log_readers_.empty()) {
110 log_number_min = log_readers_.begin()->first;
111 }
112 for (size_t i = 0; i < filenames.size(); i++) {
113 uint64_t number;
114 FileType type;
20effc67 115 if (ParseFileName(filenames[i], &number, &type) && type == kWalFile &&
f67539c2
TL
116 number >= log_number_min) {
117 logs->push_back(number);
118 }
119 }
120 // Recover logs in the order that they were generated
121 if (!logs->empty()) {
122 std::sort(logs->begin(), logs->end());
123 }
124 return s;
125}
126
127Status DBImplSecondary::MaybeInitLogReader(
128 uint64_t log_number, log::FragmentBufferedReader** log_reader) {
129 auto iter = log_readers_.find(log_number);
130 // make sure the log file is still present
131 if (iter == log_readers_.end() ||
132 iter->second->reader_->GetLogNumber() != log_number) {
133 // delete the obsolete log reader if log number mismatch
134 if (iter != log_readers_.end()) {
135 log_readers_.erase(iter);
136 }
137 // initialize log reader from log_number
138 // TODO: min_log_number_to_keep_2pc check needed?
139 // Open the log file
140 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
141 ROCKS_LOG_INFO(immutable_db_options_.info_log,
142 "Recovering log #%" PRIu64 " mode %d", log_number,
143 static_cast<int>(immutable_db_options_.wal_recovery_mode));
144
145 std::unique_ptr<SequentialFileReader> file_reader;
146 {
147 std::unique_ptr<FSSequentialFile> file;
148 Status status = fs_->NewSequentialFile(
149 fname, fs_->OptimizeForLogRead(file_options_), &file,
150 nullptr);
151 if (!status.ok()) {
152 *log_reader = nullptr;
153 return status;
154 }
155 file_reader.reset(new SequentialFileReader(
20effc67
TL
156 std::move(file), fname, immutable_db_options_.log_readahead_size,
157 io_tracer_));
f67539c2
TL
158 }
159
160 // Create the log reader.
161 LogReaderContainer* log_reader_container = new LogReaderContainer(
162 env_, immutable_db_options_.info_log, std::move(fname),
163 std::move(file_reader), log_number);
164 log_readers_.insert(std::make_pair(
165 log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
166 }
167 iter = log_readers_.find(log_number);
168 assert(iter != log_readers_.end());
169 *log_reader = iter->second->reader_;
170 return Status::OK();
171}
172
173// After manifest recovery, replay WALs and refresh log_readers_ if necessary
174// REQUIRES: log_numbers are sorted in ascending order
175Status DBImplSecondary::RecoverLogFiles(
176 const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
177 std::unordered_set<ColumnFamilyData*>* cfds_changed,
178 JobContext* job_context) {
179 assert(nullptr != cfds_changed);
180 assert(nullptr != job_context);
181 mutex_.AssertHeld();
182 Status status;
183 for (auto log_number : log_numbers) {
184 log::FragmentBufferedReader* reader = nullptr;
185 status = MaybeInitLogReader(log_number, &reader);
186 if (!status.ok()) {
187 return status;
188 }
189 assert(reader != nullptr);
190 }
191 for (auto log_number : log_numbers) {
192 auto it = log_readers_.find(log_number);
193 assert(it != log_readers_.end());
194 log::FragmentBufferedReader* reader = it->second->reader_;
20effc67
TL
195 Status* wal_read_status = it->second->status_;
196 assert(wal_read_status);
f67539c2
TL
197 // Manually update the file number allocation counter in VersionSet.
198 versions_->MarkFileNumberUsed(log_number);
199
200 // Determine if we should tolerate incomplete records at the tail end of the
201 // Read all the records and add to a memtable
202 std::string scratch;
203 Slice record;
204 WriteBatch batch;
205
206 while (reader->ReadRecord(&record, &scratch,
207 immutable_db_options_.wal_recovery_mode) &&
20effc67 208 wal_read_status->ok() && status.ok()) {
f67539c2
TL
209 if (record.size() < WriteBatchInternal::kHeader) {
210 reader->GetReporter()->Corruption(
211 record.size(), Status::Corruption("log record too small"));
212 continue;
213 }
20effc67
TL
214 status = WriteBatchInternal::SetContents(&batch, record);
215 if (!status.ok()) {
216 break;
217 }
f67539c2
TL
218 SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
219 std::vector<uint32_t> column_family_ids;
220 status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
221 if (status.ok()) {
222 for (const auto id : column_family_ids) {
223 ColumnFamilyData* cfd =
224 versions_->GetColumnFamilySet()->GetColumnFamily(id);
225 if (cfd == nullptr) {
226 continue;
227 }
228 if (cfds_changed->count(cfd) == 0) {
229 cfds_changed->insert(cfd);
230 }
231 const std::vector<FileMetaData*>& l0_files =
232 cfd->current()->storage_info()->LevelFiles(0);
233 SequenceNumber seq =
234 l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
235 // If the write batch's sequence number is smaller than the last
236 // sequence number of the largest sequence persisted for this column
237 // family, then its data must reside in an SST that has already been
238 // added in the prior MANIFEST replay.
239 if (seq_of_batch <= seq) {
240 continue;
241 }
242 auto curr_log_num = port::kMaxUint64;
243 if (cfd_to_current_log_.count(cfd) > 0) {
244 curr_log_num = cfd_to_current_log_[cfd];
245 }
246 // If the active memtable contains records added by replaying an
247 // earlier WAL, then we need to seal the memtable, add it to the
248 // immutable memtable list and create a new active memtable.
249 if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 ||
250 curr_log_num != log_number)) {
251 const MutableCFOptions mutable_cf_options =
252 *cfd->GetLatestMutableCFOptions();
253 MemTable* new_mem =
254 cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
255 cfd->mem()->SetNextLogNumber(log_number);
256 cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
257 new_mem->Ref();
258 cfd->SetMemtable(new_mem);
259 }
260 }
261 bool has_valid_writes = false;
262 status = WriteBatchInternal::InsertInto(
263 &batch, column_family_memtables_.get(),
264 nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
265 true, log_number, this, false /* concurrent_memtable_writes */,
266 next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
267 }
268 // If column family was not found, it might mean that the WAL write
269 // batch references to the column family that was dropped after the
270 // insert. We don't want to fail the whole write batch in that case --
271 // we just ignore the update.
272 // That's why we set ignore missing column families to true
273 // passing null flush_scheduler will disable memtable flushing which is
274 // needed for secondary instances
275 if (status.ok()) {
276 for (const auto id : column_family_ids) {
277 ColumnFamilyData* cfd =
278 versions_->GetColumnFamilySet()->GetColumnFamily(id);
279 if (cfd == nullptr) {
280 continue;
281 }
282 std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
283 cfd_to_current_log_.find(cfd);
284 if (iter == cfd_to_current_log_.end()) {
285 cfd_to_current_log_.insert({cfd, log_number});
286 } else if (log_number > iter->second) {
287 iter->second = log_number;
288 }
289 }
290 auto last_sequence = *next_sequence - 1;
291 if ((*next_sequence != kMaxSequenceNumber) &&
292 (versions_->LastSequence() <= last_sequence)) {
293 versions_->SetLastAllocatedSequence(last_sequence);
294 versions_->SetLastPublishedSequence(last_sequence);
295 versions_->SetLastSequence(last_sequence);
296 }
297 } else {
298 // We are treating this as a failure while reading since we read valid
299 // blocks that do not form coherent data
300 reader->GetReporter()->Corruption(record.size(), status);
301 }
302 }
20effc67
TL
303 if (status.ok() && !wal_read_status->ok()) {
304 status = *wal_read_status;
305 }
f67539c2
TL
306 if (!status.ok()) {
307 return status;
308 }
309 }
310 // remove logreaders from map after successfully recovering the WAL
311 if (log_readers_.size() > 1) {
312 auto erase_iter = log_readers_.begin();
313 std::advance(erase_iter, log_readers_.size() - 1);
314 log_readers_.erase(log_readers_.begin(), erase_iter);
315 }
316 return status;
317}
318
319// Implementation of the DB interface
320Status DBImplSecondary::Get(const ReadOptions& read_options,
321 ColumnFamilyHandle* column_family, const Slice& key,
322 PinnableSlice* value) {
323 return GetImpl(read_options, column_family, key, value);
324}
325
326Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
327 ColumnFamilyHandle* column_family,
328 const Slice& key, PinnableSlice* pinnable_val) {
329 assert(pinnable_val != nullptr);
330 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
331 StopWatch sw(env_, stats_, DB_GET);
332 PERF_TIMER_GUARD(get_snapshot_time);
333
334 auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
335 ColumnFamilyData* cfd = cfh->cfd();
336 if (tracer_) {
337 InstrumentedMutexLock lock(&trace_mutex_);
338 if (tracer_) {
339 tracer_->Get(column_family, key);
340 }
341 }
342 // Acquire SuperVersion
343 SuperVersion* super_version = GetAndRefSuperVersion(cfd);
344 SequenceNumber snapshot = versions_->LastSequence();
345 MergeContext merge_context;
346 SequenceNumber max_covering_tombstone_seq = 0;
347 Status s;
348 LookupKey lkey(key, snapshot);
349 PERF_TIMER_STOP(get_snapshot_time);
350
351 bool done = false;
20effc67
TL
352 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
353 /*timestamp=*/nullptr, &s, &merge_context,
f67539c2
TL
354 &max_covering_tombstone_seq, read_options)) {
355 done = true;
356 pinnable_val->PinSelf();
357 RecordTick(stats_, MEMTABLE_HIT);
358 } else if ((s.ok() || s.IsMergeInProgress()) &&
359 super_version->imm->Get(
20effc67
TL
360 lkey, pinnable_val->GetSelf(), /*timestamp=*/nullptr, &s,
361 &merge_context, &max_covering_tombstone_seq, read_options)) {
f67539c2
TL
362 done = true;
363 pinnable_val->PinSelf();
364 RecordTick(stats_, MEMTABLE_HIT);
365 }
366 if (!done && !s.ok() && !s.IsMergeInProgress()) {
367 ReturnAndCleanupSuperVersion(cfd, super_version);
368 return s;
369 }
370 if (!done) {
371 PERF_TIMER_GUARD(get_from_output_files_time);
20effc67
TL
372 super_version->current->Get(read_options, lkey, pinnable_val,
373 /*timestamp=*/nullptr, &s, &merge_context,
374 &max_covering_tombstone_seq);
f67539c2
TL
375 RecordTick(stats_, MEMTABLE_MISS);
376 }
377 {
378 PERF_TIMER_GUARD(get_post_process_time);
379 ReturnAndCleanupSuperVersion(cfd, super_version);
380 RecordTick(stats_, NUMBER_KEYS_READ);
381 size_t size = pinnable_val->size();
382 RecordTick(stats_, BYTES_READ, size);
383 RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
384 PERF_COUNTER_ADD(get_read_bytes, size);
385 }
386 return s;
387}
388
389Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
390 ColumnFamilyHandle* column_family) {
391 if (read_options.managed) {
392 return NewErrorIterator(
393 Status::NotSupported("Managed iterator is not supported anymore."));
394 }
395 if (read_options.read_tier == kPersistedTier) {
396 return NewErrorIterator(Status::NotSupported(
397 "ReadTier::kPersistedData is not yet supported in iterators."));
398 }
399 Iterator* result = nullptr;
20effc67 400 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
f67539c2
TL
401 auto cfd = cfh->cfd();
402 ReadCallback* read_callback = nullptr; // No read callback provided.
403 if (read_options.tailing) {
404 return NewErrorIterator(Status::NotSupported(
405 "tailing iterator not supported in secondary mode"));
406 } else if (read_options.snapshot != nullptr) {
407 // TODO (yanqin) support snapshot.
408 return NewErrorIterator(
409 Status::NotSupported("snapshot not supported in secondary mode"));
410 } else {
411 auto snapshot = versions_->LastSequence();
412 result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
413 }
414 return result;
415}
416
417ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
418 const ReadOptions& read_options, ColumnFamilyData* cfd,
419 SequenceNumber snapshot, ReadCallback* read_callback) {
420 assert(nullptr != cfd);
421 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
422 auto db_iter = NewArenaWrappedDbIterator(
423 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
424 snapshot,
425 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
426 super_version->version_number, read_callback);
20effc67
TL
427 auto internal_iter = NewInternalIterator(
428 db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
429 db_iter->GetRangeDelAggregator(), snapshot,
430 /* allow_unprepared_value */ true);
f67539c2
TL
431 db_iter->SetIterUnderDBIter(internal_iter);
432 return db_iter;
433}
434
435Status DBImplSecondary::NewIterators(
436 const ReadOptions& read_options,
437 const std::vector<ColumnFamilyHandle*>& column_families,
438 std::vector<Iterator*>* iterators) {
439 if (read_options.managed) {
440 return Status::NotSupported("Managed iterator is not supported anymore.");
441 }
442 if (read_options.read_tier == kPersistedTier) {
443 return Status::NotSupported(
444 "ReadTier::kPersistedData is not yet supported in iterators.");
445 }
446 ReadCallback* read_callback = nullptr; // No read callback provided.
447 if (iterators == nullptr) {
448 return Status::InvalidArgument("iterators not allowed to be nullptr");
449 }
450 iterators->clear();
451 iterators->reserve(column_families.size());
452 if (read_options.tailing) {
453 return Status::NotSupported(
454 "tailing iterator not supported in secondary mode");
455 } else if (read_options.snapshot != nullptr) {
456 // TODO (yanqin) support snapshot.
457 return Status::NotSupported("snapshot not supported in secondary mode");
458 } else {
459 SequenceNumber read_seq = versions_->LastSequence();
460 for (auto cfh : column_families) {
461 ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
462 iterators->push_back(
463 NewIteratorImpl(read_options, cfd, read_seq, read_callback));
464 }
465 }
466 return Status::OK();
467}
468
469Status DBImplSecondary::CheckConsistency() {
470 mutex_.AssertHeld();
471 Status s = DBImpl::CheckConsistency();
472 // If DBImpl::CheckConsistency() which is stricter returns success, then we
473 // do not need to give a second chance.
474 if (s.ok()) {
475 return s;
476 }
477 // It's possible that DBImpl::CheckConssitency() can fail because the primary
478 // may have removed certain files, causing the GetFileSize(name) call to
479 // fail and returning a PathNotFound. In this case, we take a best-effort
480 // approach and just proceed.
481 TEST_SYNC_POINT_CALLBACK(
482 "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s);
483
484 if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
485 return Status::OK();
486 }
487
488 std::vector<LiveFileMetaData> metadata;
489 versions_->GetLiveFilesMetaData(&metadata);
490
491 std::string corruption_messages;
492 for (const auto& md : metadata) {
493 // md.name has a leading "/".
494 std::string file_path = md.db_path + md.name;
495
496 uint64_t fsize = 0;
497 s = env_->GetFileSize(file_path, &fsize);
498 if (!s.ok() &&
499 (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() ||
500 s.IsPathNotFound())) {
501 s = Status::OK();
502 }
503 if (!s.ok()) {
504 corruption_messages +=
505 "Can't access " + md.name + ": " + s.ToString() + "\n";
506 }
507 }
508 return corruption_messages.empty() ? Status::OK()
509 : Status::Corruption(corruption_messages);
510}
511
512Status DBImplSecondary::TryCatchUpWithPrimary() {
513 assert(versions_.get() != nullptr);
514 assert(manifest_reader_.get() != nullptr);
515 Status s;
516 // read the manifest and apply new changes to the secondary instance
517 std::unordered_set<ColumnFamilyData*> cfds_changed;
518 JobContext job_context(0, true /*create_superversion*/);
519 {
520 InstrumentedMutexLock lock_guard(&mutex_);
521 s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
522 ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
523
524 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
525 static_cast<uint64_t>(versions_->LastSequence()));
526 for (ColumnFamilyData* cfd : cfds_changed) {
527 if (cfd->IsDropped()) {
528 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
529 cfd->GetName().c_str());
530 continue;
531 }
532 VersionStorageInfo::LevelSummaryStorage tmp;
533 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
534 "[%s] Level summary: %s\n", cfd->GetName().c_str(),
535 cfd->current()->storage_info()->LevelSummary(&tmp));
536 }
537
538 // list wal_dir to discover new WALs and apply new changes to the secondary
539 // instance
540 if (s.ok()) {
541 s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
542 }
543 if (s.IsPathNotFound()) {
544 ROCKS_LOG_INFO(
545 immutable_db_options_.info_log,
546 "Secondary tries to read WAL, but WAL file(s) have already "
547 "been purged by primary.");
548 s = Status::OK();
549 }
550 if (s.ok()) {
551 for (auto cfd : cfds_changed) {
552 cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
553 &job_context.memtables_to_free);
554 auto& sv_context = job_context.superversion_contexts.back();
555 cfd->InstallSuperVersion(&sv_context, &mutex_);
556 sv_context.NewSuperVersion();
557 }
558 }
559 }
560 job_context.Clean();
561
562 // Cleanup unused, obsolete files.
563 JobContext purge_files_job_context(0);
564 {
565 InstrumentedMutexLock lock_guard(&mutex_);
566 // Currently, secondary instance does not own the database files, thus it
567 // is unnecessary for the secondary to force full scan.
568 FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
569 }
570 if (purge_files_job_context.HaveSomethingToDelete()) {
571 PurgeObsoleteFiles(purge_files_job_context);
572 }
573 purge_files_job_context.Clean();
574 return s;
575}
576
577Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
578 const std::string& secondary_path, DB** dbptr) {
579 *dbptr = nullptr;
580
581 DBOptions db_options(options);
582 ColumnFamilyOptions cf_options(options);
583 std::vector<ColumnFamilyDescriptor> column_families;
584 column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
585 std::vector<ColumnFamilyHandle*> handles;
586
587 Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
588 column_families, &handles, dbptr);
589 if (s.ok()) {
590 assert(handles.size() == 1);
591 delete handles[0];
592 }
593 return s;
594}
595
596Status DB::OpenAsSecondary(
597 const DBOptions& db_options, const std::string& dbname,
598 const std::string& secondary_path,
599 const std::vector<ColumnFamilyDescriptor>& column_families,
600 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
601 *dbptr = nullptr;
602 if (db_options.max_open_files != -1) {
603 // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
604 // on SST files so that db secondary can still have access to old SSTs
605 // while primary instance may delete original.
606 return Status::InvalidArgument("require max_open_files to be -1");
607 }
608
609 DBOptions tmp_opts(db_options);
610 Status s;
611 if (nullptr == tmp_opts.info_log) {
612 s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
613 if (!s.ok()) {
614 tmp_opts.info_log = nullptr;
615 }
616 }
617
618 handles->clear();
619 DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname);
620 impl->versions_.reset(new ReactiveVersionSet(
621 dbname, &impl->immutable_db_options_, impl->file_options_,
622 impl->table_cache_.get(), impl->write_buffer_manager_,
20effc67 623 &impl->write_controller_, impl->io_tracer_));
f67539c2
TL
624 impl->column_family_memtables_.reset(
625 new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
626 impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
627
628 impl->mutex_.Lock();
629 s = impl->Recover(column_families, true, false, false);
630 if (s.ok()) {
631 for (auto cf : column_families) {
632 auto cfd =
633 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
634 if (nullptr == cfd) {
20effc67 635 s = Status::InvalidArgument("Column family not found", cf.name);
f67539c2
TL
636 break;
637 }
638 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
639 }
640 }
641 SuperVersionContext sv_context(true /* create_superversion */);
642 if (s.ok()) {
643 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
644 sv_context.NewSuperVersion();
645 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
646 }
647 }
648 impl->mutex_.Unlock();
649 sv_context.Clean();
650 if (s.ok()) {
651 *dbptr = impl;
652 for (auto h : *handles) {
653 impl->NewThreadStatusCfInfo(
20effc67 654 static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
f67539c2
TL
655 }
656 } else {
657 for (auto h : *handles) {
658 delete h;
659 }
660 handles->clear();
661 delete impl;
662 }
663 return s;
664}
665#else // !ROCKSDB_LITE
666
667Status DB::OpenAsSecondary(const Options& /*options*/,
668 const std::string& /*name*/,
669 const std::string& /*secondary_path*/,
670 DB** /*dbptr*/) {
671 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
672}
673
674Status DB::OpenAsSecondary(
675 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
676 const std::string& /*secondary_path*/,
677 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
678 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
679 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
680}
681#endif // !ROCKSDB_LITE
682
683} // namespace ROCKSDB_NAMESPACE