]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include "db/db_impl/db_impl_secondary.h" | |
7 | ||
8 | #include <cinttypes> | |
9 | ||
10 | #include "db/arena_wrapped_db_iter.h" | |
11 | #include "db/merge_context.h" | |
12 | #include "logging/auto_roll_logger.h" | |
1e59de90 | 13 | #include "logging/logging.h" |
f67539c2 | 14 | #include "monitoring/perf_context_imp.h" |
1e59de90 | 15 | #include "rocksdb/configurable.h" |
f67539c2 TL |
16 | #include "util/cast_util.h" |
17 | ||
18 | namespace ROCKSDB_NAMESPACE { | |
19 | ||
20 | #ifndef ROCKSDB_LITE | |
21 | DBImplSecondary::DBImplSecondary(const DBOptions& db_options, | |
1e59de90 TL |
22 | const std::string& dbname, |
23 | std::string secondary_path) | |
24 | : DBImpl(db_options, dbname, false, true, true), | |
25 | secondary_path_(std::move(secondary_path)) { | |
f67539c2 TL |
26 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
27 | "Opening the db in secondary mode"); | |
28 | LogFlush(immutable_db_options_.info_log); | |
29 | } | |
30 | ||
31 | DBImplSecondary::~DBImplSecondary() {} | |
32 | ||
33 | Status DBImplSecondary::Recover( | |
34 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
20effc67 | 35 | bool /*readonly*/, bool /*error_if_wal_file_exists*/, |
1e59de90 TL |
36 | bool /*error_if_data_exists_in_wals*/, uint64_t*, |
37 | RecoveryContext* /*recovery_ctx*/) { | |
f67539c2 TL |
38 | mutex_.AssertHeld(); |
39 | ||
40 | JobContext job_context(0); | |
41 | Status s; | |
42 | s = static_cast<ReactiveVersionSet*>(versions_.get()) | |
43 | ->Recover(column_families, &manifest_reader_, &manifest_reporter_, | |
44 | &manifest_reader_status_); | |
45 | if (!s.ok()) { | |
1e59de90 TL |
46 | if (manifest_reader_status_) { |
47 | manifest_reader_status_->PermitUncheckedError(); | |
48 | } | |
f67539c2 TL |
49 | return s; |
50 | } | |
51 | if (immutable_db_options_.paranoid_checks && s.ok()) { | |
52 | s = CheckConsistency(); | |
53 | } | |
54 | // Initial max_total_in_memory_state_ before recovery logs. | |
55 | max_total_in_memory_state_ = 0; | |
56 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
57 | auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); | |
58 | max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * | |
59 | mutable_cf_options->max_write_buffer_number; | |
60 | } | |
61 | if (s.ok()) { | |
62 | default_cf_handle_ = new ColumnFamilyHandleImpl( | |
63 | versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); | |
64 | default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); | |
f67539c2 TL |
65 | |
66 | std::unordered_set<ColumnFamilyData*> cfds_changed; | |
67 | s = FindAndRecoverLogFiles(&cfds_changed, &job_context); | |
68 | } | |
69 | ||
70 | if (s.IsPathNotFound()) { | |
71 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
72 | "Secondary tries to read WAL, but WAL file(s) have already " | |
73 | "been purged by primary."); | |
74 | s = Status::OK(); | |
75 | } | |
76 | // TODO: update options_file_number_ needed? | |
77 | ||
78 | job_context.Clean(); | |
79 | return s; | |
80 | } | |
81 | ||
82 | // find new WAL and apply them in order to the secondary instance | |
83 | Status DBImplSecondary::FindAndRecoverLogFiles( | |
84 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
85 | JobContext* job_context) { | |
86 | assert(nullptr != cfds_changed); | |
87 | assert(nullptr != job_context); | |
88 | Status s; | |
89 | std::vector<uint64_t> logs; | |
90 | s = FindNewLogNumbers(&logs); | |
91 | if (s.ok() && !logs.empty()) { | |
92 | SequenceNumber next_sequence(kMaxSequenceNumber); | |
93 | s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context); | |
94 | } | |
95 | return s; | |
96 | } | |
97 | ||
98 | // List wal_dir and find all new WALs, return these log numbers | |
99 | Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) { | |
100 | assert(logs != nullptr); | |
101 | std::vector<std::string> filenames; | |
102 | Status s; | |
1e59de90 TL |
103 | IOOptions io_opts; |
104 | io_opts.do_not_recurse = true; | |
105 | s = immutable_db_options_.fs->GetChildren(immutable_db_options_.GetWalDir(), | |
106 | io_opts, &filenames, | |
107 | /*IODebugContext*=*/nullptr); | |
f67539c2 TL |
108 | if (s.IsNotFound()) { |
109 | return Status::InvalidArgument("Failed to open wal_dir", | |
1e59de90 | 110 | immutable_db_options_.GetWalDir()); |
f67539c2 TL |
111 | } else if (!s.ok()) { |
112 | return s; | |
113 | } | |
114 | ||
115 | // if log_readers_ is non-empty, it means we have applied all logs with log | |
116 | // numbers smaller than the smallest log in log_readers_, so there is no | |
117 | // need to pass these logs to RecoverLogFiles | |
118 | uint64_t log_number_min = 0; | |
119 | if (!log_readers_.empty()) { | |
120 | log_number_min = log_readers_.begin()->first; | |
121 | } | |
122 | for (size_t i = 0; i < filenames.size(); i++) { | |
123 | uint64_t number; | |
124 | FileType type; | |
20effc67 | 125 | if (ParseFileName(filenames[i], &number, &type) && type == kWalFile && |
f67539c2 TL |
126 | number >= log_number_min) { |
127 | logs->push_back(number); | |
128 | } | |
129 | } | |
130 | // Recover logs in the order that they were generated | |
131 | if (!logs->empty()) { | |
132 | std::sort(logs->begin(), logs->end()); | |
133 | } | |
134 | return s; | |
135 | } | |
136 | ||
137 | Status DBImplSecondary::MaybeInitLogReader( | |
138 | uint64_t log_number, log::FragmentBufferedReader** log_reader) { | |
139 | auto iter = log_readers_.find(log_number); | |
140 | // make sure the log file is still present | |
141 | if (iter == log_readers_.end() || | |
142 | iter->second->reader_->GetLogNumber() != log_number) { | |
143 | // delete the obsolete log reader if log number mismatch | |
144 | if (iter != log_readers_.end()) { | |
145 | log_readers_.erase(iter); | |
146 | } | |
147 | // initialize log reader from log_number | |
148 | // TODO: min_log_number_to_keep_2pc check needed? | |
149 | // Open the log file | |
1e59de90 TL |
150 | std::string fname = |
151 | LogFileName(immutable_db_options_.GetWalDir(), log_number); | |
f67539c2 TL |
152 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
153 | "Recovering log #%" PRIu64 " mode %d", log_number, | |
154 | static_cast<int>(immutable_db_options_.wal_recovery_mode)); | |
155 | ||
156 | std::unique_ptr<SequentialFileReader> file_reader; | |
157 | { | |
158 | std::unique_ptr<FSSequentialFile> file; | |
159 | Status status = fs_->NewSequentialFile( | |
1e59de90 | 160 | fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr); |
f67539c2 TL |
161 | if (!status.ok()) { |
162 | *log_reader = nullptr; | |
163 | return status; | |
164 | } | |
165 | file_reader.reset(new SequentialFileReader( | |
20effc67 TL |
166 | std::move(file), fname, immutable_db_options_.log_readahead_size, |
167 | io_tracer_)); | |
f67539c2 TL |
168 | } |
169 | ||
170 | // Create the log reader. | |
171 | LogReaderContainer* log_reader_container = new LogReaderContainer( | |
172 | env_, immutable_db_options_.info_log, std::move(fname), | |
173 | std::move(file_reader), log_number); | |
174 | log_readers_.insert(std::make_pair( | |
175 | log_number, std::unique_ptr<LogReaderContainer>(log_reader_container))); | |
176 | } | |
177 | iter = log_readers_.find(log_number); | |
178 | assert(iter != log_readers_.end()); | |
179 | *log_reader = iter->second->reader_; | |
180 | return Status::OK(); | |
181 | } | |
182 | ||
183 | // After manifest recovery, replay WALs and refresh log_readers_ if necessary | |
184 | // REQUIRES: log_numbers are sorted in ascending order | |
185 | Status DBImplSecondary::RecoverLogFiles( | |
186 | const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence, | |
187 | std::unordered_set<ColumnFamilyData*>* cfds_changed, | |
188 | JobContext* job_context) { | |
189 | assert(nullptr != cfds_changed); | |
190 | assert(nullptr != job_context); | |
191 | mutex_.AssertHeld(); | |
192 | Status status; | |
193 | for (auto log_number : log_numbers) { | |
194 | log::FragmentBufferedReader* reader = nullptr; | |
195 | status = MaybeInitLogReader(log_number, &reader); | |
196 | if (!status.ok()) { | |
197 | return status; | |
198 | } | |
199 | assert(reader != nullptr); | |
200 | } | |
201 | for (auto log_number : log_numbers) { | |
1e59de90 | 202 | auto it = log_readers_.find(log_number); |
f67539c2 TL |
203 | assert(it != log_readers_.end()); |
204 | log::FragmentBufferedReader* reader = it->second->reader_; | |
20effc67 TL |
205 | Status* wal_read_status = it->second->status_; |
206 | assert(wal_read_status); | |
f67539c2 TL |
207 | // Manually update the file number allocation counter in VersionSet. |
208 | versions_->MarkFileNumberUsed(log_number); | |
209 | ||
210 | // Determine if we should tolerate incomplete records at the tail end of the | |
211 | // Read all the records and add to a memtable | |
212 | std::string scratch; | |
213 | Slice record; | |
214 | WriteBatch batch; | |
215 | ||
216 | while (reader->ReadRecord(&record, &scratch, | |
217 | immutable_db_options_.wal_recovery_mode) && | |
20effc67 | 218 | wal_read_status->ok() && status.ok()) { |
f67539c2 TL |
219 | if (record.size() < WriteBatchInternal::kHeader) { |
220 | reader->GetReporter()->Corruption( | |
221 | record.size(), Status::Corruption("log record too small")); | |
222 | continue; | |
223 | } | |
20effc67 TL |
224 | status = WriteBatchInternal::SetContents(&batch, record); |
225 | if (!status.ok()) { | |
226 | break; | |
227 | } | |
f67539c2 TL |
228 | SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); |
229 | std::vector<uint32_t> column_family_ids; | |
230 | status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); | |
231 | if (status.ok()) { | |
232 | for (const auto id : column_family_ids) { | |
233 | ColumnFamilyData* cfd = | |
234 | versions_->GetColumnFamilySet()->GetColumnFamily(id); | |
235 | if (cfd == nullptr) { | |
236 | continue; | |
237 | } | |
238 | if (cfds_changed->count(cfd) == 0) { | |
239 | cfds_changed->insert(cfd); | |
240 | } | |
241 | const std::vector<FileMetaData*>& l0_files = | |
242 | cfd->current()->storage_info()->LevelFiles(0); | |
243 | SequenceNumber seq = | |
244 | l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno; | |
245 | // If the write batch's sequence number is smaller than the last | |
246 | // sequence number of the largest sequence persisted for this column | |
247 | // family, then its data must reside in an SST that has already been | |
248 | // added in the prior MANIFEST replay. | |
249 | if (seq_of_batch <= seq) { | |
250 | continue; | |
251 | } | |
1e59de90 | 252 | auto curr_log_num = std::numeric_limits<uint64_t>::max(); |
f67539c2 TL |
253 | if (cfd_to_current_log_.count(cfd) > 0) { |
254 | curr_log_num = cfd_to_current_log_[cfd]; | |
255 | } | |
256 | // If the active memtable contains records added by replaying an | |
257 | // earlier WAL, then we need to seal the memtable, add it to the | |
258 | // immutable memtable list and create a new active memtable. | |
1e59de90 TL |
259 | if (!cfd->mem()->IsEmpty() && |
260 | (curr_log_num == std::numeric_limits<uint64_t>::max() || | |
261 | curr_log_num != log_number)) { | |
f67539c2 TL |
262 | const MutableCFOptions mutable_cf_options = |
263 | *cfd->GetLatestMutableCFOptions(); | |
264 | MemTable* new_mem = | |
265 | cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch); | |
266 | cfd->mem()->SetNextLogNumber(log_number); | |
1e59de90 | 267 | cfd->mem()->ConstructFragmentedRangeTombstones(); |
f67539c2 TL |
268 | cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); |
269 | new_mem->Ref(); | |
270 | cfd->SetMemtable(new_mem); | |
271 | } | |
272 | } | |
273 | bool has_valid_writes = false; | |
274 | status = WriteBatchInternal::InsertInto( | |
275 | &batch, column_family_memtables_.get(), | |
276 | nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/, | |
277 | true, log_number, this, false /* concurrent_memtable_writes */, | |
278 | next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); | |
279 | } | |
280 | // If column family was not found, it might mean that the WAL write | |
281 | // batch references to the column family that was dropped after the | |
282 | // insert. We don't want to fail the whole write batch in that case -- | |
283 | // we just ignore the update. | |
284 | // That's why we set ignore missing column families to true | |
285 | // passing null flush_scheduler will disable memtable flushing which is | |
286 | // needed for secondary instances | |
287 | if (status.ok()) { | |
288 | for (const auto id : column_family_ids) { | |
289 | ColumnFamilyData* cfd = | |
290 | versions_->GetColumnFamilySet()->GetColumnFamily(id); | |
291 | if (cfd == nullptr) { | |
292 | continue; | |
293 | } | |
294 | std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter = | |
295 | cfd_to_current_log_.find(cfd); | |
296 | if (iter == cfd_to_current_log_.end()) { | |
297 | cfd_to_current_log_.insert({cfd, log_number}); | |
298 | } else if (log_number > iter->second) { | |
299 | iter->second = log_number; | |
300 | } | |
301 | } | |
302 | auto last_sequence = *next_sequence - 1; | |
303 | if ((*next_sequence != kMaxSequenceNumber) && | |
304 | (versions_->LastSequence() <= last_sequence)) { | |
305 | versions_->SetLastAllocatedSequence(last_sequence); | |
306 | versions_->SetLastPublishedSequence(last_sequence); | |
307 | versions_->SetLastSequence(last_sequence); | |
308 | } | |
309 | } else { | |
310 | // We are treating this as a failure while reading since we read valid | |
311 | // blocks that do not form coherent data | |
312 | reader->GetReporter()->Corruption(record.size(), status); | |
313 | } | |
314 | } | |
20effc67 TL |
315 | if (status.ok() && !wal_read_status->ok()) { |
316 | status = *wal_read_status; | |
317 | } | |
f67539c2 TL |
318 | if (!status.ok()) { |
319 | return status; | |
320 | } | |
321 | } | |
322 | // remove logreaders from map after successfully recovering the WAL | |
323 | if (log_readers_.size() > 1) { | |
324 | auto erase_iter = log_readers_.begin(); | |
325 | std::advance(erase_iter, log_readers_.size() - 1); | |
326 | log_readers_.erase(log_readers_.begin(), erase_iter); | |
327 | } | |
328 | return status; | |
329 | } | |
330 | ||
331 | // Implementation of the DB interface | |
332 | Status DBImplSecondary::Get(const ReadOptions& read_options, | |
333 | ColumnFamilyHandle* column_family, const Slice& key, | |
334 | PinnableSlice* value) { | |
1e59de90 TL |
335 | return GetImpl(read_options, column_family, key, value, |
336 | /*timestamp*/ nullptr); | |
337 | } | |
338 | ||
339 | Status DBImplSecondary::Get(const ReadOptions& read_options, | |
340 | ColumnFamilyHandle* column_family, const Slice& key, | |
341 | PinnableSlice* value, std::string* timestamp) { | |
342 | return GetImpl(read_options, column_family, key, value, timestamp); | |
f67539c2 TL |
343 | } |
344 | ||
345 | Status DBImplSecondary::GetImpl(const ReadOptions& read_options, | |
346 | ColumnFamilyHandle* column_family, | |
1e59de90 TL |
347 | const Slice& key, PinnableSlice* pinnable_val, |
348 | std::string* timestamp) { | |
f67539c2 | 349 | assert(pinnable_val != nullptr); |
1e59de90 TL |
350 | PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock); |
351 | StopWatch sw(immutable_db_options_.clock, stats_, DB_GET); | |
f67539c2 TL |
352 | PERF_TIMER_GUARD(get_snapshot_time); |
353 | ||
1e59de90 TL |
354 | assert(column_family); |
355 | if (read_options.timestamp) { | |
356 | const Status s = FailIfTsMismatchCf( | |
357 | column_family, *(read_options.timestamp), /*ts_for_read=*/true); | |
358 | if (!s.ok()) { | |
359 | return s; | |
360 | } | |
361 | } else { | |
362 | const Status s = FailIfCfHasTs(column_family); | |
363 | if (!s.ok()) { | |
364 | return s; | |
365 | } | |
366 | } | |
367 | ||
368 | // Clear the timestamp for returning results so that we can distinguish | |
369 | // between tombstone or key that has never been written later. | |
370 | if (timestamp) { | |
371 | timestamp->clear(); | |
372 | } | |
373 | ||
f67539c2 TL |
374 | auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family); |
375 | ColumnFamilyData* cfd = cfh->cfd(); | |
376 | if (tracer_) { | |
377 | InstrumentedMutexLock lock(&trace_mutex_); | |
378 | if (tracer_) { | |
379 | tracer_->Get(column_family, key); | |
380 | } | |
381 | } | |
382 | // Acquire SuperVersion | |
383 | SuperVersion* super_version = GetAndRefSuperVersion(cfd); | |
384 | SequenceNumber snapshot = versions_->LastSequence(); | |
1e59de90 | 385 | GetWithTimestampReadCallback read_cb(snapshot); |
f67539c2 TL |
386 | MergeContext merge_context; |
387 | SequenceNumber max_covering_tombstone_seq = 0; | |
388 | Status s; | |
1e59de90 | 389 | LookupKey lkey(key, snapshot, read_options.timestamp); |
f67539c2 TL |
390 | PERF_TIMER_STOP(get_snapshot_time); |
391 | ||
392 | bool done = false; | |
1e59de90 TL |
393 | const Comparator* ucmp = column_family->GetComparator(); |
394 | assert(ucmp); | |
395 | std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr; | |
20effc67 | 396 | if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), |
1e59de90 TL |
397 | /*columns=*/nullptr, ts, &s, &merge_context, |
398 | &max_covering_tombstone_seq, read_options, | |
399 | false /* immutable_memtable */, &read_cb)) { | |
f67539c2 TL |
400 | done = true; |
401 | pinnable_val->PinSelf(); | |
402 | RecordTick(stats_, MEMTABLE_HIT); | |
403 | } else if ((s.ok() || s.IsMergeInProgress()) && | |
404 | super_version->imm->Get( | |
1e59de90 TL |
405 | lkey, pinnable_val->GetSelf(), /*columns=*/nullptr, ts, &s, |
406 | &merge_context, &max_covering_tombstone_seq, read_options, | |
407 | &read_cb)) { | |
f67539c2 TL |
408 | done = true; |
409 | pinnable_val->PinSelf(); | |
410 | RecordTick(stats_, MEMTABLE_HIT); | |
411 | } | |
412 | if (!done && !s.ok() && !s.IsMergeInProgress()) { | |
413 | ReturnAndCleanupSuperVersion(cfd, super_version); | |
414 | return s; | |
415 | } | |
416 | if (!done) { | |
417 | PERF_TIMER_GUARD(get_from_output_files_time); | |
1e59de90 TL |
418 | PinnedIteratorsManager pinned_iters_mgr; |
419 | super_version->current->Get( | |
420 | read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s, | |
421 | &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr, | |
422 | /*value_found*/ nullptr, | |
423 | /*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr, | |
424 | /*do_merge*/ true); | |
f67539c2 TL |
425 | RecordTick(stats_, MEMTABLE_MISS); |
426 | } | |
427 | { | |
428 | PERF_TIMER_GUARD(get_post_process_time); | |
429 | ReturnAndCleanupSuperVersion(cfd, super_version); | |
430 | RecordTick(stats_, NUMBER_KEYS_READ); | |
431 | size_t size = pinnable_val->size(); | |
432 | RecordTick(stats_, BYTES_READ, size); | |
433 | RecordTimeToHistogram(stats_, BYTES_PER_READ, size); | |
434 | PERF_COUNTER_ADD(get_read_bytes, size); | |
435 | } | |
436 | return s; | |
437 | } | |
438 | ||
439 | Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, | |
440 | ColumnFamilyHandle* column_family) { | |
441 | if (read_options.managed) { | |
442 | return NewErrorIterator( | |
443 | Status::NotSupported("Managed iterator is not supported anymore.")); | |
444 | } | |
445 | if (read_options.read_tier == kPersistedTier) { | |
446 | return NewErrorIterator(Status::NotSupported( | |
447 | "ReadTier::kPersistedData is not yet supported in iterators.")); | |
448 | } | |
1e59de90 TL |
449 | |
450 | assert(column_family); | |
451 | if (read_options.timestamp) { | |
452 | const Status s = FailIfTsMismatchCf( | |
453 | column_family, *(read_options.timestamp), /*ts_for_read=*/true); | |
454 | if (!s.ok()) { | |
455 | return NewErrorIterator(s); | |
456 | } | |
457 | } else { | |
458 | const Status s = FailIfCfHasTs(column_family); | |
459 | if (!s.ok()) { | |
460 | return NewErrorIterator(s); | |
461 | } | |
462 | } | |
463 | ||
f67539c2 | 464 | Iterator* result = nullptr; |
20effc67 | 465 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
f67539c2 TL |
466 | auto cfd = cfh->cfd(); |
467 | ReadCallback* read_callback = nullptr; // No read callback provided. | |
468 | if (read_options.tailing) { | |
469 | return NewErrorIterator(Status::NotSupported( | |
470 | "tailing iterator not supported in secondary mode")); | |
471 | } else if (read_options.snapshot != nullptr) { | |
472 | // TODO (yanqin) support snapshot. | |
473 | return NewErrorIterator( | |
474 | Status::NotSupported("snapshot not supported in secondary mode")); | |
475 | } else { | |
1e59de90 | 476 | SequenceNumber snapshot(kMaxSequenceNumber); |
f67539c2 TL |
477 | result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); |
478 | } | |
479 | return result; | |
480 | } | |
481 | ||
482 | ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( | |
483 | const ReadOptions& read_options, ColumnFamilyData* cfd, | |
1e59de90 TL |
484 | SequenceNumber snapshot, ReadCallback* read_callback, |
485 | bool expose_blob_index, bool allow_refresh) { | |
f67539c2 TL |
486 | assert(nullptr != cfd); |
487 | SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); | |
1e59de90 TL |
488 | assert(snapshot == kMaxSequenceNumber); |
489 | snapshot = versions_->LastSequence(); | |
490 | assert(snapshot != kMaxSequenceNumber); | |
f67539c2 TL |
491 | auto db_iter = NewArenaWrappedDbIterator( |
492 | env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, | |
1e59de90 | 493 | super_version->current, snapshot, |
f67539c2 | 494 | super_version->mutable_cf_options.max_sequential_skip_in_iterations, |
1e59de90 TL |
495 | super_version->version_number, read_callback, this, cfd, |
496 | expose_blob_index, read_options.snapshot ? false : allow_refresh); | |
20effc67 TL |
497 | auto internal_iter = NewInternalIterator( |
498 | db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), | |
1e59de90 | 499 | snapshot, /* allow_unprepared_value */ true, db_iter); |
f67539c2 TL |
500 | db_iter->SetIterUnderDBIter(internal_iter); |
501 | return db_iter; | |
502 | } | |
503 | ||
504 | Status DBImplSecondary::NewIterators( | |
505 | const ReadOptions& read_options, | |
506 | const std::vector<ColumnFamilyHandle*>& column_families, | |
507 | std::vector<Iterator*>* iterators) { | |
508 | if (read_options.managed) { | |
509 | return Status::NotSupported("Managed iterator is not supported anymore."); | |
510 | } | |
511 | if (read_options.read_tier == kPersistedTier) { | |
512 | return Status::NotSupported( | |
513 | "ReadTier::kPersistedData is not yet supported in iterators."); | |
514 | } | |
515 | ReadCallback* read_callback = nullptr; // No read callback provided. | |
516 | if (iterators == nullptr) { | |
517 | return Status::InvalidArgument("iterators not allowed to be nullptr"); | |
518 | } | |
1e59de90 TL |
519 | |
520 | if (read_options.timestamp) { | |
521 | for (auto* cf : column_families) { | |
522 | assert(cf); | |
523 | const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp), | |
524 | /*ts_for_read=*/true); | |
525 | if (!s.ok()) { | |
526 | return s; | |
527 | } | |
528 | } | |
529 | } else { | |
530 | for (auto* cf : column_families) { | |
531 | assert(cf); | |
532 | const Status s = FailIfCfHasTs(cf); | |
533 | if (!s.ok()) { | |
534 | return s; | |
535 | } | |
536 | } | |
537 | } | |
f67539c2 TL |
538 | iterators->clear(); |
539 | iterators->reserve(column_families.size()); | |
540 | if (read_options.tailing) { | |
541 | return Status::NotSupported( | |
542 | "tailing iterator not supported in secondary mode"); | |
543 | } else if (read_options.snapshot != nullptr) { | |
544 | // TODO (yanqin) support snapshot. | |
545 | return Status::NotSupported("snapshot not supported in secondary mode"); | |
546 | } else { | |
1e59de90 | 547 | SequenceNumber read_seq(kMaxSequenceNumber); |
f67539c2 TL |
548 | for (auto cfh : column_families) { |
549 | ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); | |
550 | iterators->push_back( | |
551 | NewIteratorImpl(read_options, cfd, read_seq, read_callback)); | |
552 | } | |
553 | } | |
554 | return Status::OK(); | |
555 | } | |
556 | ||
557 | Status DBImplSecondary::CheckConsistency() { | |
558 | mutex_.AssertHeld(); | |
559 | Status s = DBImpl::CheckConsistency(); | |
560 | // If DBImpl::CheckConsistency() which is stricter returns success, then we | |
561 | // do not need to give a second chance. | |
562 | if (s.ok()) { | |
563 | return s; | |
564 | } | |
565 | // It's possible that DBImpl::CheckConssitency() can fail because the primary | |
566 | // may have removed certain files, causing the GetFileSize(name) call to | |
567 | // fail and returning a PathNotFound. In this case, we take a best-effort | |
568 | // approach and just proceed. | |
569 | TEST_SYNC_POINT_CALLBACK( | |
570 | "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s); | |
571 | ||
572 | if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) { | |
573 | return Status::OK(); | |
574 | } | |
575 | ||
576 | std::vector<LiveFileMetaData> metadata; | |
577 | versions_->GetLiveFilesMetaData(&metadata); | |
578 | ||
579 | std::string corruption_messages; | |
580 | for (const auto& md : metadata) { | |
581 | // md.name has a leading "/". | |
582 | std::string file_path = md.db_path + md.name; | |
583 | ||
584 | uint64_t fsize = 0; | |
585 | s = env_->GetFileSize(file_path, &fsize); | |
586 | if (!s.ok() && | |
587 | (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() || | |
588 | s.IsPathNotFound())) { | |
589 | s = Status::OK(); | |
590 | } | |
591 | if (!s.ok()) { | |
592 | corruption_messages += | |
593 | "Can't access " + md.name + ": " + s.ToString() + "\n"; | |
594 | } | |
595 | } | |
596 | return corruption_messages.empty() ? Status::OK() | |
597 | : Status::Corruption(corruption_messages); | |
598 | } | |
599 | ||
600 | Status DBImplSecondary::TryCatchUpWithPrimary() { | |
601 | assert(versions_.get() != nullptr); | |
602 | assert(manifest_reader_.get() != nullptr); | |
603 | Status s; | |
604 | // read the manifest and apply new changes to the secondary instance | |
605 | std::unordered_set<ColumnFamilyData*> cfds_changed; | |
606 | JobContext job_context(0, true /*create_superversion*/); | |
607 | { | |
608 | InstrumentedMutexLock lock_guard(&mutex_); | |
609 | s = static_cast_with_check<ReactiveVersionSet>(versions_.get()) | |
1e59de90 TL |
610 | ->ReadAndApply(&mutex_, &manifest_reader_, |
611 | manifest_reader_status_.get(), &cfds_changed); | |
f67539c2 TL |
612 | |
613 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, | |
614 | static_cast<uint64_t>(versions_->LastSequence())); | |
615 | for (ColumnFamilyData* cfd : cfds_changed) { | |
616 | if (cfd->IsDropped()) { | |
617 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", | |
618 | cfd->GetName().c_str()); | |
619 | continue; | |
620 | } | |
621 | VersionStorageInfo::LevelSummaryStorage tmp; | |
622 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
623 | "[%s] Level summary: %s\n", cfd->GetName().c_str(), | |
624 | cfd->current()->storage_info()->LevelSummary(&tmp)); | |
625 | } | |
626 | ||
627 | // list wal_dir to discover new WALs and apply new changes to the secondary | |
628 | // instance | |
629 | if (s.ok()) { | |
630 | s = FindAndRecoverLogFiles(&cfds_changed, &job_context); | |
631 | } | |
632 | if (s.IsPathNotFound()) { | |
633 | ROCKS_LOG_INFO( | |
634 | immutable_db_options_.info_log, | |
635 | "Secondary tries to read WAL, but WAL file(s) have already " | |
636 | "been purged by primary."); | |
637 | s = Status::OK(); | |
638 | } | |
639 | if (s.ok()) { | |
640 | for (auto cfd : cfds_changed) { | |
641 | cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), | |
642 | &job_context.memtables_to_free); | |
643 | auto& sv_context = job_context.superversion_contexts.back(); | |
644 | cfd->InstallSuperVersion(&sv_context, &mutex_); | |
645 | sv_context.NewSuperVersion(); | |
646 | } | |
647 | } | |
648 | } | |
649 | job_context.Clean(); | |
650 | ||
651 | // Cleanup unused, obsolete files. | |
652 | JobContext purge_files_job_context(0); | |
653 | { | |
654 | InstrumentedMutexLock lock_guard(&mutex_); | |
655 | // Currently, secondary instance does not own the database files, thus it | |
656 | // is unnecessary for the secondary to force full scan. | |
657 | FindObsoleteFiles(&purge_files_job_context, /*force=*/false); | |
658 | } | |
659 | if (purge_files_job_context.HaveSomethingToDelete()) { | |
660 | PurgeObsoleteFiles(purge_files_job_context); | |
661 | } | |
662 | purge_files_job_context.Clean(); | |
663 | return s; | |
664 | } | |
665 | ||
666 | Status DB::OpenAsSecondary(const Options& options, const std::string& dbname, | |
667 | const std::string& secondary_path, DB** dbptr) { | |
668 | *dbptr = nullptr; | |
669 | ||
670 | DBOptions db_options(options); | |
671 | ColumnFamilyOptions cf_options(options); | |
672 | std::vector<ColumnFamilyDescriptor> column_families; | |
673 | column_families.emplace_back(kDefaultColumnFamilyName, cf_options); | |
674 | std::vector<ColumnFamilyHandle*> handles; | |
675 | ||
676 | Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path, | |
677 | column_families, &handles, dbptr); | |
678 | if (s.ok()) { | |
679 | assert(handles.size() == 1); | |
680 | delete handles[0]; | |
681 | } | |
682 | return s; | |
683 | } | |
684 | ||
685 | Status DB::OpenAsSecondary( | |
686 | const DBOptions& db_options, const std::string& dbname, | |
687 | const std::string& secondary_path, | |
688 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
689 | std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) { | |
690 | *dbptr = nullptr; | |
f67539c2 TL |
691 | |
692 | DBOptions tmp_opts(db_options); | |
693 | Status s; | |
694 | if (nullptr == tmp_opts.info_log) { | |
695 | s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log); | |
696 | if (!s.ok()) { | |
697 | tmp_opts.info_log = nullptr; | |
1e59de90 | 698 | return s; |
f67539c2 TL |
699 | } |
700 | } | |
701 | ||
1e59de90 TL |
702 | assert(tmp_opts.info_log != nullptr); |
703 | if (db_options.max_open_files != -1) { | |
704 | std::ostringstream oss; | |
705 | oss << "The primary instance may delete all types of files after they " | |
706 | "become obsolete. The application can coordinate the primary and " | |
707 | "secondary so that primary does not delete/rename files that are " | |
708 | "currently being used by the secondary. Alternatively, a custom " | |
709 | "Env/FS can be provided such that files become inaccessible only " | |
710 | "after all primary and secondaries indicate that they are obsolete " | |
711 | "and deleted. If the above two are not possible, you can open the " | |
712 | "secondary instance with `max_open_files==-1` so that secondary " | |
713 | "will eagerly keep all table files open. Even if a file is deleted, " | |
714 | "its content can still be accessed via a prior open file " | |
715 | "descriptor. This is a hacky workaround for only table files. If " | |
716 | "none of the above is done, then point lookup or " | |
717 | "range scan via the secondary instance can result in IOError: file " | |
718 | "not found. This can be resolved by retrying " | |
719 | "TryCatchUpWithPrimary()."; | |
720 | ROCKS_LOG_WARN(tmp_opts.info_log, "%s", oss.str().c_str()); | |
721 | } | |
722 | ||
f67539c2 | 723 | handles->clear(); |
1e59de90 | 724 | DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path); |
f67539c2 TL |
725 | impl->versions_.reset(new ReactiveVersionSet( |
726 | dbname, &impl->immutable_db_options_, impl->file_options_, | |
727 | impl->table_cache_.get(), impl->write_buffer_manager_, | |
20effc67 | 728 | &impl->write_controller_, impl->io_tracer_)); |
f67539c2 TL |
729 | impl->column_family_memtables_.reset( |
730 | new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); | |
1e59de90 | 731 | impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); |
f67539c2 TL |
732 | |
733 | impl->mutex_.Lock(); | |
734 | s = impl->Recover(column_families, true, false, false); | |
735 | if (s.ok()) { | |
736 | for (auto cf : column_families) { | |
737 | auto cfd = | |
738 | impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); | |
739 | if (nullptr == cfd) { | |
20effc67 | 740 | s = Status::InvalidArgument("Column family not found", cf.name); |
f67539c2 TL |
741 | break; |
742 | } | |
743 | handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); | |
744 | } | |
745 | } | |
746 | SuperVersionContext sv_context(true /* create_superversion */); | |
747 | if (s.ok()) { | |
748 | for (auto cfd : *impl->versions_->GetColumnFamilySet()) { | |
749 | sv_context.NewSuperVersion(); | |
750 | cfd->InstallSuperVersion(&sv_context, &impl->mutex_); | |
751 | } | |
752 | } | |
753 | impl->mutex_.Unlock(); | |
754 | sv_context.Clean(); | |
755 | if (s.ok()) { | |
756 | *dbptr = impl; | |
757 | for (auto h : *handles) { | |
758 | impl->NewThreadStatusCfInfo( | |
20effc67 | 759 | static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd()); |
f67539c2 TL |
760 | } |
761 | } else { | |
762 | for (auto h : *handles) { | |
763 | delete h; | |
764 | } | |
765 | handles->clear(); | |
766 | delete impl; | |
767 | } | |
768 | return s; | |
769 | } | |
1e59de90 TL |
770 | |
771 | Status DBImplSecondary::CompactWithoutInstallation( | |
772 | const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh, | |
773 | const CompactionServiceInput& input, CompactionServiceResult* result) { | |
774 | if (options.canceled && options.canceled->load(std::memory_order_acquire)) { | |
775 | return Status::Incomplete(Status::SubCode::kManualCompactionPaused); | |
776 | } | |
777 | InstrumentedMutexLock l(&mutex_); | |
778 | auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd(); | |
779 | if (!cfd) { | |
780 | return Status::InvalidArgument("Cannot find column family" + | |
781 | cfh->GetName()); | |
782 | } | |
783 | ||
784 | std::unordered_set<uint64_t> input_set; | |
785 | for (const auto& file_name : input.input_files) { | |
786 | input_set.insert(TableFileNameToNumber(file_name)); | |
787 | } | |
788 | ||
789 | auto* version = cfd->current(); | |
790 | ||
791 | ColumnFamilyMetaData cf_meta; | |
792 | version->GetColumnFamilyMetaData(&cf_meta); | |
793 | ||
794 | const MutableCFOptions* mutable_cf_options = cfd->GetLatestMutableCFOptions(); | |
795 | ColumnFamilyOptions cf_options = cfd->GetLatestCFOptions(); | |
796 | VersionStorageInfo* vstorage = version->storage_info(); | |
797 | ||
798 | // Use comp_options to reuse some CompactFiles functions | |
799 | CompactionOptions comp_options; | |
800 | comp_options.compression = kDisableCompressionOption; | |
801 | comp_options.output_file_size_limit = MaxFileSizeForLevel( | |
802 | *mutable_cf_options, input.output_level, cf_options.compaction_style, | |
803 | vstorage->base_level(), cf_options.level_compaction_dynamic_level_bytes); | |
804 | ||
805 | std::vector<CompactionInputFiles> input_files; | |
806 | Status s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( | |
807 | &input_files, &input_set, vstorage, comp_options); | |
808 | if (!s.ok()) { | |
809 | return s; | |
810 | } | |
811 | ||
812 | std::unique_ptr<Compaction> c; | |
813 | assert(cfd->compaction_picker()); | |
814 | c.reset(cfd->compaction_picker()->CompactFiles( | |
815 | comp_options, input_files, input.output_level, vstorage, | |
816 | *mutable_cf_options, mutable_db_options_, 0)); | |
817 | assert(c != nullptr); | |
818 | ||
819 | c->SetInputVersion(version); | |
820 | ||
821 | // Create output directory if it's not existed yet | |
822 | std::unique_ptr<FSDirectory> output_dir; | |
823 | s = CreateAndNewDirectory(fs_.get(), secondary_path_, &output_dir); | |
824 | if (!s.ok()) { | |
825 | return s; | |
826 | } | |
827 | ||
828 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, | |
829 | immutable_db_options_.info_log.get()); | |
830 | ||
831 | const int job_id = next_job_id_.fetch_add(1); | |
832 | ||
833 | // use primary host's db_id for running the compaction, but db_session_id is | |
834 | // using the local one, which is to make sure the unique id is unique from | |
835 | // the remote compactors. Because the id is generated from db_id, | |
836 | // db_session_id and orig_file_number, unlike the local compaction, remote | |
837 | // compaction cannot guarantee the uniqueness of orig_file_number, the file | |
838 | // number is only assigned when compaction is done. | |
839 | CompactionServiceCompactionJob compaction_job( | |
840 | job_id, c.get(), immutable_db_options_, mutable_db_options_, | |
841 | file_options_for_compaction_, versions_.get(), &shutting_down_, | |
842 | &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_, | |
843 | input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_, | |
844 | options.canceled ? *options.canceled : kManualCompactionCanceledFalse_, | |
845 | input.db_id, db_session_id_, secondary_path_, input, result); | |
846 | ||
847 | mutex_.Unlock(); | |
848 | s = compaction_job.Run(); | |
849 | mutex_.Lock(); | |
850 | ||
851 | // clean up | |
852 | compaction_job.io_status().PermitUncheckedError(); | |
853 | compaction_job.CleanupCompaction(); | |
854 | c->ReleaseCompactionFiles(s); | |
855 | c.reset(); | |
856 | ||
857 | TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End", | |
858 | &s); | |
859 | result->status = s; | |
860 | return s; | |
861 | } | |
862 | ||
863 | Status DB::OpenAndCompact( | |
864 | const OpenAndCompactOptions& options, const std::string& name, | |
865 | const std::string& output_directory, const std::string& input, | |
866 | std::string* output, | |
867 | const CompactionServiceOptionsOverride& override_options) { | |
868 | if (options.canceled && options.canceled->load(std::memory_order_acquire)) { | |
869 | return Status::Incomplete(Status::SubCode::kManualCompactionPaused); | |
870 | } | |
871 | CompactionServiceInput compaction_input; | |
872 | Status s = CompactionServiceInput::Read(input, &compaction_input); | |
873 | if (!s.ok()) { | |
874 | return s; | |
875 | } | |
876 | ||
877 | compaction_input.db_options.max_open_files = -1; | |
878 | compaction_input.db_options.compaction_service = nullptr; | |
879 | if (compaction_input.db_options.statistics) { | |
880 | compaction_input.db_options.statistics.reset(); | |
881 | } | |
882 | compaction_input.db_options.env = override_options.env; | |
883 | compaction_input.db_options.file_checksum_gen_factory = | |
884 | override_options.file_checksum_gen_factory; | |
885 | compaction_input.db_options.statistics = override_options.statistics; | |
886 | compaction_input.column_family.options.comparator = | |
887 | override_options.comparator; | |
888 | compaction_input.column_family.options.merge_operator = | |
889 | override_options.merge_operator; | |
890 | compaction_input.column_family.options.compaction_filter = | |
891 | override_options.compaction_filter; | |
892 | compaction_input.column_family.options.compaction_filter_factory = | |
893 | override_options.compaction_filter_factory; | |
894 | compaction_input.column_family.options.prefix_extractor = | |
895 | override_options.prefix_extractor; | |
896 | compaction_input.column_family.options.table_factory = | |
897 | override_options.table_factory; | |
898 | compaction_input.column_family.options.sst_partitioner_factory = | |
899 | override_options.sst_partitioner_factory; | |
900 | compaction_input.column_family.options.table_properties_collector_factories = | |
901 | override_options.table_properties_collector_factories; | |
902 | compaction_input.db_options.listeners = override_options.listeners; | |
903 | ||
904 | std::vector<ColumnFamilyDescriptor> column_families; | |
905 | column_families.push_back(compaction_input.column_family); | |
906 | // TODO: we have to open default CF, because of an implementation limitation, | |
907 | // currently we just use the same CF option from input, which is not collect | |
908 | // and open may fail. | |
909 | if (compaction_input.column_family.name != kDefaultColumnFamilyName) { | |
910 | column_families.emplace_back(kDefaultColumnFamilyName, | |
911 | compaction_input.column_family.options); | |
912 | } | |
913 | ||
914 | DB* db; | |
915 | std::vector<ColumnFamilyHandle*> handles; | |
916 | ||
917 | s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory, | |
918 | column_families, &handles, &db); | |
919 | if (!s.ok()) { | |
920 | return s; | |
921 | } | |
922 | ||
923 | CompactionServiceResult compaction_result; | |
924 | DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db); | |
925 | assert(handles.size() > 0); | |
926 | s = db_secondary->CompactWithoutInstallation( | |
927 | options, handles[0], compaction_input, &compaction_result); | |
928 | ||
929 | Status serialization_status = compaction_result.Write(output); | |
930 | ||
931 | for (auto& handle : handles) { | |
932 | delete handle; | |
933 | } | |
934 | delete db; | |
935 | if (s.ok()) { | |
936 | return serialization_status; | |
937 | } | |
938 | return s; | |
939 | } | |
940 | ||
941 | Status DB::OpenAndCompact( | |
942 | const std::string& name, const std::string& output_directory, | |
943 | const std::string& input, std::string* output, | |
944 | const CompactionServiceOptionsOverride& override_options) { | |
945 | return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input, | |
946 | output, override_options); | |
947 | } | |
948 | ||
f67539c2 TL |
949 | #else // !ROCKSDB_LITE |
950 | ||
951 | Status DB::OpenAsSecondary(const Options& /*options*/, | |
952 | const std::string& /*name*/, | |
953 | const std::string& /*secondary_path*/, | |
954 | DB** /*dbptr*/) { | |
955 | return Status::NotSupported("Not supported in ROCKSDB_LITE."); | |
956 | } | |
957 | ||
958 | Status DB::OpenAsSecondary( | |
959 | const DBOptions& /*db_options*/, const std::string& /*dbname*/, | |
960 | const std::string& /*secondary_path*/, | |
961 | const std::vector<ColumnFamilyDescriptor>& /*column_families*/, | |
962 | std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) { | |
963 | return Status::NotSupported("Not supported in ROCKSDB_LITE."); | |
964 | } | |
965 | #endif // !ROCKSDB_LITE | |
966 | ||
967 | } // namespace ROCKSDB_NAMESPACE |