]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/repair.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / repair.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // Repairer does best effort recovery to recover as much data as possible after
11 // a disaster without compromising consistency. It does not guarantee bringing
12 // the database to a time consistent state.
13 //
14 // Repair process is broken into 4 phases:
15 // (a) Find files
16 // (b) Convert logs to tables
17 // (c) Extract metadata
18 // (d) Write Descriptor
19 //
20 // (a) Find files
21 //
22 // The repairer goes through all the files in the directory, and classifies them
23 // based on their file name. Any file that cannot be identified by name will be
24 // ignored.
25 //
26 // (b) Convert logs to table
27 //
28 // Every log file that is active is replayed. All sections of the file where the
29 // checksum does not match is skipped over. We intentionally give preference to
30 // data consistency.
31 //
32 // (c) Extract metadata
33 //
34 // We scan every table to compute
35 // (1) smallest/largest for the table
36 // (2) largest sequence number in the table
37 // (3) oldest blob file referred to by the table (if applicable)
38 //
39 // If we are unable to scan the file, then we ignore the table.
40 //
41 // (d) Write Descriptor
42 //
43 // We generate descriptor contents:
44 // - log number is set to zero
45 // - next-file-number is set to 1 + largest file number we found
46 // - last-sequence-number is set to largest sequence# found across
47 // all tables (see 2c)
48 // - compaction pointers are cleared
49 // - every table file is added at level 0
50 //
51 // Possible optimization 1:
52 // (a) Compute total size and use to pick appropriate max-level M
53 // (b) Sort tables by largest sequence# in the table
54 // (c) For each table: if it overlaps earlier table, place in level-0,
55 // else place in level-M.
56 // (d) We can provide options for time consistent recovery and unsafe recovery
57 // (ignore checksum failure when applicable)
58 // Possible optimization 2:
59 // Store per-table metadata (smallest, largest, largest-seq#, ...)
60 // in the table's meta section to speed up ScanTable.
61
62 #ifndef ROCKSDB_LITE
63
64 #include <cinttypes>
65 #include "db/builder.h"
66 #include "db/db_impl/db_impl.h"
67 #include "db/dbformat.h"
68 #include "db/log_reader.h"
69 #include "db/log_writer.h"
70 #include "db/memtable.h"
71 #include "db/table_cache.h"
72 #include "db/version_edit.h"
73 #include "db/write_batch_internal.h"
74 #include "env/composite_env_wrapper.h"
75 #include "file/filename.h"
76 #include "file/writable_file_writer.h"
77 #include "options/cf_options.h"
78 #include "rocksdb/comparator.h"
79 #include "rocksdb/db.h"
80 #include "rocksdb/env.h"
81 #include "rocksdb/options.h"
82 #include "rocksdb/write_buffer_manager.h"
83 #include "table/scoped_arena_iterator.h"
84 #include "util/string_util.h"
85
86 namespace ROCKSDB_NAMESPACE {
87
88 namespace {
89
90 class Repairer {
91 public:
92 Repairer(const std::string& dbname, const DBOptions& db_options,
93 const std::vector<ColumnFamilyDescriptor>& column_families,
94 const ColumnFamilyOptions& default_cf_opts,
95 const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs)
96 : dbname_(dbname),
97 env_(db_options.env),
98 env_options_(),
99 db_options_(SanitizeOptions(dbname_, db_options)),
100 immutable_db_options_(ImmutableDBOptions(db_options_)),
101 icmp_(default_cf_opts.comparator),
102 default_cf_opts_(
103 SanitizeOptions(immutable_db_options_, default_cf_opts)),
104 default_cf_iopts_(
105 ImmutableCFOptions(immutable_db_options_, default_cf_opts_)),
106 unknown_cf_opts_(
107 SanitizeOptions(immutable_db_options_, unknown_cf_opts)),
108 create_unknown_cfs_(create_unknown_cfs),
109 raw_table_cache_(
110 // TableCache can be small since we expect each table to be opened
111 // once.
112 NewLRUCache(10, db_options_.table_cache_numshardbits)),
113 table_cache_(new TableCache(
114 default_cf_iopts_, env_options_, raw_table_cache_.get(),
115 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)),
116 wb_(db_options_.db_write_buffer_size),
117 wc_(db_options_.delayed_write_rate),
118 vset_(dbname_, &immutable_db_options_, env_options_,
119 raw_table_cache_.get(), &wb_, &wc_,
120 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr),
121 next_file_number_(1),
122 db_lock_(nullptr),
123 closed_(false) {
124 for (const auto& cfd : column_families) {
125 cf_name_to_opts_[cfd.name] = cfd.options;
126 }
127 }
128
129 const ColumnFamilyOptions* GetColumnFamilyOptions(
130 const std::string& cf_name) {
131 if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) {
132 if (create_unknown_cfs_) {
133 return &unknown_cf_opts_;
134 }
135 return nullptr;
136 }
137 return &cf_name_to_opts_[cf_name];
138 }
139
140 // Adds a column family to the VersionSet with cf_options_ and updates
141 // manifest.
142 Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
143 const auto* cf_opts = GetColumnFamilyOptions(cf_name);
144 if (cf_opts == nullptr) {
145 return Status::Corruption("Encountered unknown column family with name=" +
146 cf_name + ", id=" + ToString(cf_id));
147 }
148 Options opts(db_options_, *cf_opts);
149 MutableCFOptions mut_cf_opts(opts);
150
151 VersionEdit edit;
152 edit.SetComparatorName(opts.comparator->Name());
153 edit.SetLogNumber(0);
154 edit.SetColumnFamily(cf_id);
155 ColumnFamilyData* cfd;
156 cfd = nullptr;
157 edit.AddColumnFamily(cf_name);
158
159 mutex_.Lock();
160 Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_,
161 nullptr /* db_directory */,
162 false /* new_descriptor_log */, cf_opts);
163 mutex_.Unlock();
164 return status;
165 }
166
167 Status Close() {
168 Status s = Status::OK();
169 if (!closed_) {
170 if (db_lock_ != nullptr) {
171 s = env_->UnlockFile(db_lock_);
172 db_lock_ = nullptr;
173 }
174 closed_ = true;
175 }
176 return s;
177 }
178
179 ~Repairer() { Close().PermitUncheckedError(); }
180
181 Status Run() {
182 Status status = env_->LockFile(LockFileName(dbname_), &db_lock_);
183 if (!status.ok()) {
184 return status;
185 }
186 status = FindFiles();
187 DBImpl* db_impl = nullptr;
188 if (status.ok()) {
189 // Discard older manifests and start a fresh one
190 for (size_t i = 0; i < manifests_.size(); i++) {
191 ArchiveFile(dbname_ + "/" + manifests_[i]);
192 }
193 // Just create a DBImpl temporarily so we can reuse NewDB()
194 db_impl = new DBImpl(db_options_, dbname_);
195 // Also use this temp DBImpl to get a session id
196 status = db_impl->GetDbSessionId(db_session_id_);
197 }
198 if (status.ok()) {
199 status = db_impl->NewDB(/*new_filenames=*/nullptr);
200 }
201 delete db_impl;
202
203 if (status.ok()) {
204 // Recover using the fresh manifest created by NewDB()
205 status =
206 vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false);
207 }
208 if (status.ok()) {
209 // Need to scan existing SST files first so the column families are
210 // created before we process WAL files
211 ExtractMetaData();
212
213 // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
214 // extract -- we need to clear it here since metadata for existing SST
215 // files has been extracted already
216 table_fds_.clear();
217 ConvertLogFilesToTables();
218 ExtractMetaData();
219 status = AddTables();
220 }
221 if (status.ok()) {
222 uint64_t bytes = 0;
223 for (size_t i = 0; i < tables_.size(); i++) {
224 bytes += tables_[i].meta.fd.GetFileSize();
225 }
226 ROCKS_LOG_WARN(db_options_.info_log,
227 "**** Repaired rocksdb %s; "
228 "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
229 " bytes. "
230 "Some data may have been lost. "
231 "****",
232 dbname_.c_str(), tables_.size(), bytes);
233 }
234 return status;
235 }
236
237 private:
238 struct TableInfo {
239 FileMetaData meta;
240 uint32_t column_family_id;
241 std::string column_family_name;
242 };
243
244 std::string const dbname_;
245 std::string db_session_id_;
246 Env* const env_;
247 const EnvOptions env_options_;
248 const DBOptions db_options_;
249 const ImmutableDBOptions immutable_db_options_;
250 const InternalKeyComparator icmp_;
251 const ColumnFamilyOptions default_cf_opts_;
252 const ImmutableCFOptions default_cf_iopts_; // table_cache_ holds reference
253 const ColumnFamilyOptions unknown_cf_opts_;
254 const bool create_unknown_cfs_;
255 std::shared_ptr<Cache> raw_table_cache_;
256 std::unique_ptr<TableCache> table_cache_;
257 WriteBufferManager wb_;
258 WriteController wc_;
259 VersionSet vset_;
260 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;
261 InstrumentedMutex mutex_;
262
263 std::vector<std::string> manifests_;
264 std::vector<FileDescriptor> table_fds_;
265 std::vector<uint64_t> logs_;
266 std::vector<TableInfo> tables_;
267 uint64_t next_file_number_;
268 // Lock over the persistent DB state. Non-nullptr iff successfully
269 // acquired.
270 FileLock* db_lock_;
271 bool closed_;
272
273 Status FindFiles() {
274 std::vector<std::string> filenames;
275 bool found_file = false;
276 std::vector<std::string> to_search_paths;
277
278 for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
279 to_search_paths.push_back(db_options_.db_paths[path_id].path);
280 }
281
282 // search wal_dir if user uses a customize wal_dir
283 bool same = false;
284 Status status = env_->AreFilesSame(db_options_.wal_dir, dbname_, &same);
285 if (status.IsNotSupported()) {
286 same = db_options_.wal_dir == dbname_;
287 status = Status::OK();
288 } else if (!status.ok()) {
289 return status;
290 }
291
292 if (!same) {
293 to_search_paths.push_back(db_options_.wal_dir);
294 }
295
296 for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
297 status = env_->GetChildren(to_search_paths[path_id], &filenames);
298 if (!status.ok()) {
299 return status;
300 }
301 if (!filenames.empty()) {
302 found_file = true;
303 }
304
305 uint64_t number;
306 FileType type;
307 for (size_t i = 0; i < filenames.size(); i++) {
308 if (ParseFileName(filenames[i], &number, &type)) {
309 if (type == kDescriptorFile) {
310 manifests_.push_back(filenames[i]);
311 } else {
312 if (number + 1 > next_file_number_) {
313 next_file_number_ = number + 1;
314 }
315 if (type == kWalFile) {
316 logs_.push_back(number);
317 } else if (type == kTableFile) {
318 table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
319 0);
320 } else {
321 // Ignore other files
322 }
323 }
324 }
325 }
326 }
327 if (!found_file) {
328 return Status::Corruption(dbname_, "repair found no files");
329 }
330 return Status::OK();
331 }
332
333 void ConvertLogFilesToTables() {
334 for (size_t i = 0; i < logs_.size(); i++) {
335 // we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
336 std::string logname = LogFileName(db_options_.wal_dir, logs_[i]);
337 Status status = ConvertLogToTable(logs_[i]);
338 if (!status.ok()) {
339 ROCKS_LOG_WARN(db_options_.info_log,
340 "Log #%" PRIu64 ": ignoring conversion error: %s",
341 logs_[i], status.ToString().c_str());
342 }
343 ArchiveFile(logname);
344 }
345 }
346
347 Status ConvertLogToTable(uint64_t log) {
348 struct LogReporter : public log::Reader::Reporter {
349 Env* env;
350 std::shared_ptr<Logger> info_log;
351 uint64_t lognum;
352 void Corruption(size_t bytes, const Status& s) override {
353 // We print error messages for corruption, but continue repairing.
354 ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
355 lognum, static_cast<int>(bytes), s.ToString().c_str());
356 }
357 };
358
359 // Open the log file
360 std::string logname = LogFileName(db_options_.wal_dir, log);
361 std::unique_ptr<SequentialFile> lfile;
362 Status status = env_->NewSequentialFile(
363 logname, &lfile, env_->OptimizeForLogRead(env_options_));
364 if (!status.ok()) {
365 return status;
366 }
367 std::unique_ptr<SequentialFileReader> lfile_reader(new SequentialFileReader(
368 NewLegacySequentialFileWrapper(lfile), logname));
369
370 // Create the log reader.
371 LogReporter reporter;
372 reporter.env = env_;
373 reporter.info_log = db_options_.info_log;
374 reporter.lognum = log;
375 // We intentionally make log::Reader do checksumming so that
376 // corruptions cause entire commits to be skipped instead of
377 // propagating bad information (like overly large sequence
378 // numbers).
379 log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
380 true /*enable checksum*/, log);
381
382 // Initialize per-column family memtables
383 for (auto* cfd : *vset_.GetColumnFamilySet()) {
384 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
385 kMaxSequenceNumber);
386 }
387 auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
388
389 // Read all the records and add to a memtable
390 std::string scratch;
391 Slice record;
392 WriteBatch batch;
393 int counter = 0;
394 while (reader.ReadRecord(&record, &scratch)) {
395 if (record.size() < WriteBatchInternal::kHeader) {
396 reporter.Corruption(
397 record.size(), Status::Corruption("log record too small"));
398 continue;
399 }
400 Status record_status = WriteBatchInternal::SetContents(&batch, record);
401 if (record_status.ok()) {
402 record_status =
403 WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr);
404 }
405 if (record_status.ok()) {
406 counter += WriteBatchInternal::Count(&batch);
407 } else {
408 ROCKS_LOG_WARN(db_options_.info_log, "Log #%" PRIu64 ": ignoring %s",
409 log, record_status.ToString().c_str());
410 }
411 }
412
413 // Dump a table for each column family with entries in this log file.
414 for (auto* cfd : *vset_.GetColumnFamilySet()) {
415 // Do not record a version edit for this conversion to a Table
416 // since ExtractMetaData() will also generate edits.
417 MemTable* mem = cfd->mem();
418 if (mem->IsEmpty()) {
419 continue;
420 }
421
422 FileMetaData meta;
423 meta.fd = FileDescriptor(next_file_number_++, 0, 0);
424 ReadOptions ro;
425 ro.total_order_seek = true;
426 Arena arena;
427 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
428 int64_t _current_time = 0;
429 status = env_->GetCurrentTime(&_current_time); // ignore error
430 const uint64_t current_time = static_cast<uint64_t>(_current_time);
431 SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
432
433 auto write_hint = cfd->CalculateSSTWriteHint(0);
434 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
435 range_del_iters;
436 auto range_del_iter =
437 mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
438 if (range_del_iter != nullptr) {
439 range_del_iters.emplace_back(range_del_iter);
440 }
441
442 LegacyFileSystemWrapper fs(env_);
443 IOStatus io_s;
444 status = BuildTable(
445 dbname_, /* versions */ nullptr, immutable_db_options_,
446 *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), env_options_,
447 table_cache_.get(), iter.get(), std::move(range_del_iters), &meta,
448 nullptr /* blob_file_additions */, cfd->internal_comparator(),
449 cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
450 {}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
451 0 /* sample_for_compression */, CompressionOptions(), false,
452 nullptr /* internal_stats */, TableFileCreationReason::kRecovery,
453 &io_s, nullptr /*IOTracer*/, nullptr /* event_logger */,
454 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
455 -1 /* level */, current_time, 0 /* oldest_key_time */, write_hint,
456 0 /* file_creation_time */, "DB Repairer" /* db_id */,
457 db_session_id_);
458 ROCKS_LOG_INFO(db_options_.info_log,
459 "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
460 log, counter, meta.fd.GetNumber(),
461 status.ToString().c_str());
462 if (status.ok()) {
463 if (meta.fd.GetFileSize() > 0) {
464 table_fds_.push_back(meta.fd);
465 }
466 } else {
467 break;
468 }
469 }
470 delete cf_mems;
471 return status;
472 }
473
474 void ExtractMetaData() {
475 for (size_t i = 0; i < table_fds_.size(); i++) {
476 TableInfo t;
477 t.meta.fd = table_fds_[i];
478 Status status = ScanTable(&t);
479 if (!status.ok()) {
480 std::string fname = TableFileName(
481 db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
482 char file_num_buf[kFormatFileNumberBufSize];
483 FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
484 file_num_buf, sizeof(file_num_buf));
485 ROCKS_LOG_WARN(db_options_.info_log, "Table #%s: ignoring %s",
486 file_num_buf, status.ToString().c_str());
487 ArchiveFile(fname);
488 } else {
489 tables_.push_back(t);
490 }
491 }
492 }
493
494 Status ScanTable(TableInfo* t) {
495 std::string fname = TableFileName(
496 db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId());
497 int counter = 0;
498 uint64_t file_size;
499 Status status = env_->GetFileSize(fname, &file_size);
500 t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
501 file_size);
502 std::shared_ptr<const TableProperties> props;
503 if (status.ok()) {
504 status = table_cache_->GetTableProperties(env_options_, icmp_, t->meta.fd,
505 &props);
506 }
507 if (status.ok()) {
508 t->column_family_id = static_cast<uint32_t>(props->column_family_id);
509 if (t->column_family_id ==
510 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
511 ROCKS_LOG_WARN(
512 db_options_.info_log,
513 "Table #%" PRIu64
514 ": column family unknown (probably due to legacy format); "
515 "adding to default column family id 0.",
516 t->meta.fd.GetNumber());
517 t->column_family_id = 0;
518 }
519
520 if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) ==
521 nullptr) {
522 status =
523 AddColumnFamily(props->column_family_name, t->column_family_id);
524 }
525 t->meta.oldest_ancester_time = props->creation_time;
526 }
527 ColumnFamilyData* cfd = nullptr;
528 if (status.ok()) {
529 cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
530 if (cfd->GetName() != props->column_family_name) {
531 ROCKS_LOG_ERROR(
532 db_options_.info_log,
533 "Table #%" PRIu64
534 ": inconsistent column family name '%s'; expected '%s' for column "
535 "family id %" PRIu32 ".",
536 t->meta.fd.GetNumber(), props->column_family_name.c_str(),
537 cfd->GetName().c_str(), t->column_family_id);
538 status = Status::Corruption(dbname_, "inconsistent column family name");
539 }
540 }
541 if (status.ok()) {
542 ReadOptions ropts;
543 ropts.total_order_seek = true;
544 InternalIterator* iter = table_cache_->NewIterator(
545 ropts, env_options_, cfd->internal_comparator(), t->meta,
546 nullptr /* range_del_agg */,
547 cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
548 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
549 TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
550 /*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
551 /*smallest_compaction_key=*/nullptr,
552 /*largest_compaction_key=*/nullptr,
553 /*allow_unprepared_value=*/false);
554 ParsedInternalKey parsed;
555 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
556 Slice key = iter->key();
557 Status pik_status =
558 ParseInternalKey(key, &parsed, db_options_.allow_data_in_errors);
559 if (!pik_status.ok()) {
560 ROCKS_LOG_ERROR(db_options_.info_log,
561 "Table #%" PRIu64 ": unparsable key - %s",
562 t->meta.fd.GetNumber(), pik_status.getState());
563 continue;
564 }
565
566 counter++;
567
568 t->meta.UpdateBoundaries(key, iter->value(), parsed.sequence,
569 parsed.type);
570 }
571 if (!iter->status().ok()) {
572 status = iter->status();
573 }
574 delete iter;
575
576 ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s",
577 t->meta.fd.GetNumber(), counter,
578 status.ToString().c_str());
579 }
580 return status;
581 }
582
583 Status AddTables() {
584 std::unordered_map<uint32_t, std::vector<const TableInfo*>> cf_id_to_tables;
585 SequenceNumber max_sequence = 0;
586 for (size_t i = 0; i < tables_.size(); i++) {
587 cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
588 if (max_sequence < tables_[i].meta.fd.largest_seqno) {
589 max_sequence = tables_[i].meta.fd.largest_seqno;
590 }
591 }
592 vset_.SetLastAllocatedSequence(max_sequence);
593 vset_.SetLastPublishedSequence(max_sequence);
594 vset_.SetLastSequence(max_sequence);
595
596 for (const auto& cf_id_and_tables : cf_id_to_tables) {
597 auto* cfd =
598 vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first);
599 VersionEdit edit;
600 edit.SetComparatorName(cfd->user_comparator()->Name());
601 edit.SetLogNumber(0);
602 edit.SetNextFile(next_file_number_);
603 edit.SetColumnFamily(cfd->GetID());
604
605 // TODO(opt): separate out into multiple levels
606 for (const auto* table : cf_id_and_tables.second) {
607 edit.AddFile(
608 0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
609 table->meta.fd.GetFileSize(), table->meta.smallest,
610 table->meta.largest, table->meta.fd.smallest_seqno,
611 table->meta.fd.largest_seqno, table->meta.marked_for_compaction,
612 table->meta.oldest_blob_file_number,
613 table->meta.oldest_ancester_time, table->meta.file_creation_time,
614 table->meta.file_checksum, table->meta.file_checksum_func_name);
615 }
616 assert(next_file_number_ > 0);
617 vset_.MarkFileNumberUsed(next_file_number_ - 1);
618 mutex_.Lock();
619 Status status = vset_.LogAndApply(
620 cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
621 nullptr /* db_directory */, false /* new_descriptor_log */);
622 mutex_.Unlock();
623 if (!status.ok()) {
624 return status;
625 }
626 }
627 return Status::OK();
628 }
629
630 void ArchiveFile(const std::string& fname) {
631 // Move into another directory. E.g., for
632 // dir/foo
633 // rename to
634 // dir/lost/foo
635 const char* slash = strrchr(fname.c_str(), '/');
636 std::string new_dir;
637 if (slash != nullptr) {
638 new_dir.assign(fname.data(), slash - fname.data());
639 }
640 new_dir.append("/lost");
641 env_->CreateDir(new_dir).PermitUncheckedError(); // Ignore error
642 std::string new_file = new_dir;
643 new_file.append("/");
644 new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
645 Status s = env_->RenameFile(fname, new_file);
646 ROCKS_LOG_INFO(db_options_.info_log, "Archiving %s: %s\n", fname.c_str(),
647 s.ToString().c_str());
648 }
649 };
650
651 Status GetDefaultCFOptions(
652 const std::vector<ColumnFamilyDescriptor>& column_families,
653 ColumnFamilyOptions* res) {
654 assert(res != nullptr);
655 auto iter = std::find_if(column_families.begin(), column_families.end(),
656 [](const ColumnFamilyDescriptor& cfd) {
657 return cfd.name == kDefaultColumnFamilyName;
658 });
659 if (iter == column_families.end()) {
660 return Status::InvalidArgument(
661 "column_families", "Must contain entry for default column family");
662 }
663 *res = iter->options;
664 return Status::OK();
665 }
666 } // anonymous namespace
667
668 Status RepairDB(const std::string& dbname, const DBOptions& db_options,
669 const std::vector<ColumnFamilyDescriptor>& column_families
670 ) {
671 ColumnFamilyOptions default_cf_opts;
672 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
673 if (!status.ok()) {
674 return status;
675 }
676
677 Repairer repairer(dbname, db_options, column_families, default_cf_opts,
678 ColumnFamilyOptions() /* unknown_cf_opts */,
679 false /* create_unknown_cfs */);
680 status = repairer.Run();
681 if (status.ok()) {
682 status = repairer.Close();
683 }
684 return status;
685 }
686
687 Status RepairDB(const std::string& dbname, const DBOptions& db_options,
688 const std::vector<ColumnFamilyDescriptor>& column_families,
689 const ColumnFamilyOptions& unknown_cf_opts) {
690 ColumnFamilyOptions default_cf_opts;
691 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
692 if (!status.ok()) {
693 return status;
694 }
695
696 Repairer repairer(dbname, db_options, column_families, default_cf_opts,
697 unknown_cf_opts, true /* create_unknown_cfs */);
698 status = repairer.Run();
699 if (status.ok()) {
700 status = repairer.Close();
701 }
702 return status;
703 }
704
705 Status RepairDB(const std::string& dbname, const Options& options) {
706 Options opts(options);
707 DBOptions db_options(opts);
708 ColumnFamilyOptions cf_options(opts);
709
710 Repairer repairer(dbname, db_options,
711 {}, cf_options /* default_cf_opts */,
712 cf_options /* unknown_cf_opts */,
713 true /* create_unknown_cfs */);
714 Status status = repairer.Run();
715 if (status.ok()) {
716 status = repairer.Close();
717 }
718 return status;
719 }
720
721 } // namespace ROCKSDB_NAMESPACE
722
723 #endif // ROCKSDB_LITE