]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/repair.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / repair.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // Repairer does best effort recovery to recover as much data as possible after
11 // a disaster without compromising consistency. It does not guarantee bringing
12 // the database to a time consistent state.
13 //
14 // Repair process is broken into 4 phases:
15 // (a) Find files
16 // (b) Convert logs to tables
17 // (c) Extract metadata
18 // (d) Write Descriptor
19 //
20 // (a) Find files
21 //
22 // The repairer goes through all the files in the directory, and classifies them
23 // based on their file name. Any file that cannot be identified by name will be
24 // ignored.
25 //
26 // (b) Convert logs to table
27 //
28 // Every log file that is active is replayed. All sections of the file where the
29 // checksum does not match is skipped over. We intentionally give preference to
30 // data consistency.
31 //
32 // (c) Extract metadata
33 //
34 // We scan every table to compute
35 // (1) smallest/largest for the table
36 // (2) largest sequence number in the table
37 //
38 // If we are unable to scan the file, then we ignore the table.
39 //
40 // (d) Write Descriptor
41 //
42 // We generate descriptor contents:
43 // - log number is set to zero
44 // - next-file-number is set to 1 + largest file number we found
45 // - last-sequence-number is set to largest sequence# found across
46 // all tables (see 2c)
47 // - compaction pointers are cleared
48 // - every table file is added at level 0
49 //
50 // Possible optimization 1:
51 // (a) Compute total size and use to pick appropriate max-level M
52 // (b) Sort tables by largest sequence# in the table
53 // (c) For each table: if it overlaps earlier table, place in level-0,
54 // else place in level-M.
55 // (d) We can provide options for time consistent recovery and unsafe recovery
56 // (ignore checksum failure when applicable)
57 // Possible optimization 2:
58 // Store per-table metadata (smallest, largest, largest-seq#, ...)
59 // in the table's meta section to speed up ScanTable.
60
61 #ifndef ROCKSDB_LITE
62
63 #ifndef __STDC_FORMAT_MACROS
64 #define __STDC_FORMAT_MACROS
65 #endif
66
67 #include <inttypes.h>
68 #include "db/builder.h"
69 #include "db/db_impl.h"
70 #include "db/dbformat.h"
71 #include "db/log_reader.h"
72 #include "db/log_writer.h"
73 #include "db/memtable.h"
74 #include "db/table_cache.h"
75 #include "db/version_edit.h"
76 #include "db/write_batch_internal.h"
77 #include "options/cf_options.h"
78 #include "rocksdb/comparator.h"
79 #include "rocksdb/db.h"
80 #include "rocksdb/env.h"
81 #include "rocksdb/options.h"
82 #include "rocksdb/write_buffer_manager.h"
83 #include "table/scoped_arena_iterator.h"
84 #include "util/file_reader_writer.h"
85 #include "util/filename.h"
86 #include "util/string_util.h"
87
88 namespace rocksdb {
89
90 namespace {
91
92 class Repairer {
93 public:
94 Repairer(const std::string& dbname, const DBOptions& db_options,
95 const std::vector<ColumnFamilyDescriptor>& column_families,
96 const ColumnFamilyOptions& default_cf_opts,
97 const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs)
98 : dbname_(dbname),
99 env_(db_options.env),
100 env_options_(),
101 db_options_(SanitizeOptions(dbname_, db_options)),
102 immutable_db_options_(ImmutableDBOptions(db_options_)),
103 icmp_(default_cf_opts.comparator),
104 default_cf_opts_(
105 SanitizeOptions(immutable_db_options_, default_cf_opts)),
106 default_cf_iopts_(
107 ImmutableCFOptions(immutable_db_options_, default_cf_opts_)),
108 unknown_cf_opts_(
109 SanitizeOptions(immutable_db_options_, unknown_cf_opts)),
110 create_unknown_cfs_(create_unknown_cfs),
111 raw_table_cache_(
112 // TableCache can be small since we expect each table to be opened
113 // once.
114 NewLRUCache(10, db_options_.table_cache_numshardbits)),
115 table_cache_(new TableCache(default_cf_iopts_, env_options_,
116 raw_table_cache_.get())),
117 wb_(db_options_.db_write_buffer_size),
118 wc_(db_options_.delayed_write_rate),
119 vset_(dbname_, &immutable_db_options_, env_options_,
120 raw_table_cache_.get(), &wb_, &wc_),
121 next_file_number_(1) {
122 for (const auto& cfd : column_families) {
123 cf_name_to_opts_[cfd.name] = cfd.options;
124 }
125 }
126
127 const ColumnFamilyOptions* GetColumnFamilyOptions(
128 const std::string& cf_name) {
129 if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) {
130 if (create_unknown_cfs_) {
131 return &unknown_cf_opts_;
132 }
133 return nullptr;
134 }
135 return &cf_name_to_opts_[cf_name];
136 }
137
138 // Adds a column family to the VersionSet with cf_options_ and updates
139 // manifest.
140 Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
141 const auto* cf_opts = GetColumnFamilyOptions(cf_name);
142 if (cf_opts == nullptr) {
143 return Status::Corruption("Encountered unknown column family with name=" +
144 cf_name + ", id=" + ToString(cf_id));
145 }
146 Options opts(db_options_, *cf_opts);
147 MutableCFOptions mut_cf_opts(opts);
148
149 VersionEdit edit;
150 edit.SetComparatorName(opts.comparator->Name());
151 edit.SetLogNumber(0);
152 edit.SetColumnFamily(cf_id);
153 ColumnFamilyData* cfd;
154 cfd = nullptr;
155 edit.AddColumnFamily(cf_name);
156
157 mutex_.Lock();
158 Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_,
159 nullptr /* db_directory */,
160 false /* new_descriptor_log */, cf_opts);
161 mutex_.Unlock();
162 return status;
163 }
164
165 ~Repairer() {
166 delete table_cache_;
167 }
168
169 Status Run() {
170 Status status = FindFiles();
171 if (status.ok()) {
172 // Discard older manifests and start a fresh one
173 for (size_t i = 0; i < manifests_.size(); i++) {
174 ArchiveFile(dbname_ + "/" + manifests_[i]);
175 }
176 // Just create a DBImpl temporarily so we can reuse NewDB()
177 DBImpl* db_impl = new DBImpl(db_options_, dbname_);
178 status = db_impl->NewDB();
179 delete db_impl;
180 }
181
182 if (status.ok()) {
183 // Recover using the fresh manifest created by NewDB()
184 status =
185 vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false);
186 }
187 if (status.ok()) {
188 // Need to scan existing SST files first so the column families are
189 // created before we process WAL files
190 ExtractMetaData();
191
192 // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
193 // extract -- we need to clear it here since metadata for existing SST
194 // files has been extracted already
195 table_fds_.clear();
196 ConvertLogFilesToTables();
197 ExtractMetaData();
198 status = AddTables();
199 }
200 if (status.ok()) {
201 uint64_t bytes = 0;
202 for (size_t i = 0; i < tables_.size(); i++) {
203 bytes += tables_[i].meta.fd.GetFileSize();
204 }
205 ROCKS_LOG_WARN(db_options_.info_log,
206 "**** Repaired rocksdb %s; "
207 "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
208 " bytes. "
209 "Some data may have been lost. "
210 "****",
211 dbname_.c_str(), tables_.size(), bytes);
212 }
213 return status;
214 }
215
216 private:
217 struct TableInfo {
218 FileMetaData meta;
219 uint32_t column_family_id;
220 std::string column_family_name;
221 SequenceNumber min_sequence;
222 SequenceNumber max_sequence;
223 };
224
225 std::string const dbname_;
226 Env* const env_;
227 const EnvOptions env_options_;
228 const DBOptions db_options_;
229 const ImmutableDBOptions immutable_db_options_;
230 const InternalKeyComparator icmp_;
231 const ColumnFamilyOptions default_cf_opts_;
232 const ImmutableCFOptions default_cf_iopts_; // table_cache_ holds reference
233 const ColumnFamilyOptions unknown_cf_opts_;
234 const bool create_unknown_cfs_;
235 std::shared_ptr<Cache> raw_table_cache_;
236 TableCache* table_cache_;
237 WriteBufferManager wb_;
238 WriteController wc_;
239 VersionSet vset_;
240 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;
241 InstrumentedMutex mutex_;
242
243 std::vector<std::string> manifests_;
244 std::vector<FileDescriptor> table_fds_;
245 std::vector<uint64_t> logs_;
246 std::vector<TableInfo> tables_;
247 uint64_t next_file_number_;
248
249 Status FindFiles() {
250 std::vector<std::string> filenames;
251 bool found_file = false;
252 std::vector<std::string> to_search_paths;
253
254 for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
255 to_search_paths.push_back(db_options_.db_paths[path_id].path);
256 }
257
258 // search wal_dir if user uses a customize wal_dir
259 bool same = false;
260 Status status = env_->AreFilesSame(db_options_.wal_dir, dbname_, &same);
261 if (status.IsNotSupported()) {
262 same = db_options_.wal_dir == dbname_;
263 status = Status::OK();
264 } else if (!status.ok()) {
265 return status;
266 }
267
268 if (!same) {
269 to_search_paths.push_back(db_options_.wal_dir);
270 }
271
272 for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
273 status = env_->GetChildren(to_search_paths[path_id], &filenames);
274 if (!status.ok()) {
275 return status;
276 }
277 if (!filenames.empty()) {
278 found_file = true;
279 }
280
281 uint64_t number;
282 FileType type;
283 for (size_t i = 0; i < filenames.size(); i++) {
284 if (ParseFileName(filenames[i], &number, &type)) {
285 if (type == kDescriptorFile) {
286 manifests_.push_back(filenames[i]);
287 } else {
288 if (number + 1 > next_file_number_) {
289 next_file_number_ = number + 1;
290 }
291 if (type == kLogFile) {
292 logs_.push_back(number);
293 } else if (type == kTableFile) {
294 table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
295 0);
296 } else {
297 // Ignore other files
298 }
299 }
300 }
301 }
302 }
303 if (!found_file) {
304 return Status::Corruption(dbname_, "repair found no files");
305 }
306 return Status::OK();
307 }
308
309 void ConvertLogFilesToTables() {
310 for (size_t i = 0; i < logs_.size(); i++) {
311 // we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
312 std::string logname = LogFileName(db_options_.wal_dir, logs_[i]);
313 Status status = ConvertLogToTable(logs_[i]);
314 if (!status.ok()) {
315 ROCKS_LOG_WARN(db_options_.info_log,
316 "Log #%" PRIu64 ": ignoring conversion error: %s",
317 logs_[i], status.ToString().c_str());
318 }
319 ArchiveFile(logname);
320 }
321 }
322
323 Status ConvertLogToTable(uint64_t log) {
324 struct LogReporter : public log::Reader::Reporter {
325 Env* env;
326 std::shared_ptr<Logger> info_log;
327 uint64_t lognum;
328 virtual void Corruption(size_t bytes, const Status& s) override {
329 // We print error messages for corruption, but continue repairing.
330 ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
331 lognum, static_cast<int>(bytes), s.ToString().c_str());
332 }
333 };
334
335 // Open the log file
336 std::string logname = LogFileName(db_options_.wal_dir, log);
337 unique_ptr<SequentialFile> lfile;
338 Status status = env_->NewSequentialFile(
339 logname, &lfile, env_->OptimizeForLogRead(env_options_));
340 if (!status.ok()) {
341 return status;
342 }
343 unique_ptr<SequentialFileReader> lfile_reader(
344 new SequentialFileReader(std::move(lfile), logname));
345
346 // Create the log reader.
347 LogReporter reporter;
348 reporter.env = env_;
349 reporter.info_log = db_options_.info_log;
350 reporter.lognum = log;
351 // We intentionally make log::Reader do checksumming so that
352 // corruptions cause entire commits to be skipped instead of
353 // propagating bad information (like overly large sequence
354 // numbers).
355 log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
356 true /*enable checksum*/, log);
357
358 // Initialize per-column family memtables
359 for (auto* cfd : *vset_.GetColumnFamilySet()) {
360 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
361 kMaxSequenceNumber);
362 }
363 auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
364
365 // Read all the records and add to a memtable
366 std::string scratch;
367 Slice record;
368 WriteBatch batch;
369 int counter = 0;
370 while (reader.ReadRecord(&record, &scratch)) {
371 if (record.size() < WriteBatchInternal::kHeader) {
372 reporter.Corruption(
373 record.size(), Status::Corruption("log record too small"));
374 continue;
375 }
376 WriteBatchInternal::SetContents(&batch, record);
377 status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr);
378 if (status.ok()) {
379 counter += WriteBatchInternal::Count(&batch);
380 } else {
381 ROCKS_LOG_WARN(db_options_.info_log, "Log #%" PRIu64 ": ignoring %s",
382 log, status.ToString().c_str());
383 status = Status::OK(); // Keep going with rest of file
384 }
385 }
386
387 // Dump a table for each column family with entries in this log file.
388 for (auto* cfd : *vset_.GetColumnFamilySet()) {
389 // Do not record a version edit for this conversion to a Table
390 // since ExtractMetaData() will also generate edits.
391 MemTable* mem = cfd->mem();
392 if (mem->IsEmpty()) {
393 continue;
394 }
395
396 FileMetaData meta;
397 meta.fd = FileDescriptor(next_file_number_++, 0, 0);
398 ReadOptions ro;
399 ro.total_order_seek = true;
400 Arena arena;
401 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
402 int64_t _current_time = 0;
403 status = env_->GetCurrentTime(&_current_time); // ignore error
404 const uint64_t current_time = static_cast<uint64_t>(_current_time);
405 SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
406
407 auto write_hint = cfd->CalculateSSTWriteHint(0);
408 status = BuildTable(
409 dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
410 env_options_, table_cache_, iter.get(),
411 std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
412 &meta, cfd->internal_comparator(),
413 cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
414 {}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
415 CompressionOptions(), false, nullptr /* internal_stats */,
416 TableFileCreationReason::kRecovery, nullptr /* event_logger */,
417 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
418 -1 /* level */, current_time, write_hint);
419 ROCKS_LOG_INFO(db_options_.info_log,
420 "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
421 log, counter, meta.fd.GetNumber(),
422 status.ToString().c_str());
423 if (status.ok()) {
424 if (meta.fd.GetFileSize() > 0) {
425 table_fds_.push_back(meta.fd);
426 }
427 } else {
428 break;
429 }
430 }
431 delete cf_mems;
432 return status;
433 }
434
435 void ExtractMetaData() {
436 for (size_t i = 0; i < table_fds_.size(); i++) {
437 TableInfo t;
438 t.meta.fd = table_fds_[i];
439 Status status = ScanTable(&t);
440 if (!status.ok()) {
441 std::string fname = TableFileName(
442 db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
443 char file_num_buf[kFormatFileNumberBufSize];
444 FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
445 file_num_buf, sizeof(file_num_buf));
446 ROCKS_LOG_WARN(db_options_.info_log, "Table #%s: ignoring %s",
447 file_num_buf, status.ToString().c_str());
448 ArchiveFile(fname);
449 } else {
450 tables_.push_back(t);
451 }
452 }
453 }
454
455 Status ScanTable(TableInfo* t) {
456 std::string fname = TableFileName(
457 db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId());
458 int counter = 0;
459 uint64_t file_size;
460 Status status = env_->GetFileSize(fname, &file_size);
461 t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
462 file_size);
463 std::shared_ptr<const TableProperties> props;
464 if (status.ok()) {
465 status = table_cache_->GetTableProperties(env_options_, icmp_, t->meta.fd,
466 &props);
467 }
468 if (status.ok()) {
469 t->column_family_id = static_cast<uint32_t>(props->column_family_id);
470 if (t->column_family_id ==
471 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
472 ROCKS_LOG_WARN(
473 db_options_.info_log,
474 "Table #%" PRIu64
475 ": column family unknown (probably due to legacy format); "
476 "adding to default column family id 0.",
477 t->meta.fd.GetNumber());
478 t->column_family_id = 0;
479 }
480
481 if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) ==
482 nullptr) {
483 status =
484 AddColumnFamily(props->column_family_name, t->column_family_id);
485 }
486 }
487 ColumnFamilyData* cfd = nullptr;
488 if (status.ok()) {
489 cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
490 if (cfd->GetName() != props->column_family_name) {
491 ROCKS_LOG_ERROR(
492 db_options_.info_log,
493 "Table #%" PRIu64
494 ": inconsistent column family name '%s'; expected '%s' for column "
495 "family id %" PRIu32 ".",
496 t->meta.fd.GetNumber(), props->column_family_name.c_str(),
497 cfd->GetName().c_str(), t->column_family_id);
498 status = Status::Corruption(dbname_, "inconsistent column family name");
499 }
500 }
501 if (status.ok()) {
502 InternalIterator* iter = table_cache_->NewIterator(
503 ReadOptions(), env_options_, cfd->internal_comparator(), t->meta,
504 nullptr /* range_del_agg */,
505 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
506 bool empty = true;
507 ParsedInternalKey parsed;
508 t->min_sequence = 0;
509 t->max_sequence = 0;
510 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
511 Slice key = iter->key();
512 if (!ParseInternalKey(key, &parsed)) {
513 ROCKS_LOG_ERROR(db_options_.info_log,
514 "Table #%" PRIu64 ": unparsable key %s",
515 t->meta.fd.GetNumber(), EscapeString(key).c_str());
516 continue;
517 }
518
519 counter++;
520 if (empty) {
521 empty = false;
522 t->meta.smallest.DecodeFrom(key);
523 t->min_sequence = parsed.sequence;
524 }
525 t->meta.largest.DecodeFrom(key);
526 if (parsed.sequence < t->min_sequence) {
527 t->min_sequence = parsed.sequence;
528 }
529 if (parsed.sequence > t->max_sequence) {
530 t->max_sequence = parsed.sequence;
531 }
532 }
533 if (!iter->status().ok()) {
534 status = iter->status();
535 }
536 delete iter;
537
538 ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s",
539 t->meta.fd.GetNumber(), counter,
540 status.ToString().c_str());
541 }
542 return status;
543 }
544
545 Status AddTables() {
546 std::unordered_map<uint32_t, std::vector<const TableInfo*>> cf_id_to_tables;
547 SequenceNumber max_sequence = 0;
548 for (size_t i = 0; i < tables_.size(); i++) {
549 cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
550 if (max_sequence < tables_[i].max_sequence) {
551 max_sequence = tables_[i].max_sequence;
552 }
553 }
554 vset_.SetLastAllocatedSequence(max_sequence);
555 vset_.SetLastPublishedSequence(max_sequence);
556 vset_.SetLastSequence(max_sequence);
557
558 for (const auto& cf_id_and_tables : cf_id_to_tables) {
559 auto* cfd =
560 vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first);
561 VersionEdit edit;
562 edit.SetComparatorName(cfd->user_comparator()->Name());
563 edit.SetLogNumber(0);
564 edit.SetNextFile(next_file_number_);
565 edit.SetColumnFamily(cfd->GetID());
566
567 // TODO(opt): separate out into multiple levels
568 for (const auto* table : cf_id_and_tables.second) {
569 edit.AddFile(0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
570 table->meta.fd.GetFileSize(), table->meta.smallest,
571 table->meta.largest, table->min_sequence,
572 table->max_sequence, table->meta.marked_for_compaction);
573 }
574 assert(next_file_number_ > 0);
575 vset_.MarkFileNumberUsed(next_file_number_ - 1);
576 mutex_.Lock();
577 Status status = vset_.LogAndApply(
578 cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
579 nullptr /* db_directory */, false /* new_descriptor_log */);
580 mutex_.Unlock();
581 if (!status.ok()) {
582 return status;
583 }
584 }
585 return Status::OK();
586 }
587
588 void ArchiveFile(const std::string& fname) {
589 // Move into another directory. E.g., for
590 // dir/foo
591 // rename to
592 // dir/lost/foo
593 const char* slash = strrchr(fname.c_str(), '/');
594 std::string new_dir;
595 if (slash != nullptr) {
596 new_dir.assign(fname.data(), slash - fname.data());
597 }
598 new_dir.append("/lost");
599 env_->CreateDir(new_dir); // Ignore error
600 std::string new_file = new_dir;
601 new_file.append("/");
602 new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
603 Status s = env_->RenameFile(fname, new_file);
604 ROCKS_LOG_INFO(db_options_.info_log, "Archiving %s: %s\n", fname.c_str(),
605 s.ToString().c_str());
606 }
607 };
608
609 Status GetDefaultCFOptions(
610 const std::vector<ColumnFamilyDescriptor>& column_families,
611 ColumnFamilyOptions* res) {
612 assert(res != nullptr);
613 auto iter = std::find_if(column_families.begin(), column_families.end(),
614 [](const ColumnFamilyDescriptor& cfd) {
615 return cfd.name == kDefaultColumnFamilyName;
616 });
617 if (iter == column_families.end()) {
618 return Status::InvalidArgument(
619 "column_families", "Must contain entry for default column family");
620 }
621 *res = iter->options;
622 return Status::OK();
623 }
624 } // anonymous namespace
625
626 Status RepairDB(const std::string& dbname, const DBOptions& db_options,
627 const std::vector<ColumnFamilyDescriptor>& column_families
628 ) {
629 ColumnFamilyOptions default_cf_opts;
630 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
631 if (status.ok()) {
632 Repairer repairer(dbname, db_options, column_families,
633 default_cf_opts,
634 ColumnFamilyOptions() /* unknown_cf_opts */,
635 false /* create_unknown_cfs */);
636 status = repairer.Run();
637 }
638 return status;
639 }
640
641 Status RepairDB(const std::string& dbname, const DBOptions& db_options,
642 const std::vector<ColumnFamilyDescriptor>& column_families,
643 const ColumnFamilyOptions& unknown_cf_opts) {
644 ColumnFamilyOptions default_cf_opts;
645 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
646 if (status.ok()) {
647 Repairer repairer(dbname, db_options,
648 column_families, default_cf_opts,
649 unknown_cf_opts, true /* create_unknown_cfs */);
650 status = repairer.Run();
651 }
652 return status;
653 }
654
655 Status RepairDB(const std::string& dbname, const Options& options) {
656 DBOptions db_options(options);
657 ColumnFamilyOptions cf_options(options);
658 Repairer repairer(dbname, db_options,
659 {}, cf_options /* default_cf_opts */,
660 cf_options /* unknown_cf_opts */,
661 true /* create_unknown_cfs */);
662 return repairer.Run();
663 }
664
665 } // namespace rocksdb
666
667 #endif // ROCKSDB_LITE