]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl_open.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_open.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/read_write_util.h"
17 #include "file/sst_file_manager_impl.h"
18 #include "file/writable_file_writer.h"
19 #include "monitoring/persistent_stats_history.h"
20 #include "options/options_helper.h"
21 #include "rocksdb/wal_filter.h"
22 #include "table/block_based/block_based_table_factory.h"
23 #include "test_util/sync_point.h"
24 #include "util/rate_limiter.h"
25
26 namespace ROCKSDB_NAMESPACE {
27 Options SanitizeOptions(const std::string& dbname, const Options& src) {
28 auto db_options = SanitizeOptions(dbname, DBOptions(src));
29 ImmutableDBOptions immutable_db_options(db_options);
30 auto cf_options =
31 SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
32 return Options(db_options, cf_options);
33 }
34
35 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
36 DBOptions result(src);
37
38 if (result.file_system == nullptr) {
39 if (result.env == Env::Default()) {
40 result.file_system = FileSystem::Default();
41 } else {
42 result.file_system.reset(new LegacyFileSystemWrapper(result.env));
43 }
44 } else {
45 if (result.env == nullptr) {
46 result.env = Env::Default();
47 }
48 }
49
50 // result.max_open_files means an "infinite" open files.
51 if (result.max_open_files != -1) {
52 int max_max_open_files = port::GetMaxOpenFiles();
53 if (max_max_open_files == -1) {
54 max_max_open_files = 0x400000;
55 }
56 ClipToRange(&result.max_open_files, 20, max_max_open_files);
57 TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
58 &result.max_open_files);
59 }
60
61 if (result.info_log == nullptr) {
62 Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
63 if (!s.ok()) {
64 // No place suitable for logging
65 result.info_log = nullptr;
66 }
67 }
68
69 if (!result.write_buffer_manager) {
70 result.write_buffer_manager.reset(
71 new WriteBufferManager(result.db_write_buffer_size));
72 }
73 auto bg_job_limits = DBImpl::GetBGJobLimits(
74 result.max_background_flushes, result.max_background_compactions,
75 result.max_background_jobs, true /* parallelize_compactions */);
76 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
77 Env::Priority::LOW);
78 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
79 Env::Priority::HIGH);
80
81 if (result.rate_limiter.get() != nullptr) {
82 if (result.bytes_per_sync == 0) {
83 result.bytes_per_sync = 1024 * 1024;
84 }
85 }
86
87 if (result.delayed_write_rate == 0) {
88 if (result.rate_limiter.get() != nullptr) {
89 result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
90 }
91 if (result.delayed_write_rate == 0) {
92 result.delayed_write_rate = 16 * 1024 * 1024;
93 }
94 }
95
96 if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
97 result.recycle_log_file_num = false;
98 }
99
100 if (result.recycle_log_file_num &&
101 (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
102 result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
103 // kPointInTimeRecovery is inconsistent with recycle log file feature since
104 // we define the "end" of the log as the first corrupt record we encounter.
105 // kAbsoluteConsistency doesn't make sense because even a clean
106 // shutdown leaves old junk at the end of the log file.
107 result.recycle_log_file_num = 0;
108 }
109
110 if (result.wal_dir.empty()) {
111 // Use dbname as default
112 result.wal_dir = dbname;
113 }
114 if (result.wal_dir.back() == '/') {
115 result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
116 }
117
118 if (result.db_paths.size() == 0) {
119 result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
120 }
121
122 if (result.use_direct_reads && result.compaction_readahead_size == 0) {
123 TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
124 result.compaction_readahead_size = 1024 * 1024 * 2;
125 }
126
127 if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
128 result.new_table_reader_for_compaction_inputs = true;
129 }
130
131 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
132 // guarantee that consecutive log files have consecutive sequence id, which
133 // make recovery complicated.
134 if (result.allow_2pc) {
135 result.avoid_flush_during_recovery = false;
136 }
137
138 #ifndef ROCKSDB_LITE
139 ImmutableDBOptions immutable_db_options(result);
140 if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
141 // Either the WAL dir and db_paths[0]/db_name are not the same, or we
142 // cannot tell for sure. In either case, assume they're different and
143 // explicitly cleanup the trash log files (bypass DeleteScheduler)
144 // Do this first so even if we end up calling
145 // DeleteScheduler::CleanupDirectory on the same dir later, it will be
146 // safe
147 std::vector<std::string> filenames;
148 result.env->GetChildren(result.wal_dir, &filenames);
149 for (std::string& filename : filenames) {
150 if (filename.find(".log.trash", filename.length() -
151 std::string(".log.trash").length()) !=
152 std::string::npos) {
153 std::string trash_file = result.wal_dir + "/" + filename;
154 result.env->DeleteFile(trash_file);
155 }
156 }
157 }
158 // When the DB is stopped, it's possible that there are some .trash files that
159 // were not deleted yet, when we open the DB we will find these .trash files
160 // and schedule them to be deleted (or delete immediately if SstFileManager
161 // was not used)
162 auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
163 for (size_t i = 0; i < result.db_paths.size(); i++) {
164 DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
165 }
166
167 // Create a default SstFileManager for purposes of tracking compaction size
168 // and facilitating recovery from out of space errors.
169 if (result.sst_file_manager.get() == nullptr) {
170 std::shared_ptr<SstFileManager> sst_file_manager(
171 NewSstFileManager(result.env, result.info_log));
172 result.sst_file_manager = sst_file_manager;
173 }
174 #endif
175
176 if (!result.paranoid_checks) {
177 result.skip_checking_sst_file_sizes_on_db_open = true;
178 ROCKS_LOG_INFO(result.info_log,
179 "file size check will be skipped during open.");
180 }
181
182 return result;
183 }
184
185 namespace {
186 Status SanitizeOptionsByTable(
187 const DBOptions& db_opts,
188 const std::vector<ColumnFamilyDescriptor>& column_families) {
189 Status s;
190 for (auto cf : column_families) {
191 s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
192 if (!s.ok()) {
193 return s;
194 }
195 }
196 return Status::OK();
197 }
198 } // namespace
199
200 Status DBImpl::ValidateOptions(
201 const DBOptions& db_options,
202 const std::vector<ColumnFamilyDescriptor>& column_families) {
203 Status s;
204 for (auto& cfd : column_families) {
205 s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
206 if (!s.ok()) {
207 return s;
208 }
209 }
210 s = ValidateOptions(db_options);
211 return s;
212 }
213
214 Status DBImpl::ValidateOptions(const DBOptions& db_options) {
215 if (db_options.db_paths.size() > 4) {
216 return Status::NotSupported(
217 "More than four DB paths are not supported yet. ");
218 }
219
220 if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
221 // Protect against assert in PosixMMapReadableFile constructor
222 return Status::NotSupported(
223 "If memory mapped reads (allow_mmap_reads) are enabled "
224 "then direct I/O reads (use_direct_reads) must be disabled. ");
225 }
226
227 if (db_options.allow_mmap_writes &&
228 db_options.use_direct_io_for_flush_and_compaction) {
229 return Status::NotSupported(
230 "If memory mapped writes (allow_mmap_writes) are enabled "
231 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
232 "be disabled. ");
233 }
234
235 if (db_options.keep_log_file_num == 0) {
236 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
237 }
238
239 if (db_options.unordered_write &&
240 !db_options.allow_concurrent_memtable_write) {
241 return Status::InvalidArgument(
242 "unordered_write is incompatible with !allow_concurrent_memtable_write");
243 }
244
245 if (db_options.unordered_write && db_options.enable_pipelined_write) {
246 return Status::InvalidArgument(
247 "unordered_write is incompatible with enable_pipelined_write");
248 }
249
250 if (db_options.atomic_flush && db_options.enable_pipelined_write) {
251 return Status::InvalidArgument(
252 "atomic_flush is incompatible with enable_pipelined_write");
253 }
254
255 return Status::OK();
256 }
257
258 Status DBImpl::NewDB() {
259 VersionEdit new_db;
260 Status s = SetIdentityFile(env_, dbname_);
261 if (!s.ok()) {
262 return s;
263 }
264 if (immutable_db_options_.write_dbid_to_manifest) {
265 std::string temp_db_id;
266 GetDbIdentityFromIdentityFile(&temp_db_id);
267 new_db.SetDBId(temp_db_id);
268 }
269 new_db.SetLogNumber(0);
270 new_db.SetNextFile(2);
271 new_db.SetLastSequence(0);
272
273 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
274 const std::string manifest = DescriptorFileName(dbname_, 1);
275 {
276 std::unique_ptr<FSWritableFile> file;
277 FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
278 s = NewWritableFile(fs_.get(), manifest, &file, file_options);
279 if (!s.ok()) {
280 return s;
281 }
282 file->SetPreallocationBlockSize(
283 immutable_db_options_.manifest_preallocation_size);
284 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
285 std::move(file), manifest, file_options, env_, nullptr /* stats */,
286 immutable_db_options_.listeners));
287 log::Writer log(std::move(file_writer), 0, false);
288 std::string record;
289 new_db.EncodeTo(&record);
290 s = log.AddRecord(record);
291 if (s.ok()) {
292 s = SyncManifest(env_, &immutable_db_options_, log.file());
293 }
294 }
295 if (s.ok()) {
296 // Make "CURRENT" file that points to the new manifest file.
297 s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
298 } else {
299 fs_->DeleteFile(manifest, IOOptions(), nullptr);
300 }
301 return s;
302 }
303
304 Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
305 std::unique_ptr<Directory>* directory) {
306 // We call CreateDirIfMissing() as the directory may already exist (if we
307 // are reopening a DB), when this happens we don't want creating the
308 // directory to cause an error. However, we need to check if creating the
309 // directory fails or else we may get an obscure message about the lock
310 // file not existing. One real-world example of this occurring is if
311 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
312 // when dbname_ is "dir/db" but when "dir" doesn't exist.
313 Status s = env->CreateDirIfMissing(dirname);
314 if (!s.ok()) {
315 return s;
316 }
317 return env->NewDirectory(dirname, directory);
318 }
319
320 Status Directories::SetDirectories(Env* env, const std::string& dbname,
321 const std::string& wal_dir,
322 const std::vector<DbPath>& data_paths) {
323 Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_);
324 if (!s.ok()) {
325 return s;
326 }
327 if (!wal_dir.empty() && dbname != wal_dir) {
328 s = DBImpl::CreateAndNewDirectory(env, wal_dir, &wal_dir_);
329 if (!s.ok()) {
330 return s;
331 }
332 }
333
334 data_dirs_.clear();
335 for (auto& p : data_paths) {
336 const std::string db_path = p.path;
337 if (db_path == dbname) {
338 data_dirs_.emplace_back(nullptr);
339 } else {
340 std::unique_ptr<Directory> path_directory;
341 s = DBImpl::CreateAndNewDirectory(env, db_path, &path_directory);
342 if (!s.ok()) {
343 return s;
344 }
345 data_dirs_.emplace_back(path_directory.release());
346 }
347 }
348 assert(data_dirs_.size() == data_paths.size());
349 return Status::OK();
350 }
351
352 Status DBImpl::Recover(
353 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
354 bool error_if_log_file_exist, bool error_if_data_exists_in_logs,
355 uint64_t* recovered_seq) {
356 mutex_.AssertHeld();
357
358 bool is_new_db = false;
359 assert(db_lock_ == nullptr);
360 if (!read_only) {
361 Status s = directories_.SetDirectories(env_, dbname_,
362 immutable_db_options_.wal_dir,
363 immutable_db_options_.db_paths);
364 if (!s.ok()) {
365 return s;
366 }
367
368 s = env_->LockFile(LockFileName(dbname_), &db_lock_);
369 if (!s.ok()) {
370 return s;
371 }
372
373 std::string current_fname = CurrentFileName(dbname_);
374 s = env_->FileExists(current_fname);
375 if (s.IsNotFound()) {
376 if (immutable_db_options_.create_if_missing) {
377 s = NewDB();
378 is_new_db = true;
379 if (!s.ok()) {
380 return s;
381 }
382 } else {
383 return Status::InvalidArgument(
384 current_fname, "does not exist (create_if_missing is false)");
385 }
386 } else if (s.ok()) {
387 if (immutable_db_options_.error_if_exists) {
388 return Status::InvalidArgument(dbname_,
389 "exists (error_if_exists is true)");
390 }
391 } else {
392 // Unexpected error reading file
393 assert(s.IsIOError());
394 return s;
395 }
396 // Verify compatibility of file_options_ and filesystem
397 {
398 std::unique_ptr<FSRandomAccessFile> idfile;
399 FileOptions customized_fs(file_options_);
400 customized_fs.use_direct_reads |=
401 immutable_db_options_.use_direct_io_for_flush_and_compaction;
402 s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
403 nullptr);
404 if (!s.ok()) {
405 std::string error_str = s.ToString();
406 // Check if unsupported Direct I/O is the root cause
407 customized_fs.use_direct_reads = false;
408 s = fs_->NewRandomAccessFile(current_fname, customized_fs, &idfile,
409 nullptr);
410 if (s.ok()) {
411 return Status::InvalidArgument(
412 "Direct I/O is not supported by the specified DB.");
413 } else {
414 return Status::InvalidArgument(
415 "Found options incompatible with filesystem", error_str.c_str());
416 }
417 }
418 }
419 }
420 assert(db_id_.empty());
421 Status s = versions_->Recover(column_families, read_only, &db_id_);
422 if (!s.ok()) {
423 return s;
424 }
425 // Happens when immutable_db_options_.write_dbid_to_manifest is set to true
426 // the very first time.
427 if (db_id_.empty()) {
428 // Check for the IDENTITY file and create it if not there.
429 s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr);
430 // Typically Identity file is created in NewDB() and for some reason if
431 // it is no longer available then at this point DB ID is not in Identity
432 // file or Manifest.
433 if (s.IsNotFound()) {
434 s = SetIdentityFile(env_, dbname_);
435 if (!s.ok()) {
436 return s;
437 }
438 } else if (!s.ok()) {
439 assert(s.IsIOError());
440 return s;
441 }
442 s = GetDbIdentityFromIdentityFile(&db_id_);
443 if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
444 VersionEdit edit;
445 edit.SetDBId(db_id_);
446 Options options;
447 MutableCFOptions mutable_cf_options(options);
448 versions_->db_id_ = db_id_;
449 s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
450 mutable_cf_options, &edit, &mutex_, nullptr,
451 false);
452 }
453 } else {
454 s = SetIdentityFile(env_, dbname_, db_id_);
455 }
456
457 if (immutable_db_options_.paranoid_checks && s.ok()) {
458 s = CheckConsistency();
459 }
460 if (s.ok() && !read_only) {
461 std::map<std::string, std::shared_ptr<Directory>> created_dirs;
462 for (auto cfd : *versions_->GetColumnFamilySet()) {
463 s = cfd->AddDirectories(&created_dirs);
464 if (!s.ok()) {
465 return s;
466 }
467 }
468 }
469 // DB mutex is already held
470 if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
471 s = InitPersistStatsColumnFamily();
472 }
473
474 if (s.ok()) {
475 // Initial max_total_in_memory_state_ before recovery logs. Log recovery
476 // may check this value to decide whether to flush.
477 max_total_in_memory_state_ = 0;
478 for (auto cfd : *versions_->GetColumnFamilySet()) {
479 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
480 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
481 mutable_cf_options->max_write_buffer_number;
482 }
483
484 SequenceNumber next_sequence(kMaxSequenceNumber);
485 default_cf_handle_ = new ColumnFamilyHandleImpl(
486 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
487 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
488 // TODO(Zhongyi): handle single_column_family_mode_ when
489 // persistent_stats is enabled
490 single_column_family_mode_ =
491 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
492
493 // Recover from all newer log files than the ones named in the
494 // descriptor (new log files may have been added by the previous
495 // incarnation without registering them in the descriptor).
496 //
497 // Note that prev_log_number() is no longer used, but we pay
498 // attention to it in case we are recovering a database
499 // produced by an older version of rocksdb.
500 std::vector<std::string> filenames;
501 s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
502 if (s.IsNotFound()) {
503 return Status::InvalidArgument("wal_dir not found",
504 immutable_db_options_.wal_dir);
505 } else if (!s.ok()) {
506 return s;
507 }
508
509 std::vector<uint64_t> logs;
510 for (size_t i = 0; i < filenames.size(); i++) {
511 uint64_t number;
512 FileType type;
513 if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
514 if (is_new_db) {
515 return Status::Corruption(
516 "While creating a new Db, wal_dir contains "
517 "existing log file: ",
518 filenames[i]);
519 } else {
520 logs.push_back(number);
521 }
522 }
523 }
524
525 if (logs.size() > 0) {
526 if (error_if_log_file_exist) {
527 return Status::Corruption(
528 "The db was opened in readonly mode with error_if_log_file_exist"
529 "flag but a log file already exists");
530 } else if (error_if_data_exists_in_logs) {
531 for (auto& log : logs) {
532 std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
533 uint64_t bytes;
534 s = env_->GetFileSize(fname, &bytes);
535 if (s.ok()) {
536 if (bytes > 0) {
537 return Status::Corruption(
538 "error_if_data_exists_in_logs is set but there are data "
539 " in log files.");
540 }
541 }
542 }
543 }
544 }
545
546 if (!logs.empty()) {
547 // Recover in the order in which the logs were generated
548 std::sort(logs.begin(), logs.end());
549 bool corrupted_log_found = false;
550 s = RecoverLogFiles(logs, &next_sequence, read_only,
551 &corrupted_log_found);
552 if (corrupted_log_found && recovered_seq != nullptr) {
553 *recovered_seq = next_sequence;
554 }
555 if (!s.ok()) {
556 // Clear memtables if recovery failed
557 for (auto cfd : *versions_->GetColumnFamilySet()) {
558 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
559 kMaxSequenceNumber);
560 }
561 }
562 }
563 }
564
565 if (read_only) {
566 // If we are opening as read-only, we need to update options_file_number_
567 // to reflect the most recent OPTIONS file. It does not matter for regular
568 // read-write db instance because options_file_number_ will later be
569 // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
570 std::vector<std::string> file_names;
571 if (s.ok()) {
572 s = env_->GetChildren(GetName(), &file_names);
573 }
574 if (s.ok()) {
575 uint64_t number = 0;
576 uint64_t options_file_number = 0;
577 FileType type;
578 for (const auto& fname : file_names) {
579 if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
580 options_file_number = std::max(number, options_file_number);
581 }
582 }
583 versions_->options_file_number_ = options_file_number;
584 }
585 }
586
587 return s;
588 }
589
590 Status DBImpl::PersistentStatsProcessFormatVersion() {
591 mutex_.AssertHeld();
592 Status s;
593 // persist version when stats CF doesn't exist
594 bool should_persist_format_version = !persistent_stats_cfd_exists_;
595 mutex_.Unlock();
596 if (persistent_stats_cfd_exists_) {
597 // Check persistent stats format version compatibility. Drop and recreate
598 // persistent stats CF if format version is incompatible
599 uint64_t format_version_recovered = 0;
600 Status s_format = DecodePersistentStatsVersionNumber(
601 this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
602 uint64_t compatible_version_recovered = 0;
603 Status s_compatible = DecodePersistentStatsVersionNumber(
604 this, StatsVersionKeyType::kCompatibleVersion,
605 &compatible_version_recovered);
606 // abort reading from existing stats CF if any of following is true:
607 // 1. failed to read format version or compatible version from disk
608 // 2. sst's format version is greater than current format version, meaning
609 // this sst is encoded with a newer RocksDB release, and current compatible
610 // version is below the sst's compatible version
611 if (!s_format.ok() || !s_compatible.ok() ||
612 (kStatsCFCurrentFormatVersion < format_version_recovered &&
613 kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
614 if (!s_format.ok() || !s_compatible.ok()) {
615 ROCKS_LOG_INFO(
616 immutable_db_options_.info_log,
617 "Reading persistent stats version key failed. Format key: %s, "
618 "compatible key: %s",
619 s_format.ToString().c_str(), s_compatible.ToString().c_str());
620 } else {
621 ROCKS_LOG_INFO(
622 immutable_db_options_.info_log,
623 "Disable persistent stats due to corrupted or incompatible format "
624 "version\n");
625 }
626 DropColumnFamily(persist_stats_cf_handle_);
627 DestroyColumnFamilyHandle(persist_stats_cf_handle_);
628 ColumnFamilyHandle* handle = nullptr;
629 ColumnFamilyOptions cfo;
630 OptimizeForPersistentStats(&cfo);
631 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
632 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
633 // should also persist version here because old stats CF is discarded
634 should_persist_format_version = true;
635 }
636 }
637 if (s.ok() && should_persist_format_version) {
638 // Persistent stats CF being created for the first time, need to write
639 // format version key
640 WriteBatch batch;
641 batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
642 ToString(kStatsCFCurrentFormatVersion));
643 batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
644 ToString(kStatsCFCompatibleFormatVersion));
645 WriteOptions wo;
646 wo.low_pri = true;
647 wo.no_slowdown = true;
648 wo.sync = false;
649 s = Write(wo, &batch);
650 }
651 mutex_.Lock();
652 return s;
653 }
654
655 Status DBImpl::InitPersistStatsColumnFamily() {
656 mutex_.AssertHeld();
657 assert(!persist_stats_cf_handle_);
658 ColumnFamilyData* persistent_stats_cfd =
659 versions_->GetColumnFamilySet()->GetColumnFamily(
660 kPersistentStatsColumnFamilyName);
661 persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
662
663 Status s;
664 if (persistent_stats_cfd != nullptr) {
665 // We are recovering from a DB which already contains persistent stats CF,
666 // the CF is already created in VersionSet::ApplyOneVersionEdit, but
667 // column family handle was not. Need to explicitly create handle here.
668 persist_stats_cf_handle_ =
669 new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
670 } else {
671 mutex_.Unlock();
672 ColumnFamilyHandle* handle = nullptr;
673 ColumnFamilyOptions cfo;
674 OptimizeForPersistentStats(&cfo);
675 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
676 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
677 mutex_.Lock();
678 }
679 return s;
680 }
681
682 // REQUIRES: log_numbers are sorted in ascending order
683 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
684 SequenceNumber* next_sequence, bool read_only,
685 bool* corrupted_log_found) {
686 struct LogReporter : public log::Reader::Reporter {
687 Env* env;
688 Logger* info_log;
689 const char* fname;
690 Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
691 void Corruption(size_t bytes, const Status& s) override {
692 ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
693 (this->status == nullptr ? "(ignoring error) " : ""),
694 fname, static_cast<int>(bytes), s.ToString().c_str());
695 if (this->status != nullptr && this->status->ok()) {
696 *this->status = s;
697 }
698 }
699 };
700
701 mutex_.AssertHeld();
702 Status status;
703 std::unordered_map<int, VersionEdit> version_edits;
704 // no need to refcount because iteration is under mutex
705 for (auto cfd : *versions_->GetColumnFamilySet()) {
706 VersionEdit edit;
707 edit.SetColumnFamily(cfd->GetID());
708 version_edits.insert({cfd->GetID(), edit});
709 }
710 int job_id = next_job_id_.fetch_add(1);
711 {
712 auto stream = event_logger_.Log();
713 stream << "job" << job_id << "event"
714 << "recovery_started";
715 stream << "log_files";
716 stream.StartArray();
717 for (auto log_number : log_numbers) {
718 stream << log_number;
719 }
720 stream.EndArray();
721 }
722
723 #ifndef ROCKSDB_LITE
724 if (immutable_db_options_.wal_filter != nullptr) {
725 std::map<std::string, uint32_t> cf_name_id_map;
726 std::map<uint32_t, uint64_t> cf_lognumber_map;
727 for (auto cfd : *versions_->GetColumnFamilySet()) {
728 cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
729 cf_lognumber_map.insert(
730 std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
731 }
732
733 immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
734 cf_name_id_map);
735 }
736 #endif
737
738 bool stop_replay_by_wal_filter = false;
739 bool stop_replay_for_corruption = false;
740 bool flushed = false;
741 uint64_t corrupted_log_number = kMaxSequenceNumber;
742 uint64_t min_log_number = MinLogNumberToKeep();
743 for (auto log_number : log_numbers) {
744 if (log_number < min_log_number) {
745 ROCKS_LOG_INFO(immutable_db_options_.info_log,
746 "Skipping log #%" PRIu64
747 " since it is older than min log to keep #%" PRIu64,
748 log_number, min_log_number);
749 continue;
750 }
751 // The previous incarnation may not have written any MANIFEST
752 // records after allocating this log number. So we manually
753 // update the file number allocation counter in VersionSet.
754 versions_->MarkFileNumberUsed(log_number);
755 // Open the log file
756 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
757
758 ROCKS_LOG_INFO(immutable_db_options_.info_log,
759 "Recovering log #%" PRIu64 " mode %d", log_number,
760 static_cast<int>(immutable_db_options_.wal_recovery_mode));
761 auto logFileDropped = [this, &fname]() {
762 uint64_t bytes;
763 if (env_->GetFileSize(fname, &bytes).ok()) {
764 auto info_log = immutable_db_options_.info_log.get();
765 ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
766 static_cast<int>(bytes));
767 }
768 };
769 if (stop_replay_by_wal_filter) {
770 logFileDropped();
771 continue;
772 }
773
774 std::unique_ptr<SequentialFileReader> file_reader;
775 {
776 std::unique_ptr<FSSequentialFile> file;
777 status = fs_->NewSequentialFile(fname,
778 fs_->OptimizeForLogRead(file_options_),
779 &file, nullptr);
780 if (!status.ok()) {
781 MaybeIgnoreError(&status);
782 if (!status.ok()) {
783 return status;
784 } else {
785 // Fail with one log file, but that's ok.
786 // Try next one.
787 continue;
788 }
789 }
790 file_reader.reset(new SequentialFileReader(
791 std::move(file), fname, immutable_db_options_.log_readahead_size));
792 }
793
794 // Create the log reader.
795 LogReporter reporter;
796 reporter.env = env_;
797 reporter.info_log = immutable_db_options_.info_log.get();
798 reporter.fname = fname.c_str();
799 if (!immutable_db_options_.paranoid_checks ||
800 immutable_db_options_.wal_recovery_mode ==
801 WALRecoveryMode::kSkipAnyCorruptedRecords) {
802 reporter.status = nullptr;
803 } else {
804 reporter.status = &status;
805 }
806 // We intentially make log::Reader do checksumming even if
807 // paranoid_checks==false so that corruptions cause entire commits
808 // to be skipped instead of propagating bad information (like overly
809 // large sequence numbers).
810 log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
811 &reporter, true /*checksum*/, log_number);
812
813 // Determine if we should tolerate incomplete records at the tail end of the
814 // Read all the records and add to a memtable
815 std::string scratch;
816 Slice record;
817 WriteBatch batch;
818
819 while (!stop_replay_by_wal_filter &&
820 reader.ReadRecord(&record, &scratch,
821 immutable_db_options_.wal_recovery_mode) &&
822 status.ok()) {
823 if (record.size() < WriteBatchInternal::kHeader) {
824 reporter.Corruption(record.size(),
825 Status::Corruption("log record too small"));
826 continue;
827 }
828 WriteBatchInternal::SetContents(&batch, record);
829 SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
830
831 if (immutable_db_options_.wal_recovery_mode ==
832 WALRecoveryMode::kPointInTimeRecovery) {
833 // In point-in-time recovery mode, if sequence id of log files are
834 // consecutive, we continue recovery despite corruption. This could
835 // happen when we open and write to a corrupted DB, where sequence id
836 // will start from the last sequence id we recovered.
837 if (sequence == *next_sequence) {
838 stop_replay_for_corruption = false;
839 }
840 if (stop_replay_for_corruption) {
841 logFileDropped();
842 break;
843 }
844 }
845
846 #ifndef ROCKSDB_LITE
847 if (immutable_db_options_.wal_filter != nullptr) {
848 WriteBatch new_batch;
849 bool batch_changed = false;
850
851 WalFilter::WalProcessingOption wal_processing_option =
852 immutable_db_options_.wal_filter->LogRecordFound(
853 log_number, fname, batch, &new_batch, &batch_changed);
854
855 switch (wal_processing_option) {
856 case WalFilter::WalProcessingOption::kContinueProcessing:
857 // do nothing, proceeed normally
858 break;
859 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
860 // skip current record
861 continue;
862 case WalFilter::WalProcessingOption::kStopReplay:
863 // skip current record and stop replay
864 stop_replay_by_wal_filter = true;
865 continue;
866 case WalFilter::WalProcessingOption::kCorruptedRecord: {
867 status =
868 Status::Corruption("Corruption reported by Wal Filter ",
869 immutable_db_options_.wal_filter->Name());
870 MaybeIgnoreError(&status);
871 if (!status.ok()) {
872 reporter.Corruption(record.size(), status);
873 continue;
874 }
875 break;
876 }
877 default: {
878 assert(false); // unhandled case
879 status = Status::NotSupported(
880 "Unknown WalProcessingOption returned"
881 " by Wal Filter ",
882 immutable_db_options_.wal_filter->Name());
883 MaybeIgnoreError(&status);
884 if (!status.ok()) {
885 return status;
886 } else {
887 // Ignore the error with current record processing.
888 continue;
889 }
890 }
891 }
892
893 if (batch_changed) {
894 // Make sure that the count in the new batch is
895 // within the orignal count.
896 int new_count = WriteBatchInternal::Count(&new_batch);
897 int original_count = WriteBatchInternal::Count(&batch);
898 if (new_count > original_count) {
899 ROCKS_LOG_FATAL(
900 immutable_db_options_.info_log,
901 "Recovering log #%" PRIu64
902 " mode %d log filter %s returned "
903 "more records (%d) than original (%d) which is not allowed. "
904 "Aborting recovery.",
905 log_number,
906 static_cast<int>(immutable_db_options_.wal_recovery_mode),
907 immutable_db_options_.wal_filter->Name(), new_count,
908 original_count);
909 status = Status::NotSupported(
910 "More than original # of records "
911 "returned by Wal Filter ",
912 immutable_db_options_.wal_filter->Name());
913 return status;
914 }
915 // Set the same sequence number in the new_batch
916 // as the original batch.
917 WriteBatchInternal::SetSequence(&new_batch,
918 WriteBatchInternal::Sequence(&batch));
919 batch = new_batch;
920 }
921 }
922 #endif // ROCKSDB_LITE
923
924 // If column family was not found, it might mean that the WAL write
925 // batch references to the column family that was dropped after the
926 // insert. We don't want to fail the whole write batch in that case --
927 // we just ignore the update.
928 // That's why we set ignore missing column families to true
929 bool has_valid_writes = false;
930 status = WriteBatchInternal::InsertInto(
931 &batch, column_family_memtables_.get(), &flush_scheduler_,
932 &trim_history_scheduler_, true, log_number, this,
933 false /* concurrent_memtable_writes */, next_sequence,
934 &has_valid_writes, seq_per_batch_, batch_per_txn_);
935 MaybeIgnoreError(&status);
936 if (!status.ok()) {
937 // We are treating this as a failure while reading since we read valid
938 // blocks that do not form coherent data
939 reporter.Corruption(record.size(), status);
940 continue;
941 }
942
943 if (has_valid_writes && !read_only) {
944 // we can do this because this is called before client has access to the
945 // DB and there is only a single thread operating on DB
946 ColumnFamilyData* cfd;
947
948 while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
949 cfd->UnrefAndTryDelete();
950 // If this asserts, it means that InsertInto failed in
951 // filtering updates to already-flushed column families
952 assert(cfd->GetLogNumber() <= log_number);
953 auto iter = version_edits.find(cfd->GetID());
954 assert(iter != version_edits.end());
955 VersionEdit* edit = &iter->second;
956 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
957 if (!status.ok()) {
958 // Reflect errors immediately so that conditions like full
959 // file-systems cause the DB::Open() to fail.
960 return status;
961 }
962 flushed = true;
963
964 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
965 *next_sequence);
966 }
967 }
968 }
969
970 if (!status.ok()) {
971 if (status.IsNotSupported()) {
972 // We should not treat NotSupported as corruption. It is rather a clear
973 // sign that we are processing a WAL that is produced by an incompatible
974 // version of the code.
975 return status;
976 }
977 if (immutable_db_options_.wal_recovery_mode ==
978 WALRecoveryMode::kSkipAnyCorruptedRecords) {
979 // We should ignore all errors unconditionally
980 status = Status::OK();
981 } else if (immutable_db_options_.wal_recovery_mode ==
982 WALRecoveryMode::kPointInTimeRecovery) {
983 // We should ignore the error but not continue replaying
984 status = Status::OK();
985 stop_replay_for_corruption = true;
986 corrupted_log_number = log_number;
987 if (corrupted_log_found != nullptr) {
988 *corrupted_log_found = true;
989 }
990 ROCKS_LOG_INFO(immutable_db_options_.info_log,
991 "Point in time recovered to log #%" PRIu64
992 " seq #%" PRIu64,
993 log_number, *next_sequence);
994 } else {
995 assert(immutable_db_options_.wal_recovery_mode ==
996 WALRecoveryMode::kTolerateCorruptedTailRecords ||
997 immutable_db_options_.wal_recovery_mode ==
998 WALRecoveryMode::kAbsoluteConsistency);
999 return status;
1000 }
1001 }
1002
1003 flush_scheduler_.Clear();
1004 trim_history_scheduler_.Clear();
1005 auto last_sequence = *next_sequence - 1;
1006 if ((*next_sequence != kMaxSequenceNumber) &&
1007 (versions_->LastSequence() <= last_sequence)) {
1008 versions_->SetLastAllocatedSequence(last_sequence);
1009 versions_->SetLastPublishedSequence(last_sequence);
1010 versions_->SetLastSequence(last_sequence);
1011 }
1012 }
1013 // Compare the corrupted log number to all columnfamily's current log number.
1014 // Abort Open() if any column family's log number is greater than
1015 // the corrupted log number, which means CF contains data beyond the point of
1016 // corruption. This could during PIT recovery when the WAL is corrupted and
1017 // some (but not all) CFs are flushed
1018 // Exclude the PIT case where no log is dropped after the corruption point.
1019 // This is to cover the case for empty logs after corrupted log, in which we
1020 // don't reset stop_replay_for_corruption.
1021 if (stop_replay_for_corruption == true &&
1022 (immutable_db_options_.wal_recovery_mode ==
1023 WALRecoveryMode::kPointInTimeRecovery ||
1024 immutable_db_options_.wal_recovery_mode ==
1025 WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1026 for (auto cfd : *versions_->GetColumnFamilySet()) {
1027 if (cfd->GetLogNumber() > corrupted_log_number) {
1028 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1029 "Column family inconsistency: SST file contains data"
1030 " beyond the point of corruption.");
1031 return Status::Corruption("SST file is ahead of WALs");
1032 }
1033 }
1034 }
1035
1036 // True if there's any data in the WALs; if not, we can skip re-processing
1037 // them later
1038 bool data_seen = false;
1039 if (!read_only) {
1040 // no need to refcount since client still doesn't have access
1041 // to the DB and can not drop column families while we iterate
1042 auto max_log_number = log_numbers.back();
1043 for (auto cfd : *versions_->GetColumnFamilySet()) {
1044 auto iter = version_edits.find(cfd->GetID());
1045 assert(iter != version_edits.end());
1046 VersionEdit* edit = &iter->second;
1047
1048 if (cfd->GetLogNumber() > max_log_number) {
1049 // Column family cfd has already flushed the data
1050 // from all logs. Memtable has to be empty because
1051 // we filter the updates based on log_number
1052 // (in WriteBatch::InsertInto)
1053 assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1054 assert(edit->NumEntries() == 0);
1055 continue;
1056 }
1057
1058 TEST_SYNC_POINT_CALLBACK(
1059 "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1060
1061 // flush the final memtable (if non-empty)
1062 if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1063 // If flush happened in the middle of recovery (e.g. due to memtable
1064 // being full), we flush at the end. Otherwise we'll need to record
1065 // where we were on last flush, which make the logic complicated.
1066 if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1067 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1068 if (!status.ok()) {
1069 // Recovery failed
1070 break;
1071 }
1072 flushed = true;
1073
1074 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1075 versions_->LastSequence());
1076 }
1077 data_seen = true;
1078 }
1079
1080 // Update the log number info in the version edit corresponding to this
1081 // column family. Note that the version edits will be written to MANIFEST
1082 // together later.
1083 // writing log_number in the manifest means that any log file
1084 // with number strongly less than (log_number + 1) is already
1085 // recovered and should be ignored on next reincarnation.
1086 // Since we already recovered max_log_number, we want all logs
1087 // with numbers `<= max_log_number` (includes this one) to be ignored
1088 if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1089 edit->SetLogNumber(max_log_number + 1);
1090 }
1091 }
1092 if (status.ok()) {
1093 // we must mark the next log number as used, even though it's
1094 // not actually used. that is because VersionSet assumes
1095 // VersionSet::next_file_number_ always to be strictly greater than any
1096 // log number
1097 versions_->MarkFileNumberUsed(max_log_number + 1);
1098
1099 autovector<ColumnFamilyData*> cfds;
1100 autovector<const MutableCFOptions*> cf_opts;
1101 autovector<autovector<VersionEdit*>> edit_lists;
1102 for (auto* cfd : *versions_->GetColumnFamilySet()) {
1103 cfds.push_back(cfd);
1104 cf_opts.push_back(cfd->GetLatestMutableCFOptions());
1105 auto iter = version_edits.find(cfd->GetID());
1106 assert(iter != version_edits.end());
1107 edit_lists.push_back({&iter->second});
1108 }
1109 // write MANIFEST with update
1110 status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
1111 directories_.GetDbDir(),
1112 /*new_descriptor_log=*/true);
1113 }
1114 }
1115
1116 if (status.ok() && data_seen && !flushed) {
1117 status = RestoreAliveLogFiles(log_numbers);
1118 }
1119
1120 event_logger_.Log() << "job" << job_id << "event"
1121 << "recovery_finished";
1122
1123 return status;
1124 }
1125
1126 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
1127 if (log_numbers.empty()) {
1128 return Status::OK();
1129 }
1130 Status s;
1131 mutex_.AssertHeld();
1132 assert(immutable_db_options_.avoid_flush_during_recovery);
1133 if (two_write_queues_) {
1134 log_write_mutex_.Lock();
1135 }
1136 // Mark these as alive so they'll be considered for deletion later by
1137 // FindObsoleteFiles()
1138 total_log_size_ = 0;
1139 log_empty_ = false;
1140 for (auto log_number : log_numbers) {
1141 LogFileNumberSize log(log_number);
1142 std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
1143 // This gets the appear size of the logs, not including preallocated space.
1144 s = env_->GetFileSize(fname, &log.size);
1145 if (!s.ok()) {
1146 break;
1147 }
1148 total_log_size_ += log.size;
1149 alive_log_files_.push_back(log);
1150 // We preallocate space for logs, but then after a crash and restart, those
1151 // preallocated space are not needed anymore. It is likely only the last
1152 // log has such preallocated space, so we only truncate for the last log.
1153 if (log_number == log_numbers.back()) {
1154 std::unique_ptr<FSWritableFile> last_log;
1155 Status truncate_status = fs_->ReopenWritableFile(
1156 fname,
1157 fs_->OptimizeForLogWrite(
1158 file_options_,
1159 BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1160 &last_log, nullptr);
1161 if (truncate_status.ok()) {
1162 truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1163 }
1164 if (truncate_status.ok()) {
1165 truncate_status = last_log->Close(IOOptions(), nullptr);
1166 }
1167 // Not a critical error if fail to truncate.
1168 if (!truncate_status.ok()) {
1169 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1170 "Failed to truncate log #%" PRIu64 ": %s", log_number,
1171 truncate_status.ToString().c_str());
1172 }
1173 }
1174 }
1175 if (two_write_queues_) {
1176 log_write_mutex_.Unlock();
1177 }
1178 return s;
1179 }
1180
1181 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1182 MemTable* mem, VersionEdit* edit) {
1183 mutex_.AssertHeld();
1184 const uint64_t start_micros = env_->NowMicros();
1185 FileMetaData meta;
1186 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1187 new std::list<uint64_t>::iterator(
1188 CaptureCurrentFileNumberInPendingOutputs()));
1189 meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1190 ReadOptions ro;
1191 ro.total_order_seek = true;
1192 Arena arena;
1193 Status s;
1194 TableProperties table_properties;
1195 {
1196 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
1197 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1198 "[%s] [WriteLevel0TableForRecovery]"
1199 " Level-0 table #%" PRIu64 ": started",
1200 cfd->GetName().c_str(), meta.fd.GetNumber());
1201
1202 // Get the latest mutable cf options while the mutex is still locked
1203 const MutableCFOptions mutable_cf_options =
1204 *cfd->GetLatestMutableCFOptions();
1205 bool paranoid_file_checks =
1206 cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1207
1208 int64_t _current_time = 0;
1209 env_->GetCurrentTime(&_current_time); // ignore error
1210 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1211 meta.oldest_ancester_time = current_time;
1212
1213 {
1214 auto write_hint = cfd->CalculateSSTWriteHint(0);
1215 mutex_.Unlock();
1216
1217 SequenceNumber earliest_write_conflict_snapshot;
1218 std::vector<SequenceNumber> snapshot_seqs =
1219 snapshots_.GetAll(&earliest_write_conflict_snapshot);
1220 auto snapshot_checker = snapshot_checker_.get();
1221 if (use_custom_gc_ && snapshot_checker == nullptr) {
1222 snapshot_checker = DisableGCSnapshotChecker::Instance();
1223 }
1224 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1225 range_del_iters;
1226 auto range_del_iter =
1227 mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1228 if (range_del_iter != nullptr) {
1229 range_del_iters.emplace_back(range_del_iter);
1230 }
1231 s = BuildTable(
1232 dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options,
1233 file_options_for_compaction_, cfd->table_cache(), iter.get(),
1234 std::move(range_del_iters), &meta, cfd->internal_comparator(),
1235 cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
1236 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1237 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1238 mutable_cf_options.sample_for_compression,
1239 cfd->ioptions()->compression_opts, paranoid_file_checks,
1240 cfd->internal_stats(), TableFileCreationReason::kRecovery,
1241 &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
1242 -1 /* level */, current_time, write_hint);
1243 LogFlush(immutable_db_options_.info_log);
1244 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1245 "[%s] [WriteLevel0TableForRecovery]"
1246 " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1247 cfd->GetName().c_str(), meta.fd.GetNumber(),
1248 meta.fd.GetFileSize(), s.ToString().c_str());
1249 mutex_.Lock();
1250 }
1251 }
1252 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1253
1254 // Note that if file_size is zero, the file has been deleted and
1255 // should not be added to the manifest.
1256 int level = 0;
1257 if (s.ok() && meta.fd.GetFileSize() > 0) {
1258 edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1259 meta.fd.GetFileSize(), meta.smallest, meta.largest,
1260 meta.fd.smallest_seqno, meta.fd.largest_seqno,
1261 meta.marked_for_compaction, meta.oldest_blob_file_number,
1262 meta.oldest_ancester_time, meta.file_creation_time,
1263 meta.file_checksum, meta.file_checksum_func_name);
1264 }
1265
1266 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1267 stats.micros = env_->NowMicros() - start_micros;
1268 stats.bytes_written = meta.fd.GetFileSize();
1269 stats.num_output_files = 1;
1270 cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1271 cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
1272 meta.fd.GetFileSize());
1273 RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1274 return s;
1275 }
1276
1277 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1278 DBOptions db_options(options);
1279 ColumnFamilyOptions cf_options(options);
1280 std::vector<ColumnFamilyDescriptor> column_families;
1281 column_families.push_back(
1282 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1283 if (db_options.persist_stats_to_disk) {
1284 column_families.push_back(
1285 ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
1286 }
1287 std::vector<ColumnFamilyHandle*> handles;
1288 Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1289 if (s.ok()) {
1290 if (db_options.persist_stats_to_disk) {
1291 assert(handles.size() == 2);
1292 } else {
1293 assert(handles.size() == 1);
1294 }
1295 // i can delete the handle since DBImpl is always holding a reference to
1296 // default column family
1297 if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1298 delete handles[1];
1299 }
1300 delete handles[0];
1301 }
1302 return s;
1303 }
1304
1305 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1306 const std::vector<ColumnFamilyDescriptor>& column_families,
1307 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1308 const bool kSeqPerBatch = true;
1309 const bool kBatchPerTxn = true;
1310 return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1311 !kSeqPerBatch, kBatchPerTxn);
1312 }
1313
1314 Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1315 size_t preallocate_block_size, log::Writer** new_log) {
1316 Status s;
1317 std::unique_ptr<FSWritableFile> lfile;
1318
1319 DBOptions db_options =
1320 BuildDBOptions(immutable_db_options_, mutable_db_options_);
1321 FileOptions opt_file_options =
1322 fs_->OptimizeForLogWrite(file_options_, db_options);
1323 std::string log_fname =
1324 LogFileName(immutable_db_options_.wal_dir, log_file_num);
1325
1326 if (recycle_log_number) {
1327 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1328 "reusing log %" PRIu64 " from recycle list\n",
1329 recycle_log_number);
1330 std::string old_log_fname =
1331 LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
1332 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1333 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1334 s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1335 &lfile, /*dbg=*/nullptr);
1336 } else {
1337 s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1338 }
1339
1340 if (s.ok()) {
1341 lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1342 lfile->SetPreallocationBlockSize(preallocate_block_size);
1343
1344 const auto& listeners = immutable_db_options_.listeners;
1345 std::unique_ptr<WritableFileWriter> file_writer(
1346 new WritableFileWriter(std::move(lfile), log_fname, opt_file_options,
1347 env_, nullptr /* stats */, listeners));
1348 *new_log = new log::Writer(std::move(file_writer), log_file_num,
1349 immutable_db_options_.recycle_log_file_num > 0,
1350 immutable_db_options_.manual_wal_flush);
1351 }
1352 return s;
1353 }
1354
1355 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1356 const std::vector<ColumnFamilyDescriptor>& column_families,
1357 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1358 const bool seq_per_batch, const bool batch_per_txn) {
1359 Status s = SanitizeOptionsByTable(db_options, column_families);
1360 if (!s.ok()) {
1361 return s;
1362 }
1363
1364 s = ValidateOptions(db_options, column_families);
1365 if (!s.ok()) {
1366 return s;
1367 }
1368
1369 *dbptr = nullptr;
1370 handles->clear();
1371
1372 size_t max_write_buffer_size = 0;
1373 for (auto cf : column_families) {
1374 max_write_buffer_size =
1375 std::max(max_write_buffer_size, cf.options.write_buffer_size);
1376 }
1377
1378 DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1379 s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
1380 if (s.ok()) {
1381 std::vector<std::string> paths;
1382 for (auto& db_path : impl->immutable_db_options_.db_paths) {
1383 paths.emplace_back(db_path.path);
1384 }
1385 for (auto& cf : column_families) {
1386 for (auto& cf_path : cf.options.cf_paths) {
1387 paths.emplace_back(cf_path.path);
1388 }
1389 }
1390 for (auto& path : paths) {
1391 s = impl->env_->CreateDirIfMissing(path);
1392 if (!s.ok()) {
1393 break;
1394 }
1395 }
1396
1397 // For recovery from NoSpace() error, we can only handle
1398 // the case where the database is stored in a single path
1399 if (paths.size() <= 1) {
1400 impl->error_handler_.EnableAutoRecovery();
1401 }
1402 }
1403
1404 if (!s.ok()) {
1405 delete impl;
1406 return s;
1407 }
1408
1409 s = impl->CreateArchivalDirectory();
1410 if (!s.ok()) {
1411 delete impl;
1412 return s;
1413 }
1414
1415 impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
1416
1417 impl->mutex_.Lock();
1418 // Handles create_if_missing, error_if_exists
1419 uint64_t recovered_seq(kMaxSequenceNumber);
1420 s = impl->Recover(column_families, false, false, false, &recovered_seq);
1421 if (s.ok()) {
1422 uint64_t new_log_number = impl->versions_->NewFileNumber();
1423 log::Writer* new_log = nullptr;
1424 const size_t preallocate_block_size =
1425 impl->GetWalPreallocateBlockSize(max_write_buffer_size);
1426 s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
1427 preallocate_block_size, &new_log);
1428 if (s.ok()) {
1429 InstrumentedMutexLock wl(&impl->log_write_mutex_);
1430 impl->logfile_number_ = new_log_number;
1431 assert(new_log != nullptr);
1432 impl->logs_.emplace_back(new_log_number, new_log);
1433 }
1434
1435 if (s.ok()) {
1436 // set column family handles
1437 for (auto cf : column_families) {
1438 auto cfd =
1439 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1440 if (cfd != nullptr) {
1441 handles->push_back(
1442 new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1443 impl->NewThreadStatusCfInfo(cfd);
1444 } else {
1445 if (db_options.create_missing_column_families) {
1446 // missing column family, create it
1447 ColumnFamilyHandle* handle;
1448 impl->mutex_.Unlock();
1449 s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1450 impl->mutex_.Lock();
1451 if (s.ok()) {
1452 handles->push_back(handle);
1453 } else {
1454 break;
1455 }
1456 } else {
1457 s = Status::InvalidArgument("Column family not found: ", cf.name);
1458 break;
1459 }
1460 }
1461 }
1462 }
1463 if (s.ok()) {
1464 SuperVersionContext sv_context(/* create_superversion */ true);
1465 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1466 impl->InstallSuperVersionAndScheduleWork(
1467 cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1468 }
1469 sv_context.Clean();
1470 if (impl->two_write_queues_) {
1471 impl->log_write_mutex_.Lock();
1472 }
1473 impl->alive_log_files_.push_back(
1474 DBImpl::LogFileNumberSize(impl->logfile_number_));
1475 if (impl->two_write_queues_) {
1476 impl->log_write_mutex_.Unlock();
1477 }
1478
1479 impl->DeleteObsoleteFiles();
1480 s = impl->directories_.GetDbDir()->Fsync();
1481 }
1482 if (s.ok()) {
1483 // In WritePrepared there could be gap in sequence numbers. This breaks
1484 // the trick we use in kPointInTimeRecovery which assumes the first seq in
1485 // the log right after the corrupted log is one larger than the last seq
1486 // we read from the logs. To let this trick keep working, we add a dummy
1487 // entry with the expected sequence to the first log right after recovery.
1488 // In non-WritePrepared case also the new log after recovery could be
1489 // empty, and thus missing the consecutive seq hint to distinguish
1490 // middle-log corruption to corrupted-log-remained-after-recovery. This
1491 // case also will be addressed by a dummy write.
1492 if (recovered_seq != kMaxSequenceNumber) {
1493 WriteBatch empty_batch;
1494 WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
1495 WriteOptions write_options;
1496 uint64_t log_used, log_size;
1497 log::Writer* log_writer = impl->logs_.back().writer;
1498 s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
1499 if (s.ok()) {
1500 // Need to fsync, otherwise it might get lost after a power reset.
1501 s = impl->FlushWAL(false);
1502 if (s.ok()) {
1503 s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
1504 }
1505 }
1506 }
1507 }
1508 }
1509 if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
1510 // try to read format version but no need to fail Open() even if it fails
1511 s = impl->PersistentStatsProcessFormatVersion();
1512 }
1513
1514 if (s.ok()) {
1515 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1516 if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1517 auto* vstorage = cfd->current()->storage_info();
1518 for (int i = 1; i < vstorage->num_levels(); ++i) {
1519 int num_files = vstorage->NumLevelFiles(i);
1520 if (num_files > 0) {
1521 s = Status::InvalidArgument(
1522 "Not all files are at level 0. Cannot "
1523 "open with FIFO compaction style.");
1524 break;
1525 }
1526 }
1527 }
1528 if (!cfd->mem()->IsSnapshotSupported()) {
1529 impl->is_snapshot_supported_ = false;
1530 }
1531 if (cfd->ioptions()->merge_operator != nullptr &&
1532 !cfd->mem()->IsMergeOperatorSupported()) {
1533 s = Status::InvalidArgument(
1534 "The memtable of column family %s does not support merge operator "
1535 "its options.merge_operator is non-null",
1536 cfd->GetName().c_str());
1537 }
1538 if (!s.ok()) {
1539 break;
1540 }
1541 }
1542 }
1543 TEST_SYNC_POINT("DBImpl::Open:Opened");
1544 Status persist_options_status;
1545 if (s.ok()) {
1546 // Persist RocksDB Options before scheduling the compaction.
1547 // The WriteOptionsFile() will release and lock the mutex internally.
1548 persist_options_status = impl->WriteOptionsFile(
1549 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1550
1551 *dbptr = impl;
1552 impl->opened_successfully_ = true;
1553 impl->MaybeScheduleFlushOrCompaction();
1554 }
1555 impl->mutex_.Unlock();
1556
1557 #ifndef ROCKSDB_LITE
1558 auto sfm = static_cast<SstFileManagerImpl*>(
1559 impl->immutable_db_options_.sst_file_manager.get());
1560 if (s.ok() && sfm) {
1561 // Notify SstFileManager about all sst files that already exist in
1562 // db_paths[0] and cf_paths[0] when the DB is opened.
1563
1564 // SstFileManagerImpl needs to know sizes of the files. For files whose size
1565 // we already know (sst files that appear in manifest - typically that's the
1566 // vast majority of all files), we'll pass the size to SstFileManager.
1567 // For all other files SstFileManager will query the size from filesystem.
1568
1569 std::vector<LiveFileMetaData> metadata;
1570
1571 impl->mutex_.Lock();
1572 impl->versions_->GetLiveFilesMetaData(&metadata);
1573 impl->mutex_.Unlock();
1574
1575 std::unordered_map<std::string, uint64_t> known_file_sizes;
1576 for (const auto& md : metadata) {
1577 std::string name = md.name;
1578 if (!name.empty() && name[0] == '/') {
1579 name = name.substr(1);
1580 }
1581 known_file_sizes[name] = md.size;
1582 }
1583
1584 std::vector<std::string> paths;
1585 paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1586 for (auto& cf : column_families) {
1587 if (!cf.options.cf_paths.empty()) {
1588 paths.emplace_back(cf.options.cf_paths[0].path);
1589 }
1590 }
1591 // Remove duplicate paths.
1592 std::sort(paths.begin(), paths.end());
1593 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1594 for (auto& path : paths) {
1595 std::vector<std::string> existing_files;
1596 impl->immutable_db_options_.env->GetChildren(path, &existing_files);
1597 for (auto& file_name : existing_files) {
1598 uint64_t file_number;
1599 FileType file_type;
1600 std::string file_path = path + "/" + file_name;
1601 if (ParseFileName(file_name, &file_number, &file_type) &&
1602 file_type == kTableFile) {
1603 if (known_file_sizes.count(file_name)) {
1604 // We're assuming that each sst file name exists in at most one of
1605 // the paths.
1606 sfm->OnAddFile(file_path, known_file_sizes.at(file_name),
1607 /* compaction */ false);
1608 } else {
1609 sfm->OnAddFile(file_path);
1610 }
1611 }
1612 }
1613 }
1614
1615 // Reserve some disk buffer space. This is a heuristic - when we run out
1616 // of disk space, this ensures that there is atleast write_buffer_size
1617 // amount of free space before we resume DB writes. In low disk space
1618 // conditions, we want to avoid a lot of small L0 files due to frequent
1619 // WAL write failures and resultant forced flushes
1620 sfm->ReserveDiskBuffer(max_write_buffer_size,
1621 impl->immutable_db_options_.db_paths[0].path);
1622 }
1623 #endif // !ROCKSDB_LITE
1624
1625 if (s.ok()) {
1626 ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1627 impl);
1628 LogFlush(impl->immutable_db_options_.info_log);
1629 assert(impl->TEST_WALBufferIsEmpty());
1630 // If the assert above fails then we need to FlushWAL before returning
1631 // control back to the user.
1632 if (!persist_options_status.ok()) {
1633 s = Status::IOError(
1634 "DB::Open() failed --- Unable to persist Options file",
1635 persist_options_status.ToString());
1636 }
1637 }
1638 if (s.ok()) {
1639 impl->StartTimedTasks();
1640 }
1641 if (!s.ok()) {
1642 for (auto* h : *handles) {
1643 delete h;
1644 }
1645 handles->clear();
1646 delete impl;
1647 *dbptr = nullptr;
1648 }
1649 return s;
1650 }
1651 } // namespace ROCKSDB_NAMESPACE