]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under the BSD-style license found in the | |
3 | // LICENSE file in the root directory of this source tree. An additional grant | |
4 | // of patent rights can be found in the PATENTS file in the same directory. | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | #include "db/db_impl.h" | |
10 | ||
11 | #ifndef __STDC_FORMAT_MACROS | |
12 | #define __STDC_FORMAT_MACROS | |
13 | #endif | |
14 | #include <inttypes.h> | |
15 | ||
16 | #include "db/builder.h" | |
17 | #include "options/options_helper.h" | |
18 | #include "rocksdb/wal_filter.h" | |
19 | #include "util/sst_file_manager_impl.h" | |
20 | #include "util/sync_point.h" | |
21 | ||
22 | namespace rocksdb { | |
23 | Options SanitizeOptions(const std::string& dbname, | |
24 | const Options& src) { | |
25 | auto db_options = SanitizeOptions(dbname, DBOptions(src)); | |
26 | ImmutableDBOptions immutable_db_options(db_options); | |
27 | auto cf_options = | |
28 | SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src)); | |
29 | return Options(db_options, cf_options); | |
30 | } | |
31 | ||
32 | DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { | |
33 | DBOptions result(src); | |
34 | ||
35 | // result.max_open_files means an "infinite" open files. | |
36 | if (result.max_open_files != -1) { | |
37 | int max_max_open_files = port::GetMaxOpenFiles(); | |
38 | if (max_max_open_files == -1) { | |
39 | max_max_open_files = 1000000; | |
40 | } | |
41 | ClipToRange(&result.max_open_files, 20, max_max_open_files); | |
42 | } | |
43 | ||
44 | if (result.info_log == nullptr) { | |
45 | Status s = CreateLoggerFromOptions(dbname, result, &result.info_log); | |
46 | if (!s.ok()) { | |
47 | // No place suitable for logging | |
48 | result.info_log = nullptr; | |
49 | } | |
50 | } | |
51 | ||
52 | if (!result.write_buffer_manager) { | |
53 | result.write_buffer_manager.reset( | |
54 | new WriteBufferManager(result.db_write_buffer_size)); | |
55 | } | |
56 | if (result.base_background_compactions == -1) { | |
57 | result.base_background_compactions = result.max_background_compactions; | |
58 | } | |
59 | if (result.base_background_compactions > result.max_background_compactions) { | |
60 | result.base_background_compactions = result.max_background_compactions; | |
61 | } | |
62 | result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions, | |
63 | Env::Priority::LOW); | |
64 | result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes, | |
65 | Env::Priority::HIGH); | |
66 | ||
67 | if (result.rate_limiter.get() != nullptr) { | |
68 | if (result.bytes_per_sync == 0) { | |
69 | result.bytes_per_sync = 1024 * 1024; | |
70 | } | |
71 | } | |
72 | ||
73 | if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) { | |
74 | result.recycle_log_file_num = false; | |
75 | } | |
76 | ||
77 | if (result.recycle_log_file_num && | |
78 | (result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery || | |
79 | result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) { | |
80 | // kPointInTimeRecovery is indistinguishable from | |
81 | // kTolerateCorruptedTailRecords in recycle mode since we define | |
82 | // the "end" of the log as the first corrupt record we encounter. | |
83 | // kAbsoluteConsistency doesn't make sense because even a clean | |
84 | // shutdown leaves old junk at the end of the log file. | |
85 | result.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; | |
86 | } | |
87 | ||
88 | if (result.wal_dir.empty()) { | |
89 | // Use dbname as default | |
90 | result.wal_dir = dbname; | |
91 | } | |
92 | if (result.wal_dir.back() == '/') { | |
93 | result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1); | |
94 | } | |
95 | ||
96 | if (result.db_paths.size() == 0) { | |
97 | result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max()); | |
98 | } | |
99 | ||
100 | if (result.use_direct_reads && result.compaction_readahead_size == 0) { | |
101 | result.compaction_readahead_size = 1024 * 1024 * 2; | |
102 | } | |
103 | ||
104 | if (result.compaction_readahead_size > 0 || | |
105 | result.use_direct_io_for_flush_and_compaction) { | |
106 | result.new_table_reader_for_compaction_inputs = true; | |
107 | } | |
108 | ||
109 | // Force flush on DB open if 2PC is enabled, since with 2PC we have no | |
110 | // guarantee that consecutive log files have consecutive sequence id, which | |
111 | // make recovery complicated. | |
112 | if (result.allow_2pc) { | |
113 | result.avoid_flush_during_recovery = false; | |
114 | } | |
115 | ||
116 | return result; | |
117 | } | |
118 | ||
119 | namespace { | |
120 | ||
121 | Status SanitizeOptionsByTable( | |
122 | const DBOptions& db_opts, | |
123 | const std::vector<ColumnFamilyDescriptor>& column_families) { | |
124 | Status s; | |
125 | for (auto cf : column_families) { | |
126 | s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options); | |
127 | if (!s.ok()) { | |
128 | return s; | |
129 | } | |
130 | } | |
131 | return Status::OK(); | |
132 | } | |
133 | ||
134 | static Status ValidateOptions( | |
135 | const DBOptions& db_options, | |
136 | const std::vector<ColumnFamilyDescriptor>& column_families) { | |
137 | Status s; | |
138 | ||
139 | for (auto& cfd : column_families) { | |
140 | s = CheckCompressionSupported(cfd.options); | |
141 | if (s.ok() && db_options.allow_concurrent_memtable_write) { | |
142 | s = CheckConcurrentWritesSupported(cfd.options); | |
143 | } | |
144 | if (!s.ok()) { | |
145 | return s; | |
146 | } | |
147 | if (db_options.db_paths.size() > 1) { | |
148 | if ((cfd.options.compaction_style != kCompactionStyleUniversal) && | |
149 | (cfd.options.compaction_style != kCompactionStyleLevel)) { | |
150 | return Status::NotSupported( | |
151 | "More than one DB paths are only supported in " | |
152 | "universal and level compaction styles. "); | |
153 | } | |
154 | } | |
155 | } | |
156 | ||
157 | if (db_options.db_paths.size() > 4) { | |
158 | return Status::NotSupported( | |
159 | "More than four DB paths are not supported yet. "); | |
160 | } | |
161 | ||
162 | if (db_options.allow_mmap_reads && db_options.use_direct_reads) { | |
163 | // Protect against assert in PosixMMapReadableFile constructor | |
164 | return Status::NotSupported( | |
165 | "If memory mapped reads (allow_mmap_reads) are enabled " | |
166 | "then direct I/O reads (use_direct_reads) must be disabled. "); | |
167 | } | |
168 | ||
169 | if (db_options.allow_mmap_writes && | |
170 | db_options.use_direct_io_for_flush_and_compaction) { | |
171 | return Status::NotSupported( | |
172 | "If memory mapped writes (allow_mmap_writes) are enabled " | |
173 | "then direct I/O writes (use_direct_io_for_flush_and_compaction) must " | |
174 | "be disabled. "); | |
175 | } | |
176 | ||
177 | if (db_options.keep_log_file_num == 0) { | |
178 | return Status::InvalidArgument("keep_log_file_num must be greater than 0"); | |
179 | } | |
180 | ||
181 | return Status::OK(); | |
182 | } | |
183 | } // namespace | |
184 | Status DBImpl::NewDB() { | |
185 | VersionEdit new_db; | |
186 | new_db.SetLogNumber(0); | |
187 | new_db.SetNextFile(2); | |
188 | new_db.SetLastSequence(0); | |
189 | ||
190 | Status s; | |
191 | ||
192 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n"); | |
193 | const std::string manifest = DescriptorFileName(dbname_, 1); | |
194 | { | |
195 | unique_ptr<WritableFile> file; | |
196 | EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_); | |
197 | s = NewWritableFile(env_, manifest, &file, env_options); | |
198 | if (!s.ok()) { | |
199 | return s; | |
200 | } | |
201 | file->SetPreallocationBlockSize( | |
202 | immutable_db_options_.manifest_preallocation_size); | |
203 | unique_ptr<WritableFileWriter> file_writer( | |
204 | new WritableFileWriter(std::move(file), env_options)); | |
205 | log::Writer log(std::move(file_writer), 0, false); | |
206 | std::string record; | |
207 | new_db.EncodeTo(&record); | |
208 | s = log.AddRecord(record); | |
209 | if (s.ok()) { | |
210 | s = SyncManifest(env_, &immutable_db_options_, log.file()); | |
211 | } | |
212 | } | |
213 | if (s.ok()) { | |
214 | // Make "CURRENT" file that points to the new manifest file. | |
215 | s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir()); | |
216 | } else { | |
217 | env_->DeleteFile(manifest); | |
218 | } | |
219 | return s; | |
220 | } | |
221 | ||
222 | Status DBImpl::Directories::CreateAndNewDirectory( | |
223 | Env* env, const std::string& dirname, | |
224 | std::unique_ptr<Directory>* directory) const { | |
225 | // We call CreateDirIfMissing() as the directory may already exist (if we | |
226 | // are reopening a DB), when this happens we don't want creating the | |
227 | // directory to cause an error. However, we need to check if creating the | |
228 | // directory fails or else we may get an obscure message about the lock | |
229 | // file not existing. One real-world example of this occurring is if | |
230 | // env->CreateDirIfMissing() doesn't create intermediate directories, e.g. | |
231 | // when dbname_ is "dir/db" but when "dir" doesn't exist. | |
232 | Status s = env->CreateDirIfMissing(dirname); | |
233 | if (!s.ok()) { | |
234 | return s; | |
235 | } | |
236 | return env->NewDirectory(dirname, directory); | |
237 | } | |
238 | ||
239 | Status DBImpl::Directories::SetDirectories( | |
240 | Env* env, const std::string& dbname, const std::string& wal_dir, | |
241 | const std::vector<DbPath>& data_paths) { | |
242 | Status s = CreateAndNewDirectory(env, dbname, &db_dir_); | |
243 | if (!s.ok()) { | |
244 | return s; | |
245 | } | |
246 | if (!wal_dir.empty() && dbname != wal_dir) { | |
247 | s = CreateAndNewDirectory(env, wal_dir, &wal_dir_); | |
248 | if (!s.ok()) { | |
249 | return s; | |
250 | } | |
251 | } | |
252 | ||
253 | data_dirs_.clear(); | |
254 | for (auto& p : data_paths) { | |
255 | const std::string db_path = p.path; | |
256 | if (db_path == dbname) { | |
257 | data_dirs_.emplace_back(nullptr); | |
258 | } else { | |
259 | std::unique_ptr<Directory> path_directory; | |
260 | s = CreateAndNewDirectory(env, db_path, &path_directory); | |
261 | if (!s.ok()) { | |
262 | return s; | |
263 | } | |
264 | data_dirs_.emplace_back(path_directory.release()); | |
265 | } | |
266 | } | |
267 | assert(data_dirs_.size() == data_paths.size()); | |
268 | return Status::OK(); | |
269 | } | |
270 | ||
271 | Status DBImpl::Recover( | |
272 | const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only, | |
273 | bool error_if_log_file_exist, bool error_if_data_exists_in_logs) { | |
274 | mutex_.AssertHeld(); | |
275 | ||
276 | bool is_new_db = false; | |
277 | assert(db_lock_ == nullptr); | |
278 | if (!read_only) { | |
279 | Status s = directories_.SetDirectories(env_, dbname_, | |
280 | immutable_db_options_.wal_dir, | |
281 | immutable_db_options_.db_paths); | |
282 | if (!s.ok()) { | |
283 | return s; | |
284 | } | |
285 | ||
286 | s = env_->LockFile(LockFileName(dbname_), &db_lock_); | |
287 | if (!s.ok()) { | |
288 | return s; | |
289 | } | |
290 | ||
291 | s = env_->FileExists(CurrentFileName(dbname_)); | |
292 | if (s.IsNotFound()) { | |
293 | if (immutable_db_options_.create_if_missing) { | |
294 | s = NewDB(); | |
295 | is_new_db = true; | |
296 | if (!s.ok()) { | |
297 | return s; | |
298 | } | |
299 | } else { | |
300 | return Status::InvalidArgument( | |
301 | dbname_, "does not exist (create_if_missing is false)"); | |
302 | } | |
303 | } else if (s.ok()) { | |
304 | if (immutable_db_options_.error_if_exists) { | |
305 | return Status::InvalidArgument( | |
306 | dbname_, "exists (error_if_exists is true)"); | |
307 | } | |
308 | } else { | |
309 | // Unexpected error reading file | |
310 | assert(s.IsIOError()); | |
311 | return s; | |
312 | } | |
313 | // Check for the IDENTITY file and create it if not there | |
314 | s = env_->FileExists(IdentityFileName(dbname_)); | |
315 | if (s.IsNotFound()) { | |
316 | s = SetIdentityFile(env_, dbname_); | |
317 | if (!s.ok()) { | |
318 | return s; | |
319 | } | |
320 | } else if (!s.ok()) { | |
321 | assert(s.IsIOError()); | |
322 | return s; | |
323 | } | |
324 | } | |
325 | ||
326 | Status s = versions_->Recover(column_families, read_only); | |
327 | if (immutable_db_options_.paranoid_checks && s.ok()) { | |
328 | s = CheckConsistency(); | |
329 | } | |
330 | if (s.ok()) { | |
331 | SequenceNumber next_sequence(kMaxSequenceNumber); | |
332 | default_cf_handle_ = new ColumnFamilyHandleImpl( | |
333 | versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); | |
334 | default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); | |
335 | single_column_family_mode_ = | |
336 | versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; | |
337 | ||
338 | // Recover from all newer log files than the ones named in the | |
339 | // descriptor (new log files may have been added by the previous | |
340 | // incarnation without registering them in the descriptor). | |
341 | // | |
342 | // Note that prev_log_number() is no longer used, but we pay | |
343 | // attention to it in case we are recovering a database | |
344 | // produced by an older version of rocksdb. | |
345 | std::vector<std::string> filenames; | |
346 | s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); | |
347 | if (!s.ok()) { | |
348 | return s; | |
349 | } | |
350 | ||
351 | std::vector<uint64_t> logs; | |
352 | for (size_t i = 0; i < filenames.size(); i++) { | |
353 | uint64_t number; | |
354 | FileType type; | |
355 | if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) { | |
356 | if (is_new_db) { | |
357 | return Status::Corruption( | |
358 | "While creating a new Db, wal_dir contains " | |
359 | "existing log file: ", | |
360 | filenames[i]); | |
361 | } else { | |
362 | logs.push_back(number); | |
363 | } | |
364 | } | |
365 | } | |
366 | ||
367 | if (logs.size() > 0) { | |
368 | if (error_if_log_file_exist) { | |
369 | return Status::Corruption( | |
370 | "The db was opened in readonly mode with error_if_log_file_exist" | |
371 | "flag but a log file already exists"); | |
372 | } else if (error_if_data_exists_in_logs) { | |
373 | for (auto& log : logs) { | |
374 | std::string fname = LogFileName(immutable_db_options_.wal_dir, log); | |
375 | uint64_t bytes; | |
376 | s = env_->GetFileSize(fname, &bytes); | |
377 | if (s.ok()) { | |
378 | if (bytes > 0) { | |
379 | return Status::Corruption( | |
380 | "error_if_data_exists_in_logs is set but there are data " | |
381 | " in log files."); | |
382 | } | |
383 | } | |
384 | } | |
385 | } | |
386 | } | |
387 | ||
388 | if (!logs.empty()) { | |
389 | // Recover in the order in which the logs were generated | |
390 | std::sort(logs.begin(), logs.end()); | |
391 | s = RecoverLogFiles(logs, &next_sequence, read_only); | |
392 | if (!s.ok()) { | |
393 | // Clear memtables if recovery failed | |
394 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
395 | cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
396 | kMaxSequenceNumber); | |
397 | } | |
398 | } | |
399 | } | |
400 | } | |
401 | ||
402 | // Initial value | |
403 | max_total_in_memory_state_ = 0; | |
404 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
405 | auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); | |
406 | max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * | |
407 | mutable_cf_options->max_write_buffer_number; | |
408 | } | |
409 | ||
410 | return s; | |
411 | } | |
412 | ||
413 | // REQUIRES: log_numbers are sorted in ascending order | |
414 | Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers, | |
415 | SequenceNumber* next_sequence, bool read_only) { | |
416 | struct LogReporter : public log::Reader::Reporter { | |
417 | Env* env; | |
418 | Logger* info_log; | |
419 | const char* fname; | |
420 | Status* status; // nullptr if immutable_db_options_.paranoid_checks==false | |
421 | virtual void Corruption(size_t bytes, const Status& s) override { | |
422 | ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s", | |
423 | (this->status == nullptr ? "(ignoring error) " : ""), | |
424 | fname, static_cast<int>(bytes), s.ToString().c_str()); | |
425 | if (this->status != nullptr && this->status->ok()) { | |
426 | *this->status = s; | |
427 | } | |
428 | } | |
429 | }; | |
430 | ||
431 | mutex_.AssertHeld(); | |
432 | Status status; | |
433 | std::unordered_map<int, VersionEdit> version_edits; | |
434 | // no need to refcount because iteration is under mutex | |
435 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
436 | VersionEdit edit; | |
437 | edit.SetColumnFamily(cfd->GetID()); | |
438 | version_edits.insert({cfd->GetID(), edit}); | |
439 | } | |
440 | int job_id = next_job_id_.fetch_add(1); | |
441 | { | |
442 | auto stream = event_logger_.Log(); | |
443 | stream << "job" << job_id << "event" | |
444 | << "recovery_started"; | |
445 | stream << "log_files"; | |
446 | stream.StartArray(); | |
447 | for (auto log_number : log_numbers) { | |
448 | stream << log_number; | |
449 | } | |
450 | stream.EndArray(); | |
451 | } | |
452 | ||
453 | #ifndef ROCKSDB_LITE | |
454 | if (immutable_db_options_.wal_filter != nullptr) { | |
455 | std::map<std::string, uint32_t> cf_name_id_map; | |
456 | std::map<uint32_t, uint64_t> cf_lognumber_map; | |
457 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
458 | cf_name_id_map.insert( | |
459 | std::make_pair(cfd->GetName(), cfd->GetID())); | |
460 | cf_lognumber_map.insert( | |
461 | std::make_pair(cfd->GetID(), cfd->GetLogNumber())); | |
462 | } | |
463 | ||
464 | immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map, | |
465 | cf_name_id_map); | |
466 | } | |
467 | #endif | |
468 | ||
469 | bool stop_replay_by_wal_filter = false; | |
470 | bool stop_replay_for_corruption = false; | |
471 | bool flushed = false; | |
472 | for (auto log_number : log_numbers) { | |
473 | // The previous incarnation may not have written any MANIFEST | |
474 | // records after allocating this log number. So we manually | |
475 | // update the file number allocation counter in VersionSet. | |
476 | versions_->MarkFileNumberUsedDuringRecovery(log_number); | |
477 | // Open the log file | |
478 | std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number); | |
479 | ||
480 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
481 | "Recovering log #%" PRIu64 " mode %d", log_number, | |
482 | immutable_db_options_.wal_recovery_mode); | |
483 | auto logFileDropped = [this, &fname]() { | |
484 | uint64_t bytes; | |
485 | if (env_->GetFileSize(fname, &bytes).ok()) { | |
486 | auto info_log = immutable_db_options_.info_log.get(); | |
487 | ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(), | |
488 | static_cast<int>(bytes)); | |
489 | } | |
490 | }; | |
491 | if (stop_replay_by_wal_filter) { | |
492 | logFileDropped(); | |
493 | continue; | |
494 | } | |
495 | ||
496 | unique_ptr<SequentialFileReader> file_reader; | |
497 | { | |
498 | unique_ptr<SequentialFile> file; | |
499 | status = env_->NewSequentialFile(fname, &file, env_options_); | |
500 | if (!status.ok()) { | |
501 | MaybeIgnoreError(&status); | |
502 | if (!status.ok()) { | |
503 | return status; | |
504 | } else { | |
505 | // Fail with one log file, but that's ok. | |
506 | // Try next one. | |
507 | continue; | |
508 | } | |
509 | } | |
510 | file_reader.reset(new SequentialFileReader(std::move(file))); | |
511 | } | |
512 | ||
513 | // Create the log reader. | |
514 | LogReporter reporter; | |
515 | reporter.env = env_; | |
516 | reporter.info_log = immutable_db_options_.info_log.get(); | |
517 | reporter.fname = fname.c_str(); | |
518 | if (!immutable_db_options_.paranoid_checks || | |
519 | immutable_db_options_.wal_recovery_mode == | |
520 | WALRecoveryMode::kSkipAnyCorruptedRecords) { | |
521 | reporter.status = nullptr; | |
522 | } else { | |
523 | reporter.status = &status; | |
524 | } | |
525 | // We intentially make log::Reader do checksumming even if | |
526 | // paranoid_checks==false so that corruptions cause entire commits | |
527 | // to be skipped instead of propagating bad information (like overly | |
528 | // large sequence numbers). | |
529 | log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), | |
530 | &reporter, true /*checksum*/, 0 /*initial_offset*/, | |
531 | log_number); | |
532 | ||
533 | // Determine if we should tolerate incomplete records at the tail end of the | |
534 | // Read all the records and add to a memtable | |
535 | std::string scratch; | |
536 | Slice record; | |
537 | WriteBatch batch; | |
538 | ||
539 | while (!stop_replay_by_wal_filter && | |
540 | reader.ReadRecord(&record, &scratch, | |
541 | immutable_db_options_.wal_recovery_mode) && | |
542 | status.ok()) { | |
543 | if (record.size() < WriteBatchInternal::kHeader) { | |
544 | reporter.Corruption(record.size(), | |
545 | Status::Corruption("log record too small")); | |
546 | continue; | |
547 | } | |
548 | WriteBatchInternal::SetContents(&batch, record); | |
549 | SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); | |
550 | ||
551 | if (immutable_db_options_.wal_recovery_mode == | |
552 | WALRecoveryMode::kPointInTimeRecovery) { | |
553 | // In point-in-time recovery mode, if sequence id of log files are | |
554 | // consecutive, we continue recovery despite corruption. This could | |
555 | // happen when we open and write to a corrupted DB, where sequence id | |
556 | // will start from the last sequence id we recovered. | |
557 | if (sequence == *next_sequence) { | |
558 | stop_replay_for_corruption = false; | |
559 | } | |
560 | if (stop_replay_for_corruption) { | |
561 | logFileDropped(); | |
562 | break; | |
563 | } | |
564 | } | |
565 | ||
566 | #ifndef ROCKSDB_LITE | |
567 | if (immutable_db_options_.wal_filter != nullptr) { | |
568 | WriteBatch new_batch; | |
569 | bool batch_changed = false; | |
570 | ||
571 | WalFilter::WalProcessingOption wal_processing_option = | |
572 | immutable_db_options_.wal_filter->LogRecordFound( | |
573 | log_number, fname, batch, &new_batch, &batch_changed); | |
574 | ||
575 | switch (wal_processing_option) { | |
576 | case WalFilter::WalProcessingOption::kContinueProcessing: | |
577 | // do nothing, proceeed normally | |
578 | break; | |
579 | case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: | |
580 | // skip current record | |
581 | continue; | |
582 | case WalFilter::WalProcessingOption::kStopReplay: | |
583 | // skip current record and stop replay | |
584 | stop_replay_by_wal_filter = true; | |
585 | continue; | |
586 | case WalFilter::WalProcessingOption::kCorruptedRecord: { | |
587 | status = | |
588 | Status::Corruption("Corruption reported by Wal Filter ", | |
589 | immutable_db_options_.wal_filter->Name()); | |
590 | MaybeIgnoreError(&status); | |
591 | if (!status.ok()) { | |
592 | reporter.Corruption(record.size(), status); | |
593 | continue; | |
594 | } | |
595 | break; | |
596 | } | |
597 | default: { | |
598 | assert(false); // unhandled case | |
599 | status = Status::NotSupported( | |
600 | "Unknown WalProcessingOption returned" | |
601 | " by Wal Filter ", | |
602 | immutable_db_options_.wal_filter->Name()); | |
603 | MaybeIgnoreError(&status); | |
604 | if (!status.ok()) { | |
605 | return status; | |
606 | } else { | |
607 | // Ignore the error with current record processing. | |
608 | continue; | |
609 | } | |
610 | } | |
611 | } | |
612 | ||
613 | if (batch_changed) { | |
614 | // Make sure that the count in the new batch is | |
615 | // within the orignal count. | |
616 | int new_count = WriteBatchInternal::Count(&new_batch); | |
617 | int original_count = WriteBatchInternal::Count(&batch); | |
618 | if (new_count > original_count) { | |
619 | ROCKS_LOG_FATAL( | |
620 | immutable_db_options_.info_log, | |
621 | "Recovering log #%" PRIu64 | |
622 | " mode %d log filter %s returned " | |
623 | "more records (%d) than original (%d) which is not allowed. " | |
624 | "Aborting recovery.", | |
625 | log_number, immutable_db_options_.wal_recovery_mode, | |
626 | immutable_db_options_.wal_filter->Name(), new_count, | |
627 | original_count); | |
628 | status = Status::NotSupported( | |
629 | "More than original # of records " | |
630 | "returned by Wal Filter ", | |
631 | immutable_db_options_.wal_filter->Name()); | |
632 | return status; | |
633 | } | |
634 | // Set the same sequence number in the new_batch | |
635 | // as the original batch. | |
636 | WriteBatchInternal::SetSequence(&new_batch, | |
637 | WriteBatchInternal::Sequence(&batch)); | |
638 | batch = new_batch; | |
639 | } | |
640 | } | |
641 | #endif // ROCKSDB_LITE | |
642 | ||
643 | // If column family was not found, it might mean that the WAL write | |
644 | // batch references to the column family that was dropped after the | |
645 | // insert. We don't want to fail the whole write batch in that case -- | |
646 | // we just ignore the update. | |
647 | // That's why we set ignore missing column families to true | |
648 | bool has_valid_writes = false; | |
649 | status = WriteBatchInternal::InsertInto( | |
650 | &batch, column_family_memtables_.get(), &flush_scheduler_, true, | |
651 | log_number, this, false /* concurrent_memtable_writes */, | |
652 | next_sequence, &has_valid_writes); | |
653 | MaybeIgnoreError(&status); | |
654 | if (!status.ok()) { | |
655 | // We are treating this as a failure while reading since we read valid | |
656 | // blocks that do not form coherent data | |
657 | reporter.Corruption(record.size(), status); | |
658 | continue; | |
659 | } | |
660 | ||
661 | if (has_valid_writes && !read_only) { | |
662 | // we can do this because this is called before client has access to the | |
663 | // DB and there is only a single thread operating on DB | |
664 | ColumnFamilyData* cfd; | |
665 | ||
666 | while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { | |
667 | cfd->Unref(); | |
668 | // If this asserts, it means that InsertInto failed in | |
669 | // filtering updates to already-flushed column families | |
670 | assert(cfd->GetLogNumber() <= log_number); | |
671 | auto iter = version_edits.find(cfd->GetID()); | |
672 | assert(iter != version_edits.end()); | |
673 | VersionEdit* edit = &iter->second; | |
674 | status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); | |
675 | if (!status.ok()) { | |
676 | // Reflect errors immediately so that conditions like full | |
677 | // file-systems cause the DB::Open() to fail. | |
678 | return status; | |
679 | } | |
680 | flushed = true; | |
681 | ||
682 | cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
683 | *next_sequence); | |
684 | } | |
685 | } | |
686 | } | |
687 | ||
688 | if (!status.ok()) { | |
689 | if (immutable_db_options_.wal_recovery_mode == | |
690 | WALRecoveryMode::kSkipAnyCorruptedRecords) { | |
691 | // We should ignore all errors unconditionally | |
692 | status = Status::OK(); | |
693 | } else if (immutable_db_options_.wal_recovery_mode == | |
694 | WALRecoveryMode::kPointInTimeRecovery) { | |
695 | // We should ignore the error but not continue replaying | |
696 | status = Status::OK(); | |
697 | stop_replay_for_corruption = true; | |
698 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
699 | "Point in time recovered to log #%" PRIu64 | |
700 | " seq #%" PRIu64, | |
701 | log_number, *next_sequence); | |
702 | } else { | |
703 | assert(immutable_db_options_.wal_recovery_mode == | |
704 | WALRecoveryMode::kTolerateCorruptedTailRecords || | |
705 | immutable_db_options_.wal_recovery_mode == | |
706 | WALRecoveryMode::kAbsoluteConsistency); | |
707 | return status; | |
708 | } | |
709 | } | |
710 | ||
711 | flush_scheduler_.Clear(); | |
712 | auto last_sequence = *next_sequence - 1; | |
713 | if ((*next_sequence != kMaxSequenceNumber) && | |
714 | (versions_->LastSequence() <= last_sequence)) { | |
715 | versions_->SetLastSequence(last_sequence); | |
716 | } | |
717 | } | |
718 | ||
719 | // True if there's any data in the WALs; if not, we can skip re-processing | |
720 | // them later | |
721 | bool data_seen = false; | |
722 | if (!read_only) { | |
723 | // no need to refcount since client still doesn't have access | |
724 | // to the DB and can not drop column families while we iterate | |
725 | auto max_log_number = log_numbers.back(); | |
726 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
727 | auto iter = version_edits.find(cfd->GetID()); | |
728 | assert(iter != version_edits.end()); | |
729 | VersionEdit* edit = &iter->second; | |
730 | ||
731 | if (cfd->GetLogNumber() > max_log_number) { | |
732 | // Column family cfd has already flushed the data | |
733 | // from all logs. Memtable has to be empty because | |
734 | // we filter the updates based on log_number | |
735 | // (in WriteBatch::InsertInto) | |
736 | assert(cfd->mem()->GetFirstSequenceNumber() == 0); | |
737 | assert(edit->NumEntries() == 0); | |
738 | continue; | |
739 | } | |
740 | ||
741 | // flush the final memtable (if non-empty) | |
742 | if (cfd->mem()->GetFirstSequenceNumber() != 0) { | |
743 | // If flush happened in the middle of recovery (e.g. due to memtable | |
744 | // being full), we flush at the end. Otherwise we'll need to record | |
745 | // where we were on last flush, which make the logic complicated. | |
746 | if (flushed || !immutable_db_options_.avoid_flush_during_recovery) { | |
747 | status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); | |
748 | if (!status.ok()) { | |
749 | // Recovery failed | |
750 | break; | |
751 | } | |
752 | flushed = true; | |
753 | ||
754 | cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), | |
755 | versions_->LastSequence()); | |
756 | } | |
757 | data_seen = true; | |
758 | } | |
759 | ||
760 | // write MANIFEST with update | |
761 | // writing log_number in the manifest means that any log file | |
762 | // with number strongly less than (log_number + 1) is already | |
763 | // recovered and should be ignored on next reincarnation. | |
764 | // Since we already recovered max_log_number, we want all logs | |
765 | // with numbers `<= max_log_number` (includes this one) to be ignored | |
766 | if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) { | |
767 | edit->SetLogNumber(max_log_number + 1); | |
768 | } | |
769 | // we must mark the next log number as used, even though it's | |
770 | // not actually used. that is because VersionSet assumes | |
771 | // VersionSet::next_file_number_ always to be strictly greater than any | |
772 | // log number | |
773 | versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1); | |
774 | status = versions_->LogAndApply( | |
775 | cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_); | |
776 | if (!status.ok()) { | |
777 | // Recovery failed | |
778 | break; | |
779 | } | |
780 | } | |
781 | } | |
782 | ||
783 | if (data_seen && !flushed) { | |
784 | // Mark these as alive so they'll be considered for deletion later by | |
785 | // FindObsoleteFiles() | |
786 | for (auto log_number : log_numbers) { | |
787 | alive_log_files_.push_back(LogFileNumberSize(log_number)); | |
788 | } | |
789 | } | |
790 | ||
791 | event_logger_.Log() << "job" << job_id << "event" | |
792 | << "recovery_finished"; | |
793 | ||
794 | return status; | |
795 | } | |
796 | ||
797 | Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, | |
798 | MemTable* mem, VersionEdit* edit) { | |
799 | mutex_.AssertHeld(); | |
800 | const uint64_t start_micros = env_->NowMicros(); | |
801 | FileMetaData meta; | |
802 | auto pending_outputs_inserted_elem = | |
803 | CaptureCurrentFileNumberInPendingOutputs(); | |
804 | meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); | |
805 | ReadOptions ro; | |
806 | ro.total_order_seek = true; | |
807 | Arena arena; | |
808 | Status s; | |
809 | TableProperties table_properties; | |
810 | { | |
811 | ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); | |
812 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
813 | "[%s] [WriteLevel0TableForRecovery]" | |
814 | " Level-0 table #%" PRIu64 ": started", | |
815 | cfd->GetName().c_str(), meta.fd.GetNumber()); | |
816 | ||
817 | // Get the latest mutable cf options while the mutex is still locked | |
818 | const MutableCFOptions mutable_cf_options = | |
819 | *cfd->GetLatestMutableCFOptions(); | |
820 | bool paranoid_file_checks = | |
821 | cfd->GetLatestMutableCFOptions()->paranoid_file_checks; | |
822 | { | |
823 | mutex_.Unlock(); | |
824 | ||
825 | SequenceNumber earliest_write_conflict_snapshot; | |
826 | std::vector<SequenceNumber> snapshot_seqs = | |
827 | snapshots_.GetAll(&earliest_write_conflict_snapshot); | |
828 | ||
829 | EnvOptions optimized_env_options = | |
830 | env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); | |
831 | s = BuildTable( | |
832 | dbname_, env_, *cfd->ioptions(), mutable_cf_options, | |
833 | optimized_env_options, cfd->table_cache(), iter.get(), | |
834 | std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)), | |
835 | &meta, cfd->internal_comparator(), | |
836 | cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), | |
837 | snapshot_seqs, earliest_write_conflict_snapshot, | |
838 | GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), | |
839 | cfd->ioptions()->compression_opts, paranoid_file_checks, | |
840 | cfd->internal_stats(), TableFileCreationReason::kRecovery, | |
841 | &event_logger_, job_id); | |
842 | LogFlush(immutable_db_options_.info_log); | |
843 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
844 | "[%s] [WriteLevel0TableForRecovery]" | |
845 | " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", | |
846 | cfd->GetName().c_str(), meta.fd.GetNumber(), | |
847 | meta.fd.GetFileSize(), s.ToString().c_str()); | |
848 | mutex_.Lock(); | |
849 | } | |
850 | } | |
851 | ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); | |
852 | ||
853 | // Note that if file_size is zero, the file has been deleted and | |
854 | // should not be added to the manifest. | |
855 | int level = 0; | |
856 | if (s.ok() && meta.fd.GetFileSize() > 0) { | |
857 | edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), | |
858 | meta.fd.GetFileSize(), meta.smallest, meta.largest, | |
859 | meta.smallest_seqno, meta.largest_seqno, | |
860 | meta.marked_for_compaction); | |
861 | } | |
862 | ||
863 | InternalStats::CompactionStats stats(1); | |
864 | stats.micros = env_->NowMicros() - start_micros; | |
865 | stats.bytes_written = meta.fd.GetFileSize(); | |
866 | stats.num_output_files = 1; | |
867 | cfd->internal_stats()->AddCompactionStats(level, stats); | |
868 | cfd->internal_stats()->AddCFStats( | |
869 | InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); | |
870 | RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); | |
871 | return s; | |
872 | } | |
873 | ||
874 | Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { | |
875 | DBOptions db_options(options); | |
876 | ColumnFamilyOptions cf_options(options); | |
877 | std::vector<ColumnFamilyDescriptor> column_families; | |
878 | column_families.push_back( | |
879 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); | |
880 | std::vector<ColumnFamilyHandle*> handles; | |
881 | Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr); | |
882 | if (s.ok()) { | |
883 | assert(handles.size() == 1); | |
884 | // i can delete the handle since DBImpl is always holding a reference to | |
885 | // default column family | |
886 | delete handles[0]; | |
887 | } | |
888 | return s; | |
889 | } | |
890 | ||
891 | Status DB::Open(const DBOptions& db_options, const std::string& dbname, | |
892 | const std::vector<ColumnFamilyDescriptor>& column_families, | |
893 | std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) { | |
894 | Status s = SanitizeOptionsByTable(db_options, column_families); | |
895 | if (!s.ok()) { | |
896 | return s; | |
897 | } | |
898 | ||
899 | s = ValidateOptions(db_options, column_families); | |
900 | if (!s.ok()) { | |
901 | return s; | |
902 | } | |
903 | ||
904 | *dbptr = nullptr; | |
905 | handles->clear(); | |
906 | ||
907 | size_t max_write_buffer_size = 0; | |
908 | for (auto cf : column_families) { | |
909 | max_write_buffer_size = | |
910 | std::max(max_write_buffer_size, cf.options.write_buffer_size); | |
911 | } | |
912 | ||
913 | DBImpl* impl = new DBImpl(db_options, dbname); | |
914 | s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir); | |
915 | if (s.ok()) { | |
916 | for (auto db_path : impl->immutable_db_options_.db_paths) { | |
917 | s = impl->env_->CreateDirIfMissing(db_path.path); | |
918 | if (!s.ok()) { | |
919 | break; | |
920 | } | |
921 | } | |
922 | } | |
923 | ||
924 | if (!s.ok()) { | |
925 | delete impl; | |
926 | return s; | |
927 | } | |
928 | ||
929 | s = impl->CreateArchivalDirectory(); | |
930 | if (!s.ok()) { | |
931 | delete impl; | |
932 | return s; | |
933 | } | |
934 | impl->mutex_.Lock(); | |
935 | // Handles create_if_missing, error_if_exists | |
936 | s = impl->Recover(column_families); | |
937 | if (s.ok()) { | |
938 | uint64_t new_log_number = impl->versions_->NewFileNumber(); | |
939 | unique_ptr<WritableFile> lfile; | |
940 | EnvOptions soptions(db_options); | |
941 | EnvOptions opt_env_options = | |
942 | impl->immutable_db_options_.env->OptimizeForLogWrite( | |
943 | soptions, BuildDBOptions(impl->immutable_db_options_, | |
944 | impl->mutable_db_options_)); | |
945 | s = NewWritableFile( | |
946 | impl->immutable_db_options_.env, | |
947 | LogFileName(impl->immutable_db_options_.wal_dir, new_log_number), | |
948 | &lfile, opt_env_options); | |
949 | if (s.ok()) { | |
950 | lfile->SetPreallocationBlockSize( | |
951 | impl->GetWalPreallocateBlockSize(max_write_buffer_size)); | |
952 | impl->logfile_number_ = new_log_number; | |
953 | unique_ptr<WritableFileWriter> file_writer( | |
954 | new WritableFileWriter(std::move(lfile), opt_env_options)); | |
955 | impl->logs_.emplace_back( | |
956 | new_log_number, | |
957 | new log::Writer( | |
958 | std::move(file_writer), new_log_number, | |
959 | impl->immutable_db_options_.recycle_log_file_num > 0)); | |
960 | ||
961 | // set column family handles | |
962 | for (auto cf : column_families) { | |
963 | auto cfd = | |
964 | impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); | |
965 | if (cfd != nullptr) { | |
966 | handles->push_back( | |
967 | new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); | |
968 | impl->NewThreadStatusCfInfo(cfd); | |
969 | } else { | |
970 | if (db_options.create_missing_column_families) { | |
971 | // missing column family, create it | |
972 | ColumnFamilyHandle* handle; | |
973 | impl->mutex_.Unlock(); | |
974 | s = impl->CreateColumnFamily(cf.options, cf.name, &handle); | |
975 | impl->mutex_.Lock(); | |
976 | if (s.ok()) { | |
977 | handles->push_back(handle); | |
978 | } else { | |
979 | break; | |
980 | } | |
981 | } else { | |
982 | s = Status::InvalidArgument("Column family not found: ", cf.name); | |
983 | break; | |
984 | } | |
985 | } | |
986 | } | |
987 | } | |
988 | if (s.ok()) { | |
989 | for (auto cfd : *impl->versions_->GetColumnFamilySet()) { | |
990 | delete impl->InstallSuperVersionAndScheduleWork( | |
991 | cfd, nullptr, *cfd->GetLatestMutableCFOptions()); | |
992 | } | |
993 | impl->alive_log_files_.push_back( | |
994 | DBImpl::LogFileNumberSize(impl->logfile_number_)); | |
995 | impl->DeleteObsoleteFiles(); | |
996 | s = impl->directories_.GetDbDir()->Fsync(); | |
997 | } | |
998 | } | |
999 | ||
1000 | if (s.ok()) { | |
1001 | for (auto cfd : *impl->versions_->GetColumnFamilySet()) { | |
1002 | if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { | |
1003 | auto* vstorage = cfd->current()->storage_info(); | |
1004 | for (int i = 1; i < vstorage->num_levels(); ++i) { | |
1005 | int num_files = vstorage->NumLevelFiles(i); | |
1006 | if (num_files > 0) { | |
1007 | s = Status::InvalidArgument( | |
1008 | "Not all files are at level 0. Cannot " | |
1009 | "open with FIFO compaction style."); | |
1010 | break; | |
1011 | } | |
1012 | } | |
1013 | } | |
1014 | if (!cfd->mem()->IsSnapshotSupported()) { | |
1015 | impl->is_snapshot_supported_ = false; | |
1016 | } | |
1017 | if (cfd->ioptions()->merge_operator != nullptr && | |
1018 | !cfd->mem()->IsMergeOperatorSupported()) { | |
1019 | s = Status::InvalidArgument( | |
1020 | "The memtable of column family %s does not support merge operator " | |
1021 | "its options.merge_operator is non-null", cfd->GetName().c_str()); | |
1022 | } | |
1023 | if (!s.ok()) { | |
1024 | break; | |
1025 | } | |
1026 | } | |
1027 | } | |
1028 | TEST_SYNC_POINT("DBImpl::Open:Opened"); | |
1029 | Status persist_options_status; | |
1030 | if (s.ok()) { | |
1031 | // Persist RocksDB Options before scheduling the compaction. | |
1032 | // The WriteOptionsFile() will release and lock the mutex internally. | |
1033 | persist_options_status = impl->WriteOptionsFile(); | |
1034 | ||
1035 | *dbptr = impl; | |
1036 | impl->opened_successfully_ = true; | |
1037 | impl->MaybeScheduleFlushOrCompaction(); | |
1038 | } | |
1039 | impl->mutex_.Unlock(); | |
1040 | ||
1041 | #ifndef ROCKSDB_LITE | |
1042 | auto sfm = static_cast<SstFileManagerImpl*>( | |
1043 | impl->immutable_db_options_.sst_file_manager.get()); | |
1044 | if (s.ok() && sfm) { | |
1045 | // Notify SstFileManager about all sst files that already exist in | |
1046 | // db_paths[0] when the DB is opened. | |
1047 | auto& db_path = impl->immutable_db_options_.db_paths[0]; | |
1048 | std::vector<std::string> existing_files; | |
1049 | impl->immutable_db_options_.env->GetChildren(db_path.path, &existing_files); | |
1050 | for (auto& file_name : existing_files) { | |
1051 | uint64_t file_number; | |
1052 | FileType file_type; | |
1053 | std::string file_path = db_path.path + "/" + file_name; | |
1054 | if (ParseFileName(file_name, &file_number, &file_type) && | |
1055 | file_type == kTableFile) { | |
1056 | sfm->OnAddFile(file_path); | |
1057 | } | |
1058 | } | |
1059 | } | |
1060 | #endif // !ROCKSDB_LITE | |
1061 | ||
1062 | if (s.ok()) { | |
1063 | ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl); | |
1064 | LogFlush(impl->immutable_db_options_.info_log); | |
1065 | if (!persist_options_status.ok()) { | |
1066 | if (db_options.fail_if_options_file_error) { | |
1067 | s = Status::IOError( | |
1068 | "DB::Open() failed --- Unable to persist Options file", | |
1069 | persist_options_status.ToString()); | |
1070 | } | |
1071 | ROCKS_LOG_WARN(impl->immutable_db_options_.info_log, | |
1072 | "Unable to persist options in DB::Open() -- %s", | |
1073 | persist_options_status.ToString().c_str()); | |
1074 | } | |
1075 | } | |
1076 | if (!s.ok()) { | |
1077 | for (auto* h : *handles) { | |
1078 | delete h; | |
1079 | } | |
1080 | handles->clear(); | |
1081 | delete impl; | |
1082 | *dbptr = nullptr; | |
1083 | } | |
1084 | return s; | |
1085 | } | |
1086 | } // namespace rocksdb |