]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/repair.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rocksdb / db / repair.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // Repairer does best effort recovery to recover as much data as possible after
11 // a disaster without compromising consistency. It does not guarantee bringing
12 // the database to a time consistent state.
13 //
14 // Repair process is broken into 4 phases:
15 // (a) Find files
16 // (b) Convert logs to tables
17 // (c) Extract metadata
18 // (d) Write Descriptor
19 //
20 // (a) Find files
21 //
22 // The repairer goes through all the files in the directory, and classifies them
23 // based on their file name. Any file that cannot be identified by name will be
24 // ignored.
25 //
26 // (b) Convert logs to table
27 //
28 // Every log file that is active is replayed. All sections of the file where the
29 // checksum does not match is skipped over. We intentionally give preference to
30 // data consistency.
31 //
32 // (c) Extract metadata
33 //
34 // We scan every table to compute
35 // (1) smallest/largest for the table
36 // (2) largest sequence number in the table
37 // (3) oldest blob file referred to by the table (if applicable)
38 //
39 // If we are unable to scan the file, then we ignore the table.
40 //
41 // (d) Write Descriptor
42 //
43 // We generate descriptor contents:
44 // - log number is set to zero
45 // - next-file-number is set to 1 + largest file number we found
46 // - last-sequence-number is set to largest sequence# found across
47 // all tables (see 2c)
48 // - compaction pointers are cleared
49 // - every table file is added at level 0
50 //
51 // Possible optimization 1:
52 // (a) Compute total size and use to pick appropriate max-level M
53 // (b) Sort tables by largest sequence# in the table
54 // (c) For each table: if it overlaps earlier table, place in level-0,
55 // else place in level-M.
56 // (d) We can provide options for time consistent recovery and unsafe recovery
57 // (ignore checksum failure when applicable)
58 // Possible optimization 2:
59 // Store per-table metadata (smallest, largest, largest-seq#, ...)
60 // in the table's meta section to speed up ScanTable.
61
62 #ifndef ROCKSDB_LITE
63
64 #include <cinttypes>
65
66 #include "db/builder.h"
67 #include "db/db_impl/db_impl.h"
68 #include "db/dbformat.h"
69 #include "db/log_reader.h"
70 #include "db/log_writer.h"
71 #include "db/memtable.h"
72 #include "db/table_cache.h"
73 #include "db/version_edit.h"
74 #include "db/write_batch_internal.h"
75 #include "file/filename.h"
76 #include "file/writable_file_writer.h"
77 #include "logging/logging.h"
78 #include "options/cf_options.h"
79 #include "rocksdb/comparator.h"
80 #include "rocksdb/db.h"
81 #include "rocksdb/env.h"
82 #include "rocksdb/options.h"
83 #include "rocksdb/write_buffer_manager.h"
84 #include "table/scoped_arena_iterator.h"
85 #include "table/unique_id_impl.h"
86 #include "util/string_util.h"
87
88 namespace ROCKSDB_NAMESPACE {
89
90 namespace {
91
92 class Repairer {
93 public:
94 Repairer(const std::string& dbname, const DBOptions& db_options,
95 const std::vector<ColumnFamilyDescriptor>& column_families,
96 const ColumnFamilyOptions& default_cf_opts,
97 const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs)
98 : dbname_(dbname),
99 db_session_id_(DBImpl::GenerateDbSessionId(db_options.env)),
100 env_(db_options.env),
101 file_options_(),
102 db_options_(SanitizeOptions(dbname_, db_options)),
103 immutable_db_options_(ImmutableDBOptions(db_options_)),
104 icmp_(default_cf_opts.comparator),
105 default_cf_opts_(
106 SanitizeOptions(immutable_db_options_, default_cf_opts)),
107 default_iopts_(
108 ImmutableOptions(immutable_db_options_, default_cf_opts_)),
109 unknown_cf_opts_(
110 SanitizeOptions(immutable_db_options_, unknown_cf_opts)),
111 create_unknown_cfs_(create_unknown_cfs),
112 raw_table_cache_(
113 // TableCache can be small since we expect each table to be opened
114 // once.
115 NewLRUCache(10, db_options_.table_cache_numshardbits)),
116 table_cache_(new TableCache(default_iopts_, &file_options_,
117 raw_table_cache_.get(),
118 /*block_cache_tracer=*/nullptr,
119 /*io_tracer=*/nullptr, db_session_id_)),
120 wb_(db_options_.db_write_buffer_size),
121 wc_(db_options_.delayed_write_rate),
122 vset_(dbname_, &immutable_db_options_, file_options_,
123 raw_table_cache_.get(), &wb_, &wc_,
124 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
125 /*db_id=*/"", db_session_id_),
126 next_file_number_(1),
127 db_lock_(nullptr),
128 closed_(false) {
129 for (const auto& cfd : column_families) {
130 cf_name_to_opts_[cfd.name] = cfd.options;
131 }
132 }
133
134 const ColumnFamilyOptions* GetColumnFamilyOptions(
135 const std::string& cf_name) {
136 if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) {
137 if (create_unknown_cfs_) {
138 return &unknown_cf_opts_;
139 }
140 return nullptr;
141 }
142 return &cf_name_to_opts_[cf_name];
143 }
144
145 // Adds a column family to the VersionSet with cf_options_ and updates
146 // manifest.
147 Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
148 const auto* cf_opts = GetColumnFamilyOptions(cf_name);
149 if (cf_opts == nullptr) {
150 return Status::Corruption("Encountered unknown column family with name=" +
151 cf_name + ", id=" + std::to_string(cf_id));
152 }
153 Options opts(db_options_, *cf_opts);
154 MutableCFOptions mut_cf_opts(opts);
155
156 VersionEdit edit;
157 edit.SetComparatorName(opts.comparator->Name());
158 edit.SetLogNumber(0);
159 edit.SetColumnFamily(cf_id);
160 ColumnFamilyData* cfd;
161 cfd = nullptr;
162 edit.AddColumnFamily(cf_name);
163
164 mutex_.Lock();
165 std::unique_ptr<FSDirectory> db_dir;
166 Status status = env_->GetFileSystem()->NewDirectory(dbname_, IOOptions(),
167 &db_dir, nullptr);
168 if (status.ok()) {
169 status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_, db_dir.get(),
170 false /* new_descriptor_log */, cf_opts);
171 }
172 mutex_.Unlock();
173 return status;
174 }
175
176 Status Close() {
177 Status s = Status::OK();
178 if (!closed_) {
179 if (db_lock_ != nullptr) {
180 s = env_->UnlockFile(db_lock_);
181 db_lock_ = nullptr;
182 }
183 closed_ = true;
184 }
185 return s;
186 }
187
188 ~Repairer() { Close().PermitUncheckedError(); }
189
190 Status Run() {
191 Status status = env_->LockFile(LockFileName(dbname_), &db_lock_);
192 if (!status.ok()) {
193 return status;
194 }
195 status = FindFiles();
196 DBImpl* db_impl = nullptr;
197 if (status.ok()) {
198 // Discard older manifests and start a fresh one
199 for (size_t i = 0; i < manifests_.size(); i++) {
200 ArchiveFile(dbname_ + "/" + manifests_[i]);
201 }
202 // Just create a DBImpl temporarily so we can reuse NewDB()
203 db_impl = new DBImpl(db_options_, dbname_);
204 status = db_impl->NewDB(/*new_filenames=*/nullptr);
205 }
206 delete db_impl;
207
208 if (status.ok()) {
209 // Recover using the fresh manifest created by NewDB()
210 status =
211 vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false);
212 }
213 if (status.ok()) {
214 // Need to scan existing SST files first so the column families are
215 // created before we process WAL files
216 ExtractMetaData();
217
218 // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
219 // extract -- we need to clear it here since metadata for existing SST
220 // files has been extracted already
221 table_fds_.clear();
222 ConvertLogFilesToTables();
223 ExtractMetaData();
224 status = AddTables();
225 }
226 if (status.ok()) {
227 uint64_t bytes = 0;
228 for (size_t i = 0; i < tables_.size(); i++) {
229 bytes += tables_[i].meta.fd.GetFileSize();
230 }
231 ROCKS_LOG_WARN(db_options_.info_log,
232 "**** Repaired rocksdb %s; "
233 "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
234 " bytes. "
235 "Some data may have been lost. "
236 "****",
237 dbname_.c_str(), tables_.size(), bytes);
238 }
239 return status;
240 }
241
242 private:
243 struct TableInfo {
244 FileMetaData meta;
245 uint32_t column_family_id;
246 std::string column_family_name;
247 };
248
249 std::string const dbname_;
250 std::string db_session_id_;
251 Env* const env_;
252 const FileOptions file_options_;
253 const DBOptions db_options_;
254 const ImmutableDBOptions immutable_db_options_;
255 const InternalKeyComparator icmp_;
256 const ColumnFamilyOptions default_cf_opts_;
257 const ImmutableOptions default_iopts_; // table_cache_ holds reference
258 const ColumnFamilyOptions unknown_cf_opts_;
259 const bool create_unknown_cfs_;
260 std::shared_ptr<Cache> raw_table_cache_;
261 std::unique_ptr<TableCache> table_cache_;
262 WriteBufferManager wb_;
263 WriteController wc_;
264 VersionSet vset_;
265 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;
266 InstrumentedMutex mutex_;
267
268 std::vector<std::string> manifests_;
269 std::vector<FileDescriptor> table_fds_;
270 std::vector<uint64_t> logs_;
271 std::vector<TableInfo> tables_;
272 uint64_t next_file_number_;
273 // Lock over the persistent DB state. Non-nullptr iff successfully
274 // acquired.
275 FileLock* db_lock_;
276 bool closed_;
277
278 Status FindFiles() {
279 std::vector<std::string> filenames;
280 bool found_file = false;
281 std::vector<std::string> to_search_paths;
282
283 for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
284 to_search_paths.push_back(db_options_.db_paths[path_id].path);
285 }
286
287 // search wal_dir if user uses a customize wal_dir
288 bool same = immutable_db_options_.IsWalDirSameAsDBPath(dbname_);
289 if (!same) {
290 to_search_paths.push_back(immutable_db_options_.wal_dir);
291 }
292
293 for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
294 ROCKS_LOG_INFO(db_options_.info_log, "Searching path %s\n",
295 to_search_paths[path_id].c_str());
296 Status status = env_->GetChildren(to_search_paths[path_id], &filenames);
297 if (!status.ok()) {
298 return status;
299 }
300 if (!filenames.empty()) {
301 found_file = true;
302 }
303
304 uint64_t number;
305 FileType type;
306 for (size_t i = 0; i < filenames.size(); i++) {
307 if (ParseFileName(filenames[i], &number, &type)) {
308 if (type == kDescriptorFile) {
309 manifests_.push_back(filenames[i]);
310 } else {
311 if (number + 1 > next_file_number_) {
312 next_file_number_ = number + 1;
313 }
314 if (type == kWalFile) {
315 logs_.push_back(number);
316 } else if (type == kTableFile) {
317 table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
318 0);
319 } else {
320 // Ignore other files
321 }
322 }
323 }
324 }
325 }
326 if (!found_file) {
327 return Status::Corruption(dbname_, "repair found no files");
328 }
329 return Status::OK();
330 }
331
332 void ConvertLogFilesToTables() {
333 const auto& wal_dir = immutable_db_options_.GetWalDir();
334 for (size_t i = 0; i < logs_.size(); i++) {
335 // we should use LogFileName(wal_dir, logs_[i]) here. user might uses
336 // wal_dir option.
337 std::string logname = LogFileName(wal_dir, logs_[i]);
338 Status status = ConvertLogToTable(wal_dir, logs_[i]);
339 if (!status.ok()) {
340 ROCKS_LOG_WARN(db_options_.info_log,
341 "Log #%" PRIu64 ": ignoring conversion error: %s",
342 logs_[i], status.ToString().c_str());
343 }
344 ArchiveFile(logname);
345 }
346 }
347
348 Status ConvertLogToTable(const std::string& wal_dir, uint64_t log) {
349 struct LogReporter : public log::Reader::Reporter {
350 Env* env;
351 std::shared_ptr<Logger> info_log;
352 uint64_t lognum;
353 void Corruption(size_t bytes, const Status& s) override {
354 // We print error messages for corruption, but continue repairing.
355 ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
356 lognum, static_cast<int>(bytes), s.ToString().c_str());
357 }
358 };
359
360 // Open the log file
361 std::string logname = LogFileName(wal_dir, log);
362 const auto& fs = env_->GetFileSystem();
363 std::unique_ptr<SequentialFileReader> lfile_reader;
364 Status status = SequentialFileReader::Create(
365 fs, logname, fs->OptimizeForLogRead(file_options_), &lfile_reader,
366 nullptr /* dbg */, nullptr /* rate limiter */);
367 if (!status.ok()) {
368 return status;
369 }
370
371 // Create the log reader.
372 LogReporter reporter;
373 reporter.env = env_;
374 reporter.info_log = db_options_.info_log;
375 reporter.lognum = log;
376 // We intentionally make log::Reader do checksumming so that
377 // corruptions cause entire commits to be skipped instead of
378 // propagating bad information (like overly large sequence
379 // numbers).
380 log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
381 true /*enable checksum*/, log);
382
383 // Initialize per-column family memtables
384 for (auto* cfd : *vset_.GetColumnFamilySet()) {
385 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
386 kMaxSequenceNumber);
387 }
388 auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
389
390 // Read all the records and add to a memtable
391 std::string scratch;
392 Slice record;
393 WriteBatch batch;
394 int counter = 0;
395 while (reader.ReadRecord(&record, &scratch)) {
396 if (record.size() < WriteBatchInternal::kHeader) {
397 reporter.Corruption(record.size(),
398 Status::Corruption("log record too small"));
399 continue;
400 }
401 Status record_status = WriteBatchInternal::SetContents(&batch, record);
402 if (record_status.ok()) {
403 record_status =
404 WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr);
405 }
406 if (record_status.ok()) {
407 counter += WriteBatchInternal::Count(&batch);
408 } else {
409 ROCKS_LOG_WARN(db_options_.info_log, "Log #%" PRIu64 ": ignoring %s",
410 log, record_status.ToString().c_str());
411 }
412 }
413
414 // Dump a table for each column family with entries in this log file.
415 for (auto* cfd : *vset_.GetColumnFamilySet()) {
416 // Do not record a version edit for this conversion to a Table
417 // since ExtractMetaData() will also generate edits.
418 MemTable* mem = cfd->mem();
419 if (mem->IsEmpty()) {
420 continue;
421 }
422
423 FileMetaData meta;
424 meta.fd = FileDescriptor(next_file_number_++, 0, 0);
425 ReadOptions ro;
426 ro.total_order_seek = true;
427 Arena arena;
428 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
429 int64_t _current_time = 0;
430 immutable_db_options_.clock->GetCurrentTime(&_current_time)
431 .PermitUncheckedError(); // ignore error
432 const uint64_t current_time = static_cast<uint64_t>(_current_time);
433 meta.file_creation_time = current_time;
434 SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
435
436 auto write_hint = cfd->CalculateSSTWriteHint(0);
437 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
438 range_del_iters;
439 auto range_del_iter = mem->NewRangeTombstoneIterator(
440 ro, kMaxSequenceNumber, false /* immutable_memtable */);
441 if (range_del_iter != nullptr) {
442 range_del_iters.emplace_back(range_del_iter);
443 }
444
445 IOStatus io_s;
446 CompressionOptions default_compression;
447 TableBuilderOptions tboptions(
448 *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
449 cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
450 kNoCompression, default_compression, cfd->GetID(), cfd->GetName(),
451 -1 /* level */, false /* is_bottommost */,
452 TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
453 0 /* file_creation_time */, "DB Repairer" /* db_id */, db_session_id_,
454 0 /*target_file_size*/, meta.fd.GetNumber());
455
456 SeqnoToTimeMapping empty_seqno_time_mapping;
457 status = BuildTable(
458 dbname_, /* versions */ nullptr, immutable_db_options_, tboptions,
459 file_options_, table_cache_.get(), iter.get(),
460 std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
461 {}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker,
462 false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s,
463 nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
464 empty_seqno_time_mapping, nullptr /* event_logger */, 0 /* job_id */,
465 Env::IO_HIGH, nullptr /* table_properties */, write_hint);
466 ROCKS_LOG_INFO(db_options_.info_log,
467 "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
468 log, counter, meta.fd.GetNumber(),
469 status.ToString().c_str());
470 if (status.ok()) {
471 if (meta.fd.GetFileSize() > 0) {
472 table_fds_.push_back(meta.fd);
473 }
474 } else {
475 break;
476 }
477 }
478 delete cf_mems;
479 return status;
480 }
481
482 void ExtractMetaData() {
483 for (size_t i = 0; i < table_fds_.size(); i++) {
484 TableInfo t;
485 t.meta.fd = table_fds_[i];
486 Status status = ScanTable(&t);
487 if (!status.ok()) {
488 std::string fname = TableFileName(
489 db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
490 char file_num_buf[kFormatFileNumberBufSize];
491 FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
492 file_num_buf, sizeof(file_num_buf));
493 ROCKS_LOG_WARN(db_options_.info_log, "Table #%s: ignoring %s",
494 file_num_buf, status.ToString().c_str());
495 ArchiveFile(fname);
496 } else {
497 tables_.push_back(t);
498 }
499 }
500 }
501
502 Status ScanTable(TableInfo* t) {
503 std::string fname = TableFileName(
504 db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId());
505 int counter = 0;
506 uint64_t file_size;
507 Status status = env_->GetFileSize(fname, &file_size);
508 t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
509 file_size);
510 std::shared_ptr<const TableProperties> props;
511 if (status.ok()) {
512 status = table_cache_->GetTableProperties(file_options_, icmp_, t->meta,
513 &props);
514 }
515 if (status.ok()) {
516 auto s =
517 GetSstInternalUniqueId(props->db_id, props->db_session_id,
518 props->orig_file_number, &t->meta.unique_id);
519 if (!s.ok()) {
520 ROCKS_LOG_WARN(db_options_.info_log,
521 "Table #%" PRIu64
522 ": unable to get unique id, default to Unknown.",
523 t->meta.fd.GetNumber());
524 }
525 t->column_family_id = static_cast<uint32_t>(props->column_family_id);
526 if (t->column_family_id ==
527 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
528 ROCKS_LOG_WARN(
529 db_options_.info_log,
530 "Table #%" PRIu64
531 ": column family unknown (probably due to legacy format); "
532 "adding to default column family id 0.",
533 t->meta.fd.GetNumber());
534 t->column_family_id = 0;
535 }
536
537 if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) ==
538 nullptr) {
539 status =
540 AddColumnFamily(props->column_family_name, t->column_family_id);
541 }
542 t->meta.oldest_ancester_time = props->creation_time;
543 }
544 ColumnFamilyData* cfd = nullptr;
545 if (status.ok()) {
546 cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
547 if (cfd->GetName() != props->column_family_name) {
548 ROCKS_LOG_ERROR(
549 db_options_.info_log,
550 "Table #%" PRIu64
551 ": inconsistent column family name '%s'; expected '%s' for column "
552 "family id %" PRIu32 ".",
553 t->meta.fd.GetNumber(), props->column_family_name.c_str(),
554 cfd->GetName().c_str(), t->column_family_id);
555 status = Status::Corruption(dbname_, "inconsistent column family name");
556 }
557 }
558 if (status.ok()) {
559 ReadOptions ropts;
560 ropts.total_order_seek = true;
561 InternalIterator* iter = table_cache_->NewIterator(
562 ropts, file_options_, cfd->internal_comparator(), t->meta,
563 nullptr /* range_del_agg */,
564 cfd->GetLatestMutableCFOptions()->prefix_extractor,
565 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
566 TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
567 /*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
568 /*smallest_compaction_key=*/nullptr,
569 /*largest_compaction_key=*/nullptr,
570 /*allow_unprepared_value=*/false);
571 ParsedInternalKey parsed;
572 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
573 Slice key = iter->key();
574 Status pik_status =
575 ParseInternalKey(key, &parsed, db_options_.allow_data_in_errors);
576 if (!pik_status.ok()) {
577 ROCKS_LOG_ERROR(db_options_.info_log,
578 "Table #%" PRIu64 ": unparsable key - %s",
579 t->meta.fd.GetNumber(), pik_status.getState());
580 continue;
581 }
582
583 counter++;
584
585 status = t->meta.UpdateBoundaries(key, iter->value(), parsed.sequence,
586 parsed.type);
587 if (!status.ok()) {
588 break;
589 }
590 }
591 if (status.ok() && !iter->status().ok()) {
592 status = iter->status();
593 }
594 delete iter;
595
596 ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s",
597 t->meta.fd.GetNumber(), counter,
598 status.ToString().c_str());
599 }
600 if (status.ok()) {
601 // XXX/FIXME: This is just basic, naive handling of range tombstones,
602 // like call to UpdateBoundariesForRange in builder.cc where we assume
603 // an SST file is a full sorted run. This probably needs the extra logic
604 // from compaction_job.cc around call to UpdateBoundariesForRange (to
605 // handle range tombstones extendingg beyond range of other entries).
606 ReadOptions ropts;
607 std::unique_ptr<FragmentedRangeTombstoneIterator> r_iter;
608 status = table_cache_->GetRangeTombstoneIterator(
609 ropts, cfd->internal_comparator(), t->meta, &r_iter);
610
611 if (r_iter) {
612 r_iter->SeekToFirst();
613
614 while (r_iter->Valid()) {
615 auto tombstone = r_iter->Tombstone();
616 auto kv = tombstone.Serialize();
617 t->meta.UpdateBoundariesForRange(
618 kv.first, tombstone.SerializeEndKey(), tombstone.seq_,
619 cfd->internal_comparator());
620 r_iter->Next();
621 }
622 }
623 }
624 return status;
625 }
626
627 Status AddTables() {
628 std::unordered_map<uint32_t, std::vector<const TableInfo*>> cf_id_to_tables;
629 SequenceNumber max_sequence = 0;
630 for (size_t i = 0; i < tables_.size(); i++) {
631 cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
632 if (max_sequence < tables_[i].meta.fd.largest_seqno) {
633 max_sequence = tables_[i].meta.fd.largest_seqno;
634 }
635 }
636 vset_.SetLastAllocatedSequence(max_sequence);
637 vset_.SetLastPublishedSequence(max_sequence);
638 vset_.SetLastSequence(max_sequence);
639
640 for (const auto& cf_id_and_tables : cf_id_to_tables) {
641 auto* cfd =
642 vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first);
643 VersionEdit edit;
644 edit.SetComparatorName(cfd->user_comparator()->Name());
645 edit.SetLogNumber(0);
646 edit.SetNextFile(next_file_number_);
647 edit.SetColumnFamily(cfd->GetID());
648
649 // TODO(opt): separate out into multiple levels
650 for (const auto* table : cf_id_and_tables.second) {
651 edit.AddFile(
652 0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
653 table->meta.fd.GetFileSize(), table->meta.smallest,
654 table->meta.largest, table->meta.fd.smallest_seqno,
655 table->meta.fd.largest_seqno, table->meta.marked_for_compaction,
656 table->meta.temperature, table->meta.oldest_blob_file_number,
657 table->meta.oldest_ancester_time, table->meta.file_creation_time,
658 table->meta.file_checksum, table->meta.file_checksum_func_name,
659 table->meta.unique_id);
660 }
661 assert(next_file_number_ > 0);
662 vset_.MarkFileNumberUsed(next_file_number_ - 1);
663 mutex_.Lock();
664 std::unique_ptr<FSDirectory> db_dir;
665 Status status = env_->GetFileSystem()->NewDirectory(dbname_, IOOptions(),
666 &db_dir, nullptr);
667 if (status.ok()) {
668 status = vset_.LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
669 &edit, &mutex_, db_dir.get(),
670 false /* new_descriptor_log */);
671 }
672 mutex_.Unlock();
673 if (!status.ok()) {
674 return status;
675 }
676 }
677 return Status::OK();
678 }
679
680 void ArchiveFile(const std::string& fname) {
681 // Move into another directory. E.g., for
682 // dir/foo
683 // rename to
684 // dir/lost/foo
685 const char* slash = strrchr(fname.c_str(), '/');
686 std::string new_dir;
687 if (slash != nullptr) {
688 new_dir.assign(fname.data(), slash - fname.data());
689 }
690 new_dir.append("/lost");
691 env_->CreateDir(new_dir).PermitUncheckedError(); // Ignore error
692 std::string new_file = new_dir;
693 new_file.append("/");
694 new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
695 Status s = env_->RenameFile(fname, new_file);
696 ROCKS_LOG_INFO(db_options_.info_log, "Archiving %s: %s\n", fname.c_str(),
697 s.ToString().c_str());
698 }
699 };
700
701 Status GetDefaultCFOptions(
702 const std::vector<ColumnFamilyDescriptor>& column_families,
703 ColumnFamilyOptions* res) {
704 assert(res != nullptr);
705 auto iter = std::find_if(column_families.begin(), column_families.end(),
706 [](const ColumnFamilyDescriptor& cfd) {
707 return cfd.name == kDefaultColumnFamilyName;
708 });
709 if (iter == column_families.end()) {
710 return Status::InvalidArgument(
711 "column_families", "Must contain entry for default column family");
712 }
713 *res = iter->options;
714 return Status::OK();
715 }
716 } // anonymous namespace
717
718 Status RepairDB(const std::string& dbname, const DBOptions& db_options,
719 const std::vector<ColumnFamilyDescriptor>& column_families) {
720 ColumnFamilyOptions default_cf_opts;
721 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
722 if (!status.ok()) {
723 return status;
724 }
725
726 Repairer repairer(dbname, db_options, column_families, default_cf_opts,
727 ColumnFamilyOptions() /* unknown_cf_opts */,
728 false /* create_unknown_cfs */);
729 status = repairer.Run();
730 if (status.ok()) {
731 status = repairer.Close();
732 }
733 return status;
734 }
735
736 Status RepairDB(const std::string& dbname, const DBOptions& db_options,
737 const std::vector<ColumnFamilyDescriptor>& column_families,
738 const ColumnFamilyOptions& unknown_cf_opts) {
739 ColumnFamilyOptions default_cf_opts;
740 Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
741 if (!status.ok()) {
742 return status;
743 }
744
745 Repairer repairer(dbname, db_options, column_families, default_cf_opts,
746 unknown_cf_opts, true /* create_unknown_cfs */);
747 status = repairer.Run();
748 if (status.ok()) {
749 status = repairer.Close();
750 }
751 return status;
752 }
753
754 Status RepairDB(const std::string& dbname, const Options& options) {
755 Options opts(options);
756 DBOptions db_options(opts);
757 ColumnFamilyOptions cf_options(opts);
758
759 Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */,
760 cf_options /* unknown_cf_opts */,
761 true /* create_unknown_cfs */);
762 Status status = repairer.Run();
763 if (status.ok()) {
764 status = repairer.Close();
765 }
766 return status;
767 }
768
769 } // namespace ROCKSDB_NAMESPACE
770
771 #endif // ROCKSDB_LITE