1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Repairer does best effort recovery to recover as much data as possible after
11 // a disaster without compromising consistency. It does not guarantee bringing
12 // the database to a time consistent state.
14 // Repair process is broken into 4 phases:
16 // (b) Convert logs to tables
17 // (c) Extract metadata
18 // (d) Write Descriptor
22 // The repairer goes through all the files in the directory, and classifies them
23 // based on their file name. Any file that cannot be identified by name will be
26 // (b) Convert logs to table
28 // Every log file that is active is replayed. All sections of the file where the
29 // checksum does not match is skipped over. We intentionally give preference to
32 // (c) Extract metadata
34 // We scan every table to compute
35 // (1) smallest/largest for the table
36 // (2) largest sequence number in the table
37 // (3) oldest blob file referred to by the table (if applicable)
39 // If we are unable to scan the file, then we ignore the table.
41 // (d) Write Descriptor
43 // We generate descriptor contents:
44 // - log number is set to zero
45 // - next-file-number is set to 1 + largest file number we found
46 // - last-sequence-number is set to largest sequence# found across
47 // all tables (see 2c)
48 // - compaction pointers are cleared
49 // - every table file is added at level 0
51 // Possible optimization 1:
52 // (a) Compute total size and use to pick appropriate max-level M
53 // (b) Sort tables by largest sequence# in the table
54 // (c) For each table: if it overlaps earlier table, place in level-0,
55 // else place in level-M.
56 // (d) We can provide options for time consistent recovery and unsafe recovery
57 // (ignore checksum failure when applicable)
58 // Possible optimization 2:
59 // Store per-table metadata (smallest, largest, largest-seq#, ...)
60 // in the table's meta section to speed up ScanTable.
66 #include "db/builder.h"
67 #include "db/db_impl/db_impl.h"
68 #include "db/dbformat.h"
69 #include "db/log_reader.h"
70 #include "db/log_writer.h"
71 #include "db/memtable.h"
72 #include "db/table_cache.h"
73 #include "db/version_edit.h"
74 #include "db/write_batch_internal.h"
75 #include "file/filename.h"
76 #include "file/writable_file_writer.h"
77 #include "logging/logging.h"
78 #include "options/cf_options.h"
79 #include "rocksdb/comparator.h"
80 #include "rocksdb/db.h"
81 #include "rocksdb/env.h"
82 #include "rocksdb/options.h"
83 #include "rocksdb/write_buffer_manager.h"
84 #include "table/scoped_arena_iterator.h"
85 #include "table/unique_id_impl.h"
86 #include "util/string_util.h"
88 namespace ROCKSDB_NAMESPACE
{
94 Repairer(const std::string
& dbname
, const DBOptions
& db_options
,
95 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
96 const ColumnFamilyOptions
& default_cf_opts
,
97 const ColumnFamilyOptions
& unknown_cf_opts
, bool create_unknown_cfs
)
99 db_session_id_(DBImpl::GenerateDbSessionId(db_options
.env
)),
100 env_(db_options
.env
),
102 db_options_(SanitizeOptions(dbname_
, db_options
)),
103 immutable_db_options_(ImmutableDBOptions(db_options_
)),
104 icmp_(default_cf_opts
.comparator
),
106 SanitizeOptions(immutable_db_options_
, default_cf_opts
)),
108 ImmutableOptions(immutable_db_options_
, default_cf_opts_
)),
110 SanitizeOptions(immutable_db_options_
, unknown_cf_opts
)),
111 create_unknown_cfs_(create_unknown_cfs
),
113 // TableCache can be small since we expect each table to be opened
115 NewLRUCache(10, db_options_
.table_cache_numshardbits
)),
116 table_cache_(new TableCache(default_iopts_
, &file_options_
,
117 raw_table_cache_
.get(),
118 /*block_cache_tracer=*/nullptr,
119 /*io_tracer=*/nullptr, db_session_id_
)),
120 wb_(db_options_
.db_write_buffer_size
),
121 wc_(db_options_
.delayed_write_rate
),
122 vset_(dbname_
, &immutable_db_options_
, file_options_
,
123 raw_table_cache_
.get(), &wb_
, &wc_
,
124 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
125 /*db_id=*/"", db_session_id_
),
126 next_file_number_(1),
129 for (const auto& cfd
: column_families
) {
130 cf_name_to_opts_
[cfd
.name
] = cfd
.options
;
134 const ColumnFamilyOptions
* GetColumnFamilyOptions(
135 const std::string
& cf_name
) {
136 if (cf_name_to_opts_
.find(cf_name
) == cf_name_to_opts_
.end()) {
137 if (create_unknown_cfs_
) {
138 return &unknown_cf_opts_
;
142 return &cf_name_to_opts_
[cf_name
];
145 // Adds a column family to the VersionSet with cf_options_ and updates
147 Status
AddColumnFamily(const std::string
& cf_name
, uint32_t cf_id
) {
148 const auto* cf_opts
= GetColumnFamilyOptions(cf_name
);
149 if (cf_opts
== nullptr) {
150 return Status::Corruption("Encountered unknown column family with name=" +
151 cf_name
+ ", id=" + std::to_string(cf_id
));
153 Options
opts(db_options_
, *cf_opts
);
154 MutableCFOptions
mut_cf_opts(opts
);
157 edit
.SetComparatorName(opts
.comparator
->Name());
158 edit
.SetLogNumber(0);
159 edit
.SetColumnFamily(cf_id
);
160 ColumnFamilyData
* cfd
;
162 edit
.AddColumnFamily(cf_name
);
165 std::unique_ptr
<FSDirectory
> db_dir
;
166 Status status
= env_
->GetFileSystem()->NewDirectory(dbname_
, IOOptions(),
169 status
= vset_
.LogAndApply(cfd
, mut_cf_opts
, &edit
, &mutex_
, db_dir
.get(),
170 false /* new_descriptor_log */, cf_opts
);
177 Status s
= Status::OK();
179 if (db_lock_
!= nullptr) {
180 s
= env_
->UnlockFile(db_lock_
);
188 ~Repairer() { Close().PermitUncheckedError(); }
191 Status status
= env_
->LockFile(LockFileName(dbname_
), &db_lock_
);
195 status
= FindFiles();
196 DBImpl
* db_impl
= nullptr;
198 // Discard older manifests and start a fresh one
199 for (size_t i
= 0; i
< manifests_
.size(); i
++) {
200 ArchiveFile(dbname_
+ "/" + manifests_
[i
]);
202 // Just create a DBImpl temporarily so we can reuse NewDB()
203 db_impl
= new DBImpl(db_options_
, dbname_
);
204 status
= db_impl
->NewDB(/*new_filenames=*/nullptr);
209 // Recover using the fresh manifest created by NewDB()
211 vset_
.Recover({{kDefaultColumnFamilyName
, default_cf_opts_
}}, false);
214 // Need to scan existing SST files first so the column families are
215 // created before we process WAL files
218 // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
219 // extract -- we need to clear it here since metadata for existing SST
220 // files has been extracted already
222 ConvertLogFilesToTables();
224 status
= AddTables();
228 for (size_t i
= 0; i
< tables_
.size(); i
++) {
229 bytes
+= tables_
[i
].meta
.fd
.GetFileSize();
231 ROCKS_LOG_WARN(db_options_
.info_log
,
232 "**** Repaired rocksdb %s; "
233 "recovered %" ROCKSDB_PRIszt
" files; %" PRIu64
235 "Some data may have been lost. "
237 dbname_
.c_str(), tables_
.size(), bytes
);
245 uint32_t column_family_id
;
246 std::string column_family_name
;
249 std::string
const dbname_
;
250 std::string db_session_id_
;
252 const FileOptions file_options_
;
253 const DBOptions db_options_
;
254 const ImmutableDBOptions immutable_db_options_
;
255 const InternalKeyComparator icmp_
;
256 const ColumnFamilyOptions default_cf_opts_
;
257 const ImmutableOptions default_iopts_
; // table_cache_ holds reference
258 const ColumnFamilyOptions unknown_cf_opts_
;
259 const bool create_unknown_cfs_
;
260 std::shared_ptr
<Cache
> raw_table_cache_
;
261 std::unique_ptr
<TableCache
> table_cache_
;
262 WriteBufferManager wb_
;
265 std::unordered_map
<std::string
, ColumnFamilyOptions
> cf_name_to_opts_
;
266 InstrumentedMutex mutex_
;
268 std::vector
<std::string
> manifests_
;
269 std::vector
<FileDescriptor
> table_fds_
;
270 std::vector
<uint64_t> logs_
;
271 std::vector
<TableInfo
> tables_
;
272 uint64_t next_file_number_
;
273 // Lock over the persistent DB state. Non-nullptr iff successfully
279 std::vector
<std::string
> filenames
;
280 bool found_file
= false;
281 std::vector
<std::string
> to_search_paths
;
283 for (size_t path_id
= 0; path_id
< db_options_
.db_paths
.size(); path_id
++) {
284 to_search_paths
.push_back(db_options_
.db_paths
[path_id
].path
);
287 // search wal_dir if user uses a customize wal_dir
288 bool same
= immutable_db_options_
.IsWalDirSameAsDBPath(dbname_
);
290 to_search_paths
.push_back(immutable_db_options_
.wal_dir
);
293 for (size_t path_id
= 0; path_id
< to_search_paths
.size(); path_id
++) {
294 ROCKS_LOG_INFO(db_options_
.info_log
, "Searching path %s\n",
295 to_search_paths
[path_id
].c_str());
296 Status status
= env_
->GetChildren(to_search_paths
[path_id
], &filenames
);
300 if (!filenames
.empty()) {
306 for (size_t i
= 0; i
< filenames
.size(); i
++) {
307 if (ParseFileName(filenames
[i
], &number
, &type
)) {
308 if (type
== kDescriptorFile
) {
309 manifests_
.push_back(filenames
[i
]);
311 if (number
+ 1 > next_file_number_
) {
312 next_file_number_
= number
+ 1;
314 if (type
== kWalFile
) {
315 logs_
.push_back(number
);
316 } else if (type
== kTableFile
) {
317 table_fds_
.emplace_back(number
, static_cast<uint32_t>(path_id
),
320 // Ignore other files
327 return Status::Corruption(dbname_
, "repair found no files");
332 void ConvertLogFilesToTables() {
333 const auto& wal_dir
= immutable_db_options_
.GetWalDir();
334 for (size_t i
= 0; i
< logs_
.size(); i
++) {
335 // we should use LogFileName(wal_dir, logs_[i]) here. user might uses
337 std::string logname
= LogFileName(wal_dir
, logs_
[i
]);
338 Status status
= ConvertLogToTable(wal_dir
, logs_
[i
]);
340 ROCKS_LOG_WARN(db_options_
.info_log
,
341 "Log #%" PRIu64
": ignoring conversion error: %s",
342 logs_
[i
], status
.ToString().c_str());
344 ArchiveFile(logname
);
348 Status
ConvertLogToTable(const std::string
& wal_dir
, uint64_t log
) {
349 struct LogReporter
: public log::Reader::Reporter
{
351 std::shared_ptr
<Logger
> info_log
;
353 void Corruption(size_t bytes
, const Status
& s
) override
{
354 // We print error messages for corruption, but continue repairing.
355 ROCKS_LOG_ERROR(info_log
, "Log #%" PRIu64
": dropping %d bytes; %s",
356 lognum
, static_cast<int>(bytes
), s
.ToString().c_str());
361 std::string logname
= LogFileName(wal_dir
, log
);
362 const auto& fs
= env_
->GetFileSystem();
363 std::unique_ptr
<SequentialFileReader
> lfile_reader
;
364 Status status
= SequentialFileReader::Create(
365 fs
, logname
, fs
->OptimizeForLogRead(file_options_
), &lfile_reader
,
366 nullptr /* dbg */, nullptr /* rate limiter */);
371 // Create the log reader.
372 LogReporter reporter
;
374 reporter
.info_log
= db_options_
.info_log
;
375 reporter
.lognum
= log
;
376 // We intentionally make log::Reader do checksumming so that
377 // corruptions cause entire commits to be skipped instead of
378 // propagating bad information (like overly large sequence
380 log::Reader
reader(db_options_
.info_log
, std::move(lfile_reader
), &reporter
,
381 true /*enable checksum*/, log
);
383 // Initialize per-column family memtables
384 for (auto* cfd
: *vset_
.GetColumnFamilySet()) {
385 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
388 auto cf_mems
= new ColumnFamilyMemTablesImpl(vset_
.GetColumnFamilySet());
390 // Read all the records and add to a memtable
395 while (reader
.ReadRecord(&record
, &scratch
)) {
396 if (record
.size() < WriteBatchInternal::kHeader
) {
397 reporter
.Corruption(record
.size(),
398 Status::Corruption("log record too small"));
401 Status record_status
= WriteBatchInternal::SetContents(&batch
, record
);
402 if (record_status
.ok()) {
404 WriteBatchInternal::InsertInto(&batch
, cf_mems
, nullptr, nullptr);
406 if (record_status
.ok()) {
407 counter
+= WriteBatchInternal::Count(&batch
);
409 ROCKS_LOG_WARN(db_options_
.info_log
, "Log #%" PRIu64
": ignoring %s",
410 log
, record_status
.ToString().c_str());
414 // Dump a table for each column family with entries in this log file.
415 for (auto* cfd
: *vset_
.GetColumnFamilySet()) {
416 // Do not record a version edit for this conversion to a Table
417 // since ExtractMetaData() will also generate edits.
418 MemTable
* mem
= cfd
->mem();
419 if (mem
->IsEmpty()) {
424 meta
.fd
= FileDescriptor(next_file_number_
++, 0, 0);
426 ro
.total_order_seek
= true;
428 ScopedArenaIterator
iter(mem
->NewIterator(ro
, &arena
));
429 int64_t _current_time
= 0;
430 immutable_db_options_
.clock
->GetCurrentTime(&_current_time
)
431 .PermitUncheckedError(); // ignore error
432 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
433 meta
.file_creation_time
= current_time
;
434 SnapshotChecker
* snapshot_checker
= DisableGCSnapshotChecker::Instance();
436 auto write_hint
= cfd
->CalculateSSTWriteHint(0);
437 std::vector
<std::unique_ptr
<FragmentedRangeTombstoneIterator
>>
439 auto range_del_iter
= mem
->NewRangeTombstoneIterator(
440 ro
, kMaxSequenceNumber
, false /* immutable_memtable */);
441 if (range_del_iter
!= nullptr) {
442 range_del_iters
.emplace_back(range_del_iter
);
446 CompressionOptions default_compression
;
447 TableBuilderOptions
tboptions(
448 *cfd
->ioptions(), *cfd
->GetLatestMutableCFOptions(),
449 cfd
->internal_comparator(), cfd
->int_tbl_prop_collector_factories(),
450 kNoCompression
, default_compression
, cfd
->GetID(), cfd
->GetName(),
451 -1 /* level */, false /* is_bottommost */,
452 TableFileCreationReason::kRecovery
, 0 /* oldest_key_time */,
453 0 /* file_creation_time */, "DB Repairer" /* db_id */, db_session_id_
,
454 0 /*target_file_size*/, meta
.fd
.GetNumber());
456 SeqnoToTimeMapping empty_seqno_time_mapping
;
458 dbname_
, /* versions */ nullptr, immutable_db_options_
, tboptions
,
459 file_options_
, table_cache_
.get(), iter
.get(),
460 std::move(range_del_iters
), &meta
, nullptr /* blob_file_additions */,
461 {}, kMaxSequenceNumber
, kMaxSequenceNumber
, snapshot_checker
,
462 false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s
,
463 nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery
,
464 empty_seqno_time_mapping
, nullptr /* event_logger */, 0 /* job_id */,
465 Env::IO_HIGH
, nullptr /* table_properties */, write_hint
);
466 ROCKS_LOG_INFO(db_options_
.info_log
,
467 "Log #%" PRIu64
": %d ops saved to Table #%" PRIu64
" %s",
468 log
, counter
, meta
.fd
.GetNumber(),
469 status
.ToString().c_str());
471 if (meta
.fd
.GetFileSize() > 0) {
472 table_fds_
.push_back(meta
.fd
);
482 void ExtractMetaData() {
483 for (size_t i
= 0; i
< table_fds_
.size(); i
++) {
485 t
.meta
.fd
= table_fds_
[i
];
486 Status status
= ScanTable(&t
);
488 std::string fname
= TableFileName(
489 db_options_
.db_paths
, t
.meta
.fd
.GetNumber(), t
.meta
.fd
.GetPathId());
490 char file_num_buf
[kFormatFileNumberBufSize
];
491 FormatFileNumber(t
.meta
.fd
.GetNumber(), t
.meta
.fd
.GetPathId(),
492 file_num_buf
, sizeof(file_num_buf
));
493 ROCKS_LOG_WARN(db_options_
.info_log
, "Table #%s: ignoring %s",
494 file_num_buf
, status
.ToString().c_str());
497 tables_
.push_back(t
);
502 Status
ScanTable(TableInfo
* t
) {
503 std::string fname
= TableFileName(
504 db_options_
.db_paths
, t
->meta
.fd
.GetNumber(), t
->meta
.fd
.GetPathId());
507 Status status
= env_
->GetFileSize(fname
, &file_size
);
508 t
->meta
.fd
= FileDescriptor(t
->meta
.fd
.GetNumber(), t
->meta
.fd
.GetPathId(),
510 std::shared_ptr
<const TableProperties
> props
;
512 status
= table_cache_
->GetTableProperties(file_options_
, icmp_
, t
->meta
,
517 GetSstInternalUniqueId(props
->db_id
, props
->db_session_id
,
518 props
->orig_file_number
, &t
->meta
.unique_id
);
520 ROCKS_LOG_WARN(db_options_
.info_log
,
522 ": unable to get unique id, default to Unknown.",
523 t
->meta
.fd
.GetNumber());
525 t
->column_family_id
= static_cast<uint32_t>(props
->column_family_id
);
526 if (t
->column_family_id
==
527 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) {
529 db_options_
.info_log
,
531 ": column family unknown (probably due to legacy format); "
532 "adding to default column family id 0.",
533 t
->meta
.fd
.GetNumber());
534 t
->column_family_id
= 0;
537 if (vset_
.GetColumnFamilySet()->GetColumnFamily(t
->column_family_id
) ==
540 AddColumnFamily(props
->column_family_name
, t
->column_family_id
);
542 t
->meta
.oldest_ancester_time
= props
->creation_time
;
544 ColumnFamilyData
* cfd
= nullptr;
546 cfd
= vset_
.GetColumnFamilySet()->GetColumnFamily(t
->column_family_id
);
547 if (cfd
->GetName() != props
->column_family_name
) {
549 db_options_
.info_log
,
551 ": inconsistent column family name '%s'; expected '%s' for column "
552 "family id %" PRIu32
".",
553 t
->meta
.fd
.GetNumber(), props
->column_family_name
.c_str(),
554 cfd
->GetName().c_str(), t
->column_family_id
);
555 status
= Status::Corruption(dbname_
, "inconsistent column family name");
560 ropts
.total_order_seek
= true;
561 InternalIterator
* iter
= table_cache_
->NewIterator(
562 ropts
, file_options_
, cfd
->internal_comparator(), t
->meta
,
563 nullptr /* range_del_agg */,
564 cfd
->GetLatestMutableCFOptions()->prefix_extractor
,
565 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
566 TableReaderCaller::kRepair
, /*arena=*/nullptr, /*skip_filters=*/false,
567 /*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
568 /*smallest_compaction_key=*/nullptr,
569 /*largest_compaction_key=*/nullptr,
570 /*allow_unprepared_value=*/false);
571 ParsedInternalKey parsed
;
572 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
573 Slice key
= iter
->key();
575 ParseInternalKey(key
, &parsed
, db_options_
.allow_data_in_errors
);
576 if (!pik_status
.ok()) {
577 ROCKS_LOG_ERROR(db_options_
.info_log
,
578 "Table #%" PRIu64
": unparsable key - %s",
579 t
->meta
.fd
.GetNumber(), pik_status
.getState());
585 status
= t
->meta
.UpdateBoundaries(key
, iter
->value(), parsed
.sequence
,
591 if (status
.ok() && !iter
->status().ok()) {
592 status
= iter
->status();
596 ROCKS_LOG_INFO(db_options_
.info_log
, "Table #%" PRIu64
": %d entries %s",
597 t
->meta
.fd
.GetNumber(), counter
,
598 status
.ToString().c_str());
601 // XXX/FIXME: This is just basic, naive handling of range tombstones,
602 // like call to UpdateBoundariesForRange in builder.cc where we assume
603 // an SST file is a full sorted run. This probably needs the extra logic
604 // from compaction_job.cc around call to UpdateBoundariesForRange (to
605 // handle range tombstones extendingg beyond range of other entries).
607 std::unique_ptr
<FragmentedRangeTombstoneIterator
> r_iter
;
608 status
= table_cache_
->GetRangeTombstoneIterator(
609 ropts
, cfd
->internal_comparator(), t
->meta
, &r_iter
);
612 r_iter
->SeekToFirst();
614 while (r_iter
->Valid()) {
615 auto tombstone
= r_iter
->Tombstone();
616 auto kv
= tombstone
.Serialize();
617 t
->meta
.UpdateBoundariesForRange(
618 kv
.first
, tombstone
.SerializeEndKey(), tombstone
.seq_
,
619 cfd
->internal_comparator());
628 std::unordered_map
<uint32_t, std::vector
<const TableInfo
*>> cf_id_to_tables
;
629 SequenceNumber max_sequence
= 0;
630 for (size_t i
= 0; i
< tables_
.size(); i
++) {
631 cf_id_to_tables
[tables_
[i
].column_family_id
].push_back(&tables_
[i
]);
632 if (max_sequence
< tables_
[i
].meta
.fd
.largest_seqno
) {
633 max_sequence
= tables_
[i
].meta
.fd
.largest_seqno
;
636 vset_
.SetLastAllocatedSequence(max_sequence
);
637 vset_
.SetLastPublishedSequence(max_sequence
);
638 vset_
.SetLastSequence(max_sequence
);
640 for (const auto& cf_id_and_tables
: cf_id_to_tables
) {
642 vset_
.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables
.first
);
644 edit
.SetComparatorName(cfd
->user_comparator()->Name());
645 edit
.SetLogNumber(0);
646 edit
.SetNextFile(next_file_number_
);
647 edit
.SetColumnFamily(cfd
->GetID());
649 // TODO(opt): separate out into multiple levels
650 for (const auto* table
: cf_id_and_tables
.second
) {
652 0, table
->meta
.fd
.GetNumber(), table
->meta
.fd
.GetPathId(),
653 table
->meta
.fd
.GetFileSize(), table
->meta
.smallest
,
654 table
->meta
.largest
, table
->meta
.fd
.smallest_seqno
,
655 table
->meta
.fd
.largest_seqno
, table
->meta
.marked_for_compaction
,
656 table
->meta
.temperature
, table
->meta
.oldest_blob_file_number
,
657 table
->meta
.oldest_ancester_time
, table
->meta
.file_creation_time
,
658 table
->meta
.file_checksum
, table
->meta
.file_checksum_func_name
,
659 table
->meta
.unique_id
);
661 assert(next_file_number_
> 0);
662 vset_
.MarkFileNumberUsed(next_file_number_
- 1);
664 std::unique_ptr
<FSDirectory
> db_dir
;
665 Status status
= env_
->GetFileSystem()->NewDirectory(dbname_
, IOOptions(),
668 status
= vset_
.LogAndApply(cfd
, *cfd
->GetLatestMutableCFOptions(),
669 &edit
, &mutex_
, db_dir
.get(),
670 false /* new_descriptor_log */);
680 void ArchiveFile(const std::string
& fname
) {
681 // Move into another directory. E.g., for
685 const char* slash
= strrchr(fname
.c_str(), '/');
687 if (slash
!= nullptr) {
688 new_dir
.assign(fname
.data(), slash
- fname
.data());
690 new_dir
.append("/lost");
691 env_
->CreateDir(new_dir
).PermitUncheckedError(); // Ignore error
692 std::string new_file
= new_dir
;
693 new_file
.append("/");
694 new_file
.append((slash
== nullptr) ? fname
.c_str() : slash
+ 1);
695 Status s
= env_
->RenameFile(fname
, new_file
);
696 ROCKS_LOG_INFO(db_options_
.info_log
, "Archiving %s: %s\n", fname
.c_str(),
697 s
.ToString().c_str());
701 Status
GetDefaultCFOptions(
702 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
703 ColumnFamilyOptions
* res
) {
704 assert(res
!= nullptr);
705 auto iter
= std::find_if(column_families
.begin(), column_families
.end(),
706 [](const ColumnFamilyDescriptor
& cfd
) {
707 return cfd
.name
== kDefaultColumnFamilyName
;
709 if (iter
== column_families
.end()) {
710 return Status::InvalidArgument(
711 "column_families", "Must contain entry for default column family");
713 *res
= iter
->options
;
716 } // anonymous namespace
718 Status
RepairDB(const std::string
& dbname
, const DBOptions
& db_options
,
719 const std::vector
<ColumnFamilyDescriptor
>& column_families
) {
720 ColumnFamilyOptions default_cf_opts
;
721 Status status
= GetDefaultCFOptions(column_families
, &default_cf_opts
);
726 Repairer
repairer(dbname
, db_options
, column_families
, default_cf_opts
,
727 ColumnFamilyOptions() /* unknown_cf_opts */,
728 false /* create_unknown_cfs */);
729 status
= repairer
.Run();
731 status
= repairer
.Close();
736 Status
RepairDB(const std::string
& dbname
, const DBOptions
& db_options
,
737 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
738 const ColumnFamilyOptions
& unknown_cf_opts
) {
739 ColumnFamilyOptions default_cf_opts
;
740 Status status
= GetDefaultCFOptions(column_families
, &default_cf_opts
);
745 Repairer
repairer(dbname
, db_options
, column_families
, default_cf_opts
,
746 unknown_cf_opts
, true /* create_unknown_cfs */);
747 status
= repairer
.Run();
749 status
= repairer
.Close();
754 Status
RepairDB(const std::string
& dbname
, const Options
& options
) {
755 Options
opts(options
);
756 DBOptions
db_options(opts
);
757 ColumnFamilyOptions
cf_options(opts
);
759 Repairer
repairer(dbname
, db_options
, {}, cf_options
/* default_cf_opts */,
760 cf_options
/* unknown_cf_opts */,
761 true /* create_unknown_cfs */);
762 Status status
= repairer
.Run();
764 status
= repairer
.Close();
769 } // namespace ROCKSDB_NAMESPACE
771 #endif // ROCKSDB_LITE