]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl/db_impl_secondary.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_secondary.cc
CommitLineData
f67539c2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "db/db_impl/db_impl_secondary.h"
7
8#include <cinttypes>
9
10#include "db/arena_wrapped_db_iter.h"
11#include "db/merge_context.h"
12#include "logging/auto_roll_logger.h"
1e59de90 13#include "logging/logging.h"
f67539c2 14#include "monitoring/perf_context_imp.h"
1e59de90 15#include "rocksdb/configurable.h"
f67539c2
TL
16#include "util/cast_util.h"
17
18namespace ROCKSDB_NAMESPACE {
19
20#ifndef ROCKSDB_LITE
21DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
1e59de90
TL
22 const std::string& dbname,
23 std::string secondary_path)
24 : DBImpl(db_options, dbname, false, true, true),
25 secondary_path_(std::move(secondary_path)) {
f67539c2
TL
26 ROCKS_LOG_INFO(immutable_db_options_.info_log,
27 "Opening the db in secondary mode");
28 LogFlush(immutable_db_options_.info_log);
29}
30
31DBImplSecondary::~DBImplSecondary() {}
32
33Status DBImplSecondary::Recover(
34 const std::vector<ColumnFamilyDescriptor>& column_families,
20effc67 35 bool /*readonly*/, bool /*error_if_wal_file_exists*/,
1e59de90
TL
36 bool /*error_if_data_exists_in_wals*/, uint64_t*,
37 RecoveryContext* /*recovery_ctx*/) {
f67539c2
TL
38 mutex_.AssertHeld();
39
40 JobContext job_context(0);
41 Status s;
42 s = static_cast<ReactiveVersionSet*>(versions_.get())
43 ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
44 &manifest_reader_status_);
45 if (!s.ok()) {
1e59de90
TL
46 if (manifest_reader_status_) {
47 manifest_reader_status_->PermitUncheckedError();
48 }
f67539c2
TL
49 return s;
50 }
51 if (immutable_db_options_.paranoid_checks && s.ok()) {
52 s = CheckConsistency();
53 }
54 // Initial max_total_in_memory_state_ before recovery logs.
55 max_total_in_memory_state_ = 0;
56 for (auto cfd : *versions_->GetColumnFamilySet()) {
57 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
58 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
59 mutable_cf_options->max_write_buffer_number;
60 }
61 if (s.ok()) {
62 default_cf_handle_ = new ColumnFamilyHandleImpl(
63 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
64 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
f67539c2
TL
65
66 std::unordered_set<ColumnFamilyData*> cfds_changed;
67 s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
68 }
69
70 if (s.IsPathNotFound()) {
71 ROCKS_LOG_INFO(immutable_db_options_.info_log,
72 "Secondary tries to read WAL, but WAL file(s) have already "
73 "been purged by primary.");
74 s = Status::OK();
75 }
76 // TODO: update options_file_number_ needed?
77
78 job_context.Clean();
79 return s;
80}
81
82// find new WAL and apply them in order to the secondary instance
83Status DBImplSecondary::FindAndRecoverLogFiles(
84 std::unordered_set<ColumnFamilyData*>* cfds_changed,
85 JobContext* job_context) {
86 assert(nullptr != cfds_changed);
87 assert(nullptr != job_context);
88 Status s;
89 std::vector<uint64_t> logs;
90 s = FindNewLogNumbers(&logs);
91 if (s.ok() && !logs.empty()) {
92 SequenceNumber next_sequence(kMaxSequenceNumber);
93 s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
94 }
95 return s;
96}
97
98// List wal_dir and find all new WALs, return these log numbers
99Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
100 assert(logs != nullptr);
101 std::vector<std::string> filenames;
102 Status s;
1e59de90
TL
103 IOOptions io_opts;
104 io_opts.do_not_recurse = true;
105 s = immutable_db_options_.fs->GetChildren(immutable_db_options_.GetWalDir(),
106 io_opts, &filenames,
107 /*IODebugContext*=*/nullptr);
f67539c2
TL
108 if (s.IsNotFound()) {
109 return Status::InvalidArgument("Failed to open wal_dir",
1e59de90 110 immutable_db_options_.GetWalDir());
f67539c2
TL
111 } else if (!s.ok()) {
112 return s;
113 }
114
115 // if log_readers_ is non-empty, it means we have applied all logs with log
116 // numbers smaller than the smallest log in log_readers_, so there is no
117 // need to pass these logs to RecoverLogFiles
118 uint64_t log_number_min = 0;
119 if (!log_readers_.empty()) {
120 log_number_min = log_readers_.begin()->first;
121 }
122 for (size_t i = 0; i < filenames.size(); i++) {
123 uint64_t number;
124 FileType type;
20effc67 125 if (ParseFileName(filenames[i], &number, &type) && type == kWalFile &&
f67539c2
TL
126 number >= log_number_min) {
127 logs->push_back(number);
128 }
129 }
130 // Recover logs in the order that they were generated
131 if (!logs->empty()) {
132 std::sort(logs->begin(), logs->end());
133 }
134 return s;
135}
136
137Status DBImplSecondary::MaybeInitLogReader(
138 uint64_t log_number, log::FragmentBufferedReader** log_reader) {
139 auto iter = log_readers_.find(log_number);
140 // make sure the log file is still present
141 if (iter == log_readers_.end() ||
142 iter->second->reader_->GetLogNumber() != log_number) {
143 // delete the obsolete log reader if log number mismatch
144 if (iter != log_readers_.end()) {
145 log_readers_.erase(iter);
146 }
147 // initialize log reader from log_number
148 // TODO: min_log_number_to_keep_2pc check needed?
149 // Open the log file
1e59de90
TL
150 std::string fname =
151 LogFileName(immutable_db_options_.GetWalDir(), log_number);
f67539c2
TL
152 ROCKS_LOG_INFO(immutable_db_options_.info_log,
153 "Recovering log #%" PRIu64 " mode %d", log_number,
154 static_cast<int>(immutable_db_options_.wal_recovery_mode));
155
156 std::unique_ptr<SequentialFileReader> file_reader;
157 {
158 std::unique_ptr<FSSequentialFile> file;
159 Status status = fs_->NewSequentialFile(
1e59de90 160 fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
f67539c2
TL
161 if (!status.ok()) {
162 *log_reader = nullptr;
163 return status;
164 }
165 file_reader.reset(new SequentialFileReader(
20effc67
TL
166 std::move(file), fname, immutable_db_options_.log_readahead_size,
167 io_tracer_));
f67539c2
TL
168 }
169
170 // Create the log reader.
171 LogReaderContainer* log_reader_container = new LogReaderContainer(
172 env_, immutable_db_options_.info_log, std::move(fname),
173 std::move(file_reader), log_number);
174 log_readers_.insert(std::make_pair(
175 log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
176 }
177 iter = log_readers_.find(log_number);
178 assert(iter != log_readers_.end());
179 *log_reader = iter->second->reader_;
180 return Status::OK();
181}
182
183// After manifest recovery, replay WALs and refresh log_readers_ if necessary
184// REQUIRES: log_numbers are sorted in ascending order
185Status DBImplSecondary::RecoverLogFiles(
186 const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
187 std::unordered_set<ColumnFamilyData*>* cfds_changed,
188 JobContext* job_context) {
189 assert(nullptr != cfds_changed);
190 assert(nullptr != job_context);
191 mutex_.AssertHeld();
192 Status status;
193 for (auto log_number : log_numbers) {
194 log::FragmentBufferedReader* reader = nullptr;
195 status = MaybeInitLogReader(log_number, &reader);
196 if (!status.ok()) {
197 return status;
198 }
199 assert(reader != nullptr);
200 }
201 for (auto log_number : log_numbers) {
1e59de90 202 auto it = log_readers_.find(log_number);
f67539c2
TL
203 assert(it != log_readers_.end());
204 log::FragmentBufferedReader* reader = it->second->reader_;
20effc67
TL
205 Status* wal_read_status = it->second->status_;
206 assert(wal_read_status);
f67539c2
TL
207 // Manually update the file number allocation counter in VersionSet.
208 versions_->MarkFileNumberUsed(log_number);
209
210 // Determine if we should tolerate incomplete records at the tail end of the
211 // Read all the records and add to a memtable
212 std::string scratch;
213 Slice record;
214 WriteBatch batch;
215
216 while (reader->ReadRecord(&record, &scratch,
217 immutable_db_options_.wal_recovery_mode) &&
20effc67 218 wal_read_status->ok() && status.ok()) {
f67539c2
TL
219 if (record.size() < WriteBatchInternal::kHeader) {
220 reader->GetReporter()->Corruption(
221 record.size(), Status::Corruption("log record too small"));
222 continue;
223 }
20effc67
TL
224 status = WriteBatchInternal::SetContents(&batch, record);
225 if (!status.ok()) {
226 break;
227 }
f67539c2
TL
228 SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
229 std::vector<uint32_t> column_family_ids;
230 status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
231 if (status.ok()) {
232 for (const auto id : column_family_ids) {
233 ColumnFamilyData* cfd =
234 versions_->GetColumnFamilySet()->GetColumnFamily(id);
235 if (cfd == nullptr) {
236 continue;
237 }
238 if (cfds_changed->count(cfd) == 0) {
239 cfds_changed->insert(cfd);
240 }
241 const std::vector<FileMetaData*>& l0_files =
242 cfd->current()->storage_info()->LevelFiles(0);
243 SequenceNumber seq =
244 l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
245 // If the write batch's sequence number is smaller than the last
246 // sequence number of the largest sequence persisted for this column
247 // family, then its data must reside in an SST that has already been
248 // added in the prior MANIFEST replay.
249 if (seq_of_batch <= seq) {
250 continue;
251 }
1e59de90 252 auto curr_log_num = std::numeric_limits<uint64_t>::max();
f67539c2
TL
253 if (cfd_to_current_log_.count(cfd) > 0) {
254 curr_log_num = cfd_to_current_log_[cfd];
255 }
256 // If the active memtable contains records added by replaying an
257 // earlier WAL, then we need to seal the memtable, add it to the
258 // immutable memtable list and create a new active memtable.
1e59de90
TL
259 if (!cfd->mem()->IsEmpty() &&
260 (curr_log_num == std::numeric_limits<uint64_t>::max() ||
261 curr_log_num != log_number)) {
f67539c2
TL
262 const MutableCFOptions mutable_cf_options =
263 *cfd->GetLatestMutableCFOptions();
264 MemTable* new_mem =
265 cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
266 cfd->mem()->SetNextLogNumber(log_number);
1e59de90 267 cfd->mem()->ConstructFragmentedRangeTombstones();
f67539c2
TL
268 cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
269 new_mem->Ref();
270 cfd->SetMemtable(new_mem);
271 }
272 }
273 bool has_valid_writes = false;
274 status = WriteBatchInternal::InsertInto(
275 &batch, column_family_memtables_.get(),
276 nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
277 true, log_number, this, false /* concurrent_memtable_writes */,
278 next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
279 }
280 // If column family was not found, it might mean that the WAL write
281 // batch references to the column family that was dropped after the
282 // insert. We don't want to fail the whole write batch in that case --
283 // we just ignore the update.
284 // That's why we set ignore missing column families to true
285 // passing null flush_scheduler will disable memtable flushing which is
286 // needed for secondary instances
287 if (status.ok()) {
288 for (const auto id : column_family_ids) {
289 ColumnFamilyData* cfd =
290 versions_->GetColumnFamilySet()->GetColumnFamily(id);
291 if (cfd == nullptr) {
292 continue;
293 }
294 std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
295 cfd_to_current_log_.find(cfd);
296 if (iter == cfd_to_current_log_.end()) {
297 cfd_to_current_log_.insert({cfd, log_number});
298 } else if (log_number > iter->second) {
299 iter->second = log_number;
300 }
301 }
302 auto last_sequence = *next_sequence - 1;
303 if ((*next_sequence != kMaxSequenceNumber) &&
304 (versions_->LastSequence() <= last_sequence)) {
305 versions_->SetLastAllocatedSequence(last_sequence);
306 versions_->SetLastPublishedSequence(last_sequence);
307 versions_->SetLastSequence(last_sequence);
308 }
309 } else {
310 // We are treating this as a failure while reading since we read valid
311 // blocks that do not form coherent data
312 reader->GetReporter()->Corruption(record.size(), status);
313 }
314 }
20effc67
TL
315 if (status.ok() && !wal_read_status->ok()) {
316 status = *wal_read_status;
317 }
f67539c2
TL
318 if (!status.ok()) {
319 return status;
320 }
321 }
322 // remove logreaders from map after successfully recovering the WAL
323 if (log_readers_.size() > 1) {
324 auto erase_iter = log_readers_.begin();
325 std::advance(erase_iter, log_readers_.size() - 1);
326 log_readers_.erase(log_readers_.begin(), erase_iter);
327 }
328 return status;
329}
330
331// Implementation of the DB interface
332Status DBImplSecondary::Get(const ReadOptions& read_options,
333 ColumnFamilyHandle* column_family, const Slice& key,
334 PinnableSlice* value) {
1e59de90
TL
335 return GetImpl(read_options, column_family, key, value,
336 /*timestamp*/ nullptr);
337}
338
339Status DBImplSecondary::Get(const ReadOptions& read_options,
340 ColumnFamilyHandle* column_family, const Slice& key,
341 PinnableSlice* value, std::string* timestamp) {
342 return GetImpl(read_options, column_family, key, value, timestamp);
f67539c2
TL
343}
344
345Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
346 ColumnFamilyHandle* column_family,
1e59de90
TL
347 const Slice& key, PinnableSlice* pinnable_val,
348 std::string* timestamp) {
f67539c2 349 assert(pinnable_val != nullptr);
1e59de90
TL
350 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
351 StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
f67539c2
TL
352 PERF_TIMER_GUARD(get_snapshot_time);
353
1e59de90
TL
354 assert(column_family);
355 if (read_options.timestamp) {
356 const Status s = FailIfTsMismatchCf(
357 column_family, *(read_options.timestamp), /*ts_for_read=*/true);
358 if (!s.ok()) {
359 return s;
360 }
361 } else {
362 const Status s = FailIfCfHasTs(column_family);
363 if (!s.ok()) {
364 return s;
365 }
366 }
367
368 // Clear the timestamp for returning results so that we can distinguish
369 // between tombstone or key that has never been written later.
370 if (timestamp) {
371 timestamp->clear();
372 }
373
f67539c2
TL
374 auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
375 ColumnFamilyData* cfd = cfh->cfd();
376 if (tracer_) {
377 InstrumentedMutexLock lock(&trace_mutex_);
378 if (tracer_) {
379 tracer_->Get(column_family, key);
380 }
381 }
382 // Acquire SuperVersion
383 SuperVersion* super_version = GetAndRefSuperVersion(cfd);
384 SequenceNumber snapshot = versions_->LastSequence();
1e59de90 385 GetWithTimestampReadCallback read_cb(snapshot);
f67539c2
TL
386 MergeContext merge_context;
387 SequenceNumber max_covering_tombstone_seq = 0;
388 Status s;
1e59de90 389 LookupKey lkey(key, snapshot, read_options.timestamp);
f67539c2
TL
390 PERF_TIMER_STOP(get_snapshot_time);
391
392 bool done = false;
1e59de90
TL
393 const Comparator* ucmp = column_family->GetComparator();
394 assert(ucmp);
395 std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr;
20effc67 396 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
1e59de90
TL
397 /*columns=*/nullptr, ts, &s, &merge_context,
398 &max_covering_tombstone_seq, read_options,
399 false /* immutable_memtable */, &read_cb)) {
f67539c2
TL
400 done = true;
401 pinnable_val->PinSelf();
402 RecordTick(stats_, MEMTABLE_HIT);
403 } else if ((s.ok() || s.IsMergeInProgress()) &&
404 super_version->imm->Get(
1e59de90
TL
405 lkey, pinnable_val->GetSelf(), /*columns=*/nullptr, ts, &s,
406 &merge_context, &max_covering_tombstone_seq, read_options,
407 &read_cb)) {
f67539c2
TL
408 done = true;
409 pinnable_val->PinSelf();
410 RecordTick(stats_, MEMTABLE_HIT);
411 }
412 if (!done && !s.ok() && !s.IsMergeInProgress()) {
413 ReturnAndCleanupSuperVersion(cfd, super_version);
414 return s;
415 }
416 if (!done) {
417 PERF_TIMER_GUARD(get_from_output_files_time);
1e59de90
TL
418 PinnedIteratorsManager pinned_iters_mgr;
419 super_version->current->Get(
420 read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s,
421 &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr,
422 /*value_found*/ nullptr,
423 /*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr,
424 /*do_merge*/ true);
f67539c2
TL
425 RecordTick(stats_, MEMTABLE_MISS);
426 }
427 {
428 PERF_TIMER_GUARD(get_post_process_time);
429 ReturnAndCleanupSuperVersion(cfd, super_version);
430 RecordTick(stats_, NUMBER_KEYS_READ);
431 size_t size = pinnable_val->size();
432 RecordTick(stats_, BYTES_READ, size);
433 RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
434 PERF_COUNTER_ADD(get_read_bytes, size);
435 }
436 return s;
437}
438
439Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
440 ColumnFamilyHandle* column_family) {
441 if (read_options.managed) {
442 return NewErrorIterator(
443 Status::NotSupported("Managed iterator is not supported anymore."));
444 }
445 if (read_options.read_tier == kPersistedTier) {
446 return NewErrorIterator(Status::NotSupported(
447 "ReadTier::kPersistedData is not yet supported in iterators."));
448 }
1e59de90
TL
449
450 assert(column_family);
451 if (read_options.timestamp) {
452 const Status s = FailIfTsMismatchCf(
453 column_family, *(read_options.timestamp), /*ts_for_read=*/true);
454 if (!s.ok()) {
455 return NewErrorIterator(s);
456 }
457 } else {
458 const Status s = FailIfCfHasTs(column_family);
459 if (!s.ok()) {
460 return NewErrorIterator(s);
461 }
462 }
463
f67539c2 464 Iterator* result = nullptr;
20effc67 465 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
f67539c2
TL
466 auto cfd = cfh->cfd();
467 ReadCallback* read_callback = nullptr; // No read callback provided.
468 if (read_options.tailing) {
469 return NewErrorIterator(Status::NotSupported(
470 "tailing iterator not supported in secondary mode"));
471 } else if (read_options.snapshot != nullptr) {
472 // TODO (yanqin) support snapshot.
473 return NewErrorIterator(
474 Status::NotSupported("snapshot not supported in secondary mode"));
475 } else {
1e59de90 476 SequenceNumber snapshot(kMaxSequenceNumber);
f67539c2
TL
477 result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
478 }
479 return result;
480}
481
482ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
483 const ReadOptions& read_options, ColumnFamilyData* cfd,
1e59de90
TL
484 SequenceNumber snapshot, ReadCallback* read_callback,
485 bool expose_blob_index, bool allow_refresh) {
f67539c2
TL
486 assert(nullptr != cfd);
487 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
1e59de90
TL
488 assert(snapshot == kMaxSequenceNumber);
489 snapshot = versions_->LastSequence();
490 assert(snapshot != kMaxSequenceNumber);
f67539c2
TL
491 auto db_iter = NewArenaWrappedDbIterator(
492 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
1e59de90 493 super_version->current, snapshot,
f67539c2 494 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
1e59de90
TL
495 super_version->version_number, read_callback, this, cfd,
496 expose_blob_index, read_options.snapshot ? false : allow_refresh);
20effc67
TL
497 auto internal_iter = NewInternalIterator(
498 db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
1e59de90 499 snapshot, /* allow_unprepared_value */ true, db_iter);
f67539c2
TL
500 db_iter->SetIterUnderDBIter(internal_iter);
501 return db_iter;
502}
503
504Status DBImplSecondary::NewIterators(
505 const ReadOptions& read_options,
506 const std::vector<ColumnFamilyHandle*>& column_families,
507 std::vector<Iterator*>* iterators) {
508 if (read_options.managed) {
509 return Status::NotSupported("Managed iterator is not supported anymore.");
510 }
511 if (read_options.read_tier == kPersistedTier) {
512 return Status::NotSupported(
513 "ReadTier::kPersistedData is not yet supported in iterators.");
514 }
515 ReadCallback* read_callback = nullptr; // No read callback provided.
516 if (iterators == nullptr) {
517 return Status::InvalidArgument("iterators not allowed to be nullptr");
518 }
1e59de90
TL
519
520 if (read_options.timestamp) {
521 for (auto* cf : column_families) {
522 assert(cf);
523 const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
524 /*ts_for_read=*/true);
525 if (!s.ok()) {
526 return s;
527 }
528 }
529 } else {
530 for (auto* cf : column_families) {
531 assert(cf);
532 const Status s = FailIfCfHasTs(cf);
533 if (!s.ok()) {
534 return s;
535 }
536 }
537 }
f67539c2
TL
538 iterators->clear();
539 iterators->reserve(column_families.size());
540 if (read_options.tailing) {
541 return Status::NotSupported(
542 "tailing iterator not supported in secondary mode");
543 } else if (read_options.snapshot != nullptr) {
544 // TODO (yanqin) support snapshot.
545 return Status::NotSupported("snapshot not supported in secondary mode");
546 } else {
1e59de90 547 SequenceNumber read_seq(kMaxSequenceNumber);
f67539c2
TL
548 for (auto cfh : column_families) {
549 ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
550 iterators->push_back(
551 NewIteratorImpl(read_options, cfd, read_seq, read_callback));
552 }
553 }
554 return Status::OK();
555}
556
557Status DBImplSecondary::CheckConsistency() {
558 mutex_.AssertHeld();
559 Status s = DBImpl::CheckConsistency();
560 // If DBImpl::CheckConsistency() which is stricter returns success, then we
561 // do not need to give a second chance.
562 if (s.ok()) {
563 return s;
564 }
565 // It's possible that DBImpl::CheckConssitency() can fail because the primary
566 // may have removed certain files, causing the GetFileSize(name) call to
567 // fail and returning a PathNotFound. In this case, we take a best-effort
568 // approach and just proceed.
569 TEST_SYNC_POINT_CALLBACK(
570 "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s);
571
572 if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
573 return Status::OK();
574 }
575
576 std::vector<LiveFileMetaData> metadata;
577 versions_->GetLiveFilesMetaData(&metadata);
578
579 std::string corruption_messages;
580 for (const auto& md : metadata) {
581 // md.name has a leading "/".
582 std::string file_path = md.db_path + md.name;
583
584 uint64_t fsize = 0;
585 s = env_->GetFileSize(file_path, &fsize);
586 if (!s.ok() &&
587 (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() ||
588 s.IsPathNotFound())) {
589 s = Status::OK();
590 }
591 if (!s.ok()) {
592 corruption_messages +=
593 "Can't access " + md.name + ": " + s.ToString() + "\n";
594 }
595 }
596 return corruption_messages.empty() ? Status::OK()
597 : Status::Corruption(corruption_messages);
598}
599
600Status DBImplSecondary::TryCatchUpWithPrimary() {
601 assert(versions_.get() != nullptr);
602 assert(manifest_reader_.get() != nullptr);
603 Status s;
604 // read the manifest and apply new changes to the secondary instance
605 std::unordered_set<ColumnFamilyData*> cfds_changed;
606 JobContext job_context(0, true /*create_superversion*/);
607 {
608 InstrumentedMutexLock lock_guard(&mutex_);
609 s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
1e59de90
TL
610 ->ReadAndApply(&mutex_, &manifest_reader_,
611 manifest_reader_status_.get(), &cfds_changed);
f67539c2
TL
612
613 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
614 static_cast<uint64_t>(versions_->LastSequence()));
615 for (ColumnFamilyData* cfd : cfds_changed) {
616 if (cfd->IsDropped()) {
617 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
618 cfd->GetName().c_str());
619 continue;
620 }
621 VersionStorageInfo::LevelSummaryStorage tmp;
622 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
623 "[%s] Level summary: %s\n", cfd->GetName().c_str(),
624 cfd->current()->storage_info()->LevelSummary(&tmp));
625 }
626
627 // list wal_dir to discover new WALs and apply new changes to the secondary
628 // instance
629 if (s.ok()) {
630 s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
631 }
632 if (s.IsPathNotFound()) {
633 ROCKS_LOG_INFO(
634 immutable_db_options_.info_log,
635 "Secondary tries to read WAL, but WAL file(s) have already "
636 "been purged by primary.");
637 s = Status::OK();
638 }
639 if (s.ok()) {
640 for (auto cfd : cfds_changed) {
641 cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
642 &job_context.memtables_to_free);
643 auto& sv_context = job_context.superversion_contexts.back();
644 cfd->InstallSuperVersion(&sv_context, &mutex_);
645 sv_context.NewSuperVersion();
646 }
647 }
648 }
649 job_context.Clean();
650
651 // Cleanup unused, obsolete files.
652 JobContext purge_files_job_context(0);
653 {
654 InstrumentedMutexLock lock_guard(&mutex_);
655 // Currently, secondary instance does not own the database files, thus it
656 // is unnecessary for the secondary to force full scan.
657 FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
658 }
659 if (purge_files_job_context.HaveSomethingToDelete()) {
660 PurgeObsoleteFiles(purge_files_job_context);
661 }
662 purge_files_job_context.Clean();
663 return s;
664}
665
666Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
667 const std::string& secondary_path, DB** dbptr) {
668 *dbptr = nullptr;
669
670 DBOptions db_options(options);
671 ColumnFamilyOptions cf_options(options);
672 std::vector<ColumnFamilyDescriptor> column_families;
673 column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
674 std::vector<ColumnFamilyHandle*> handles;
675
676 Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
677 column_families, &handles, dbptr);
678 if (s.ok()) {
679 assert(handles.size() == 1);
680 delete handles[0];
681 }
682 return s;
683}
684
685Status DB::OpenAsSecondary(
686 const DBOptions& db_options, const std::string& dbname,
687 const std::string& secondary_path,
688 const std::vector<ColumnFamilyDescriptor>& column_families,
689 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
690 *dbptr = nullptr;
f67539c2
TL
691
692 DBOptions tmp_opts(db_options);
693 Status s;
694 if (nullptr == tmp_opts.info_log) {
695 s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
696 if (!s.ok()) {
697 tmp_opts.info_log = nullptr;
1e59de90 698 return s;
f67539c2
TL
699 }
700 }
701
1e59de90
TL
702 assert(tmp_opts.info_log != nullptr);
703 if (db_options.max_open_files != -1) {
704 std::ostringstream oss;
705 oss << "The primary instance may delete all types of files after they "
706 "become obsolete. The application can coordinate the primary and "
707 "secondary so that primary does not delete/rename files that are "
708 "currently being used by the secondary. Alternatively, a custom "
709 "Env/FS can be provided such that files become inaccessible only "
710 "after all primary and secondaries indicate that they are obsolete "
711 "and deleted. If the above two are not possible, you can open the "
712 "secondary instance with `max_open_files==-1` so that secondary "
713 "will eagerly keep all table files open. Even if a file is deleted, "
714 "its content can still be accessed via a prior open file "
715 "descriptor. This is a hacky workaround for only table files. If "
716 "none of the above is done, then point lookup or "
717 "range scan via the secondary instance can result in IOError: file "
718 "not found. This can be resolved by retrying "
719 "TryCatchUpWithPrimary().";
720 ROCKS_LOG_WARN(tmp_opts.info_log, "%s", oss.str().c_str());
721 }
722
f67539c2 723 handles->clear();
1e59de90 724 DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path);
f67539c2
TL
725 impl->versions_.reset(new ReactiveVersionSet(
726 dbname, &impl->immutable_db_options_, impl->file_options_,
727 impl->table_cache_.get(), impl->write_buffer_manager_,
20effc67 728 &impl->write_controller_, impl->io_tracer_));
f67539c2
TL
729 impl->column_family_memtables_.reset(
730 new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
1e59de90 731 impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
f67539c2
TL
732
733 impl->mutex_.Lock();
734 s = impl->Recover(column_families, true, false, false);
735 if (s.ok()) {
736 for (auto cf : column_families) {
737 auto cfd =
738 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
739 if (nullptr == cfd) {
20effc67 740 s = Status::InvalidArgument("Column family not found", cf.name);
f67539c2
TL
741 break;
742 }
743 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
744 }
745 }
746 SuperVersionContext sv_context(true /* create_superversion */);
747 if (s.ok()) {
748 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
749 sv_context.NewSuperVersion();
750 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
751 }
752 }
753 impl->mutex_.Unlock();
754 sv_context.Clean();
755 if (s.ok()) {
756 *dbptr = impl;
757 for (auto h : *handles) {
758 impl->NewThreadStatusCfInfo(
20effc67 759 static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
f67539c2
TL
760 }
761 } else {
762 for (auto h : *handles) {
763 delete h;
764 }
765 handles->clear();
766 delete impl;
767 }
768 return s;
769}
1e59de90
TL
770
771Status DBImplSecondary::CompactWithoutInstallation(
772 const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh,
773 const CompactionServiceInput& input, CompactionServiceResult* result) {
774 if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
775 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
776 }
777 InstrumentedMutexLock l(&mutex_);
778 auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
779 if (!cfd) {
780 return Status::InvalidArgument("Cannot find column family" +
781 cfh->GetName());
782 }
783
784 std::unordered_set<uint64_t> input_set;
785 for (const auto& file_name : input.input_files) {
786 input_set.insert(TableFileNameToNumber(file_name));
787 }
788
789 auto* version = cfd->current();
790
791 ColumnFamilyMetaData cf_meta;
792 version->GetColumnFamilyMetaData(&cf_meta);
793
794 const MutableCFOptions* mutable_cf_options = cfd->GetLatestMutableCFOptions();
795 ColumnFamilyOptions cf_options = cfd->GetLatestCFOptions();
796 VersionStorageInfo* vstorage = version->storage_info();
797
798 // Use comp_options to reuse some CompactFiles functions
799 CompactionOptions comp_options;
800 comp_options.compression = kDisableCompressionOption;
801 comp_options.output_file_size_limit = MaxFileSizeForLevel(
802 *mutable_cf_options, input.output_level, cf_options.compaction_style,
803 vstorage->base_level(), cf_options.level_compaction_dynamic_level_bytes);
804
805 std::vector<CompactionInputFiles> input_files;
806 Status s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
807 &input_files, &input_set, vstorage, comp_options);
808 if (!s.ok()) {
809 return s;
810 }
811
812 std::unique_ptr<Compaction> c;
813 assert(cfd->compaction_picker());
814 c.reset(cfd->compaction_picker()->CompactFiles(
815 comp_options, input_files, input.output_level, vstorage,
816 *mutable_cf_options, mutable_db_options_, 0));
817 assert(c != nullptr);
818
819 c->SetInputVersion(version);
820
821 // Create output directory if it's not existed yet
822 std::unique_ptr<FSDirectory> output_dir;
823 s = CreateAndNewDirectory(fs_.get(), secondary_path_, &output_dir);
824 if (!s.ok()) {
825 return s;
826 }
827
828 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
829 immutable_db_options_.info_log.get());
830
831 const int job_id = next_job_id_.fetch_add(1);
832
833 // use primary host's db_id for running the compaction, but db_session_id is
834 // using the local one, which is to make sure the unique id is unique from
835 // the remote compactors. Because the id is generated from db_id,
836 // db_session_id and orig_file_number, unlike the local compaction, remote
837 // compaction cannot guarantee the uniqueness of orig_file_number, the file
838 // number is only assigned when compaction is done.
839 CompactionServiceCompactionJob compaction_job(
840 job_id, c.get(), immutable_db_options_, mutable_db_options_,
841 file_options_for_compaction_, versions_.get(), &shutting_down_,
842 &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
843 input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
844 options.canceled ? *options.canceled : kManualCompactionCanceledFalse_,
845 input.db_id, db_session_id_, secondary_path_, input, result);
846
847 mutex_.Unlock();
848 s = compaction_job.Run();
849 mutex_.Lock();
850
851 // clean up
852 compaction_job.io_status().PermitUncheckedError();
853 compaction_job.CleanupCompaction();
854 c->ReleaseCompactionFiles(s);
855 c.reset();
856
857 TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End",
858 &s);
859 result->status = s;
860 return s;
861}
862
863Status DB::OpenAndCompact(
864 const OpenAndCompactOptions& options, const std::string& name,
865 const std::string& output_directory, const std::string& input,
866 std::string* output,
867 const CompactionServiceOptionsOverride& override_options) {
868 if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
869 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
870 }
871 CompactionServiceInput compaction_input;
872 Status s = CompactionServiceInput::Read(input, &compaction_input);
873 if (!s.ok()) {
874 return s;
875 }
876
877 compaction_input.db_options.max_open_files = -1;
878 compaction_input.db_options.compaction_service = nullptr;
879 if (compaction_input.db_options.statistics) {
880 compaction_input.db_options.statistics.reset();
881 }
882 compaction_input.db_options.env = override_options.env;
883 compaction_input.db_options.file_checksum_gen_factory =
884 override_options.file_checksum_gen_factory;
885 compaction_input.db_options.statistics = override_options.statistics;
886 compaction_input.column_family.options.comparator =
887 override_options.comparator;
888 compaction_input.column_family.options.merge_operator =
889 override_options.merge_operator;
890 compaction_input.column_family.options.compaction_filter =
891 override_options.compaction_filter;
892 compaction_input.column_family.options.compaction_filter_factory =
893 override_options.compaction_filter_factory;
894 compaction_input.column_family.options.prefix_extractor =
895 override_options.prefix_extractor;
896 compaction_input.column_family.options.table_factory =
897 override_options.table_factory;
898 compaction_input.column_family.options.sst_partitioner_factory =
899 override_options.sst_partitioner_factory;
900 compaction_input.column_family.options.table_properties_collector_factories =
901 override_options.table_properties_collector_factories;
902 compaction_input.db_options.listeners = override_options.listeners;
903
904 std::vector<ColumnFamilyDescriptor> column_families;
905 column_families.push_back(compaction_input.column_family);
906 // TODO: we have to open default CF, because of an implementation limitation,
907 // currently we just use the same CF option from input, which is not collect
908 // and open may fail.
909 if (compaction_input.column_family.name != kDefaultColumnFamilyName) {
910 column_families.emplace_back(kDefaultColumnFamilyName,
911 compaction_input.column_family.options);
912 }
913
914 DB* db;
915 std::vector<ColumnFamilyHandle*> handles;
916
917 s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory,
918 column_families, &handles, &db);
919 if (!s.ok()) {
920 return s;
921 }
922
923 CompactionServiceResult compaction_result;
924 DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
925 assert(handles.size() > 0);
926 s = db_secondary->CompactWithoutInstallation(
927 options, handles[0], compaction_input, &compaction_result);
928
929 Status serialization_status = compaction_result.Write(output);
930
931 for (auto& handle : handles) {
932 delete handle;
933 }
934 delete db;
935 if (s.ok()) {
936 return serialization_status;
937 }
938 return s;
939}
940
941Status DB::OpenAndCompact(
942 const std::string& name, const std::string& output_directory,
943 const std::string& input, std::string* output,
944 const CompactionServiceOptionsOverride& override_options) {
945 return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input,
946 output, override_options);
947}
948
f67539c2
TL
949#else // !ROCKSDB_LITE
950
951Status DB::OpenAsSecondary(const Options& /*options*/,
952 const std::string& /*name*/,
953 const std::string& /*secondary_path*/,
954 DB** /*dbptr*/) {
955 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
956}
957
958Status DB::OpenAsSecondary(
959 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
960 const std::string& /*secondary_path*/,
961 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
962 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
963 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
964}
965#endif // !ROCKSDB_LITE
966
967} // namespace ROCKSDB_NAMESPACE