]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/repair.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / db / repair.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9//
10// Repairer does best effort recovery to recover as much data as possible after
11// a disaster without compromising consistency. It does not guarantee bringing
12// the database to a time consistent state.
13//
14// Repair process is broken into 4 phases:
15// (a) Find files
16// (b) Convert logs to tables
17// (c) Extract metadata
18// (d) Write Descriptor
19//
20// (a) Find files
21//
22// The repairer goes through all the files in the directory, and classifies them
23// based on their file name. Any file that cannot be identified by name will be
24// ignored.
25//
26// (b) Convert logs to table
27//
28// Every log file that is active is replayed. All sections of the file where the
29// checksum does not match is skipped over. We intentionally give preference to
30// data consistency.
31//
32// (c) Extract metadata
33//
34// We scan every table to compute
35// (1) smallest/largest for the table
36// (2) largest sequence number in the table
37//
38// If we are unable to scan the file, then we ignore the table.
39//
40// (d) Write Descriptor
41//
42// We generate descriptor contents:
43// - log number is set to zero
44// - next-file-number is set to 1 + largest file number we found
45// - last-sequence-number is set to largest sequence# found across
46// all tables (see 2c)
47// - compaction pointers are cleared
48// - every table file is added at level 0
49//
50// Possible optimization 1:
51// (a) Compute total size and use to pick appropriate max-level M
52// (b) Sort tables by largest sequence# in the table
53// (c) For each table: if it overlaps earlier table, place in level-0,
54// else place in level-M.
55// (d) We can provide options for time consistent recovery and unsafe recovery
56// (ignore checksum failure when applicable)
57// Possible optimization 2:
58// Store per-table metadata (smallest, largest, largest-seq#, ...)
59// in the table's meta section to speed up ScanTable.
60
61#ifndef ROCKSDB_LITE
62
63#ifndef __STDC_FORMAT_MACROS
64#define __STDC_FORMAT_MACROS
65#endif
66
67#include <inttypes.h>
68#include "db/builder.h"
69#include "db/db_impl.h"
70#include "db/dbformat.h"
71#include "db/log_reader.h"
72#include "db/log_writer.h"
73#include "db/memtable.h"
74#include "db/table_cache.h"
75#include "db/version_edit.h"
76#include "db/write_batch_internal.h"
77#include "options/cf_options.h"
78#include "rocksdb/comparator.h"
79#include "rocksdb/db.h"
80#include "rocksdb/env.h"
81#include "rocksdb/options.h"
82#include "rocksdb/write_buffer_manager.h"
83#include "table/scoped_arena_iterator.h"
84#include "util/file_reader_writer.h"
85#include "util/filename.h"
86#include "util/string_util.h"
87
88namespace rocksdb {
89
90namespace {
91
92class Repairer {
93 public:
94 Repairer(const std::string& dbname, const DBOptions& db_options,
95 const std::vector<ColumnFamilyDescriptor>& column_families,
96 const ColumnFamilyOptions& default_cf_opts,
97 const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs)
98 : dbname_(dbname),
99 env_(db_options.env),
100 env_options_(),
101 db_options_(SanitizeOptions(dbname_, db_options)),
11fdf7f2 102 immutable_db_options_(ImmutableDBOptions(db_options_)),
7c673cae 103 icmp_(default_cf_opts.comparator),
11fdf7f2
TL
104 default_cf_opts_(
105 SanitizeOptions(immutable_db_options_, default_cf_opts)),
7c673cae 106 default_cf_iopts_(
11fdf7f2
TL
107 ImmutableCFOptions(immutable_db_options_, default_cf_opts_)),
108 unknown_cf_opts_(
109 SanitizeOptions(immutable_db_options_, unknown_cf_opts)),
7c673cae
FG
110 create_unknown_cfs_(create_unknown_cfs),
111 raw_table_cache_(
112 // TableCache can be small since we expect each table to be opened
113 // once.
114 NewLRUCache(10, db_options_.table_cache_numshardbits)),
115 table_cache_(new TableCache(default_cf_iopts_, env_options_,
116 raw_table_cache_.get())),
117 wb_(db_options_.db_write_buffer_size),
118 wc_(db_options_.delayed_write_rate),
119 vset_(dbname_, &immutable_db_options_, env_options_,
120 raw_table_cache_.get(), &wb_, &wc_),
494da23a
TL
121 next_file_number_(1),
122 db_lock_(nullptr) {
7c673cae
FG
123 for (const auto& cfd : column_families) {
124 cf_name_to_opts_[cfd.name] = cfd.options;
125 }
126 }
127
128 const ColumnFamilyOptions* GetColumnFamilyOptions(
129 const std::string& cf_name) {
130 if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) {
131 if (create_unknown_cfs_) {
132 return &unknown_cf_opts_;
133 }
134 return nullptr;
135 }
136 return &cf_name_to_opts_[cf_name];
137 }
138
139 // Adds a column family to the VersionSet with cf_options_ and updates
140 // manifest.
141 Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
142 const auto* cf_opts = GetColumnFamilyOptions(cf_name);
143 if (cf_opts == nullptr) {
144 return Status::Corruption("Encountered unknown column family with name=" +
145 cf_name + ", id=" + ToString(cf_id));
146 }
147 Options opts(db_options_, *cf_opts);
148 MutableCFOptions mut_cf_opts(opts);
149
150 VersionEdit edit;
151 edit.SetComparatorName(opts.comparator->Name());
152 edit.SetLogNumber(0);
153 edit.SetColumnFamily(cf_id);
154 ColumnFamilyData* cfd;
155 cfd = nullptr;
156 edit.AddColumnFamily(cf_name);
157
158 mutex_.Lock();
159 Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_,
160 nullptr /* db_directory */,
161 false /* new_descriptor_log */, cf_opts);
162 mutex_.Unlock();
163 return status;
164 }
165
166 ~Repairer() {
494da23a
TL
167 if (db_lock_ != nullptr) {
168 env_->UnlockFile(db_lock_);
169 }
7c673cae
FG
170 delete table_cache_;
171 }
172
173 Status Run() {
494da23a
TL
174 Status status = env_->LockFile(LockFileName(dbname_), &db_lock_);
175 if (!status.ok()) {
176 return status;
177 }
178 status = FindFiles();
7c673cae
FG
179 if (status.ok()) {
180 // Discard older manifests and start a fresh one
181 for (size_t i = 0; i < manifests_.size(); i++) {
182 ArchiveFile(dbname_ + "/" + manifests_[i]);
183 }
184 // Just create a DBImpl temporarily so we can reuse NewDB()
185 DBImpl* db_impl = new DBImpl(db_options_, dbname_);
186 status = db_impl->NewDB();
187 delete db_impl;
188 }
11fdf7f2 189
7c673cae
FG
190 if (status.ok()) {
191 // Recover using the fresh manifest created by NewDB()
192 status =
193 vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false);
194 }
195 if (status.ok()) {
196 // Need to scan existing SST files first so the column families are
197 // created before we process WAL files
198 ExtractMetaData();
199
200 // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
201 // extract -- we need to clear it here since metadata for existing SST
202 // files has been extracted already
203 table_fds_.clear();
204 ConvertLogFilesToTables();
205 ExtractMetaData();
206 status = AddTables();
207 }
208 if (status.ok()) {
209 uint64_t bytes = 0;
210 for (size_t i = 0; i < tables_.size(); i++) {
211 bytes += tables_[i].meta.fd.GetFileSize();
212 }
213 ROCKS_LOG_WARN(db_options_.info_log,
214 "**** Repaired rocksdb %s; "
215 "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
11fdf7f2 216 " bytes. "
7c673cae
FG
217 "Some data may have been lost. "
218 "****",
219 dbname_.c_str(), tables_.size(), bytes);
220 }
221 return status;
222 }
223
224 private:
225 struct TableInfo {
226 FileMetaData meta;
227 uint32_t column_family_id;
228 std::string column_family_name;
229 SequenceNumber min_sequence;
230 SequenceNumber max_sequence;
231 };
232
233 std::string const dbname_;
234 Env* const env_;
235 const EnvOptions env_options_;
236 const DBOptions db_options_;
237 const ImmutableDBOptions immutable_db_options_;
238 const InternalKeyComparator icmp_;
239 const ColumnFamilyOptions default_cf_opts_;
240 const ImmutableCFOptions default_cf_iopts_; // table_cache_ holds reference
241 const ColumnFamilyOptions unknown_cf_opts_;
242 const bool create_unknown_cfs_;
243 std::shared_ptr<Cache> raw_table_cache_;
244 TableCache* table_cache_;
245 WriteBufferManager wb_;
246 WriteController wc_;
247 VersionSet vset_;
248 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;
249 InstrumentedMutex mutex_;
250
251 std::vector<std::string> manifests_;
252 std::vector<FileDescriptor> table_fds_;
253 std::vector<uint64_t> logs_;
254 std::vector<TableInfo> tables_;
255 uint64_t next_file_number_;
494da23a
TL
256 // Lock over the persistent DB state. Non-nullptr iff successfully
257 // acquired.
258 FileLock* db_lock_;
7c673cae
FG
259
260 Status FindFiles() {
261 std::vector<std::string> filenames;
262 bool found_file = false;
11fdf7f2
TL
263 std::vector<std::string> to_search_paths;
264
7c673cae 265 for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
11fdf7f2
TL
266 to_search_paths.push_back(db_options_.db_paths[path_id].path);
267 }
268
269 // search wal_dir if user uses a customize wal_dir
270 bool same = false;
271 Status status = env_->AreFilesSame(db_options_.wal_dir, dbname_, &same);
272 if (status.IsNotSupported()) {
273 same = db_options_.wal_dir == dbname_;
274 status = Status::OK();
275 } else if (!status.ok()) {
276 return status;
277 }
278
279 if (!same) {
280 to_search_paths.push_back(db_options_.wal_dir);
281 }
282
283 for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
284 status = env_->GetChildren(to_search_paths[path_id], &filenames);
7c673cae
FG
285 if (!status.ok()) {
286 return status;
287 }
288 if (!filenames.empty()) {
289 found_file = true;
290 }
291
292 uint64_t number;
293 FileType type;
294 for (size_t i = 0; i < filenames.size(); i++) {
295 if (ParseFileName(filenames[i], &number, &type)) {
296 if (type == kDescriptorFile) {
7c673cae
FG
297 manifests_.push_back(filenames[i]);
298 } else {
299 if (number + 1 > next_file_number_) {
300 next_file_number_ = number + 1;
301 }
302 if (type == kLogFile) {
7c673cae
FG
303 logs_.push_back(number);
304 } else if (type == kTableFile) {
305 table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
306 0);
307 } else {
308 // Ignore other files
309 }
310 }
311 }
312 }
313 }
314 if (!found_file) {
315 return Status::Corruption(dbname_, "repair found no files");
316 }
317 return Status::OK();
318 }
319
320 void ConvertLogFilesToTables() {
321 for (size_t i = 0; i < logs_.size(); i++) {
11fdf7f2
TL
322 // we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
323 std::string logname = LogFileName(db_options_.wal_dir, logs_[i]);
7c673cae
FG
324 Status status = ConvertLogToTable(logs_[i]);
325 if (!status.ok()) {
326 ROCKS_LOG_WARN(db_options_.info_log,
327 "Log #%" PRIu64 ": ignoring conversion error: %s",
328 logs_[i], status.ToString().c_str());
329 }
330 ArchiveFile(logname);
331 }
332 }
333
334 Status ConvertLogToTable(uint64_t log) {
335 struct LogReporter : public log::Reader::Reporter {
336 Env* env;
337 std::shared_ptr<Logger> info_log;
338 uint64_t lognum;
494da23a 339 void Corruption(size_t bytes, const Status& s) override {
7c673cae
FG
340 // We print error messages for corruption, but continue repairing.
341 ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
342 lognum, static_cast<int>(bytes), s.ToString().c_str());
343 }
344 };
345
346 // Open the log file
11fdf7f2 347 std::string logname = LogFileName(db_options_.wal_dir, log);
494da23a 348 std::unique_ptr<SequentialFile> lfile;
11fdf7f2
TL
349 Status status = env_->NewSequentialFile(
350 logname, &lfile, env_->OptimizeForLogRead(env_options_));
7c673cae
FG
351 if (!status.ok()) {
352 return status;
353 }
494da23a 354 std::unique_ptr<SequentialFileReader> lfile_reader(
11fdf7f2 355 new SequentialFileReader(std::move(lfile), logname));
7c673cae
FG
356
357 // Create the log reader.
358 LogReporter reporter;
359 reporter.env = env_;
360 reporter.info_log = db_options_.info_log;
361 reporter.lognum = log;
362 // We intentionally make log::Reader do checksumming so that
363 // corruptions cause entire commits to be skipped instead of
364 // propagating bad information (like overly large sequence
365 // numbers).
366 log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
11fdf7f2 367 true /*enable checksum*/, log);
7c673cae
FG
368
369 // Initialize per-column family memtables
370 for (auto* cfd : *vset_.GetColumnFamilySet()) {
371 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
372 kMaxSequenceNumber);
373 }
374 auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
375
376 // Read all the records and add to a memtable
377 std::string scratch;
378 Slice record;
379 WriteBatch batch;
380 int counter = 0;
381 while (reader.ReadRecord(&record, &scratch)) {
382 if (record.size() < WriteBatchInternal::kHeader) {
383 reporter.Corruption(
384 record.size(), Status::Corruption("log record too small"));
385 continue;
386 }
387 WriteBatchInternal::SetContents(&batch, record);
388 status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr);
389 if (status.ok()) {
390 counter += WriteBatchInternal::Count(&batch);
391 } else {
392 ROCKS_LOG_WARN(db_options_.info_log, "Log #%" PRIu64 ": ignoring %s",
393 log, status.ToString().c_str());
394 status = Status::OK(); // Keep going with rest of file
395 }
396 }
397
398 // Dump a table for each column family with entries in this log file.
399 for (auto* cfd : *vset_.GetColumnFamilySet()) {
400 // Do not record a version edit for this conversion to a Table
401 // since ExtractMetaData() will also generate edits.
402 MemTable* mem = cfd->mem();
403 if (mem->IsEmpty()) {
404 continue;
405 }
406
407 FileMetaData meta;
408 meta.fd = FileDescriptor(next_file_number_++, 0, 0);
409 ReadOptions ro;
410 ro.total_order_seek = true;
411 Arena arena;
412 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
11fdf7f2
TL
413 int64_t _current_time = 0;
414 status = env_->GetCurrentTime(&_current_time); // ignore error
415 const uint64_t current_time = static_cast<uint64_t>(_current_time);
416 SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
417
418 auto write_hint = cfd->CalculateSSTWriteHint(0);
494da23a
TL
419 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
420 range_del_iters;
421 auto range_del_iter =
422 mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
423 if (range_del_iter != nullptr) {
424 range_del_iters.emplace_back(range_del_iter);
425 }
7c673cae
FG
426 status = BuildTable(
427 dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
494da23a 428 env_options_, table_cache_, iter.get(), std::move(range_del_iters),
7c673cae
FG
429 &meta, cfd->internal_comparator(),
430 cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
11fdf7f2 431 {}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
494da23a
TL
432 0 /* sample_for_compression */, CompressionOptions(), false,
433 nullptr /* internal_stats */, TableFileCreationReason::kRecovery,
434 nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
435 nullptr /* table_properties */, -1 /* level */, current_time,
436 write_hint);
7c673cae
FG
437 ROCKS_LOG_INFO(db_options_.info_log,
438 "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
439 log, counter, meta.fd.GetNumber(),
440 status.ToString().c_str());
441 if (status.ok()) {
442 if (meta.fd.GetFileSize() > 0) {
443 table_fds_.push_back(meta.fd);
444 }
445 } else {
446 break;
447 }
448 }
449 delete cf_mems;
450 return status;
451 }
452
453 void ExtractMetaData() {
454 for (size_t i = 0; i < table_fds_.size(); i++) {
455 TableInfo t;
456 t.meta.fd = table_fds_[i];
457 Status status = ScanTable(&t);
458 if (!status.ok()) {
459 std::string fname = TableFileName(
460 db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
461 char file_num_buf[kFormatFileNumberBufSize];
462 FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
463 file_num_buf, sizeof(file_num_buf));
464 ROCKS_LOG_WARN(db_options_.info_log, "Table #%s: ignoring %s",
465 file_num_buf, status.ToString().c_str());
466 ArchiveFile(fname);
467 } else {
468 tables_.push_back(t);
469 }
470 }
471 }
472
473 Status ScanTable(TableInfo* t) {
474 std::string fname = TableFileName(
475 db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId());
476 int counter = 0;
477 uint64_t file_size;
478 Status status = env_->GetFileSize(fname, &file_size);
479 t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
480 file_size);
481 std::shared_ptr<const TableProperties> props;
482 if (status.ok()) {
483 status = table_cache_->GetTableProperties(env_options_, icmp_, t->meta.fd,
484 &props);
485 }
486 if (status.ok()) {
487 t->column_family_id = static_cast<uint32_t>(props->column_family_id);
488 if (t->column_family_id ==
489 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
490 ROCKS_LOG_WARN(
491 db_options_.info_log,
492 "Table #%" PRIu64
493 ": column family unknown (probably due to legacy format); "
494 "adding to default column family id 0.",
495 t->meta.fd.GetNumber());
496 t->column_family_id = 0;
497 }
498
499 if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) ==
500 nullptr) {
501 status =
502 AddColumnFamily(props->column_family_name, t->column_family_id);
503 }
504 }
505 ColumnFamilyData* cfd = nullptr;
506 if (status.ok()) {
507 cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
508 if (cfd->GetName() != props->column_family_name) {
509 ROCKS_LOG_ERROR(
510 db_options_.info_log,
511 "Table #%" PRIu64
512 ": inconsistent column family name '%s'; expected '%s' for column "
513 "family id %" PRIu32 ".",
514 t->meta.fd.GetNumber(), props->column_family_name.c_str(),
515 cfd->GetName().c_str(), t->column_family_id);
516 status = Status::Corruption(dbname_, "inconsistent column family name");
517 }
518 }
519 if (status.ok()) {
520 InternalIterator* iter = table_cache_->NewIterator(
11fdf7f2
TL
521 ReadOptions(), env_options_, cfd->internal_comparator(), t->meta,
522 nullptr /* range_del_agg */,
523 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
7c673cae
FG
524 bool empty = true;
525 ParsedInternalKey parsed;
526 t->min_sequence = 0;
527 t->max_sequence = 0;
528 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
529 Slice key = iter->key();
530 if (!ParseInternalKey(key, &parsed)) {
531 ROCKS_LOG_ERROR(db_options_.info_log,
532 "Table #%" PRIu64 ": unparsable key %s",
533 t->meta.fd.GetNumber(), EscapeString(key).c_str());
534 continue;
535 }
536
537 counter++;
538 if (empty) {
539 empty = false;
540 t->meta.smallest.DecodeFrom(key);
541 t->min_sequence = parsed.sequence;
542 }
543 t->meta.largest.DecodeFrom(key);
544 if (parsed.sequence < t->min_sequence) {
545 t->min_sequence = parsed.sequence;
546 }
547 if (parsed.sequence > t->max_sequence) {
548 t->max_sequence = parsed.sequence;
549 }
550 }
551 if (!iter->status().ok()) {
552 status = iter->status();
553 }
554 delete iter;
555
556 ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s",
557 t->meta.fd.GetNumber(), counter,
558 status.ToString().c_str());
559 }
560 return status;
561 }
562
563 Status AddTables() {
564 std::unordered_map<uint32_t, std::vector<const TableInfo*>> cf_id_to_tables;
565 SequenceNumber max_sequence = 0;
566 for (size_t i = 0; i < tables_.size(); i++) {
567 cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
568 if (max_sequence < tables_[i].max_sequence) {
569 max_sequence = tables_[i].max_sequence;
570 }
571 }
11fdf7f2
TL
572 vset_.SetLastAllocatedSequence(max_sequence);
573 vset_.SetLastPublishedSequence(max_sequence);
7c673cae
FG
574 vset_.SetLastSequence(max_sequence);
575
576 for (const auto& cf_id_and_tables : cf_id_to_tables) {
577 auto* cfd =
578 vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first);
579 VersionEdit edit;
580 edit.SetComparatorName(cfd->user_comparator()->Name());
581 edit.SetLogNumber(0);
582 edit.SetNextFile(next_file_number_);
583 edit.SetColumnFamily(cfd->GetID());
584
585 // TODO(opt): separate out into multiple levels
586 for (const auto* table : cf_id_and_tables.second) {
587 edit.AddFile(0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
588 table->meta.fd.GetFileSize(), table->meta.smallest,
589 table->meta.largest, table->min_sequence,
590 table->max_sequence, table->meta.marked_for_compaction);
591 }
11fdf7f2
TL
592 assert(next_file_number_ > 0);
593 vset_.MarkFileNumberUsed(next_file_number_ - 1);
7c673cae
FG
594 mutex_.Lock();
595 Status status = vset_.LogAndApply(
596 cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
597 nullptr /* db_directory */, false /* new_descriptor_log */);
598 mutex_.Unlock();
599 if (!status.ok()) {
600 return status;
601 }
602 }
603 return Status::OK();
604 }
605
606 void ArchiveFile(const std::string& fname) {
607 // Move into another directory. E.g., for
608 // dir/foo
609 // rename to
610 // dir/lost/foo
611 const char* slash = strrchr(fname.c_str(), '/');
612 std::string new_dir;
613 if (slash != nullptr) {
614 new_dir.assign(fname.data(), slash - fname.data());
615 }
616 new_dir.append("/lost");
617 env_->CreateDir(new_dir); // Ignore error
618 std::string new_file = new_dir;
619 new_file.append("/");
620 new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
621 Status s = env_->RenameFile(fname, new_file);
622 ROCKS_LOG_INFO(db_options_.info_log, "Archiving %s: %s\n", fname.c_str(),
623 s.ToString().c_str());
624 }
625};
626
627Status GetDefaultCFOptions(
628 const std::vector<ColumnFamilyDescriptor>& column_families,
629 ColumnFamilyOptions* res) {
630 assert(res != nullptr);
631 auto iter = std::find_if(column_families.begin(), column_families.end(),
632 [](const ColumnFamilyDescriptor& cfd) {
633 return cfd.name == kDefaultColumnFamilyName;
634 });
635 if (iter == column_families.end()) {
636 return Status::InvalidArgument(
637 "column_families", "Must contain entry for default column family");
638 }
639 *res = iter->options;
640 return Status::OK();
641}
642} // anonymous namespace
643
644Status RepairDB(const std::string& dbname, const DBOptions& db_options,
11fdf7f2
TL
645 const std::vector<ColumnFamilyDescriptor>& column_families
646 ) {
7c673cae
FG
647 ColumnFamilyOptions default_cf_opts;
648 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
649 if (status.ok()) {
11fdf7f2
TL
650 Repairer repairer(dbname, db_options, column_families,
651 default_cf_opts,
7c673cae
FG
652 ColumnFamilyOptions() /* unknown_cf_opts */,
653 false /* create_unknown_cfs */);
654 status = repairer.Run();
655 }
656 return status;
657}
658
659Status RepairDB(const std::string& dbname, const DBOptions& db_options,
660 const std::vector<ColumnFamilyDescriptor>& column_families,
661 const ColumnFamilyOptions& unknown_cf_opts) {
662 ColumnFamilyOptions default_cf_opts;
663 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
664 if (status.ok()) {
11fdf7f2
TL
665 Repairer repairer(dbname, db_options,
666 column_families, default_cf_opts,
7c673cae
FG
667 unknown_cf_opts, true /* create_unknown_cfs */);
668 status = repairer.Run();
669 }
670 return status;
671}
672
673Status RepairDB(const std::string& dbname, const Options& options) {
674 DBOptions db_options(options);
675 ColumnFamilyOptions cf_options(options);
11fdf7f2
TL
676 Repairer repairer(dbname, db_options,
677 {}, cf_options /* default_cf_opts */,
7c673cae
FG
678 cf_options /* unknown_cf_opts */,
679 true /* create_unknown_cfs */);
680 return repairer.Run();
681}
682
683} // namespace rocksdb
684
685#endif // ROCKSDB_LITE