1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Repairer does best effort recovery to recover as much data as possible after
11 // a disaster without compromising consistency. It does not guarantee bringing
12 // the database to a time consistent state.
14 // Repair process is broken into 4 phases:
16 // (b) Convert logs to tables
17 // (c) Extract metadata
18 // (d) Write Descriptor
22 // The repairer goes through all the files in the directory, and classifies them
23 // based on their file name. Any file that cannot be identified by name will be
26 // (b) Convert logs to table
28 // Every log file that is active is replayed. All sections of the file where the
29 // checksum does not match is skipped over. We intentionally give preference to
32 // (c) Extract metadata
34 // We scan every table to compute
35 // (1) smallest/largest for the table
36 // (2) largest sequence number in the table
38 // If we are unable to scan the file, then we ignore the table.
40 // (d) Write Descriptor
42 // We generate descriptor contents:
43 // - log number is set to zero
44 // - next-file-number is set to 1 + largest file number we found
45 // - last-sequence-number is set to largest sequence# found across
46 // all tables (see 2c)
47 // - compaction pointers are cleared
48 // - every table file is added at level 0
50 // Possible optimization 1:
51 // (a) Compute total size and use to pick appropriate max-level M
52 // (b) Sort tables by largest sequence# in the table
53 // (c) For each table: if it overlaps earlier table, place in level-0,
54 // else place in level-M.
55 // (d) We can provide options for time consistent recovery and unsafe recovery
56 // (ignore checksum failure when applicable)
57 // Possible optimization 2:
58 // Store per-table metadata (smallest, largest, largest-seq#, ...)
59 // in the table's meta section to speed up ScanTable.
63 #ifndef __STDC_FORMAT_MACROS
64 #define __STDC_FORMAT_MACROS
68 #include "db/builder.h"
69 #include "db/db_impl.h"
70 #include "db/dbformat.h"
71 #include "db/log_reader.h"
72 #include "db/log_writer.h"
73 #include "db/memtable.h"
74 #include "db/table_cache.h"
75 #include "db/version_edit.h"
76 #include "db/write_batch_internal.h"
77 #include "options/cf_options.h"
78 #include "rocksdb/comparator.h"
79 #include "rocksdb/db.h"
80 #include "rocksdb/env.h"
81 #include "rocksdb/options.h"
82 #include "rocksdb/write_buffer_manager.h"
83 #include "table/scoped_arena_iterator.h"
84 #include "util/file_reader_writer.h"
85 #include "util/filename.h"
86 #include "util/string_util.h"
94 Repairer(const std::string
& dbname
, const DBOptions
& db_options
,
95 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
96 const ColumnFamilyOptions
& default_cf_opts
,
97 const ColumnFamilyOptions
& unknown_cf_opts
, bool create_unknown_cfs
)
101 db_options_(SanitizeOptions(dbname_
, db_options
)),
102 immutable_db_options_(ImmutableDBOptions(db_options_
)),
103 icmp_(default_cf_opts
.comparator
),
105 SanitizeOptions(immutable_db_options_
, default_cf_opts
)),
107 ImmutableCFOptions(immutable_db_options_
, default_cf_opts_
)),
109 SanitizeOptions(immutable_db_options_
, unknown_cf_opts
)),
110 create_unknown_cfs_(create_unknown_cfs
),
112 // TableCache can be small since we expect each table to be opened
114 NewLRUCache(10, db_options_
.table_cache_numshardbits
)),
115 table_cache_(new TableCache(default_cf_iopts_
, env_options_
,
116 raw_table_cache_
.get())),
117 wb_(db_options_
.db_write_buffer_size
),
118 wc_(db_options_
.delayed_write_rate
),
119 vset_(dbname_
, &immutable_db_options_
, env_options_
,
120 raw_table_cache_
.get(), &wb_
, &wc_
),
121 next_file_number_(1) {
122 for (const auto& cfd
: column_families
) {
123 cf_name_to_opts_
[cfd
.name
] = cfd
.options
;
127 const ColumnFamilyOptions
* GetColumnFamilyOptions(
128 const std::string
& cf_name
) {
129 if (cf_name_to_opts_
.find(cf_name
) == cf_name_to_opts_
.end()) {
130 if (create_unknown_cfs_
) {
131 return &unknown_cf_opts_
;
135 return &cf_name_to_opts_
[cf_name
];
138 // Adds a column family to the VersionSet with cf_options_ and updates
140 Status
AddColumnFamily(const std::string
& cf_name
, uint32_t cf_id
) {
141 const auto* cf_opts
= GetColumnFamilyOptions(cf_name
);
142 if (cf_opts
== nullptr) {
143 return Status::Corruption("Encountered unknown column family with name=" +
144 cf_name
+ ", id=" + ToString(cf_id
));
146 Options
opts(db_options_
, *cf_opts
);
147 MutableCFOptions
mut_cf_opts(opts
);
150 edit
.SetComparatorName(opts
.comparator
->Name());
151 edit
.SetLogNumber(0);
152 edit
.SetColumnFamily(cf_id
);
153 ColumnFamilyData
* cfd
;
155 edit
.AddColumnFamily(cf_name
);
158 Status status
= vset_
.LogAndApply(cfd
, mut_cf_opts
, &edit
, &mutex_
,
159 nullptr /* db_directory */,
160 false /* new_descriptor_log */, cf_opts
);
170 Status status
= FindFiles();
172 // Discard older manifests and start a fresh one
173 for (size_t i
= 0; i
< manifests_
.size(); i
++) {
174 ArchiveFile(dbname_
+ "/" + manifests_
[i
]);
176 // Just create a DBImpl temporarily so we can reuse NewDB()
177 DBImpl
* db_impl
= new DBImpl(db_options_
, dbname_
);
178 status
= db_impl
->NewDB();
183 // Recover using the fresh manifest created by NewDB()
185 vset_
.Recover({{kDefaultColumnFamilyName
, default_cf_opts_
}}, false);
188 // Need to scan existing SST files first so the column families are
189 // created before we process WAL files
192 // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
193 // extract -- we need to clear it here since metadata for existing SST
194 // files has been extracted already
196 ConvertLogFilesToTables();
198 status
= AddTables();
202 for (size_t i
= 0; i
< tables_
.size(); i
++) {
203 bytes
+= tables_
[i
].meta
.fd
.GetFileSize();
205 ROCKS_LOG_WARN(db_options_
.info_log
,
206 "**** Repaired rocksdb %s; "
207 "recovered %" ROCKSDB_PRIszt
" files; %" PRIu64
209 "Some data may have been lost. "
211 dbname_
.c_str(), tables_
.size(), bytes
);
219 uint32_t column_family_id
;
220 std::string column_family_name
;
221 SequenceNumber min_sequence
;
222 SequenceNumber max_sequence
;
225 std::string
const dbname_
;
227 const EnvOptions env_options_
;
228 const DBOptions db_options_
;
229 const ImmutableDBOptions immutable_db_options_
;
230 const InternalKeyComparator icmp_
;
231 const ColumnFamilyOptions default_cf_opts_
;
232 const ImmutableCFOptions default_cf_iopts_
; // table_cache_ holds reference
233 const ColumnFamilyOptions unknown_cf_opts_
;
234 const bool create_unknown_cfs_
;
235 std::shared_ptr
<Cache
> raw_table_cache_
;
236 TableCache
* table_cache_
;
237 WriteBufferManager wb_
;
240 std::unordered_map
<std::string
, ColumnFamilyOptions
> cf_name_to_opts_
;
241 InstrumentedMutex mutex_
;
243 std::vector
<std::string
> manifests_
;
244 std::vector
<FileDescriptor
> table_fds_
;
245 std::vector
<uint64_t> logs_
;
246 std::vector
<TableInfo
> tables_
;
247 uint64_t next_file_number_
;
250 std::vector
<std::string
> filenames
;
251 bool found_file
= false;
252 std::vector
<std::string
> to_search_paths
;
254 for (size_t path_id
= 0; path_id
< db_options_
.db_paths
.size(); path_id
++) {
255 to_search_paths
.push_back(db_options_
.db_paths
[path_id
].path
);
258 // search wal_dir if user uses a customize wal_dir
260 Status status
= env_
->AreFilesSame(db_options_
.wal_dir
, dbname_
, &same
);
261 if (status
.IsNotSupported()) {
262 same
= db_options_
.wal_dir
== dbname_
;
263 status
= Status::OK();
264 } else if (!status
.ok()) {
269 to_search_paths
.push_back(db_options_
.wal_dir
);
272 for (size_t path_id
= 0; path_id
< to_search_paths
.size(); path_id
++) {
273 status
= env_
->GetChildren(to_search_paths
[path_id
], &filenames
);
277 if (!filenames
.empty()) {
283 for (size_t i
= 0; i
< filenames
.size(); i
++) {
284 if (ParseFileName(filenames
[i
], &number
, &type
)) {
285 if (type
== kDescriptorFile
) {
286 manifests_
.push_back(filenames
[i
]);
288 if (number
+ 1 > next_file_number_
) {
289 next_file_number_
= number
+ 1;
291 if (type
== kLogFile
) {
292 logs_
.push_back(number
);
293 } else if (type
== kTableFile
) {
294 table_fds_
.emplace_back(number
, static_cast<uint32_t>(path_id
),
297 // Ignore other files
304 return Status::Corruption(dbname_
, "repair found no files");
309 void ConvertLogFilesToTables() {
310 for (size_t i
= 0; i
< logs_
.size(); i
++) {
311 // we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
312 std::string logname
= LogFileName(db_options_
.wal_dir
, logs_
[i
]);
313 Status status
= ConvertLogToTable(logs_
[i
]);
315 ROCKS_LOG_WARN(db_options_
.info_log
,
316 "Log #%" PRIu64
": ignoring conversion error: %s",
317 logs_
[i
], status
.ToString().c_str());
319 ArchiveFile(logname
);
323 Status
ConvertLogToTable(uint64_t log
) {
324 struct LogReporter
: public log::Reader::Reporter
{
326 std::shared_ptr
<Logger
> info_log
;
328 virtual void Corruption(size_t bytes
, const Status
& s
) override
{
329 // We print error messages for corruption, but continue repairing.
330 ROCKS_LOG_ERROR(info_log
, "Log #%" PRIu64
": dropping %d bytes; %s",
331 lognum
, static_cast<int>(bytes
), s
.ToString().c_str());
336 std::string logname
= LogFileName(db_options_
.wal_dir
, log
);
337 unique_ptr
<SequentialFile
> lfile
;
338 Status status
= env_
->NewSequentialFile(
339 logname
, &lfile
, env_
->OptimizeForLogRead(env_options_
));
343 unique_ptr
<SequentialFileReader
> lfile_reader(
344 new SequentialFileReader(std::move(lfile
), logname
));
346 // Create the log reader.
347 LogReporter reporter
;
349 reporter
.info_log
= db_options_
.info_log
;
350 reporter
.lognum
= log
;
351 // We intentionally make log::Reader do checksumming so that
352 // corruptions cause entire commits to be skipped instead of
353 // propagating bad information (like overly large sequence
355 log::Reader
reader(db_options_
.info_log
, std::move(lfile_reader
), &reporter
,
356 true /*enable checksum*/, log
);
358 // Initialize per-column family memtables
359 for (auto* cfd
: *vset_
.GetColumnFamilySet()) {
360 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
363 auto cf_mems
= new ColumnFamilyMemTablesImpl(vset_
.GetColumnFamilySet());
365 // Read all the records and add to a memtable
370 while (reader
.ReadRecord(&record
, &scratch
)) {
371 if (record
.size() < WriteBatchInternal::kHeader
) {
373 record
.size(), Status::Corruption("log record too small"));
376 WriteBatchInternal::SetContents(&batch
, record
);
377 status
= WriteBatchInternal::InsertInto(&batch
, cf_mems
, nullptr);
379 counter
+= WriteBatchInternal::Count(&batch
);
381 ROCKS_LOG_WARN(db_options_
.info_log
, "Log #%" PRIu64
": ignoring %s",
382 log
, status
.ToString().c_str());
383 status
= Status::OK(); // Keep going with rest of file
387 // Dump a table for each column family with entries in this log file.
388 for (auto* cfd
: *vset_
.GetColumnFamilySet()) {
389 // Do not record a version edit for this conversion to a Table
390 // since ExtractMetaData() will also generate edits.
391 MemTable
* mem
= cfd
->mem();
392 if (mem
->IsEmpty()) {
397 meta
.fd
= FileDescriptor(next_file_number_
++, 0, 0);
399 ro
.total_order_seek
= true;
401 ScopedArenaIterator
iter(mem
->NewIterator(ro
, &arena
));
402 int64_t _current_time
= 0;
403 status
= env_
->GetCurrentTime(&_current_time
); // ignore error
404 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
405 SnapshotChecker
* snapshot_checker
= DisableGCSnapshotChecker::Instance();
407 auto write_hint
= cfd
->CalculateSSTWriteHint(0);
409 dbname_
, env_
, *cfd
->ioptions(), *cfd
->GetLatestMutableCFOptions(),
410 env_options_
, table_cache_
, iter
.get(),
411 std::unique_ptr
<InternalIterator
>(mem
->NewRangeTombstoneIterator(ro
)),
412 &meta
, cfd
->internal_comparator(),
413 cfd
->int_tbl_prop_collector_factories(), cfd
->GetID(), cfd
->GetName(),
414 {}, kMaxSequenceNumber
, snapshot_checker
, kNoCompression
,
415 CompressionOptions(), false, nullptr /* internal_stats */,
416 TableFileCreationReason::kRecovery
, nullptr /* event_logger */,
417 0 /* job_id */, Env::IO_HIGH
, nullptr /* table_properties */,
418 -1 /* level */, current_time
, write_hint
);
419 ROCKS_LOG_INFO(db_options_
.info_log
,
420 "Log #%" PRIu64
": %d ops saved to Table #%" PRIu64
" %s",
421 log
, counter
, meta
.fd
.GetNumber(),
422 status
.ToString().c_str());
424 if (meta
.fd
.GetFileSize() > 0) {
425 table_fds_
.push_back(meta
.fd
);
435 void ExtractMetaData() {
436 for (size_t i
= 0; i
< table_fds_
.size(); i
++) {
438 t
.meta
.fd
= table_fds_
[i
];
439 Status status
= ScanTable(&t
);
441 std::string fname
= TableFileName(
442 db_options_
.db_paths
, t
.meta
.fd
.GetNumber(), t
.meta
.fd
.GetPathId());
443 char file_num_buf
[kFormatFileNumberBufSize
];
444 FormatFileNumber(t
.meta
.fd
.GetNumber(), t
.meta
.fd
.GetPathId(),
445 file_num_buf
, sizeof(file_num_buf
));
446 ROCKS_LOG_WARN(db_options_
.info_log
, "Table #%s: ignoring %s",
447 file_num_buf
, status
.ToString().c_str());
450 tables_
.push_back(t
);
455 Status
ScanTable(TableInfo
* t
) {
456 std::string fname
= TableFileName(
457 db_options_
.db_paths
, t
->meta
.fd
.GetNumber(), t
->meta
.fd
.GetPathId());
460 Status status
= env_
->GetFileSize(fname
, &file_size
);
461 t
->meta
.fd
= FileDescriptor(t
->meta
.fd
.GetNumber(), t
->meta
.fd
.GetPathId(),
463 std::shared_ptr
<const TableProperties
> props
;
465 status
= table_cache_
->GetTableProperties(env_options_
, icmp_
, t
->meta
.fd
,
469 t
->column_family_id
= static_cast<uint32_t>(props
->column_family_id
);
470 if (t
->column_family_id
==
471 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
) {
473 db_options_
.info_log
,
475 ": column family unknown (probably due to legacy format); "
476 "adding to default column family id 0.",
477 t
->meta
.fd
.GetNumber());
478 t
->column_family_id
= 0;
481 if (vset_
.GetColumnFamilySet()->GetColumnFamily(t
->column_family_id
) ==
484 AddColumnFamily(props
->column_family_name
, t
->column_family_id
);
487 ColumnFamilyData
* cfd
= nullptr;
489 cfd
= vset_
.GetColumnFamilySet()->GetColumnFamily(t
->column_family_id
);
490 if (cfd
->GetName() != props
->column_family_name
) {
492 db_options_
.info_log
,
494 ": inconsistent column family name '%s'; expected '%s' for column "
495 "family id %" PRIu32
".",
496 t
->meta
.fd
.GetNumber(), props
->column_family_name
.c_str(),
497 cfd
->GetName().c_str(), t
->column_family_id
);
498 status
= Status::Corruption(dbname_
, "inconsistent column family name");
502 InternalIterator
* iter
= table_cache_
->NewIterator(
503 ReadOptions(), env_options_
, cfd
->internal_comparator(), t
->meta
,
504 nullptr /* range_del_agg */,
505 cfd
->GetLatestMutableCFOptions()->prefix_extractor
.get());
507 ParsedInternalKey parsed
;
510 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
511 Slice key
= iter
->key();
512 if (!ParseInternalKey(key
, &parsed
)) {
513 ROCKS_LOG_ERROR(db_options_
.info_log
,
514 "Table #%" PRIu64
": unparsable key %s",
515 t
->meta
.fd
.GetNumber(), EscapeString(key
).c_str());
522 t
->meta
.smallest
.DecodeFrom(key
);
523 t
->min_sequence
= parsed
.sequence
;
525 t
->meta
.largest
.DecodeFrom(key
);
526 if (parsed
.sequence
< t
->min_sequence
) {
527 t
->min_sequence
= parsed
.sequence
;
529 if (parsed
.sequence
> t
->max_sequence
) {
530 t
->max_sequence
= parsed
.sequence
;
533 if (!iter
->status().ok()) {
534 status
= iter
->status();
538 ROCKS_LOG_INFO(db_options_
.info_log
, "Table #%" PRIu64
": %d entries %s",
539 t
->meta
.fd
.GetNumber(), counter
,
540 status
.ToString().c_str());
546 std::unordered_map
<uint32_t, std::vector
<const TableInfo
*>> cf_id_to_tables
;
547 SequenceNumber max_sequence
= 0;
548 for (size_t i
= 0; i
< tables_
.size(); i
++) {
549 cf_id_to_tables
[tables_
[i
].column_family_id
].push_back(&tables_
[i
]);
550 if (max_sequence
< tables_
[i
].max_sequence
) {
551 max_sequence
= tables_
[i
].max_sequence
;
554 vset_
.SetLastAllocatedSequence(max_sequence
);
555 vset_
.SetLastPublishedSequence(max_sequence
);
556 vset_
.SetLastSequence(max_sequence
);
558 for (const auto& cf_id_and_tables
: cf_id_to_tables
) {
560 vset_
.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables
.first
);
562 edit
.SetComparatorName(cfd
->user_comparator()->Name());
563 edit
.SetLogNumber(0);
564 edit
.SetNextFile(next_file_number_
);
565 edit
.SetColumnFamily(cfd
->GetID());
567 // TODO(opt): separate out into multiple levels
568 for (const auto* table
: cf_id_and_tables
.second
) {
569 edit
.AddFile(0, table
->meta
.fd
.GetNumber(), table
->meta
.fd
.GetPathId(),
570 table
->meta
.fd
.GetFileSize(), table
->meta
.smallest
,
571 table
->meta
.largest
, table
->min_sequence
,
572 table
->max_sequence
, table
->meta
.marked_for_compaction
);
574 assert(next_file_number_
> 0);
575 vset_
.MarkFileNumberUsed(next_file_number_
- 1);
577 Status status
= vset_
.LogAndApply(
578 cfd
, *cfd
->GetLatestMutableCFOptions(), &edit
, &mutex_
,
579 nullptr /* db_directory */, false /* new_descriptor_log */);
588 void ArchiveFile(const std::string
& fname
) {
589 // Move into another directory. E.g., for
593 const char* slash
= strrchr(fname
.c_str(), '/');
595 if (slash
!= nullptr) {
596 new_dir
.assign(fname
.data(), slash
- fname
.data());
598 new_dir
.append("/lost");
599 env_
->CreateDir(new_dir
); // Ignore error
600 std::string new_file
= new_dir
;
601 new_file
.append("/");
602 new_file
.append((slash
== nullptr) ? fname
.c_str() : slash
+ 1);
603 Status s
= env_
->RenameFile(fname
, new_file
);
604 ROCKS_LOG_INFO(db_options_
.info_log
, "Archiving %s: %s\n", fname
.c_str(),
605 s
.ToString().c_str());
609 Status
GetDefaultCFOptions(
610 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
611 ColumnFamilyOptions
* res
) {
612 assert(res
!= nullptr);
613 auto iter
= std::find_if(column_families
.begin(), column_families
.end(),
614 [](const ColumnFamilyDescriptor
& cfd
) {
615 return cfd
.name
== kDefaultColumnFamilyName
;
617 if (iter
== column_families
.end()) {
618 return Status::InvalidArgument(
619 "column_families", "Must contain entry for default column family");
621 *res
= iter
->options
;
624 } // anonymous namespace
626 Status
RepairDB(const std::string
& dbname
, const DBOptions
& db_options
,
627 const std::vector
<ColumnFamilyDescriptor
>& column_families
629 ColumnFamilyOptions default_cf_opts
;
630 Status status
= GetDefaultCFOptions(column_families
, &default_cf_opts
);
632 Repairer
repairer(dbname
, db_options
, column_families
,
634 ColumnFamilyOptions() /* unknown_cf_opts */,
635 false /* create_unknown_cfs */);
636 status
= repairer
.Run();
641 Status
RepairDB(const std::string
& dbname
, const DBOptions
& db_options
,
642 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
643 const ColumnFamilyOptions
& unknown_cf_opts
) {
644 ColumnFamilyOptions default_cf_opts
;
645 Status status
= GetDefaultCFOptions(column_families
, &default_cf_opts
);
647 Repairer
repairer(dbname
, db_options
,
648 column_families
, default_cf_opts
,
649 unknown_cf_opts
, true /* create_unknown_cfs */);
650 status
= repairer
.Run();
655 Status
RepairDB(const std::string
& dbname
, const Options
& options
) {
656 DBOptions
db_options(options
);
657 ColumnFamilyOptions
cf_options(options
);
658 Repairer
repairer(dbname
, db_options
,
659 {}, cf_options
/* default_cf_opts */,
660 cf_options
/* unknown_cf_opts */,
661 true /* create_unknown_cfs */);
662 return repairer
.Run();
665 } // namespace rocksdb
667 #endif // ROCKSDB_LITE