]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl_open.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / db_impl_open.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl.h"
10
11 #ifndef __STDC_FORMAT_MACROS
12 #define __STDC_FORMAT_MACROS
13 #endif
14 #include <inttypes.h>
15
16 #include "db/builder.h"
17 #include "options/options_helper.h"
18 #include "rocksdb/wal_filter.h"
19 #include "util/sst_file_manager_impl.h"
20 #include "util/sync_point.h"
21
22 namespace rocksdb {
23 Options SanitizeOptions(const std::string& dbname,
24 const Options& src) {
25 auto db_options = SanitizeOptions(dbname, DBOptions(src));
26 ImmutableDBOptions immutable_db_options(db_options);
27 auto cf_options =
28 SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
29 return Options(db_options, cf_options);
30 }
31
32 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
33 DBOptions result(src);
34
35 // result.max_open_files means an "infinite" open files.
36 if (result.max_open_files != -1) {
37 int max_max_open_files = port::GetMaxOpenFiles();
38 if (max_max_open_files == -1) {
39 max_max_open_files = 1000000;
40 }
41 ClipToRange(&result.max_open_files, 20, max_max_open_files);
42 }
43
44 if (result.info_log == nullptr) {
45 Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
46 if (!s.ok()) {
47 // No place suitable for logging
48 result.info_log = nullptr;
49 }
50 }
51
52 if (!result.write_buffer_manager) {
53 result.write_buffer_manager.reset(
54 new WriteBufferManager(result.db_write_buffer_size));
55 }
56 if (result.base_background_compactions == -1) {
57 result.base_background_compactions = result.max_background_compactions;
58 }
59 if (result.base_background_compactions > result.max_background_compactions) {
60 result.base_background_compactions = result.max_background_compactions;
61 }
62 result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions,
63 Env::Priority::LOW);
64 result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes,
65 Env::Priority::HIGH);
66
67 if (result.rate_limiter.get() != nullptr) {
68 if (result.bytes_per_sync == 0) {
69 result.bytes_per_sync = 1024 * 1024;
70 }
71 }
72
73 if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
74 result.recycle_log_file_num = false;
75 }
76
77 if (result.recycle_log_file_num &&
78 (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
79 result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
80 // kPointInTimeRecovery is indistinguishable from
81 // kTolerateCorruptedTailRecords in recycle mode since we define
82 // the "end" of the log as the first corrupt record we encounter.
83 // kAbsoluteConsistency doesn't make sense because even a clean
84 // shutdown leaves old junk at the end of the log file.
85 result.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
86 }
87
88 if (result.wal_dir.empty()) {
89 // Use dbname as default
90 result.wal_dir = dbname;
91 }
92 if (result.wal_dir.back() == '/') {
93 result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
94 }
95
96 if (result.db_paths.size() == 0) {
97 result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
98 }
99
100 if (result.use_direct_reads && result.compaction_readahead_size == 0) {
101 result.compaction_readahead_size = 1024 * 1024 * 2;
102 }
103
104 if (result.compaction_readahead_size > 0 ||
105 result.use_direct_io_for_flush_and_compaction) {
106 result.new_table_reader_for_compaction_inputs = true;
107 }
108
109 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
110 // guarantee that consecutive log files have consecutive sequence id, which
111 // make recovery complicated.
112 if (result.allow_2pc) {
113 result.avoid_flush_during_recovery = false;
114 }
115
116 return result;
117 }
118
119 namespace {
120
121 Status SanitizeOptionsByTable(
122 const DBOptions& db_opts,
123 const std::vector<ColumnFamilyDescriptor>& column_families) {
124 Status s;
125 for (auto cf : column_families) {
126 s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
127 if (!s.ok()) {
128 return s;
129 }
130 }
131 return Status::OK();
132 }
133
134 static Status ValidateOptions(
135 const DBOptions& db_options,
136 const std::vector<ColumnFamilyDescriptor>& column_families) {
137 Status s;
138
139 for (auto& cfd : column_families) {
140 s = CheckCompressionSupported(cfd.options);
141 if (s.ok() && db_options.allow_concurrent_memtable_write) {
142 s = CheckConcurrentWritesSupported(cfd.options);
143 }
144 if (!s.ok()) {
145 return s;
146 }
147 if (db_options.db_paths.size() > 1) {
148 if ((cfd.options.compaction_style != kCompactionStyleUniversal) &&
149 (cfd.options.compaction_style != kCompactionStyleLevel)) {
150 return Status::NotSupported(
151 "More than one DB paths are only supported in "
152 "universal and level compaction styles. ");
153 }
154 }
155 }
156
157 if (db_options.db_paths.size() > 4) {
158 return Status::NotSupported(
159 "More than four DB paths are not supported yet. ");
160 }
161
162 if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
163 // Protect against assert in PosixMMapReadableFile constructor
164 return Status::NotSupported(
165 "If memory mapped reads (allow_mmap_reads) are enabled "
166 "then direct I/O reads (use_direct_reads) must be disabled. ");
167 }
168
169 if (db_options.allow_mmap_writes &&
170 db_options.use_direct_io_for_flush_and_compaction) {
171 return Status::NotSupported(
172 "If memory mapped writes (allow_mmap_writes) are enabled "
173 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
174 "be disabled. ");
175 }
176
177 if (db_options.keep_log_file_num == 0) {
178 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
179 }
180
181 return Status::OK();
182 }
183 } // namespace
184 Status DBImpl::NewDB() {
185 VersionEdit new_db;
186 new_db.SetLogNumber(0);
187 new_db.SetNextFile(2);
188 new_db.SetLastSequence(0);
189
190 Status s;
191
192 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
193 const std::string manifest = DescriptorFileName(dbname_, 1);
194 {
195 unique_ptr<WritableFile> file;
196 EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
197 s = NewWritableFile(env_, manifest, &file, env_options);
198 if (!s.ok()) {
199 return s;
200 }
201 file->SetPreallocationBlockSize(
202 immutable_db_options_.manifest_preallocation_size);
203 unique_ptr<WritableFileWriter> file_writer(
204 new WritableFileWriter(std::move(file), env_options));
205 log::Writer log(std::move(file_writer), 0, false);
206 std::string record;
207 new_db.EncodeTo(&record);
208 s = log.AddRecord(record);
209 if (s.ok()) {
210 s = SyncManifest(env_, &immutable_db_options_, log.file());
211 }
212 }
213 if (s.ok()) {
214 // Make "CURRENT" file that points to the new manifest file.
215 s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
216 } else {
217 env_->DeleteFile(manifest);
218 }
219 return s;
220 }
221
222 Status DBImpl::Directories::CreateAndNewDirectory(
223 Env* env, const std::string& dirname,
224 std::unique_ptr<Directory>* directory) const {
225 // We call CreateDirIfMissing() as the directory may already exist (if we
226 // are reopening a DB), when this happens we don't want creating the
227 // directory to cause an error. However, we need to check if creating the
228 // directory fails or else we may get an obscure message about the lock
229 // file not existing. One real-world example of this occurring is if
230 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
231 // when dbname_ is "dir/db" but when "dir" doesn't exist.
232 Status s = env->CreateDirIfMissing(dirname);
233 if (!s.ok()) {
234 return s;
235 }
236 return env->NewDirectory(dirname, directory);
237 }
238
239 Status DBImpl::Directories::SetDirectories(
240 Env* env, const std::string& dbname, const std::string& wal_dir,
241 const std::vector<DbPath>& data_paths) {
242 Status s = CreateAndNewDirectory(env, dbname, &db_dir_);
243 if (!s.ok()) {
244 return s;
245 }
246 if (!wal_dir.empty() && dbname != wal_dir) {
247 s = CreateAndNewDirectory(env, wal_dir, &wal_dir_);
248 if (!s.ok()) {
249 return s;
250 }
251 }
252
253 data_dirs_.clear();
254 for (auto& p : data_paths) {
255 const std::string db_path = p.path;
256 if (db_path == dbname) {
257 data_dirs_.emplace_back(nullptr);
258 } else {
259 std::unique_ptr<Directory> path_directory;
260 s = CreateAndNewDirectory(env, db_path, &path_directory);
261 if (!s.ok()) {
262 return s;
263 }
264 data_dirs_.emplace_back(path_directory.release());
265 }
266 }
267 assert(data_dirs_.size() == data_paths.size());
268 return Status::OK();
269 }
270
271 Status DBImpl::Recover(
272 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
273 bool error_if_log_file_exist, bool error_if_data_exists_in_logs) {
274 mutex_.AssertHeld();
275
276 bool is_new_db = false;
277 assert(db_lock_ == nullptr);
278 if (!read_only) {
279 Status s = directories_.SetDirectories(env_, dbname_,
280 immutable_db_options_.wal_dir,
281 immutable_db_options_.db_paths);
282 if (!s.ok()) {
283 return s;
284 }
285
286 s = env_->LockFile(LockFileName(dbname_), &db_lock_);
287 if (!s.ok()) {
288 return s;
289 }
290
291 s = env_->FileExists(CurrentFileName(dbname_));
292 if (s.IsNotFound()) {
293 if (immutable_db_options_.create_if_missing) {
294 s = NewDB();
295 is_new_db = true;
296 if (!s.ok()) {
297 return s;
298 }
299 } else {
300 return Status::InvalidArgument(
301 dbname_, "does not exist (create_if_missing is false)");
302 }
303 } else if (s.ok()) {
304 if (immutable_db_options_.error_if_exists) {
305 return Status::InvalidArgument(
306 dbname_, "exists (error_if_exists is true)");
307 }
308 } else {
309 // Unexpected error reading file
310 assert(s.IsIOError());
311 return s;
312 }
313 // Check for the IDENTITY file and create it if not there
314 s = env_->FileExists(IdentityFileName(dbname_));
315 if (s.IsNotFound()) {
316 s = SetIdentityFile(env_, dbname_);
317 if (!s.ok()) {
318 return s;
319 }
320 } else if (!s.ok()) {
321 assert(s.IsIOError());
322 return s;
323 }
324 }
325
326 Status s = versions_->Recover(column_families, read_only);
327 if (immutable_db_options_.paranoid_checks && s.ok()) {
328 s = CheckConsistency();
329 }
330 if (s.ok()) {
331 SequenceNumber next_sequence(kMaxSequenceNumber);
332 default_cf_handle_ = new ColumnFamilyHandleImpl(
333 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
334 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
335 single_column_family_mode_ =
336 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
337
338 // Recover from all newer log files than the ones named in the
339 // descriptor (new log files may have been added by the previous
340 // incarnation without registering them in the descriptor).
341 //
342 // Note that prev_log_number() is no longer used, but we pay
343 // attention to it in case we are recovering a database
344 // produced by an older version of rocksdb.
345 std::vector<std::string> filenames;
346 s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
347 if (!s.ok()) {
348 return s;
349 }
350
351 std::vector<uint64_t> logs;
352 for (size_t i = 0; i < filenames.size(); i++) {
353 uint64_t number;
354 FileType type;
355 if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
356 if (is_new_db) {
357 return Status::Corruption(
358 "While creating a new Db, wal_dir contains "
359 "existing log file: ",
360 filenames[i]);
361 } else {
362 logs.push_back(number);
363 }
364 }
365 }
366
367 if (logs.size() > 0) {
368 if (error_if_log_file_exist) {
369 return Status::Corruption(
370 "The db was opened in readonly mode with error_if_log_file_exist"
371 "flag but a log file already exists");
372 } else if (error_if_data_exists_in_logs) {
373 for (auto& log : logs) {
374 std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
375 uint64_t bytes;
376 s = env_->GetFileSize(fname, &bytes);
377 if (s.ok()) {
378 if (bytes > 0) {
379 return Status::Corruption(
380 "error_if_data_exists_in_logs is set but there are data "
381 " in log files.");
382 }
383 }
384 }
385 }
386 }
387
388 if (!logs.empty()) {
389 // Recover in the order in which the logs were generated
390 std::sort(logs.begin(), logs.end());
391 s = RecoverLogFiles(logs, &next_sequence, read_only);
392 if (!s.ok()) {
393 // Clear memtables if recovery failed
394 for (auto cfd : *versions_->GetColumnFamilySet()) {
395 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
396 kMaxSequenceNumber);
397 }
398 }
399 }
400 }
401
402 // Initial value
403 max_total_in_memory_state_ = 0;
404 for (auto cfd : *versions_->GetColumnFamilySet()) {
405 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
406 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
407 mutable_cf_options->max_write_buffer_number;
408 }
409
410 return s;
411 }
412
413 // REQUIRES: log_numbers are sorted in ascending order
414 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
415 SequenceNumber* next_sequence, bool read_only) {
416 struct LogReporter : public log::Reader::Reporter {
417 Env* env;
418 Logger* info_log;
419 const char* fname;
420 Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
421 virtual void Corruption(size_t bytes, const Status& s) override {
422 ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
423 (this->status == nullptr ? "(ignoring error) " : ""),
424 fname, static_cast<int>(bytes), s.ToString().c_str());
425 if (this->status != nullptr && this->status->ok()) {
426 *this->status = s;
427 }
428 }
429 };
430
431 mutex_.AssertHeld();
432 Status status;
433 std::unordered_map<int, VersionEdit> version_edits;
434 // no need to refcount because iteration is under mutex
435 for (auto cfd : *versions_->GetColumnFamilySet()) {
436 VersionEdit edit;
437 edit.SetColumnFamily(cfd->GetID());
438 version_edits.insert({cfd->GetID(), edit});
439 }
440 int job_id = next_job_id_.fetch_add(1);
441 {
442 auto stream = event_logger_.Log();
443 stream << "job" << job_id << "event"
444 << "recovery_started";
445 stream << "log_files";
446 stream.StartArray();
447 for (auto log_number : log_numbers) {
448 stream << log_number;
449 }
450 stream.EndArray();
451 }
452
453 #ifndef ROCKSDB_LITE
454 if (immutable_db_options_.wal_filter != nullptr) {
455 std::map<std::string, uint32_t> cf_name_id_map;
456 std::map<uint32_t, uint64_t> cf_lognumber_map;
457 for (auto cfd : *versions_->GetColumnFamilySet()) {
458 cf_name_id_map.insert(
459 std::make_pair(cfd->GetName(), cfd->GetID()));
460 cf_lognumber_map.insert(
461 std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
462 }
463
464 immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
465 cf_name_id_map);
466 }
467 #endif
468
469 bool stop_replay_by_wal_filter = false;
470 bool stop_replay_for_corruption = false;
471 bool flushed = false;
472 for (auto log_number : log_numbers) {
473 // The previous incarnation may not have written any MANIFEST
474 // records after allocating this log number. So we manually
475 // update the file number allocation counter in VersionSet.
476 versions_->MarkFileNumberUsedDuringRecovery(log_number);
477 // Open the log file
478 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
479
480 ROCKS_LOG_INFO(immutable_db_options_.info_log,
481 "Recovering log #%" PRIu64 " mode %d", log_number,
482 immutable_db_options_.wal_recovery_mode);
483 auto logFileDropped = [this, &fname]() {
484 uint64_t bytes;
485 if (env_->GetFileSize(fname, &bytes).ok()) {
486 auto info_log = immutable_db_options_.info_log.get();
487 ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
488 static_cast<int>(bytes));
489 }
490 };
491 if (stop_replay_by_wal_filter) {
492 logFileDropped();
493 continue;
494 }
495
496 unique_ptr<SequentialFileReader> file_reader;
497 {
498 unique_ptr<SequentialFile> file;
499 status = env_->NewSequentialFile(fname, &file, env_options_);
500 if (!status.ok()) {
501 MaybeIgnoreError(&status);
502 if (!status.ok()) {
503 return status;
504 } else {
505 // Fail with one log file, but that's ok.
506 // Try next one.
507 continue;
508 }
509 }
510 file_reader.reset(new SequentialFileReader(std::move(file)));
511 }
512
513 // Create the log reader.
514 LogReporter reporter;
515 reporter.env = env_;
516 reporter.info_log = immutable_db_options_.info_log.get();
517 reporter.fname = fname.c_str();
518 if (!immutable_db_options_.paranoid_checks ||
519 immutable_db_options_.wal_recovery_mode ==
520 WALRecoveryMode::kSkipAnyCorruptedRecords) {
521 reporter.status = nullptr;
522 } else {
523 reporter.status = &status;
524 }
525 // We intentially make log::Reader do checksumming even if
526 // paranoid_checks==false so that corruptions cause entire commits
527 // to be skipped instead of propagating bad information (like overly
528 // large sequence numbers).
529 log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
530 &reporter, true /*checksum*/, 0 /*initial_offset*/,
531 log_number);
532
533 // Determine if we should tolerate incomplete records at the tail end of the
534 // Read all the records and add to a memtable
535 std::string scratch;
536 Slice record;
537 WriteBatch batch;
538
539 while (!stop_replay_by_wal_filter &&
540 reader.ReadRecord(&record, &scratch,
541 immutable_db_options_.wal_recovery_mode) &&
542 status.ok()) {
543 if (record.size() < WriteBatchInternal::kHeader) {
544 reporter.Corruption(record.size(),
545 Status::Corruption("log record too small"));
546 continue;
547 }
548 WriteBatchInternal::SetContents(&batch, record);
549 SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
550
551 if (immutable_db_options_.wal_recovery_mode ==
552 WALRecoveryMode::kPointInTimeRecovery) {
553 // In point-in-time recovery mode, if sequence id of log files are
554 // consecutive, we continue recovery despite corruption. This could
555 // happen when we open and write to a corrupted DB, where sequence id
556 // will start from the last sequence id we recovered.
557 if (sequence == *next_sequence) {
558 stop_replay_for_corruption = false;
559 }
560 if (stop_replay_for_corruption) {
561 logFileDropped();
562 break;
563 }
564 }
565
566 #ifndef ROCKSDB_LITE
567 if (immutable_db_options_.wal_filter != nullptr) {
568 WriteBatch new_batch;
569 bool batch_changed = false;
570
571 WalFilter::WalProcessingOption wal_processing_option =
572 immutable_db_options_.wal_filter->LogRecordFound(
573 log_number, fname, batch, &new_batch, &batch_changed);
574
575 switch (wal_processing_option) {
576 case WalFilter::WalProcessingOption::kContinueProcessing:
577 // do nothing, proceeed normally
578 break;
579 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
580 // skip current record
581 continue;
582 case WalFilter::WalProcessingOption::kStopReplay:
583 // skip current record and stop replay
584 stop_replay_by_wal_filter = true;
585 continue;
586 case WalFilter::WalProcessingOption::kCorruptedRecord: {
587 status =
588 Status::Corruption("Corruption reported by Wal Filter ",
589 immutable_db_options_.wal_filter->Name());
590 MaybeIgnoreError(&status);
591 if (!status.ok()) {
592 reporter.Corruption(record.size(), status);
593 continue;
594 }
595 break;
596 }
597 default: {
598 assert(false); // unhandled case
599 status = Status::NotSupported(
600 "Unknown WalProcessingOption returned"
601 " by Wal Filter ",
602 immutable_db_options_.wal_filter->Name());
603 MaybeIgnoreError(&status);
604 if (!status.ok()) {
605 return status;
606 } else {
607 // Ignore the error with current record processing.
608 continue;
609 }
610 }
611 }
612
613 if (batch_changed) {
614 // Make sure that the count in the new batch is
615 // within the orignal count.
616 int new_count = WriteBatchInternal::Count(&new_batch);
617 int original_count = WriteBatchInternal::Count(&batch);
618 if (new_count > original_count) {
619 ROCKS_LOG_FATAL(
620 immutable_db_options_.info_log,
621 "Recovering log #%" PRIu64
622 " mode %d log filter %s returned "
623 "more records (%d) than original (%d) which is not allowed. "
624 "Aborting recovery.",
625 log_number, immutable_db_options_.wal_recovery_mode,
626 immutable_db_options_.wal_filter->Name(), new_count,
627 original_count);
628 status = Status::NotSupported(
629 "More than original # of records "
630 "returned by Wal Filter ",
631 immutable_db_options_.wal_filter->Name());
632 return status;
633 }
634 // Set the same sequence number in the new_batch
635 // as the original batch.
636 WriteBatchInternal::SetSequence(&new_batch,
637 WriteBatchInternal::Sequence(&batch));
638 batch = new_batch;
639 }
640 }
641 #endif // ROCKSDB_LITE
642
643 // If column family was not found, it might mean that the WAL write
644 // batch references to the column family that was dropped after the
645 // insert. We don't want to fail the whole write batch in that case --
646 // we just ignore the update.
647 // That's why we set ignore missing column families to true
648 bool has_valid_writes = false;
649 status = WriteBatchInternal::InsertInto(
650 &batch, column_family_memtables_.get(), &flush_scheduler_, true,
651 log_number, this, false /* concurrent_memtable_writes */,
652 next_sequence, &has_valid_writes);
653 MaybeIgnoreError(&status);
654 if (!status.ok()) {
655 // We are treating this as a failure while reading since we read valid
656 // blocks that do not form coherent data
657 reporter.Corruption(record.size(), status);
658 continue;
659 }
660
661 if (has_valid_writes && !read_only) {
662 // we can do this because this is called before client has access to the
663 // DB and there is only a single thread operating on DB
664 ColumnFamilyData* cfd;
665
666 while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
667 cfd->Unref();
668 // If this asserts, it means that InsertInto failed in
669 // filtering updates to already-flushed column families
670 assert(cfd->GetLogNumber() <= log_number);
671 auto iter = version_edits.find(cfd->GetID());
672 assert(iter != version_edits.end());
673 VersionEdit* edit = &iter->second;
674 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
675 if (!status.ok()) {
676 // Reflect errors immediately so that conditions like full
677 // file-systems cause the DB::Open() to fail.
678 return status;
679 }
680 flushed = true;
681
682 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
683 *next_sequence);
684 }
685 }
686 }
687
688 if (!status.ok()) {
689 if (immutable_db_options_.wal_recovery_mode ==
690 WALRecoveryMode::kSkipAnyCorruptedRecords) {
691 // We should ignore all errors unconditionally
692 status = Status::OK();
693 } else if (immutable_db_options_.wal_recovery_mode ==
694 WALRecoveryMode::kPointInTimeRecovery) {
695 // We should ignore the error but not continue replaying
696 status = Status::OK();
697 stop_replay_for_corruption = true;
698 ROCKS_LOG_INFO(immutable_db_options_.info_log,
699 "Point in time recovered to log #%" PRIu64
700 " seq #%" PRIu64,
701 log_number, *next_sequence);
702 } else {
703 assert(immutable_db_options_.wal_recovery_mode ==
704 WALRecoveryMode::kTolerateCorruptedTailRecords ||
705 immutable_db_options_.wal_recovery_mode ==
706 WALRecoveryMode::kAbsoluteConsistency);
707 return status;
708 }
709 }
710
711 flush_scheduler_.Clear();
712 auto last_sequence = *next_sequence - 1;
713 if ((*next_sequence != kMaxSequenceNumber) &&
714 (versions_->LastSequence() <= last_sequence)) {
715 versions_->SetLastSequence(last_sequence);
716 }
717 }
718
719 // True if there's any data in the WALs; if not, we can skip re-processing
720 // them later
721 bool data_seen = false;
722 if (!read_only) {
723 // no need to refcount since client still doesn't have access
724 // to the DB and can not drop column families while we iterate
725 auto max_log_number = log_numbers.back();
726 for (auto cfd : *versions_->GetColumnFamilySet()) {
727 auto iter = version_edits.find(cfd->GetID());
728 assert(iter != version_edits.end());
729 VersionEdit* edit = &iter->second;
730
731 if (cfd->GetLogNumber() > max_log_number) {
732 // Column family cfd has already flushed the data
733 // from all logs. Memtable has to be empty because
734 // we filter the updates based on log_number
735 // (in WriteBatch::InsertInto)
736 assert(cfd->mem()->GetFirstSequenceNumber() == 0);
737 assert(edit->NumEntries() == 0);
738 continue;
739 }
740
741 // flush the final memtable (if non-empty)
742 if (cfd->mem()->GetFirstSequenceNumber() != 0) {
743 // If flush happened in the middle of recovery (e.g. due to memtable
744 // being full), we flush at the end. Otherwise we'll need to record
745 // where we were on last flush, which make the logic complicated.
746 if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
747 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
748 if (!status.ok()) {
749 // Recovery failed
750 break;
751 }
752 flushed = true;
753
754 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
755 versions_->LastSequence());
756 }
757 data_seen = true;
758 }
759
760 // write MANIFEST with update
761 // writing log_number in the manifest means that any log file
762 // with number strongly less than (log_number + 1) is already
763 // recovered and should be ignored on next reincarnation.
764 // Since we already recovered max_log_number, we want all logs
765 // with numbers `<= max_log_number` (includes this one) to be ignored
766 if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
767 edit->SetLogNumber(max_log_number + 1);
768 }
769 // we must mark the next log number as used, even though it's
770 // not actually used. that is because VersionSet assumes
771 // VersionSet::next_file_number_ always to be strictly greater than any
772 // log number
773 versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1);
774 status = versions_->LogAndApply(
775 cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
776 if (!status.ok()) {
777 // Recovery failed
778 break;
779 }
780 }
781 }
782
783 if (data_seen && !flushed) {
784 // Mark these as alive so they'll be considered for deletion later by
785 // FindObsoleteFiles()
786 for (auto log_number : log_numbers) {
787 alive_log_files_.push_back(LogFileNumberSize(log_number));
788 }
789 }
790
791 event_logger_.Log() << "job" << job_id << "event"
792 << "recovery_finished";
793
794 return status;
795 }
796
797 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
798 MemTable* mem, VersionEdit* edit) {
799 mutex_.AssertHeld();
800 const uint64_t start_micros = env_->NowMicros();
801 FileMetaData meta;
802 auto pending_outputs_inserted_elem =
803 CaptureCurrentFileNumberInPendingOutputs();
804 meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
805 ReadOptions ro;
806 ro.total_order_seek = true;
807 Arena arena;
808 Status s;
809 TableProperties table_properties;
810 {
811 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
812 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
813 "[%s] [WriteLevel0TableForRecovery]"
814 " Level-0 table #%" PRIu64 ": started",
815 cfd->GetName().c_str(), meta.fd.GetNumber());
816
817 // Get the latest mutable cf options while the mutex is still locked
818 const MutableCFOptions mutable_cf_options =
819 *cfd->GetLatestMutableCFOptions();
820 bool paranoid_file_checks =
821 cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
822 {
823 mutex_.Unlock();
824
825 SequenceNumber earliest_write_conflict_snapshot;
826 std::vector<SequenceNumber> snapshot_seqs =
827 snapshots_.GetAll(&earliest_write_conflict_snapshot);
828
829 EnvOptions optimized_env_options =
830 env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
831 s = BuildTable(
832 dbname_, env_, *cfd->ioptions(), mutable_cf_options,
833 optimized_env_options, cfd->table_cache(), iter.get(),
834 std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
835 &meta, cfd->internal_comparator(),
836 cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
837 snapshot_seqs, earliest_write_conflict_snapshot,
838 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
839 cfd->ioptions()->compression_opts, paranoid_file_checks,
840 cfd->internal_stats(), TableFileCreationReason::kRecovery,
841 &event_logger_, job_id);
842 LogFlush(immutable_db_options_.info_log);
843 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
844 "[%s] [WriteLevel0TableForRecovery]"
845 " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
846 cfd->GetName().c_str(), meta.fd.GetNumber(),
847 meta.fd.GetFileSize(), s.ToString().c_str());
848 mutex_.Lock();
849 }
850 }
851 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
852
853 // Note that if file_size is zero, the file has been deleted and
854 // should not be added to the manifest.
855 int level = 0;
856 if (s.ok() && meta.fd.GetFileSize() > 0) {
857 edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
858 meta.fd.GetFileSize(), meta.smallest, meta.largest,
859 meta.smallest_seqno, meta.largest_seqno,
860 meta.marked_for_compaction);
861 }
862
863 InternalStats::CompactionStats stats(1);
864 stats.micros = env_->NowMicros() - start_micros;
865 stats.bytes_written = meta.fd.GetFileSize();
866 stats.num_output_files = 1;
867 cfd->internal_stats()->AddCompactionStats(level, stats);
868 cfd->internal_stats()->AddCFStats(
869 InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
870 RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
871 return s;
872 }
873
874 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
875 DBOptions db_options(options);
876 ColumnFamilyOptions cf_options(options);
877 std::vector<ColumnFamilyDescriptor> column_families;
878 column_families.push_back(
879 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
880 std::vector<ColumnFamilyHandle*> handles;
881 Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
882 if (s.ok()) {
883 assert(handles.size() == 1);
884 // i can delete the handle since DBImpl is always holding a reference to
885 // default column family
886 delete handles[0];
887 }
888 return s;
889 }
890
891 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
892 const std::vector<ColumnFamilyDescriptor>& column_families,
893 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
894 Status s = SanitizeOptionsByTable(db_options, column_families);
895 if (!s.ok()) {
896 return s;
897 }
898
899 s = ValidateOptions(db_options, column_families);
900 if (!s.ok()) {
901 return s;
902 }
903
904 *dbptr = nullptr;
905 handles->clear();
906
907 size_t max_write_buffer_size = 0;
908 for (auto cf : column_families) {
909 max_write_buffer_size =
910 std::max(max_write_buffer_size, cf.options.write_buffer_size);
911 }
912
913 DBImpl* impl = new DBImpl(db_options, dbname);
914 s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
915 if (s.ok()) {
916 for (auto db_path : impl->immutable_db_options_.db_paths) {
917 s = impl->env_->CreateDirIfMissing(db_path.path);
918 if (!s.ok()) {
919 break;
920 }
921 }
922 }
923
924 if (!s.ok()) {
925 delete impl;
926 return s;
927 }
928
929 s = impl->CreateArchivalDirectory();
930 if (!s.ok()) {
931 delete impl;
932 return s;
933 }
934 impl->mutex_.Lock();
935 // Handles create_if_missing, error_if_exists
936 s = impl->Recover(column_families);
937 if (s.ok()) {
938 uint64_t new_log_number = impl->versions_->NewFileNumber();
939 unique_ptr<WritableFile> lfile;
940 EnvOptions soptions(db_options);
941 EnvOptions opt_env_options =
942 impl->immutable_db_options_.env->OptimizeForLogWrite(
943 soptions, BuildDBOptions(impl->immutable_db_options_,
944 impl->mutable_db_options_));
945 s = NewWritableFile(
946 impl->immutable_db_options_.env,
947 LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
948 &lfile, opt_env_options);
949 if (s.ok()) {
950 lfile->SetPreallocationBlockSize(
951 impl->GetWalPreallocateBlockSize(max_write_buffer_size));
952 impl->logfile_number_ = new_log_number;
953 unique_ptr<WritableFileWriter> file_writer(
954 new WritableFileWriter(std::move(lfile), opt_env_options));
955 impl->logs_.emplace_back(
956 new_log_number,
957 new log::Writer(
958 std::move(file_writer), new_log_number,
959 impl->immutable_db_options_.recycle_log_file_num > 0));
960
961 // set column family handles
962 for (auto cf : column_families) {
963 auto cfd =
964 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
965 if (cfd != nullptr) {
966 handles->push_back(
967 new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
968 impl->NewThreadStatusCfInfo(cfd);
969 } else {
970 if (db_options.create_missing_column_families) {
971 // missing column family, create it
972 ColumnFamilyHandle* handle;
973 impl->mutex_.Unlock();
974 s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
975 impl->mutex_.Lock();
976 if (s.ok()) {
977 handles->push_back(handle);
978 } else {
979 break;
980 }
981 } else {
982 s = Status::InvalidArgument("Column family not found: ", cf.name);
983 break;
984 }
985 }
986 }
987 }
988 if (s.ok()) {
989 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
990 delete impl->InstallSuperVersionAndScheduleWork(
991 cfd, nullptr, *cfd->GetLatestMutableCFOptions());
992 }
993 impl->alive_log_files_.push_back(
994 DBImpl::LogFileNumberSize(impl->logfile_number_));
995 impl->DeleteObsoleteFiles();
996 s = impl->directories_.GetDbDir()->Fsync();
997 }
998 }
999
1000 if (s.ok()) {
1001 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1002 if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1003 auto* vstorage = cfd->current()->storage_info();
1004 for (int i = 1; i < vstorage->num_levels(); ++i) {
1005 int num_files = vstorage->NumLevelFiles(i);
1006 if (num_files > 0) {
1007 s = Status::InvalidArgument(
1008 "Not all files are at level 0. Cannot "
1009 "open with FIFO compaction style.");
1010 break;
1011 }
1012 }
1013 }
1014 if (!cfd->mem()->IsSnapshotSupported()) {
1015 impl->is_snapshot_supported_ = false;
1016 }
1017 if (cfd->ioptions()->merge_operator != nullptr &&
1018 !cfd->mem()->IsMergeOperatorSupported()) {
1019 s = Status::InvalidArgument(
1020 "The memtable of column family %s does not support merge operator "
1021 "its options.merge_operator is non-null", cfd->GetName().c_str());
1022 }
1023 if (!s.ok()) {
1024 break;
1025 }
1026 }
1027 }
1028 TEST_SYNC_POINT("DBImpl::Open:Opened");
1029 Status persist_options_status;
1030 if (s.ok()) {
1031 // Persist RocksDB Options before scheduling the compaction.
1032 // The WriteOptionsFile() will release and lock the mutex internally.
1033 persist_options_status = impl->WriteOptionsFile();
1034
1035 *dbptr = impl;
1036 impl->opened_successfully_ = true;
1037 impl->MaybeScheduleFlushOrCompaction();
1038 }
1039 impl->mutex_.Unlock();
1040
1041 #ifndef ROCKSDB_LITE
1042 auto sfm = static_cast<SstFileManagerImpl*>(
1043 impl->immutable_db_options_.sst_file_manager.get());
1044 if (s.ok() && sfm) {
1045 // Notify SstFileManager about all sst files that already exist in
1046 // db_paths[0] when the DB is opened.
1047 auto& db_path = impl->immutable_db_options_.db_paths[0];
1048 std::vector<std::string> existing_files;
1049 impl->immutable_db_options_.env->GetChildren(db_path.path, &existing_files);
1050 for (auto& file_name : existing_files) {
1051 uint64_t file_number;
1052 FileType file_type;
1053 std::string file_path = db_path.path + "/" + file_name;
1054 if (ParseFileName(file_name, &file_number, &file_type) &&
1055 file_type == kTableFile) {
1056 sfm->OnAddFile(file_path);
1057 }
1058 }
1059 }
1060 #endif // !ROCKSDB_LITE
1061
1062 if (s.ok()) {
1063 ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
1064 LogFlush(impl->immutable_db_options_.info_log);
1065 if (!persist_options_status.ok()) {
1066 if (db_options.fail_if_options_file_error) {
1067 s = Status::IOError(
1068 "DB::Open() failed --- Unable to persist Options file",
1069 persist_options_status.ToString());
1070 }
1071 ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
1072 "Unable to persist options in DB::Open() -- %s",
1073 persist_options_status.ToString().c_str());
1074 }
1075 }
1076 if (!s.ok()) {
1077 for (auto* h : *handles) {
1078 delete h;
1079 }
1080 handles->clear();
1081 delete impl;
1082 *dbptr = nullptr;
1083 }
1084 return s;
1085 }
1086 } // namespace rocksdb