]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include "db/db_impl/db_impl_secondary.h" | |
7 | ||
8 | #include <cinttypes> | |
9 | ||
10 | #include "db/arena_wrapped_db_iter.h" | |
11 | #include "db/merge_context.h" | |
12 | #include "logging/auto_roll_logger.h" | |
13 | #include "monitoring/perf_context_imp.h" | |
14 | #include "util/cast_util.h" | |
15 | ||
16 | namespace ROCKSDB_NAMESPACE { | |
17 | ||
18 | #ifndef ROCKSDB_LITE | |
19 | DBImplSecondary::DBImplSecondary(const DBOptions& db_options, | |
20 | const std::string& dbname) | |
21 | : DBImpl(db_options, dbname) { | |
22 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
23 | "Opening the db in secondary mode"); | |
24 | LogFlush(immutable_db_options_.info_log); | |
25 | } | |
26 | ||
27 | DBImplSecondary::~DBImplSecondary() {} | |
28 | ||
29 | Status DBImplSecondary::Recover( | |
30 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
20effc67 TL |
31 | bool /*readonly*/, bool /*error_if_wal_file_exists*/, |
32 | bool /*error_if_data_exists_in_wals*/, uint64_t*) { | |
f67539c2 TL |
33 | mutex_.AssertHeld(); |
34 | ||
35 | JobContext job_context(0); | |
36 | Status s; | |
37 | s = static_cast<ReactiveVersionSet*>(versions_.get()) | |
38 | ->Recover(column_families, &manifest_reader_, &manifest_reporter_, | |
39 | &manifest_reader_status_); | |
40 | if (!s.ok()) { | |
41 | return s; | |
42 | } | |
43 | if (immutable_db_options_.paranoid_checks && s.ok()) { | |
44 | s = CheckConsistency(); | |
45 | } | |
46 | // Initial max_total_in_memory_state_ before recovery logs. | |
47 | max_total_in_memory_state_ = 0; | |
48 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
49 | auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); | |
50 | max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * | |
51 | mutable_cf_options->max_write_buffer_number; | |
52 | } | |
53 | if (s.ok()) { | |
54 | default_cf_handle_ = new ColumnFamilyHandleImpl( | |
55 | versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); | |
56 | default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); | |
57 | single_column_family_mode_ = | |
58 | versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; | |
59 | ||
60 | std::unordered_set<ColumnFamilyData*> cfds_changed; | |
61 | s = FindAndRecoverLogFiles(&cfds_changed, &job_context); | |
62 | } | |
63 | ||
64 | if (s.IsPathNotFound()) { | |
65 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
66 | "Secondary tries to read WAL, but WAL file(s) have already " | |
67 | "been purged by primary."); | |
68 | s = Status::OK(); | |
69 | } | |
70 | // TODO: update options_file_number_ needed? | |
71 | ||
72 | job_context.Clean(); | |
73 | return s; | |
74 | } | |
75 | ||
76 | // find new WAL and apply them in order to the secondary instance | |
77 | Status DBImplSecondary::FindAndRecoverLogFiles( | |
78 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
79 | JobContext* job_context) { | |
80 | assert(nullptr != cfds_changed); | |
81 | assert(nullptr != job_context); | |
82 | Status s; | |
83 | std::vector<uint64_t> logs; | |
84 | s = FindNewLogNumbers(&logs); | |
85 | if (s.ok() && !logs.empty()) { | |
86 | SequenceNumber next_sequence(kMaxSequenceNumber); | |
87 | s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context); | |
88 | } | |
89 | return s; | |
90 | } | |
91 | ||
92 | // List wal_dir and find all new WALs, return these log numbers | |
93 | Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) { | |
94 | assert(logs != nullptr); | |
95 | std::vector<std::string> filenames; | |
96 | Status s; | |
97 | s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); | |
98 | if (s.IsNotFound()) { | |
99 | return Status::InvalidArgument("Failed to open wal_dir", | |
100 | immutable_db_options_.wal_dir); | |
101 | } else if (!s.ok()) { | |
102 | return s; | |
103 | } | |
104 | ||
105 | // if log_readers_ is non-empty, it means we have applied all logs with log | |
106 | // numbers smaller than the smallest log in log_readers_, so there is no | |
107 | // need to pass these logs to RecoverLogFiles | |
108 | uint64_t log_number_min = 0; | |
109 | if (!log_readers_.empty()) { | |
110 | log_number_min = log_readers_.begin()->first; | |
111 | } | |
112 | for (size_t i = 0; i < filenames.size(); i++) { | |
113 | uint64_t number; | |
114 | FileType type; | |
20effc67 | 115 | if (ParseFileName(filenames[i], &number, &type) && type == kWalFile && |
f67539c2 TL |
116 | number >= log_number_min) { |
117 | logs->push_back(number); | |
118 | } | |
119 | } | |
120 | // Recover logs in the order that they were generated | |
121 | if (!logs->empty()) { | |
122 | std::sort(logs->begin(), logs->end()); | |
123 | } | |
124 | return s; | |
125 | } | |
126 | ||
127 | Status DBImplSecondary::MaybeInitLogReader( | |
128 | uint64_t log_number, log::FragmentBufferedReader** log_reader) { | |
129 | auto iter = log_readers_.find(log_number); | |
130 | // make sure the log file is still present | |
131 | if (iter == log_readers_.end() || | |
132 | iter->second->reader_->GetLogNumber() != log_number) { | |
133 | // delete the obsolete log reader if log number mismatch | |
134 | if (iter != log_readers_.end()) { | |
135 | log_readers_.erase(iter); | |
136 | } | |
137 | // initialize log reader from log_number | |
138 | // TODO: min_log_number_to_keep_2pc check needed? | |
139 | // Open the log file | |
140 | std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number); | |
141 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
142 | "Recovering log #%" PRIu64 " mode %d", log_number, | |
143 | static_cast<int>(immutable_db_options_.wal_recovery_mode)); | |
144 | ||
145 | std::unique_ptr<SequentialFileReader> file_reader; | |
146 | { | |
147 | std::unique_ptr<FSSequentialFile> file; | |
148 | Status status = fs_->NewSequentialFile( | |
149 | fname, fs_->OptimizeForLogRead(file_options_), &file, | |
150 | nullptr); | |
151 | if (!status.ok()) { | |
152 | *log_reader = nullptr; | |
153 | return status; | |
154 | } | |
155 | file_reader.reset(new SequentialFileReader( | |
20effc67 TL |
156 | std::move(file), fname, immutable_db_options_.log_readahead_size, |
157 | io_tracer_)); | |
f67539c2 TL |
158 | } |
159 | ||
160 | // Create the log reader. | |
161 | LogReaderContainer* log_reader_container = new LogReaderContainer( | |
162 | env_, immutable_db_options_.info_log, std::move(fname), | |
163 | std::move(file_reader), log_number); | |
164 | log_readers_.insert(std::make_pair( | |
165 | log_number, std::unique_ptr<LogReaderContainer>(log_reader_container))); | |
166 | } | |
167 | iter = log_readers_.find(log_number); | |
168 | assert(iter != log_readers_.end()); | |
169 | *log_reader = iter->second->reader_; | |
170 | return Status::OK(); | |
171 | } | |
172 | ||
173 | // After manifest recovery, replay WALs and refresh log_readers_ if necessary | |
174 | // REQUIRES: log_numbers are sorted in ascending order | |
175 | Status DBImplSecondary::RecoverLogFiles( | |
176 | const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence, | |
177 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
178 | JobContext* job_context) { | |
179 | assert(nullptr != cfds_changed); | |
180 | assert(nullptr != job_context); | |
181 | mutex_.AssertHeld(); | |
182 | Status status; | |
183 | for (auto log_number : log_numbers) { | |
184 | log::FragmentBufferedReader* reader = nullptr; | |
185 | status = MaybeInitLogReader(log_number, &reader); | |
186 | if (!status.ok()) { | |
187 | return status; | |
188 | } | |
189 | assert(reader != nullptr); | |
190 | } | |
191 | for (auto log_number : log_numbers) { | |
192 | auto it = log_readers_.find(log_number); | |
193 | assert(it != log_readers_.end()); | |
194 | log::FragmentBufferedReader* reader = it->second->reader_; | |
20effc67 TL |
195 | Status* wal_read_status = it->second->status_; |
196 | assert(wal_read_status); | |
f67539c2 TL |
197 | // Manually update the file number allocation counter in VersionSet. |
198 | versions_->MarkFileNumberUsed(log_number); | |
199 | ||
200 | // Determine if we should tolerate incomplete records at the tail end of the | |
201 | // Read all the records and add to a memtable | |
202 | std::string scratch; | |
203 | Slice record; | |
204 | WriteBatch batch; | |
205 | ||
206 | while (reader->ReadRecord(&record, &scratch, | |
207 | immutable_db_options_.wal_recovery_mode) && | |
20effc67 | 208 | wal_read_status->ok() && status.ok()) { |
f67539c2 TL |
209 | if (record.size() < WriteBatchInternal::kHeader) { |
210 | reader->GetReporter()->Corruption( | |
211 | record.size(), Status::Corruption("log record too small")); | |
212 | continue; | |
213 | } | |
20effc67 TL |
214 | status = WriteBatchInternal::SetContents(&batch, record); |
215 | if (!status.ok()) { | |
216 | break; | |
217 | } | |
f67539c2 TL |
218 | SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); |
219 | std::vector<uint32_t> column_family_ids; | |
220 | status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); | |
221 | if (status.ok()) { | |
222 | for (const auto id : column_family_ids) { | |
223 | ColumnFamilyData* cfd = | |
224 | versions_->GetColumnFamilySet()->GetColumnFamily(id); | |
225 | if (cfd == nullptr) { | |
226 | continue; | |
227 | } | |
228 | if (cfds_changed->count(cfd) == 0) { | |
229 | cfds_changed->insert(cfd); | |
230 | } | |
231 | const std::vector<FileMetaData*>& l0_files = | |
232 | cfd->current()->storage_info()->LevelFiles(0); | |
233 | SequenceNumber seq = | |
234 | l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno; | |
235 | // If the write batch's sequence number is smaller than the last | |
236 | // sequence number of the largest sequence persisted for this column | |
237 | // family, then its data must reside in an SST that has already been | |
238 | // added in the prior MANIFEST replay. | |
239 | if (seq_of_batch <= seq) { | |
240 | continue; | |
241 | } | |
242 | auto curr_log_num = port::kMaxUint64; | |
243 | if (cfd_to_current_log_.count(cfd) > 0) { | |
244 | curr_log_num = cfd_to_current_log_[cfd]; | |
245 | } | |
246 | // If the active memtable contains records added by replaying an | |
247 | // earlier WAL, then we need to seal the memtable, add it to the | |
248 | // immutable memtable list and create a new active memtable. | |
249 | if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 || | |
250 | curr_log_num != log_number)) { | |
251 | const MutableCFOptions mutable_cf_options = | |
252 | *cfd->GetLatestMutableCFOptions(); | |
253 | MemTable* new_mem = | |
254 | cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch); | |
255 | cfd->mem()->SetNextLogNumber(log_number); | |
256 | cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); | |
257 | new_mem->Ref(); | |
258 | cfd->SetMemtable(new_mem); | |
259 | } | |
260 | } | |
261 | bool has_valid_writes = false; | |
262 | status = WriteBatchInternal::InsertInto( | |
263 | &batch, column_family_memtables_.get(), | |
264 | nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/, | |
265 | true, log_number, this, false /* concurrent_memtable_writes */, | |
266 | next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); | |
267 | } | |
268 | // If column family was not found, it might mean that the WAL write | |
269 | // batch references to the column family that was dropped after the | |
270 | // insert. We don't want to fail the whole write batch in that case -- | |
271 | // we just ignore the update. | |
272 | // That's why we set ignore missing column families to true | |
273 | // passing null flush_scheduler will disable memtable flushing which is | |
274 | // needed for secondary instances | |
275 | if (status.ok()) { | |
276 | for (const auto id : column_family_ids) { | |
277 | ColumnFamilyData* cfd = | |
278 | versions_->GetColumnFamilySet()->GetColumnFamily(id); | |
279 | if (cfd == nullptr) { | |
280 | continue; | |
281 | } | |
282 | std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter = | |
283 | cfd_to_current_log_.find(cfd); | |
284 | if (iter == cfd_to_current_log_.end()) { | |
285 | cfd_to_current_log_.insert({cfd, log_number}); | |
286 | } else if (log_number > iter->second) { | |
287 | iter->second = log_number; | |
288 | } | |
289 | } | |
290 | auto last_sequence = *next_sequence - 1; | |
291 | if ((*next_sequence != kMaxSequenceNumber) && | |
292 | (versions_->LastSequence() <= last_sequence)) { | |
293 | versions_->SetLastAllocatedSequence(last_sequence); | |
294 | versions_->SetLastPublishedSequence(last_sequence); | |
295 | versions_->SetLastSequence(last_sequence); | |
296 | } | |
297 | } else { | |
298 | // We are treating this as a failure while reading since we read valid | |
299 | // blocks that do not form coherent data | |
300 | reader->GetReporter()->Corruption(record.size(), status); | |
301 | } | |
302 | } | |
20effc67 TL |
303 | if (status.ok() && !wal_read_status->ok()) { |
304 | status = *wal_read_status; | |
305 | } | |
f67539c2 TL |
306 | if (!status.ok()) { |
307 | return status; | |
308 | } | |
309 | } | |
310 | // remove logreaders from map after successfully recovering the WAL | |
311 | if (log_readers_.size() > 1) { | |
312 | auto erase_iter = log_readers_.begin(); | |
313 | std::advance(erase_iter, log_readers_.size() - 1); | |
314 | log_readers_.erase(log_readers_.begin(), erase_iter); | |
315 | } | |
316 | return status; | |
317 | } | |
318 | ||
319 | // Implementation of the DB interface | |
320 | Status DBImplSecondary::Get(const ReadOptions& read_options, | |
321 | ColumnFamilyHandle* column_family, const Slice& key, | |
322 | PinnableSlice* value) { | |
323 | return GetImpl(read_options, column_family, key, value); | |
324 | } | |
325 | ||
326 | Status DBImplSecondary::GetImpl(const ReadOptions& read_options, | |
327 | ColumnFamilyHandle* column_family, | |
328 | const Slice& key, PinnableSlice* pinnable_val) { | |
329 | assert(pinnable_val != nullptr); | |
330 | PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); | |
331 | StopWatch sw(env_, stats_, DB_GET); | |
332 | PERF_TIMER_GUARD(get_snapshot_time); | |
333 | ||
334 | auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family); | |
335 | ColumnFamilyData* cfd = cfh->cfd(); | |
336 | if (tracer_) { | |
337 | InstrumentedMutexLock lock(&trace_mutex_); | |
338 | if (tracer_) { | |
339 | tracer_->Get(column_family, key); | |
340 | } | |
341 | } | |
342 | // Acquire SuperVersion | |
343 | SuperVersion* super_version = GetAndRefSuperVersion(cfd); | |
344 | SequenceNumber snapshot = versions_->LastSequence(); | |
345 | MergeContext merge_context; | |
346 | SequenceNumber max_covering_tombstone_seq = 0; | |
347 | Status s; | |
348 | LookupKey lkey(key, snapshot); | |
349 | PERF_TIMER_STOP(get_snapshot_time); | |
350 | ||
351 | bool done = false; | |
20effc67 TL |
352 | if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), |
353 | /*timestamp=*/nullptr, &s, &merge_context, | |
f67539c2 TL |
354 | &max_covering_tombstone_seq, read_options)) { |
355 | done = true; | |
356 | pinnable_val->PinSelf(); | |
357 | RecordTick(stats_, MEMTABLE_HIT); | |
358 | } else if ((s.ok() || s.IsMergeInProgress()) && | |
359 | super_version->imm->Get( | |
20effc67 TL |
360 | lkey, pinnable_val->GetSelf(), /*timestamp=*/nullptr, &s, |
361 | &merge_context, &max_covering_tombstone_seq, read_options)) { | |
f67539c2 TL |
362 | done = true; |
363 | pinnable_val->PinSelf(); | |
364 | RecordTick(stats_, MEMTABLE_HIT); | |
365 | } | |
366 | if (!done && !s.ok() && !s.IsMergeInProgress()) { | |
367 | ReturnAndCleanupSuperVersion(cfd, super_version); | |
368 | return s; | |
369 | } | |
370 | if (!done) { | |
371 | PERF_TIMER_GUARD(get_from_output_files_time); | |
20effc67 TL |
372 | super_version->current->Get(read_options, lkey, pinnable_val, |
373 | /*timestamp=*/nullptr, &s, &merge_context, | |
374 | &max_covering_tombstone_seq); | |
f67539c2 TL |
375 | RecordTick(stats_, MEMTABLE_MISS); |
376 | } | |
377 | { | |
378 | PERF_TIMER_GUARD(get_post_process_time); | |
379 | ReturnAndCleanupSuperVersion(cfd, super_version); | |
380 | RecordTick(stats_, NUMBER_KEYS_READ); | |
381 | size_t size = pinnable_val->size(); | |
382 | RecordTick(stats_, BYTES_READ, size); | |
383 | RecordTimeToHistogram(stats_, BYTES_PER_READ, size); | |
384 | PERF_COUNTER_ADD(get_read_bytes, size); | |
385 | } | |
386 | return s; | |
387 | } | |
388 | ||
389 | Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, | |
390 | ColumnFamilyHandle* column_family) { | |
391 | if (read_options.managed) { | |
392 | return NewErrorIterator( | |
393 | Status::NotSupported("Managed iterator is not supported anymore.")); | |
394 | } | |
395 | if (read_options.read_tier == kPersistedTier) { | |
396 | return NewErrorIterator(Status::NotSupported( | |
397 | "ReadTier::kPersistedData is not yet supported in iterators.")); | |
398 | } | |
399 | Iterator* result = nullptr; | |
20effc67 | 400 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
f67539c2 TL |
401 | auto cfd = cfh->cfd(); |
402 | ReadCallback* read_callback = nullptr; // No read callback provided. | |
403 | if (read_options.tailing) { | |
404 | return NewErrorIterator(Status::NotSupported( | |
405 | "tailing iterator not supported in secondary mode")); | |
406 | } else if (read_options.snapshot != nullptr) { | |
407 | // TODO (yanqin) support snapshot. | |
408 | return NewErrorIterator( | |
409 | Status::NotSupported("snapshot not supported in secondary mode")); | |
410 | } else { | |
411 | auto snapshot = versions_->LastSequence(); | |
412 | result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); | |
413 | } | |
414 | return result; | |
415 | } | |
416 | ||
417 | ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( | |
418 | const ReadOptions& read_options, ColumnFamilyData* cfd, | |
419 | SequenceNumber snapshot, ReadCallback* read_callback) { | |
420 | assert(nullptr != cfd); | |
421 | SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); | |
422 | auto db_iter = NewArenaWrappedDbIterator( | |
423 | env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, | |
424 | snapshot, | |
425 | super_version->mutable_cf_options.max_sequential_skip_in_iterations, | |
426 | super_version->version_number, read_callback); | |
20effc67 TL |
427 | auto internal_iter = NewInternalIterator( |
428 | db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), | |
429 | db_iter->GetRangeDelAggregator(), snapshot, | |
430 | /* allow_unprepared_value */ true); | |
f67539c2 TL |
431 | db_iter->SetIterUnderDBIter(internal_iter); |
432 | return db_iter; | |
433 | } | |
434 | ||
435 | Status DBImplSecondary::NewIterators( | |
436 | const ReadOptions& read_options, | |
437 | const std::vector<ColumnFamilyHandle*>& column_families, | |
438 | std::vector<Iterator*>* iterators) { | |
439 | if (read_options.managed) { | |
440 | return Status::NotSupported("Managed iterator is not supported anymore."); | |
441 | } | |
442 | if (read_options.read_tier == kPersistedTier) { | |
443 | return Status::NotSupported( | |
444 | "ReadTier::kPersistedData is not yet supported in iterators."); | |
445 | } | |
446 | ReadCallback* read_callback = nullptr; // No read callback provided. | |
447 | if (iterators == nullptr) { | |
448 | return Status::InvalidArgument("iterators not allowed to be nullptr"); | |
449 | } | |
450 | iterators->clear(); | |
451 | iterators->reserve(column_families.size()); | |
452 | if (read_options.tailing) { | |
453 | return Status::NotSupported( | |
454 | "tailing iterator not supported in secondary mode"); | |
455 | } else if (read_options.snapshot != nullptr) { | |
456 | // TODO (yanqin) support snapshot. | |
457 | return Status::NotSupported("snapshot not supported in secondary mode"); | |
458 | } else { | |
459 | SequenceNumber read_seq = versions_->LastSequence(); | |
460 | for (auto cfh : column_families) { | |
461 | ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); | |
462 | iterators->push_back( | |
463 | NewIteratorImpl(read_options, cfd, read_seq, read_callback)); | |
464 | } | |
465 | } | |
466 | return Status::OK(); | |
467 | } | |
468 | ||
469 | Status DBImplSecondary::CheckConsistency() { | |
470 | mutex_.AssertHeld(); | |
471 | Status s = DBImpl::CheckConsistency(); | |
472 | // If DBImpl::CheckConsistency() which is stricter returns success, then we | |
473 | // do not need to give a second chance. | |
474 | if (s.ok()) { | |
475 | return s; | |
476 | } | |
477 | // It's possible that DBImpl::CheckConssitency() can fail because the primary | |
478 | // may have removed certain files, causing the GetFileSize(name) call to | |
479 | // fail and returning a PathNotFound. In this case, we take a best-effort | |
480 | // approach and just proceed. | |
481 | TEST_SYNC_POINT_CALLBACK( | |
482 | "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s); | |
483 | ||
484 | if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) { | |
485 | return Status::OK(); | |
486 | } | |
487 | ||
488 | std::vector<LiveFileMetaData> metadata; | |
489 | versions_->GetLiveFilesMetaData(&metadata); | |
490 | ||
491 | std::string corruption_messages; | |
492 | for (const auto& md : metadata) { | |
493 | // md.name has a leading "/". | |
494 | std::string file_path = md.db_path + md.name; | |
495 | ||
496 | uint64_t fsize = 0; | |
497 | s = env_->GetFileSize(file_path, &fsize); | |
498 | if (!s.ok() && | |
499 | (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() || | |
500 | s.IsPathNotFound())) { | |
501 | s = Status::OK(); | |
502 | } | |
503 | if (!s.ok()) { | |
504 | corruption_messages += | |
505 | "Can't access " + md.name + ": " + s.ToString() + "\n"; | |
506 | } | |
507 | } | |
508 | return corruption_messages.empty() ? Status::OK() | |
509 | : Status::Corruption(corruption_messages); | |
510 | } | |
511 | ||
512 | Status DBImplSecondary::TryCatchUpWithPrimary() { | |
513 | assert(versions_.get() != nullptr); | |
514 | assert(manifest_reader_.get() != nullptr); | |
515 | Status s; | |
516 | // read the manifest and apply new changes to the secondary instance | |
517 | std::unordered_set<ColumnFamilyData*> cfds_changed; | |
518 | JobContext job_context(0, true /*create_superversion*/); | |
519 | { | |
520 | InstrumentedMutexLock lock_guard(&mutex_); | |
521 | s = static_cast_with_check<ReactiveVersionSet>(versions_.get()) | |
522 | ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); | |
523 | ||
524 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, | |
525 | static_cast<uint64_t>(versions_->LastSequence())); | |
526 | for (ColumnFamilyData* cfd : cfds_changed) { | |
527 | if (cfd->IsDropped()) { | |
528 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", | |
529 | cfd->GetName().c_str()); | |
530 | continue; | |
531 | } | |
532 | VersionStorageInfo::LevelSummaryStorage tmp; | |
533 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
534 | "[%s] Level summary: %s\n", cfd->GetName().c_str(), | |
535 | cfd->current()->storage_info()->LevelSummary(&tmp)); | |
536 | } | |
537 | ||
538 | // list wal_dir to discover new WALs and apply new changes to the secondary | |
539 | // instance | |
540 | if (s.ok()) { | |
541 | s = FindAndRecoverLogFiles(&cfds_changed, &job_context); | |
542 | } | |
543 | if (s.IsPathNotFound()) { | |
544 | ROCKS_LOG_INFO( | |
545 | immutable_db_options_.info_log, | |
546 | "Secondary tries to read WAL, but WAL file(s) have already " | |
547 | "been purged by primary."); | |
548 | s = Status::OK(); | |
549 | } | |
550 | if (s.ok()) { | |
551 | for (auto cfd : cfds_changed) { | |
552 | cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), | |
553 | &job_context.memtables_to_free); | |
554 | auto& sv_context = job_context.superversion_contexts.back(); | |
555 | cfd->InstallSuperVersion(&sv_context, &mutex_); | |
556 | sv_context.NewSuperVersion(); | |
557 | } | |
558 | } | |
559 | } | |
560 | job_context.Clean(); | |
561 | ||
562 | // Cleanup unused, obsolete files. | |
563 | JobContext purge_files_job_context(0); | |
564 | { | |
565 | InstrumentedMutexLock lock_guard(&mutex_); | |
566 | // Currently, secondary instance does not own the database files, thus it | |
567 | // is unnecessary for the secondary to force full scan. | |
568 | FindObsoleteFiles(&purge_files_job_context, /*force=*/false); | |
569 | } | |
570 | if (purge_files_job_context.HaveSomethingToDelete()) { | |
571 | PurgeObsoleteFiles(purge_files_job_context); | |
572 | } | |
573 | purge_files_job_context.Clean(); | |
574 | return s; | |
575 | } | |
576 | ||
577 | Status DB::OpenAsSecondary(const Options& options, const std::string& dbname, | |
578 | const std::string& secondary_path, DB** dbptr) { | |
579 | *dbptr = nullptr; | |
580 | ||
581 | DBOptions db_options(options); | |
582 | ColumnFamilyOptions cf_options(options); | |
583 | std::vector<ColumnFamilyDescriptor> column_families; | |
584 | column_families.emplace_back(kDefaultColumnFamilyName, cf_options); | |
585 | std::vector<ColumnFamilyHandle*> handles; | |
586 | ||
587 | Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path, | |
588 | column_families, &handles, dbptr); | |
589 | if (s.ok()) { | |
590 | assert(handles.size() == 1); | |
591 | delete handles[0]; | |
592 | } | |
593 | return s; | |
594 | } | |
595 | ||
596 | Status DB::OpenAsSecondary( | |
597 | const DBOptions& db_options, const std::string& dbname, | |
598 | const std::string& secondary_path, | |
599 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
600 | std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) { | |
601 | *dbptr = nullptr; | |
602 | if (db_options.max_open_files != -1) { | |
603 | // TODO (yanqin) maybe support max_open_files != -1 by creating hard links | |
604 | // on SST files so that db secondary can still have access to old SSTs | |
605 | // while primary instance may delete original. | |
606 | return Status::InvalidArgument("require max_open_files to be -1"); | |
607 | } | |
608 | ||
609 | DBOptions tmp_opts(db_options); | |
610 | Status s; | |
611 | if (nullptr == tmp_opts.info_log) { | |
612 | s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log); | |
613 | if (!s.ok()) { | |
614 | tmp_opts.info_log = nullptr; | |
615 | } | |
616 | } | |
617 | ||
618 | handles->clear(); | |
619 | DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname); | |
620 | impl->versions_.reset(new ReactiveVersionSet( | |
621 | dbname, &impl->immutable_db_options_, impl->file_options_, | |
622 | impl->table_cache_.get(), impl->write_buffer_manager_, | |
20effc67 | 623 | &impl->write_controller_, impl->io_tracer_)); |
f67539c2 TL |
624 | impl->column_family_memtables_.reset( |
625 | new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); | |
626 | impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_); | |
627 | ||
628 | impl->mutex_.Lock(); | |
629 | s = impl->Recover(column_families, true, false, false); | |
630 | if (s.ok()) { | |
631 | for (auto cf : column_families) { | |
632 | auto cfd = | |
633 | impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); | |
634 | if (nullptr == cfd) { | |
20effc67 | 635 | s = Status::InvalidArgument("Column family not found", cf.name); |
f67539c2 TL |
636 | break; |
637 | } | |
638 | handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); | |
639 | } | |
640 | } | |
641 | SuperVersionContext sv_context(true /* create_superversion */); | |
642 | if (s.ok()) { | |
643 | for (auto cfd : *impl->versions_->GetColumnFamilySet()) { | |
644 | sv_context.NewSuperVersion(); | |
645 | cfd->InstallSuperVersion(&sv_context, &impl->mutex_); | |
646 | } | |
647 | } | |
648 | impl->mutex_.Unlock(); | |
649 | sv_context.Clean(); | |
650 | if (s.ok()) { | |
651 | *dbptr = impl; | |
652 | for (auto h : *handles) { | |
653 | impl->NewThreadStatusCfInfo( | |
20effc67 | 654 | static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd()); |
f67539c2 TL |
655 | } |
656 | } else { | |
657 | for (auto h : *handles) { | |
658 | delete h; | |
659 | } | |
660 | handles->clear(); | |
661 | delete impl; | |
662 | } | |
663 | return s; | |
664 | } | |
665 | #else // !ROCKSDB_LITE | |
666 | ||
667 | Status DB::OpenAsSecondary(const Options& /*options*/, | |
668 | const std::string& /*name*/, | |
669 | const std::string& /*secondary_path*/, | |
670 | DB** /*dbptr*/) { | |
671 | return Status::NotSupported("Not supported in ROCKSDB_LITE."); | |
672 | } | |
673 | ||
674 | Status DB::OpenAsSecondary( | |
675 | const DBOptions& /*db_options*/, const std::string& /*dbname*/, | |
676 | const std::string& /*secondary_path*/, | |
677 | const std::vector<ColumnFamilyDescriptor>& /*column_families*/, | |
678 | std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) { | |
679 | return Status::NotSupported("Not supported in ROCKSDB_LITE."); | |
680 | } | |
681 | #endif // !ROCKSDB_LITE | |
682 | ||
683 | } // namespace ROCKSDB_NAMESPACE |