]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl_open.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / db / db_impl_open.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl.h"
10
11 #ifndef __STDC_FORMAT_MACROS
12 #define __STDC_FORMAT_MACROS
13 #endif
14 #include <inttypes.h>
15
16 #include "db/builder.h"
17 #include "db/error_handler.h"
18 #include "options/options_helper.h"
19 #include "rocksdb/wal_filter.h"
20 #include "table/block_based_table_factory.h"
21 #include "util/rate_limiter.h"
22 #include "util/sst_file_manager_impl.h"
23 #include "util/sync_point.h"
24
25 namespace rocksdb {
26 Options SanitizeOptions(const std::string& dbname, const Options& src) {
27 auto db_options = SanitizeOptions(dbname, DBOptions(src));
28 ImmutableDBOptions immutable_db_options(db_options);
29 auto cf_options =
30 SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
31 return Options(db_options, cf_options);
32 }
33
34 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
35 DBOptions result(src);
36
37 // result.max_open_files means an "infinite" open files.
38 if (result.max_open_files != -1) {
39 int max_max_open_files = port::GetMaxOpenFiles();
40 if (max_max_open_files == -1) {
41 max_max_open_files = 0x400000;
42 }
43 ClipToRange(&result.max_open_files, 20, max_max_open_files);
44 TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
45 &result.max_open_files);
46 }
47
48 if (result.info_log == nullptr) {
49 Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
50 if (!s.ok()) {
51 // No place suitable for logging
52 result.info_log = nullptr;
53 }
54 }
55
56 if (!result.write_buffer_manager) {
57 result.write_buffer_manager.reset(
58 new WriteBufferManager(result.db_write_buffer_size));
59 }
60 auto bg_job_limits = DBImpl::GetBGJobLimits(
61 result.max_background_flushes, result.max_background_compactions,
62 result.max_background_jobs, true /* parallelize_compactions */);
63 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
64 Env::Priority::LOW);
65 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
66 Env::Priority::HIGH);
67
68 if (result.rate_limiter.get() != nullptr) {
69 if (result.bytes_per_sync == 0) {
70 result.bytes_per_sync = 1024 * 1024;
71 }
72 }
73
74 if (result.delayed_write_rate == 0) {
75 if (result.rate_limiter.get() != nullptr) {
76 result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
77 }
78 if (result.delayed_write_rate == 0) {
79 result.delayed_write_rate = 16 * 1024 * 1024;
80 }
81 }
82
83 if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
84 result.recycle_log_file_num = false;
85 }
86
87 if (result.recycle_log_file_num &&
88 (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
89 result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
90 // kPointInTimeRecovery is indistinguishable from
91 // kTolerateCorruptedTailRecords in recycle mode since we define
92 // the "end" of the log as the first corrupt record we encounter.
93 // kAbsoluteConsistency doesn't make sense because even a clean
94 // shutdown leaves old junk at the end of the log file.
95 result.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
96 }
97
98 if (result.wal_dir.empty()) {
99 // Use dbname as default
100 result.wal_dir = dbname;
101 }
102 if (result.wal_dir.back() == '/') {
103 result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
104 }
105
106 if (result.db_paths.size() == 0) {
107 result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
108 }
109
110 if (result.use_direct_reads && result.compaction_readahead_size == 0) {
111 TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
112 result.compaction_readahead_size = 1024 * 1024 * 2;
113 }
114
115 if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
116 result.new_table_reader_for_compaction_inputs = true;
117 }
118
119 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
120 // guarantee that consecutive log files have consecutive sequence id, which
121 // make recovery complicated.
122 if (result.allow_2pc) {
123 result.avoid_flush_during_recovery = false;
124 }
125
126 #ifndef ROCKSDB_LITE
127 // When the DB is stopped, it's possible that there are some .trash files that
128 // were not deleted yet, when we open the DB we will find these .trash files
129 // and schedule them to be deleted (or delete immediately if SstFileManager
130 // was not used)
131 auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
132 for (size_t i = 0; i < result.db_paths.size(); i++) {
133 DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
134 }
135
136 // Create a default SstFileManager for purposes of tracking compaction size
137 // and facilitating recovery from out of space errors.
138 if (result.sst_file_manager.get() == nullptr) {
139 std::shared_ptr<SstFileManager> sst_file_manager(
140 NewSstFileManager(result.env, result.info_log));
141 result.sst_file_manager = sst_file_manager;
142 }
143 #endif
144 return result;
145 }
146
147 namespace {
148
149 Status SanitizeOptionsByTable(
150 const DBOptions& db_opts,
151 const std::vector<ColumnFamilyDescriptor>& column_families) {
152 Status s;
153 for (auto cf : column_families) {
154 s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
155 if (!s.ok()) {
156 return s;
157 }
158 }
159 return Status::OK();
160 }
161
162 static Status ValidateOptions(
163 const DBOptions& db_options,
164 const std::vector<ColumnFamilyDescriptor>& column_families) {
165 Status s;
166
167 for (auto& cfd : column_families) {
168 s = CheckCompressionSupported(cfd.options);
169 if (s.ok() && db_options.allow_concurrent_memtable_write) {
170 s = CheckConcurrentWritesSupported(cfd.options);
171 }
172 if (s.ok()) {
173 s = CheckCFPathsSupported(db_options, cfd.options);
174 }
175 if (!s.ok()) {
176 return s;
177 }
178
179 if (cfd.options.ttl > 0) {
180 if (db_options.max_open_files != -1) {
181 return Status::NotSupported(
182 "TTL is only supported when files are always "
183 "kept open (set max_open_files = -1). ");
184 }
185 if (cfd.options.table_factory->Name() !=
186 BlockBasedTableFactory().Name()) {
187 return Status::NotSupported(
188 "TTL is only supported in Block-Based Table format. ");
189 }
190 }
191 }
192
193 if (db_options.db_paths.size() > 4) {
194 return Status::NotSupported(
195 "More than four DB paths are not supported yet. ");
196 }
197
198 if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
199 // Protect against assert in PosixMMapReadableFile constructor
200 return Status::NotSupported(
201 "If memory mapped reads (allow_mmap_reads) are enabled "
202 "then direct I/O reads (use_direct_reads) must be disabled. ");
203 }
204
205 if (db_options.allow_mmap_writes &&
206 db_options.use_direct_io_for_flush_and_compaction) {
207 return Status::NotSupported(
208 "If memory mapped writes (allow_mmap_writes) are enabled "
209 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
210 "be disabled. ");
211 }
212
213 if (db_options.keep_log_file_num == 0) {
214 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
215 }
216
217 return Status::OK();
218 }
219 } // namespace
220 Status DBImpl::NewDB() {
221 VersionEdit new_db;
222 new_db.SetLogNumber(0);
223 new_db.SetNextFile(2);
224 new_db.SetLastSequence(0);
225
226 Status s;
227
228 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
229 const std::string manifest = DescriptorFileName(dbname_, 1);
230 {
231 std::unique_ptr<WritableFile> file;
232 EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
233 s = NewWritableFile(env_, manifest, &file, env_options);
234 if (!s.ok()) {
235 return s;
236 }
237 file->SetPreallocationBlockSize(
238 immutable_db_options_.manifest_preallocation_size);
239 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
240 std::move(file), manifest, env_options, env_, nullptr /* stats */,
241 immutable_db_options_.listeners));
242 log::Writer log(std::move(file_writer), 0, false);
243 std::string record;
244 new_db.EncodeTo(&record);
245 s = log.AddRecord(record);
246 if (s.ok()) {
247 s = SyncManifest(env_, &immutable_db_options_, log.file());
248 }
249 }
250 if (s.ok()) {
251 // Make "CURRENT" file that points to the new manifest file.
252 s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
253 } else {
254 env_->DeleteFile(manifest);
255 }
256 return s;
257 }
258
259 Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
260 std::unique_ptr<Directory>* directory) {
261 // We call CreateDirIfMissing() as the directory may already exist (if we
262 // are reopening a DB), when this happens we don't want creating the
263 // directory to cause an error. However, we need to check if creating the
264 // directory fails or else we may get an obscure message about the lock
265 // file not existing. One real-world example of this occurring is if
266 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
267 // when dbname_ is "dir/db" but when "dir" doesn't exist.
268 Status s = env->CreateDirIfMissing(dirname);
269 if (!s.ok()) {
270 return s;
271 }
272 return env->NewDirectory(dirname, directory);
273 }
274
275 Status DBImpl::Directories::SetDirectories(
276 Env* env, const std::string& dbname, const std::string& wal_dir,
277 const std::vector<DbPath>& data_paths) {
278 Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_);
279 if (!s.ok()) {
280 return s;
281 }
282 if (!wal_dir.empty() && dbname != wal_dir) {
283 s = DBImpl::CreateAndNewDirectory(env, wal_dir, &wal_dir_);
284 if (!s.ok()) {
285 return s;
286 }
287 }
288
289 data_dirs_.clear();
290 for (auto& p : data_paths) {
291 const std::string db_path = p.path;
292 if (db_path == dbname) {
293 data_dirs_.emplace_back(nullptr);
294 } else {
295 std::unique_ptr<Directory> path_directory;
296 s = DBImpl::CreateAndNewDirectory(env, db_path, &path_directory);
297 if (!s.ok()) {
298 return s;
299 }
300 data_dirs_.emplace_back(path_directory.release());
301 }
302 }
303 assert(data_dirs_.size() == data_paths.size());
304 return Status::OK();
305 }
306
307 Status DBImpl::Recover(
308 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
309 bool error_if_log_file_exist, bool error_if_data_exists_in_logs) {
310 mutex_.AssertHeld();
311
312 bool is_new_db = false;
313 assert(db_lock_ == nullptr);
314 if (!read_only) {
315 Status s = directories_.SetDirectories(env_, dbname_,
316 immutable_db_options_.wal_dir,
317 immutable_db_options_.db_paths);
318 if (!s.ok()) {
319 return s;
320 }
321
322 s = env_->LockFile(LockFileName(dbname_), &db_lock_);
323 if (!s.ok()) {
324 return s;
325 }
326
327 s = env_->FileExists(CurrentFileName(dbname_));
328 if (s.IsNotFound()) {
329 if (immutable_db_options_.create_if_missing) {
330 s = NewDB();
331 is_new_db = true;
332 if (!s.ok()) {
333 return s;
334 }
335 } else {
336 return Status::InvalidArgument(
337 dbname_, "does not exist (create_if_missing is false)");
338 }
339 } else if (s.ok()) {
340 if (immutable_db_options_.error_if_exists) {
341 return Status::InvalidArgument(dbname_,
342 "exists (error_if_exists is true)");
343 }
344 } else {
345 // Unexpected error reading file
346 assert(s.IsIOError());
347 return s;
348 }
349 // Check for the IDENTITY file and create it if not there
350 s = env_->FileExists(IdentityFileName(dbname_));
351 if (s.IsNotFound()) {
352 s = SetIdentityFile(env_, dbname_);
353 if (!s.ok()) {
354 return s;
355 }
356 } else if (!s.ok()) {
357 assert(s.IsIOError());
358 return s;
359 }
360 // Verify compatibility of env_options_ and filesystem
361 {
362 std::unique_ptr<RandomAccessFile> idfile;
363 EnvOptions customized_env(env_options_);
364 customized_env.use_direct_reads |=
365 immutable_db_options_.use_direct_io_for_flush_and_compaction;
366 s = env_->NewRandomAccessFile(IdentityFileName(dbname_), &idfile,
367 customized_env);
368 if (!s.ok()) {
369 std::string error_str = s.ToString();
370 // Check if unsupported Direct I/O is the root cause
371 customized_env.use_direct_reads = false;
372 s = env_->NewRandomAccessFile(IdentityFileName(dbname_), &idfile,
373 customized_env);
374 if (s.ok()) {
375 return Status::InvalidArgument(
376 "Direct I/O is not supported by the specified DB.");
377 } else {
378 return Status::InvalidArgument(
379 "Found options incompatible with filesystem", error_str.c_str());
380 }
381 }
382 }
383 }
384
385 Status s = versions_->Recover(column_families, read_only);
386 if (immutable_db_options_.paranoid_checks && s.ok()) {
387 s = CheckConsistency();
388 }
389 if (s.ok() && !read_only) {
390 for (auto cfd : *versions_->GetColumnFamilySet()) {
391 s = cfd->AddDirectories();
392 if (!s.ok()) {
393 return s;
394 }
395 }
396 }
397
398 // Initial max_total_in_memory_state_ before recovery logs. Log recovery
399 // may check this value to decide whether to flush.
400 max_total_in_memory_state_ = 0;
401 for (auto cfd : *versions_->GetColumnFamilySet()) {
402 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
403 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
404 mutable_cf_options->max_write_buffer_number;
405 }
406
407 if (s.ok()) {
408 SequenceNumber next_sequence(kMaxSequenceNumber);
409 default_cf_handle_ = new ColumnFamilyHandleImpl(
410 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
411 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
412 single_column_family_mode_ =
413 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
414
415 // Recover from all newer log files than the ones named in the
416 // descriptor (new log files may have been added by the previous
417 // incarnation without registering them in the descriptor).
418 //
419 // Note that prev_log_number() is no longer used, but we pay
420 // attention to it in case we are recovering a database
421 // produced by an older version of rocksdb.
422 std::vector<std::string> filenames;
423 s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
424 if (s.IsNotFound()) {
425 return Status::InvalidArgument("wal_dir not found",
426 immutable_db_options_.wal_dir);
427 } else if (!s.ok()) {
428 return s;
429 }
430
431 std::vector<uint64_t> logs;
432 for (size_t i = 0; i < filenames.size(); i++) {
433 uint64_t number;
434 FileType type;
435 if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
436 if (is_new_db) {
437 return Status::Corruption(
438 "While creating a new Db, wal_dir contains "
439 "existing log file: ",
440 filenames[i]);
441 } else {
442 logs.push_back(number);
443 }
444 }
445 }
446
447 if (logs.size() > 0) {
448 if (error_if_log_file_exist) {
449 return Status::Corruption(
450 "The db was opened in readonly mode with error_if_log_file_exist"
451 "flag but a log file already exists");
452 } else if (error_if_data_exists_in_logs) {
453 for (auto& log : logs) {
454 std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
455 uint64_t bytes;
456 s = env_->GetFileSize(fname, &bytes);
457 if (s.ok()) {
458 if (bytes > 0) {
459 return Status::Corruption(
460 "error_if_data_exists_in_logs is set but there are data "
461 " in log files.");
462 }
463 }
464 }
465 }
466 }
467
468 if (!logs.empty()) {
469 // Recover in the order in which the logs were generated
470 std::sort(logs.begin(), logs.end());
471 s = RecoverLogFiles(logs, &next_sequence, read_only);
472 if (!s.ok()) {
473 // Clear memtables if recovery failed
474 for (auto cfd : *versions_->GetColumnFamilySet()) {
475 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
476 kMaxSequenceNumber);
477 }
478 }
479 }
480 }
481
482 if (read_only) {
483 // If we are opening as read-only, we need to update options_file_number_
484 // to reflect the most recent OPTIONS file. It does not matter for regular
485 // read-write db instance because options_file_number_ will later be
486 // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
487 std::vector<std::string> file_names;
488 if (s.ok()) {
489 s = env_->GetChildren(GetName(), &file_names);
490 }
491 if (s.ok()) {
492 uint64_t number = 0;
493 uint64_t options_file_number = 0;
494 FileType type;
495 for (const auto& fname : file_names) {
496 if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
497 options_file_number = std::max(number, options_file_number);
498 }
499 }
500 versions_->options_file_number_ = options_file_number;
501 }
502 }
503
504 return s;
505 }
506
507 // REQUIRES: log_numbers are sorted in ascending order
508 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
509 SequenceNumber* next_sequence, bool read_only) {
510 struct LogReporter : public log::Reader::Reporter {
511 Env* env;
512 Logger* info_log;
513 const char* fname;
514 Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
515 void Corruption(size_t bytes, const Status& s) override {
516 ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
517 (this->status == nullptr ? "(ignoring error) " : ""),
518 fname, static_cast<int>(bytes), s.ToString().c_str());
519 if (this->status != nullptr && this->status->ok()) {
520 *this->status = s;
521 }
522 }
523 };
524
525 mutex_.AssertHeld();
526 Status status;
527 std::unordered_map<int, VersionEdit> version_edits;
528 // no need to refcount because iteration is under mutex
529 for (auto cfd : *versions_->GetColumnFamilySet()) {
530 VersionEdit edit;
531 edit.SetColumnFamily(cfd->GetID());
532 version_edits.insert({cfd->GetID(), edit});
533 }
534 int job_id = next_job_id_.fetch_add(1);
535 {
536 auto stream = event_logger_.Log();
537 stream << "job" << job_id << "event"
538 << "recovery_started";
539 stream << "log_files";
540 stream.StartArray();
541 for (auto log_number : log_numbers) {
542 stream << log_number;
543 }
544 stream.EndArray();
545 }
546
547 #ifndef ROCKSDB_LITE
548 if (immutable_db_options_.wal_filter != nullptr) {
549 std::map<std::string, uint32_t> cf_name_id_map;
550 std::map<uint32_t, uint64_t> cf_lognumber_map;
551 for (auto cfd : *versions_->GetColumnFamilySet()) {
552 cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
553 cf_lognumber_map.insert(
554 std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
555 }
556
557 immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
558 cf_name_id_map);
559 }
560 #endif
561
562 bool stop_replay_by_wal_filter = false;
563 bool stop_replay_for_corruption = false;
564 bool flushed = false;
565 uint64_t corrupted_log_number = kMaxSequenceNumber;
566 for (auto log_number : log_numbers) {
567 if (log_number < versions_->min_log_number_to_keep_2pc()) {
568 ROCKS_LOG_INFO(immutable_db_options_.info_log,
569 "Skipping log #%" PRIu64
570 " since it is older than min log to keep #%" PRIu64,
571 log_number, versions_->min_log_number_to_keep_2pc());
572 continue;
573 }
574 // The previous incarnation may not have written any MANIFEST
575 // records after allocating this log number. So we manually
576 // update the file number allocation counter in VersionSet.
577 versions_->MarkFileNumberUsed(log_number);
578 // Open the log file
579 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
580
581 ROCKS_LOG_INFO(immutable_db_options_.info_log,
582 "Recovering log #%" PRIu64 " mode %d", log_number,
583 static_cast<int>(immutable_db_options_.wal_recovery_mode));
584 auto logFileDropped = [this, &fname]() {
585 uint64_t bytes;
586 if (env_->GetFileSize(fname, &bytes).ok()) {
587 auto info_log = immutable_db_options_.info_log.get();
588 ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
589 static_cast<int>(bytes));
590 }
591 };
592 if (stop_replay_by_wal_filter) {
593 logFileDropped();
594 continue;
595 }
596
597 std::unique_ptr<SequentialFileReader> file_reader;
598 {
599 std::unique_ptr<SequentialFile> file;
600 status = env_->NewSequentialFile(fname, &file,
601 env_->OptimizeForLogRead(env_options_));
602 if (!status.ok()) {
603 MaybeIgnoreError(&status);
604 if (!status.ok()) {
605 return status;
606 } else {
607 // Fail with one log file, but that's ok.
608 // Try next one.
609 continue;
610 }
611 }
612 file_reader.reset(new SequentialFileReader(std::move(file), fname));
613 }
614
615 // Create the log reader.
616 LogReporter reporter;
617 reporter.env = env_;
618 reporter.info_log = immutable_db_options_.info_log.get();
619 reporter.fname = fname.c_str();
620 if (!immutable_db_options_.paranoid_checks ||
621 immutable_db_options_.wal_recovery_mode ==
622 WALRecoveryMode::kSkipAnyCorruptedRecords) {
623 reporter.status = nullptr;
624 } else {
625 reporter.status = &status;
626 }
627 // We intentially make log::Reader do checksumming even if
628 // paranoid_checks==false so that corruptions cause entire commits
629 // to be skipped instead of propagating bad information (like overly
630 // large sequence numbers).
631 log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
632 &reporter, true /*checksum*/, log_number);
633
634 // Determine if we should tolerate incomplete records at the tail end of the
635 // Read all the records and add to a memtable
636 std::string scratch;
637 Slice record;
638 WriteBatch batch;
639
640 while (!stop_replay_by_wal_filter &&
641 reader.ReadRecord(&record, &scratch,
642 immutable_db_options_.wal_recovery_mode) &&
643 status.ok()) {
644 if (record.size() < WriteBatchInternal::kHeader) {
645 reporter.Corruption(record.size(),
646 Status::Corruption("log record too small"));
647 continue;
648 }
649 WriteBatchInternal::SetContents(&batch, record);
650 SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
651
652 if (immutable_db_options_.wal_recovery_mode ==
653 WALRecoveryMode::kPointInTimeRecovery) {
654 // In point-in-time recovery mode, if sequence id of log files are
655 // consecutive, we continue recovery despite corruption. This could
656 // happen when we open and write to a corrupted DB, where sequence id
657 // will start from the last sequence id we recovered.
658 if (sequence == *next_sequence) {
659 stop_replay_for_corruption = false;
660 }
661 if (stop_replay_for_corruption) {
662 logFileDropped();
663 break;
664 }
665 }
666
667 #ifndef ROCKSDB_LITE
668 if (immutable_db_options_.wal_filter != nullptr) {
669 WriteBatch new_batch;
670 bool batch_changed = false;
671
672 WalFilter::WalProcessingOption wal_processing_option =
673 immutable_db_options_.wal_filter->LogRecordFound(
674 log_number, fname, batch, &new_batch, &batch_changed);
675
676 switch (wal_processing_option) {
677 case WalFilter::WalProcessingOption::kContinueProcessing:
678 // do nothing, proceeed normally
679 break;
680 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
681 // skip current record
682 continue;
683 case WalFilter::WalProcessingOption::kStopReplay:
684 // skip current record and stop replay
685 stop_replay_by_wal_filter = true;
686 continue;
687 case WalFilter::WalProcessingOption::kCorruptedRecord: {
688 status =
689 Status::Corruption("Corruption reported by Wal Filter ",
690 immutable_db_options_.wal_filter->Name());
691 MaybeIgnoreError(&status);
692 if (!status.ok()) {
693 reporter.Corruption(record.size(), status);
694 continue;
695 }
696 break;
697 }
698 default: {
699 assert(false); // unhandled case
700 status = Status::NotSupported(
701 "Unknown WalProcessingOption returned"
702 " by Wal Filter ",
703 immutable_db_options_.wal_filter->Name());
704 MaybeIgnoreError(&status);
705 if (!status.ok()) {
706 return status;
707 } else {
708 // Ignore the error with current record processing.
709 continue;
710 }
711 }
712 }
713
714 if (batch_changed) {
715 // Make sure that the count in the new batch is
716 // within the orignal count.
717 int new_count = WriteBatchInternal::Count(&new_batch);
718 int original_count = WriteBatchInternal::Count(&batch);
719 if (new_count > original_count) {
720 ROCKS_LOG_FATAL(
721 immutable_db_options_.info_log,
722 "Recovering log #%" PRIu64
723 " mode %d log filter %s returned "
724 "more records (%d) than original (%d) which is not allowed. "
725 "Aborting recovery.",
726 log_number,
727 static_cast<int>(immutable_db_options_.wal_recovery_mode),
728 immutable_db_options_.wal_filter->Name(), new_count,
729 original_count);
730 status = Status::NotSupported(
731 "More than original # of records "
732 "returned by Wal Filter ",
733 immutable_db_options_.wal_filter->Name());
734 return status;
735 }
736 // Set the same sequence number in the new_batch
737 // as the original batch.
738 WriteBatchInternal::SetSequence(&new_batch,
739 WriteBatchInternal::Sequence(&batch));
740 batch = new_batch;
741 }
742 }
743 #endif // ROCKSDB_LITE
744
745 // If column family was not found, it might mean that the WAL write
746 // batch references to the column family that was dropped after the
747 // insert. We don't want to fail the whole write batch in that case --
748 // we just ignore the update.
749 // That's why we set ignore missing column families to true
750 bool has_valid_writes = false;
751 status = WriteBatchInternal::InsertInto(
752 &batch, column_family_memtables_.get(), &flush_scheduler_, true,
753 log_number, this, false /* concurrent_memtable_writes */,
754 next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
755 MaybeIgnoreError(&status);
756 if (!status.ok()) {
757 // We are treating this as a failure while reading since we read valid
758 // blocks that do not form coherent data
759 reporter.Corruption(record.size(), status);
760 continue;
761 }
762
763 if (has_valid_writes && !read_only) {
764 // we can do this because this is called before client has access to the
765 // DB and there is only a single thread operating on DB
766 ColumnFamilyData* cfd;
767
768 while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
769 cfd->Unref();
770 // If this asserts, it means that InsertInto failed in
771 // filtering updates to already-flushed column families
772 assert(cfd->GetLogNumber() <= log_number);
773 auto iter = version_edits.find(cfd->GetID());
774 assert(iter != version_edits.end());
775 VersionEdit* edit = &iter->second;
776 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
777 if (!status.ok()) {
778 // Reflect errors immediately so that conditions like full
779 // file-systems cause the DB::Open() to fail.
780 return status;
781 }
782 flushed = true;
783
784 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
785 *next_sequence);
786 }
787 }
788 }
789
790 if (!status.ok()) {
791 if (status.IsNotSupported()) {
792 // We should not treat NotSupported as corruption. It is rather a clear
793 // sign that we are processing a WAL that is produced by an incompatible
794 // version of the code.
795 return status;
796 }
797 if (immutable_db_options_.wal_recovery_mode ==
798 WALRecoveryMode::kSkipAnyCorruptedRecords) {
799 // We should ignore all errors unconditionally
800 status = Status::OK();
801 } else if (immutable_db_options_.wal_recovery_mode ==
802 WALRecoveryMode::kPointInTimeRecovery) {
803 // We should ignore the error but not continue replaying
804 status = Status::OK();
805 stop_replay_for_corruption = true;
806 corrupted_log_number = log_number;
807 ROCKS_LOG_INFO(immutable_db_options_.info_log,
808 "Point in time recovered to log #%" PRIu64
809 " seq #%" PRIu64,
810 log_number, *next_sequence);
811 } else {
812 assert(immutable_db_options_.wal_recovery_mode ==
813 WALRecoveryMode::kTolerateCorruptedTailRecords ||
814 immutable_db_options_.wal_recovery_mode ==
815 WALRecoveryMode::kAbsoluteConsistency);
816 return status;
817 }
818 }
819
820 flush_scheduler_.Clear();
821 auto last_sequence = *next_sequence - 1;
822 if ((*next_sequence != kMaxSequenceNumber) &&
823 (versions_->LastSequence() <= last_sequence)) {
824 versions_->SetLastAllocatedSequence(last_sequence);
825 versions_->SetLastPublishedSequence(last_sequence);
826 versions_->SetLastSequence(last_sequence);
827 }
828 }
829 // Compare the corrupted log number to all columnfamily's current log number.
830 // Abort Open() if any column family's log number is greater than
831 // the corrupted log number, which means CF contains data beyond the point of
832 // corruption. This could during PIT recovery when the WAL is corrupted and
833 // some (but not all) CFs are flushed
834 if (stop_replay_for_corruption == true &&
835 (immutable_db_options_.wal_recovery_mode ==
836 WALRecoveryMode::kPointInTimeRecovery ||
837 immutable_db_options_.wal_recovery_mode ==
838 WALRecoveryMode::kTolerateCorruptedTailRecords)) {
839 for (auto cfd : *versions_->GetColumnFamilySet()) {
840 if (cfd->GetLogNumber() > corrupted_log_number) {
841 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
842 "Column family inconsistency: SST file contains data"
843 " beyond the point of corruption.");
844 return Status::Corruption("SST file is ahead of WALs");
845 }
846 }
847 }
848
849 // True if there's any data in the WALs; if not, we can skip re-processing
850 // them later
851 bool data_seen = false;
852 if (!read_only) {
853 // no need to refcount since client still doesn't have access
854 // to the DB and can not drop column families while we iterate
855 auto max_log_number = log_numbers.back();
856 for (auto cfd : *versions_->GetColumnFamilySet()) {
857 auto iter = version_edits.find(cfd->GetID());
858 assert(iter != version_edits.end());
859 VersionEdit* edit = &iter->second;
860
861 if (cfd->GetLogNumber() > max_log_number) {
862 // Column family cfd has already flushed the data
863 // from all logs. Memtable has to be empty because
864 // we filter the updates based on log_number
865 // (in WriteBatch::InsertInto)
866 assert(cfd->mem()->GetFirstSequenceNumber() == 0);
867 assert(edit->NumEntries() == 0);
868 continue;
869 }
870
871 // flush the final memtable (if non-empty)
872 if (cfd->mem()->GetFirstSequenceNumber() != 0) {
873 // If flush happened in the middle of recovery (e.g. due to memtable
874 // being full), we flush at the end. Otherwise we'll need to record
875 // where we were on last flush, which make the logic complicated.
876 if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
877 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
878 if (!status.ok()) {
879 // Recovery failed
880 break;
881 }
882 flushed = true;
883
884 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
885 versions_->LastSequence());
886 }
887 data_seen = true;
888 }
889
890 // write MANIFEST with update
891 // writing log_number in the manifest means that any log file
892 // with number strongly less than (log_number + 1) is already
893 // recovered and should be ignored on next reincarnation.
894 // Since we already recovered max_log_number, we want all logs
895 // with numbers `<= max_log_number` (includes this one) to be ignored
896 if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
897 edit->SetLogNumber(max_log_number + 1);
898 }
899 // we must mark the next log number as used, even though it's
900 // not actually used. that is because VersionSet assumes
901 // VersionSet::next_file_number_ always to be strictly greater than any
902 // log number
903 versions_->MarkFileNumberUsed(max_log_number + 1);
904 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
905 edit, &mutex_);
906 if (!status.ok()) {
907 // Recovery failed
908 break;
909 }
910 }
911 }
912
913 if (status.ok() && data_seen && !flushed) {
914 status = RestoreAliveLogFiles(log_numbers);
915 }
916
917 event_logger_.Log() << "job" << job_id << "event"
918 << "recovery_finished";
919
920 return status;
921 }
922
923 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
924 if (log_numbers.empty()) {
925 return Status::OK();
926 }
927 Status s;
928 mutex_.AssertHeld();
929 assert(immutable_db_options_.avoid_flush_during_recovery);
930 if (two_write_queues_) {
931 log_write_mutex_.Lock();
932 }
933 // Mark these as alive so they'll be considered for deletion later by
934 // FindObsoleteFiles()
935 total_log_size_ = 0;
936 log_empty_ = false;
937 for (auto log_number : log_numbers) {
938 LogFileNumberSize log(log_number);
939 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
940 // This gets the appear size of the logs, not including preallocated space.
941 s = env_->GetFileSize(fname, &log.size);
942 if (!s.ok()) {
943 break;
944 }
945 total_log_size_ += log.size;
946 alive_log_files_.push_back(log);
947 // We preallocate space for logs, but then after a crash and restart, those
948 // preallocated space are not needed anymore. It is likely only the last
949 // log has such preallocated space, so we only truncate for the last log.
950 if (log_number == log_numbers.back()) {
951 std::unique_ptr<WritableFile> last_log;
952 Status truncate_status = env_->ReopenWritableFile(
953 fname, &last_log,
954 env_->OptimizeForLogWrite(
955 env_options_,
956 BuildDBOptions(immutable_db_options_, mutable_db_options_)));
957 if (truncate_status.ok()) {
958 truncate_status = last_log->Truncate(log.size);
959 }
960 if (truncate_status.ok()) {
961 truncate_status = last_log->Close();
962 }
963 // Not a critical error if fail to truncate.
964 if (!truncate_status.ok()) {
965 ROCKS_LOG_WARN(immutable_db_options_.info_log,
966 "Failed to truncate log #%" PRIu64 ": %s", log_number,
967 truncate_status.ToString().c_str());
968 }
969 }
970 }
971 if (two_write_queues_) {
972 log_write_mutex_.Unlock();
973 }
974 return s;
975 }
976
977 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
978 MemTable* mem, VersionEdit* edit) {
979 mutex_.AssertHeld();
980 const uint64_t start_micros = env_->NowMicros();
981 FileMetaData meta;
982 auto pending_outputs_inserted_elem =
983 CaptureCurrentFileNumberInPendingOutputs();
984 meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
985 ReadOptions ro;
986 ro.total_order_seek = true;
987 Arena arena;
988 Status s;
989 TableProperties table_properties;
990 {
991 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
992 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
993 "[%s] [WriteLevel0TableForRecovery]"
994 " Level-0 table #%" PRIu64 ": started",
995 cfd->GetName().c_str(), meta.fd.GetNumber());
996
997 // Get the latest mutable cf options while the mutex is still locked
998 const MutableCFOptions mutable_cf_options =
999 *cfd->GetLatestMutableCFOptions();
1000 bool paranoid_file_checks =
1001 cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1002
1003 int64_t _current_time = 0;
1004 env_->GetCurrentTime(&_current_time); // ignore error
1005 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1006
1007 {
1008 auto write_hint = cfd->CalculateSSTWriteHint(0);
1009 mutex_.Unlock();
1010
1011 SequenceNumber earliest_write_conflict_snapshot;
1012 std::vector<SequenceNumber> snapshot_seqs =
1013 snapshots_.GetAll(&earliest_write_conflict_snapshot);
1014 auto snapshot_checker = snapshot_checker_.get();
1015 if (use_custom_gc_ && snapshot_checker == nullptr) {
1016 snapshot_checker = DisableGCSnapshotChecker::Instance();
1017 }
1018 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1019 range_del_iters;
1020 auto range_del_iter =
1021 mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1022 if (range_del_iter != nullptr) {
1023 range_del_iters.emplace_back(range_del_iter);
1024 }
1025 s = BuildTable(
1026 dbname_, env_, *cfd->ioptions(), mutable_cf_options,
1027 env_options_for_compaction_, cfd->table_cache(), iter.get(),
1028 std::move(range_del_iters), &meta, cfd->internal_comparator(),
1029 cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
1030 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1031 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1032 mutable_cf_options.sample_for_compression,
1033 cfd->ioptions()->compression_opts, paranoid_file_checks,
1034 cfd->internal_stats(), TableFileCreationReason::kRecovery,
1035 &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
1036 -1 /* level */, current_time, write_hint);
1037 LogFlush(immutable_db_options_.info_log);
1038 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1039 "[%s] [WriteLevel0TableForRecovery]"
1040 " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1041 cfd->GetName().c_str(), meta.fd.GetNumber(),
1042 meta.fd.GetFileSize(), s.ToString().c_str());
1043 mutex_.Lock();
1044 }
1045 }
1046 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1047
1048 // Note that if file_size is zero, the file has been deleted and
1049 // should not be added to the manifest.
1050 int level = 0;
1051 if (s.ok() && meta.fd.GetFileSize() > 0) {
1052 edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1053 meta.fd.GetFileSize(), meta.smallest, meta.largest,
1054 meta.fd.smallest_seqno, meta.fd.largest_seqno,
1055 meta.marked_for_compaction);
1056 }
1057
1058 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1059 stats.micros = env_->NowMicros() - start_micros;
1060 stats.bytes_written = meta.fd.GetFileSize();
1061 stats.num_output_files = 1;
1062 cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1063 cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
1064 meta.fd.GetFileSize());
1065 RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1066 return s;
1067 }
1068
1069 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1070 DBOptions db_options(options);
1071 ColumnFamilyOptions cf_options(options);
1072 std::vector<ColumnFamilyDescriptor> column_families;
1073 column_families.push_back(
1074 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1075 std::vector<ColumnFamilyHandle*> handles;
1076 Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1077 if (s.ok()) {
1078 assert(handles.size() == 1);
1079 // i can delete the handle since DBImpl is always holding a reference to
1080 // default column family
1081 delete handles[0];
1082 }
1083 return s;
1084 }
1085
1086 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1087 const std::vector<ColumnFamilyDescriptor>& column_families,
1088 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1089 const bool kSeqPerBatch = true;
1090 const bool kBatchPerTxn = true;
1091 return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1092 !kSeqPerBatch, kBatchPerTxn);
1093 }
1094
1095 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1096 const std::vector<ColumnFamilyDescriptor>& column_families,
1097 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1098 const bool seq_per_batch, const bool batch_per_txn) {
1099 Status s = SanitizeOptionsByTable(db_options, column_families);
1100 if (!s.ok()) {
1101 return s;
1102 }
1103
1104 s = ValidateOptions(db_options, column_families);
1105 if (!s.ok()) {
1106 return s;
1107 }
1108
1109 *dbptr = nullptr;
1110 handles->clear();
1111
1112 size_t max_write_buffer_size = 0;
1113 for (auto cf : column_families) {
1114 max_write_buffer_size =
1115 std::max(max_write_buffer_size, cf.options.write_buffer_size);
1116 }
1117
1118 DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1119 s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
1120 if (s.ok()) {
1121 std::vector<std::string> paths;
1122 for (auto& db_path : impl->immutable_db_options_.db_paths) {
1123 paths.emplace_back(db_path.path);
1124 }
1125 for (auto& cf : column_families) {
1126 for (auto& cf_path : cf.options.cf_paths) {
1127 paths.emplace_back(cf_path.path);
1128 }
1129 }
1130 for (auto& path : paths) {
1131 s = impl->env_->CreateDirIfMissing(path);
1132 if (!s.ok()) {
1133 break;
1134 }
1135 }
1136
1137 // For recovery from NoSpace() error, we can only handle
1138 // the case where the database is stored in a single path
1139 if (paths.size() <= 1) {
1140 impl->error_handler_.EnableAutoRecovery();
1141 }
1142 }
1143
1144 if (!s.ok()) {
1145 delete impl;
1146 return s;
1147 }
1148
1149 s = impl->CreateArchivalDirectory();
1150 if (!s.ok()) {
1151 delete impl;
1152 return s;
1153 }
1154 impl->mutex_.Lock();
1155 auto write_hint = impl->CalculateWALWriteHint();
1156 // Handles create_if_missing, error_if_exists
1157 s = impl->Recover(column_families);
1158 if (s.ok()) {
1159 uint64_t new_log_number = impl->versions_->NewFileNumber();
1160 std::unique_ptr<WritableFile> lfile;
1161 EnvOptions soptions(db_options);
1162 EnvOptions opt_env_options =
1163 impl->immutable_db_options_.env->OptimizeForLogWrite(
1164 soptions, BuildDBOptions(impl->immutable_db_options_,
1165 impl->mutable_db_options_));
1166 std::string log_fname =
1167 LogFileName(impl->immutable_db_options_.wal_dir, new_log_number);
1168 s = NewWritableFile(impl->immutable_db_options_.env, log_fname, &lfile,
1169 opt_env_options);
1170 if (s.ok()) {
1171 lfile->SetWriteLifeTimeHint(write_hint);
1172 lfile->SetPreallocationBlockSize(
1173 impl->GetWalPreallocateBlockSize(max_write_buffer_size));
1174 {
1175 InstrumentedMutexLock wl(&impl->log_write_mutex_);
1176 impl->logfile_number_ = new_log_number;
1177 const auto& listeners = impl->immutable_db_options_.listeners;
1178 std::unique_ptr<WritableFileWriter> file_writer(
1179 new WritableFileWriter(std::move(lfile), log_fname, opt_env_options,
1180 impl->env_, nullptr /* stats */, listeners));
1181 impl->logs_.emplace_back(
1182 new_log_number,
1183 new log::Writer(
1184 std::move(file_writer), new_log_number,
1185 impl->immutable_db_options_.recycle_log_file_num > 0,
1186 impl->immutable_db_options_.manual_wal_flush));
1187 }
1188
1189 // set column family handles
1190 for (auto cf : column_families) {
1191 auto cfd =
1192 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1193 if (cfd != nullptr) {
1194 handles->push_back(
1195 new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1196 impl->NewThreadStatusCfInfo(cfd);
1197 } else {
1198 if (db_options.create_missing_column_families) {
1199 // missing column family, create it
1200 ColumnFamilyHandle* handle;
1201 impl->mutex_.Unlock();
1202 s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1203 impl->mutex_.Lock();
1204 if (s.ok()) {
1205 handles->push_back(handle);
1206 } else {
1207 break;
1208 }
1209 } else {
1210 s = Status::InvalidArgument("Column family not found: ", cf.name);
1211 break;
1212 }
1213 }
1214 }
1215 }
1216 if (s.ok()) {
1217 SuperVersionContext sv_context(/* create_superversion */ true);
1218 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1219 impl->InstallSuperVersionAndScheduleWork(
1220 cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1221 }
1222 sv_context.Clean();
1223 if (impl->two_write_queues_) {
1224 impl->log_write_mutex_.Lock();
1225 }
1226 impl->alive_log_files_.push_back(
1227 DBImpl::LogFileNumberSize(impl->logfile_number_));
1228 if (impl->two_write_queues_) {
1229 impl->log_write_mutex_.Unlock();
1230 }
1231 impl->DeleteObsoleteFiles();
1232 s = impl->directories_.GetDbDir()->Fsync();
1233 }
1234 }
1235
1236 if (s.ok()) {
1237 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1238 if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1239 auto* vstorage = cfd->current()->storage_info();
1240 for (int i = 1; i < vstorage->num_levels(); ++i) {
1241 int num_files = vstorage->NumLevelFiles(i);
1242 if (num_files > 0) {
1243 s = Status::InvalidArgument(
1244 "Not all files are at level 0. Cannot "
1245 "open with FIFO compaction style.");
1246 break;
1247 }
1248 }
1249 }
1250 if (!cfd->mem()->IsSnapshotSupported()) {
1251 impl->is_snapshot_supported_ = false;
1252 }
1253 if (cfd->ioptions()->merge_operator != nullptr &&
1254 !cfd->mem()->IsMergeOperatorSupported()) {
1255 s = Status::InvalidArgument(
1256 "The memtable of column family %s does not support merge operator "
1257 "its options.merge_operator is non-null",
1258 cfd->GetName().c_str());
1259 }
1260 if (!s.ok()) {
1261 break;
1262 }
1263 }
1264 }
1265 TEST_SYNC_POINT("DBImpl::Open:Opened");
1266 Status persist_options_status;
1267 if (s.ok()) {
1268 // Persist RocksDB Options before scheduling the compaction.
1269 // The WriteOptionsFile() will release and lock the mutex internally.
1270 persist_options_status = impl->WriteOptionsFile(
1271 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1272
1273 *dbptr = impl;
1274 impl->opened_successfully_ = true;
1275 impl->MaybeScheduleFlushOrCompaction();
1276 }
1277 impl->mutex_.Unlock();
1278
1279 #ifndef ROCKSDB_LITE
1280 auto sfm = static_cast<SstFileManagerImpl*>(
1281 impl->immutable_db_options_.sst_file_manager.get());
1282 if (s.ok() && sfm) {
1283 // Notify SstFileManager about all sst files that already exist in
1284 // db_paths[0] and cf_paths[0] when the DB is opened.
1285 std::vector<std::string> paths;
1286 paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1287 for (auto& cf : column_families) {
1288 if (!cf.options.cf_paths.empty()) {
1289 paths.emplace_back(cf.options.cf_paths[0].path);
1290 }
1291 }
1292 // Remove duplicate paths.
1293 std::sort(paths.begin(), paths.end());
1294 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1295 for (auto& path : paths) {
1296 std::vector<std::string> existing_files;
1297 impl->immutable_db_options_.env->GetChildren(path, &existing_files);
1298 for (auto& file_name : existing_files) {
1299 uint64_t file_number;
1300 FileType file_type;
1301 std::string file_path = path + "/" + file_name;
1302 if (ParseFileName(file_name, &file_number, &file_type) &&
1303 file_type == kTableFile) {
1304 sfm->OnAddFile(file_path);
1305 }
1306 }
1307 }
1308
1309 // Reserve some disk buffer space. This is a heuristic - when we run out
1310 // of disk space, this ensures that there is atleast write_buffer_size
1311 // amount of free space before we resume DB writes. In low disk space
1312 // conditions, we want to avoid a lot of small L0 files due to frequent
1313 // WAL write failures and resultant forced flushes
1314 sfm->ReserveDiskBuffer(max_write_buffer_size,
1315 impl->immutable_db_options_.db_paths[0].path);
1316 }
1317 #endif // !ROCKSDB_LITE
1318
1319 if (s.ok()) {
1320 ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1321 impl);
1322 LogFlush(impl->immutable_db_options_.info_log);
1323 assert(impl->TEST_WALBufferIsEmpty());
1324 // If the assert above fails then we need to FlushWAL before returning
1325 // control back to the user.
1326 if (!persist_options_status.ok()) {
1327 s = Status::IOError(
1328 "DB::Open() failed --- Unable to persist Options file",
1329 persist_options_status.ToString());
1330 }
1331 }
1332 if (s.ok()) {
1333 impl->StartTimedTasks();
1334 }
1335 if (!s.ok()) {
1336 for (auto* h : *handles) {
1337 delete h;
1338 }
1339 handles->clear();
1340 delete impl;
1341 *dbptr = nullptr;
1342 }
1343 return s;
1344 }
1345 } // namespace rocksdb