]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_impl.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9#include "db/db_impl.h"
10
11#ifndef __STDC_FORMAT_MACROS
12#define __STDC_FORMAT_MACROS
13#endif
7c673cae
FG
14#include <stdint.h>
15#ifdef OS_SOLARIS
16#include <alloca.h>
17#endif
7c673cae
FG
18
19#include <algorithm>
7c673cae
FG
20#include <cstdio>
21#include <map>
22#include <set>
23#include <stdexcept>
24#include <string>
25#include <unordered_map>
26#include <unordered_set>
27#include <utility>
28#include <vector>
29
30#include "db/builder.h"
31#include "db/compaction_job.h"
32#include "db/db_info_dumper.h"
33#include "db/db_iter.h"
34#include "db/dbformat.h"
11fdf7f2 35#include "db/error_handler.h"
7c673cae
FG
36#include "db/event_helpers.h"
37#include "db/external_sst_file_ingestion_job.h"
38#include "db/flush_job.h"
39#include "db/forward_iterator.h"
494da23a 40#include "db/in_memory_stats_history.h"
7c673cae
FG
41#include "db/job_context.h"
42#include "db/log_reader.h"
43#include "db/log_writer.h"
11fdf7f2 44#include "db/malloc_stats.h"
7c673cae
FG
45#include "db/memtable.h"
46#include "db/memtable_list.h"
47#include "db/merge_context.h"
48#include "db/merge_helper.h"
494da23a 49#include "db/range_tombstone_fragmenter.h"
7c673cae
FG
50#include "db/table_cache.h"
51#include "db/table_properties_collector.h"
52#include "db/transaction_log_impl.h"
53#include "db/version_set.h"
54#include "db/write_batch_internal.h"
55#include "db/write_callback.h"
56#include "memtable/hash_linklist_rep.h"
57#include "memtable/hash_skiplist_rep.h"
58#include "monitoring/iostats_context_imp.h"
59#include "monitoring/perf_context_imp.h"
60#include "monitoring/thread_status_updater.h"
61#include "monitoring/thread_status_util.h"
62#include "options/cf_options.h"
63#include "options/options_helper.h"
64#include "options/options_parser.h"
7c673cae
FG
65#include "port/port.h"
66#include "rocksdb/cache.h"
67#include "rocksdb/compaction_filter.h"
11fdf7f2 68#include "rocksdb/convenience.h"
7c673cae
FG
69#include "rocksdb/db.h"
70#include "rocksdb/env.h"
71#include "rocksdb/merge_operator.h"
72#include "rocksdb/statistics.h"
494da23a 73#include "rocksdb/stats_history.h"
7c673cae
FG
74#include "rocksdb/status.h"
75#include "rocksdb/table.h"
7c673cae
FG
76#include "rocksdb/write_buffer_manager.h"
77#include "table/block.h"
78#include "table/block_based_table_factory.h"
79#include "table/merging_iterator.h"
80#include "table/table_builder.h"
81#include "table/two_level_iterator.h"
11fdf7f2 82#include "tools/sst_dump_tool_imp.h"
7c673cae
FG
83#include "util/auto_roll_logger.h"
84#include "util/autovector.h"
85#include "util/build_version.h"
86#include "util/coding.h"
87#include "util/compression.h"
88#include "util/crc32c.h"
89#include "util/file_reader_writer.h"
90#include "util/file_util.h"
91#include "util/filename.h"
92#include "util/log_buffer.h"
93#include "util/logging.h"
94#include "util/mutexlock.h"
95#include "util/sst_file_manager_impl.h"
96#include "util/stop_watch.h"
97#include "util/string_util.h"
98#include "util/sync_point.h"
99
100namespace rocksdb {
101const std::string kDefaultColumnFamilyName("default");
11fdf7f2 102void DumpRocksDBBuildVersion(Logger* log);
7c673cae
FG
103
104CompressionType GetCompressionFlush(
105 const ImmutableCFOptions& ioptions,
106 const MutableCFOptions& mutable_cf_options) {
107 // Compressing memtable flushes might not help unless the sequential load
108 // optimization is used for leveled compaction. Otherwise the CPU and
109 // latency overhead is not offset by saving much space.
110 if (ioptions.compaction_style == kCompactionStyleUniversal) {
11fdf7f2
TL
111 if (mutable_cf_options.compaction_options_universal
112 .compression_size_percent < 0) {
7c673cae
FG
113 return mutable_cf_options.compression;
114 } else {
115 return kNoCompression;
116 }
117 } else if (!ioptions.compression_per_level.empty()) {
118 // For leveled compress when min_level_to_compress != 0.
119 return ioptions.compression_per_level[0];
120 } else {
121 return mutable_cf_options.compression;
122 }
123}
124
125namespace {
126void DumpSupportInfo(Logger* logger) {
127 ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
11fdf7f2
TL
128 for (auto& compression : OptionsHelper::compression_type_string_map) {
129 if (compression.second != kNoCompression &&
130 compression.second != kDisableCompressionOption) {
131 ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
132 CompressionTypeSupported(compression.second));
133 }
134 }
135 ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
136 crc32c::IsFastCrc32Supported().c_str());
7c673cae 137}
7c673cae 138
11fdf7f2
TL
139int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
140} // namespace
141
142DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
143 const bool seq_per_batch, const bool batch_per_txn)
7c673cae
FG
144 : env_(options.env),
145 dbname_(dbname),
11fdf7f2 146 own_info_log_(options.info_log == nullptr),
7c673cae
FG
147 initial_db_options_(SanitizeOptions(dbname, options)),
148 immutable_db_options_(initial_db_options_),
149 mutable_db_options_(initial_db_options_),
150 stats_(immutable_db_options_.statistics.get()),
7c673cae
FG
151 mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
152 immutable_db_options_.use_adaptive_mutex),
494da23a
TL
153 default_cf_handle_(nullptr),
154 max_total_in_memory_state_(0),
155 env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
156 env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
157 env_options_, immutable_db_options_)),
158 db_lock_(nullptr),
7c673cae
FG
159 shutting_down_(false),
160 bg_cv_(&mutex_),
161 logfile_number_(0),
162 log_dir_synced_(false),
163 log_empty_(true),
7c673cae
FG
164 log_sync_cv_(&mutex_),
165 total_log_size_(0),
7c673cae
FG
166 is_snapshot_supported_(true),
167 write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
11fdf7f2
TL
168 write_thread_(immutable_db_options_),
169 nonmem_write_thread_(immutable_db_options_),
7c673cae 170 write_controller_(mutable_db_options_.delayed_write_rate),
11fdf7f2
TL
171 // Use delayed_write_rate as a base line to determine the initial
172 // low pri write rate limit. It may be adjusted later.
173 low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min(
174 static_cast<int64_t>(mutable_db_options_.delayed_write_rate / 8),
175 kDefaultLowPriThrottledRate))),
7c673cae
FG
176 last_batch_group_size_(0),
177 unscheduled_flushes_(0),
178 unscheduled_compactions_(0),
11fdf7f2 179 bg_bottom_compaction_scheduled_(0),
7c673cae
FG
180 bg_compaction_scheduled_(0),
181 num_running_compactions_(0),
182 bg_flush_scheduled_(0),
183 num_running_flushes_(0),
184 bg_purge_scheduled_(0),
185 disable_delete_obsolete_files_(0),
11fdf7f2 186 pending_purge_obsolete_files_(0),
7c673cae
FG
187 delete_obsolete_files_last_run_(env_->NowMicros()),
188 last_stats_dump_time_microsec_(0),
189 next_job_id_(1),
190 has_unpersisted_data_(false),
11fdf7f2 191 unable_to_release_oldest_log_(false),
7c673cae
FG
192 num_running_ingest_file_(0),
193#ifndef ROCKSDB_LITE
11fdf7f2 194 wal_manager_(immutable_db_options_, env_options_, seq_per_batch),
7c673cae
FG
195#endif // ROCKSDB_LITE
196 event_logger_(immutable_db_options_.info_log.get()),
197 bg_work_paused_(0),
198 bg_compaction_paused_(0),
199 refitting_level_(false),
11fdf7f2
TL
200 opened_successfully_(false),
201 two_write_queues_(options.two_write_queues),
202 manual_wal_flush_(options.manual_wal_flush),
203 seq_per_batch_(seq_per_batch),
204 batch_per_txn_(batch_per_txn),
205 // last_sequencee_ is always maintained by the main queue that also writes
206 // to the memtable. When two_write_queues_ is disabled last seq in
207 // memtable is the same as last seq published to the readers. When it is
208 // enabled but seq_per_batch_ is disabled, last seq in memtable still
209 // indicates last published seq since wal-only writes that go to the 2nd
210 // queue do not consume a sequence number. Otherwise writes performed by
211 // the 2nd queue could change what is visible to the readers. In this
212 // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
213 // separate variable to indicate the last published sequence.
214 last_seq_same_as_publish_seq_(
215 !(seq_per_batch && options.two_write_queues)),
216 // Since seq_per_batch_ is currently set only by WritePreparedTxn which
217 // requires a custom gc for compaction, we use that to set use_custom_gc_
218 // as well.
219 use_custom_gc_(seq_per_batch),
220 shutdown_initiated_(false),
221 own_sfm_(options.sst_file_manager == nullptr),
222 preserve_deletes_(options.preserve_deletes),
223 closed_(false),
494da23a
TL
224 error_handler_(this, immutable_db_options_, &mutex_),
225 atomic_flush_install_cv_(&mutex_) {
11fdf7f2
TL
226 // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
227 // WriteUnprepared, which should use seq_per_batch_.
228 assert(batch_per_txn_ || seq_per_batch_);
7c673cae
FG
229 env_->GetAbsolutePath(dbname, &db_absolute_path_);
230
231 // Reserve ten files or so for other uses and give the rest to TableCache.
232 // Give a large number for setting of "infinite" open files.
11fdf7f2
TL
233 const int table_cache_size = (mutable_db_options_.max_open_files == -1)
234 ? TableCache::kInfiniteCapacity
235 : mutable_db_options_.max_open_files - 10;
7c673cae
FG
236 table_cache_ = NewLRUCache(table_cache_size,
237 immutable_db_options_.table_cache_numshardbits);
238
239 versions_.reset(new VersionSet(dbname_, &immutable_db_options_, env_options_,
240 table_cache_.get(), write_buffer_manager_,
241 &write_controller_));
242 column_family_memtables_.reset(
243 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
244
245 DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
246 DumpDBFileSummary(immutable_db_options_, dbname_);
247 immutable_db_options_.Dump(immutable_db_options_.info_log.get());
248 mutable_db_options_.Dump(immutable_db_options_.info_log.get());
249 DumpSupportInfo(immutable_db_options_.info_log.get());
11fdf7f2
TL
250
251 // always open the DB with 0 here, which means if preserve_deletes_==true
252 // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
253 // is called by client and this seqnum is advanced.
254 preserve_deletes_seqnum_.store(0);
255}
256
257Status DBImpl::Resume() {
258 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
259
260 InstrumentedMutexLock db_mutex(&mutex_);
261
262 if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
263 // Nothing to do
264 return Status::OK();
265 }
266
267 if (error_handler_.IsRecoveryInProgress()) {
268 // Don't allow a mix of manual and automatic recovery
269 return Status::Busy();
270 }
271
272 mutex_.Unlock();
273 Status s = error_handler_.RecoverFromBGError(true);
274 mutex_.Lock();
275 return s;
276}
277
278// This function implements the guts of recovery from a background error. It
279// is eventually called for both manual as well as automatic recovery. It does
280// the following -
281// 1. Wait for currently scheduled background flush/compaction to exit, in
282// order to inadvertently causing an error and thinking recovery failed
283// 2. Flush memtables if there's any data for all the CFs. This may result
284// another error, which will be saved by error_handler_ and reported later
285// as the recovery status
286// 3. Find and delete any obsolete files
287// 4. Schedule compactions if needed for all the CFs. This is needed as the
288// flush in the prior step might have been a no-op for some CFs, which
289// means a new super version wouldn't have been installed
290Status DBImpl::ResumeImpl() {
291 mutex_.AssertHeld();
292 WaitForBackgroundWork();
293
294 Status bg_error = error_handler_.GetBGError();
295 Status s;
296 if (shutdown_initiated_) {
297 // Returning shutdown status to SFM during auto recovery will cause it
298 // to abort the recovery and allow the shutdown to progress
299 s = Status::ShutdownInProgress();
300 }
301 if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
494da23a
TL
302 ROCKS_LOG_INFO(
303 immutable_db_options_.info_log,
11fdf7f2
TL
304 "DB resume requested but failed due to Fatal/Unrecoverable error");
305 s = bg_error;
306 }
307
308 // We cannot guarantee consistency of the WAL. So force flush Memtables of
309 // all the column families
310 if (s.ok()) {
494da23a
TL
311 FlushOptions flush_opts;
312 // We allow flush to stall write since we are trying to resume from error.
313 flush_opts.allow_write_stall = true;
314 if (immutable_db_options_.atomic_flush) {
315 autovector<ColumnFamilyData*> cfds;
316 SelectColumnFamiliesForAtomicFlush(&cfds);
317 mutex_.Unlock();
318 s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
319 mutex_.Lock();
320 } else {
321 for (auto cfd : *versions_->GetColumnFamilySet()) {
322 if (cfd->IsDropped()) {
323 continue;
324 }
325 cfd->Ref();
326 mutex_.Unlock();
327 s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
328 mutex_.Lock();
329 cfd->Unref();
330 if (!s.ok()) {
331 break;
332 }
333 }
334 }
11fdf7f2
TL
335 if (!s.ok()) {
336 ROCKS_LOG_INFO(immutable_db_options_.info_log,
337 "DB resume requested but failed due to Flush failure [%s]",
338 s.ToString().c_str());
339 }
340 }
341
342 JobContext job_context(0);
343 FindObsoleteFiles(&job_context, true);
344 if (s.ok()) {
345 s = error_handler_.ClearBGError();
346 }
347 mutex_.Unlock();
348
349 job_context.manifest_file_number = 1;
350 if (job_context.HaveSomethingToDelete()) {
351 PurgeObsoleteFiles(job_context);
352 }
353 job_context.Clean();
354
355 if (s.ok()) {
356 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
357 }
358 mutex_.Lock();
359 // Check for shutdown again before scheduling further compactions,
360 // since we released and re-acquired the lock above
361 if (shutdown_initiated_) {
362 s = Status::ShutdownInProgress();
363 }
364 if (s.ok()) {
365 for (auto cfd : *versions_->GetColumnFamilySet()) {
366 SchedulePendingCompaction(cfd);
367 }
368 MaybeScheduleFlushOrCompaction();
369 }
370
371 // Wake up any waiters - in this case, it could be the shutdown thread
372 bg_cv_.SignalAll();
373
494da23a
TL
374 // No need to check BGError again. If something happened, event listener would
375 // be notified and the operation causing it would have failed
11fdf7f2
TL
376 return s;
377}
378
379void DBImpl::WaitForBackgroundWork() {
380 // Wait for background work to finish
381 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
382 bg_flush_scheduled_) {
383 bg_cv_.Wait();
384 }
7c673cae
FG
385}
386
387// Will lock the mutex_, will wait for completion if wait is true
388void DBImpl::CancelAllBackgroundWork(bool wait) {
7c673cae
FG
389 ROCKS_LOG_INFO(immutable_db_options_.info_log,
390 "Shutdown: canceling all background work");
391
494da23a
TL
392 if (thread_dump_stats_ != nullptr) {
393 thread_dump_stats_->cancel();
394 thread_dump_stats_.reset();
395 }
396 if (thread_persist_stats_ != nullptr) {
397 thread_persist_stats_->cancel();
398 thread_persist_stats_.reset();
399 }
400 InstrumentedMutexLock l(&mutex_);
7c673cae
FG
401 if (!shutting_down_.load(std::memory_order_acquire) &&
402 has_unpersisted_data_.load(std::memory_order_relaxed) &&
403 !mutable_db_options_.avoid_flush_during_shutdown) {
494da23a
TL
404 if (immutable_db_options_.atomic_flush) {
405 autovector<ColumnFamilyData*> cfds;
406 SelectColumnFamiliesForAtomicFlush(&cfds);
407 mutex_.Unlock();
408 AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
409 mutex_.Lock();
410 } else {
411 for (auto cfd : *versions_->GetColumnFamilySet()) {
412 if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
413 cfd->Ref();
414 mutex_.Unlock();
415 FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
416 mutex_.Lock();
417 cfd->Unref();
418 }
7c673cae
FG
419 }
420 }
421 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
422 }
423
424 shutting_down_.store(true, std::memory_order_release);
425 bg_cv_.SignalAll();
426 if (!wait) {
427 return;
428 }
11fdf7f2
TL
429 WaitForBackgroundWork();
430}
431
432Status DBImpl::CloseHelper() {
433 // Guarantee that there is no background error recovery in progress before
434 // continuing with the shutdown
435 mutex_.Lock();
436 shutdown_initiated_ = true;
437 error_handler_.CancelErrorRecovery();
438 while (error_handler_.IsRecoveryInProgress()) {
7c673cae
FG
439 bg_cv_.Wait();
440 }
11fdf7f2 441 mutex_.Unlock();
7c673cae 442
7c673cae
FG
443 // CancelAllBackgroundWork called with false means we just set the shutdown
444 // marker. After this we do a variant of the waiting and unschedule work
445 // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
446 CancelAllBackgroundWork(false);
11fdf7f2
TL
447 int bottom_compactions_unscheduled =
448 env_->UnSchedule(this, Env::Priority::BOTTOM);
7c673cae
FG
449 int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
450 int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
11fdf7f2 451 Status ret;
7c673cae 452 mutex_.Lock();
11fdf7f2 453 bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
7c673cae
FG
454 bg_compaction_scheduled_ -= compactions_unscheduled;
455 bg_flush_scheduled_ -= flushes_unscheduled;
456
457 // Wait for background work to finish
11fdf7f2
TL
458 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
459 bg_flush_scheduled_ || bg_purge_scheduled_ ||
460 pending_purge_obsolete_files_ ||
461 error_handler_.IsRecoveryInProgress()) {
7c673cae
FG
462 TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
463 bg_cv_.Wait();
464 }
11fdf7f2
TL
465 TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
466 &files_grabbed_for_purge_);
7c673cae
FG
467 EraseThreadStatusDbInfo();
468 flush_scheduler_.Clear();
469
470 while (!flush_queue_.empty()) {
11fdf7f2
TL
471 const FlushRequest& flush_req = PopFirstFromFlushQueue();
472 for (const auto& iter : flush_req) {
473 ColumnFamilyData* cfd = iter.first;
474 if (cfd->Unref()) {
475 delete cfd;
476 }
7c673cae
FG
477 }
478 }
479 while (!compaction_queue_.empty()) {
480 auto cfd = PopFirstFromCompactionQueue();
481 if (cfd->Unref()) {
482 delete cfd;
483 }
484 }
485
486 if (default_cf_handle_ != nullptr) {
487 // we need to delete handle outside of lock because it does its own locking
488 mutex_.Unlock();
489 delete default_cf_handle_;
490 mutex_.Lock();
491 }
492
493 // Clean up obsolete files due to SuperVersion release.
494 // (1) Need to delete to obsolete files before closing because RepairDB()
495 // scans all existing files in the file system and builds manifest file.
496 // Keeping obsolete files confuses the repair process.
497 // (2) Need to check if we Open()/Recover() the DB successfully before
498 // deleting because if VersionSet recover fails (may be due to corrupted
499 // manifest file), it is not able to identify live files correctly. As a
500 // result, all "live" files can get deleted by accident. However, corrupted
501 // manifest is recoverable by RepairDB().
502 if (opened_successfully_) {
503 JobContext job_context(next_job_id_.fetch_add(1));
504 FindObsoleteFiles(&job_context, true);
505
506 mutex_.Unlock();
507 // manifest number starting from 2
508 job_context.manifest_file_number = 1;
509 if (job_context.HaveSomethingToDelete()) {
510 PurgeObsoleteFiles(job_context);
511 }
512 job_context.Clean();
513 mutex_.Lock();
514 }
515
516 for (auto l : logs_to_free_) {
517 delete l;
518 }
519 for (auto& log : logs_) {
11fdf7f2
TL
520 uint64_t log_number = log.writer->get_log_number();
521 Status s = log.ClearWriter();
522 if (!s.ok()) {
523 ROCKS_LOG_WARN(
524 immutable_db_options_.info_log,
525 "Unable to Sync WAL file %s with error -- %s",
526 LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
527 s.ToString().c_str());
528 // Retain the first error
529 if (ret.ok()) {
530 ret = s;
531 }
532 }
7c673cae
FG
533 }
534 logs_.clear();
535
536 // Table cache may have table handles holding blocks from the block cache.
537 // We need to release them before the block cache is destroyed. The block
538 // cache may be destroyed inside versions_.reset(), when column family data
539 // list is destroyed, so leaving handles in table cache after
540 // versions_.reset() may cause issues.
541 // Here we clean all unreferenced handles in table cache.
542 // Now we assume all user queries have finished, so only version set itself
543 // can possibly hold the blocks from block cache. After releasing unreferenced
544 // handles here, only handles held by version set left and inside
545 // versions_.reset(), we will release them. There, we need to make sure every
546 // time a handle is released, we erase it from the cache too. By doing that,
547 // we can guarantee that after versions_.reset(), table cache is empty
548 // so the cache can be safely destroyed.
549 table_cache_->EraseUnRefEntries();
550
551 for (auto& txn_entry : recovered_transactions_) {
552 delete txn_entry.second;
553 }
554
555 // versions need to be destroyed before table_cache since it can hold
556 // references to table_cache.
557 versions_.reset();
558 mutex_.Unlock();
559 if (db_lock_ != nullptr) {
560 env_->UnlockFile(db_lock_);
561 }
562
563 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
564 LogFlush(immutable_db_options_.info_log);
11fdf7f2
TL
565
566#ifndef ROCKSDB_LITE
567 // If the sst_file_manager was allocated by us during DB::Open(), ccall
568 // Close() on it before closing the info_log. Otherwise, background thread
569 // in SstFileManagerImpl might try to log something
570 if (immutable_db_options_.sst_file_manager && own_sfm_) {
571 auto sfm = static_cast<SstFileManagerImpl*>(
572 immutable_db_options_.sst_file_manager.get());
573 sfm->Close();
574 }
494da23a 575#endif // ROCKSDB_LITE
11fdf7f2
TL
576
577 if (immutable_db_options_.info_log && own_info_log_) {
578 Status s = immutable_db_options_.info_log->Close();
579 if (ret.ok()) {
580 ret = s;
581 }
582 }
583 return ret;
584}
585
586Status DBImpl::CloseImpl() { return CloseHelper(); }
587
588DBImpl::~DBImpl() {
589 if (!closed_) {
590 closed_ = true;
591 CloseHelper();
592 }
7c673cae
FG
593}
594
595void DBImpl::MaybeIgnoreError(Status* s) const {
596 if (s->ok() || immutable_db_options_.paranoid_checks) {
597 // No change needed
598 } else {
599 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
600 s->ToString().c_str());
601 *s = Status::OK();
602 }
603}
604
605const Status DBImpl::CreateArchivalDirectory() {
606 if (immutable_db_options_.wal_ttl_seconds > 0 ||
607 immutable_db_options_.wal_size_limit_mb > 0) {
608 std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
609 return env_->CreateDirIfMissing(archivalPath);
610 }
611 return Status::OK();
612}
613
614void DBImpl::PrintStatistics() {
615 auto dbstats = immutable_db_options_.statistics.get();
616 if (dbstats) {
494da23a 617 ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
7c673cae
FG
618 dbstats->ToString().c_str());
619 }
620}
621
494da23a
TL
622void DBImpl::StartTimedTasks() {
623 unsigned int stats_dump_period_sec = 0;
624 unsigned int stats_persist_period_sec = 0;
625 {
626 InstrumentedMutexLock l(&mutex_);
627 stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
628 if (stats_dump_period_sec > 0) {
629 if (!thread_dump_stats_) {
630 thread_dump_stats_.reset(new rocksdb::RepeatableThread(
631 [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
632 stats_dump_period_sec * 1000000));
633 }
634 }
635 stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
636 if (stats_persist_period_sec > 0) {
637 if (!thread_persist_stats_) {
638 thread_persist_stats_.reset(new rocksdb::RepeatableThread(
639 [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
640 stats_persist_period_sec * 1000000));
641 }
642 }
643 }
644}
7c673cae 645
494da23a
TL
646// esitmate the total size of stats_history_
647size_t DBImpl::EstiamteStatsHistorySize() const {
648 size_t size_total =
649 sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
650 if (stats_history_.size() == 0) return size_total;
651 size_t size_per_slice =
652 sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
653 // non-empty map, stats_history_.begin() guaranteed to exist
654 std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
655 for (const auto& pairs : sample_slice) {
656 size_per_slice +=
657 pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
658 }
659 size_total = size_per_slice * stats_history_.size();
660 return size_total;
661}
7c673cae 662
494da23a
TL
663void DBImpl::PersistStats() {
664 TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
7c673cae 665#ifndef ROCKSDB_LITE
494da23a
TL
666 if (shutdown_initiated_) {
667 return;
668 }
669 uint64_t now_micros = env_->NowMicros();
670 Statistics* statistics = immutable_db_options_.statistics.get();
671 if (!statistics) {
672 return;
673 }
674 size_t stats_history_size_limit = 0;
675 {
676 InstrumentedMutexLock l(&mutex_);
677 stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
678 }
679
680 // TODO(Zhongyi): also persist immutable_db_options_.statistics
681 {
682 std::map<std::string, uint64_t> stats_map;
683 if (!statistics->getTickerMap(&stats_map)) {
684 return;
685 }
686 InstrumentedMutexLock l(&stats_history_mutex_);
687 // calculate the delta from last time
688 if (stats_slice_initialized_) {
689 std::map<std::string, uint64_t> stats_delta;
690 for (const auto& stat : stats_map) {
691 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
692 stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
11fdf7f2 693 }
7c673cae 694 }
494da23a
TL
695 stats_history_[now_micros] = stats_delta;
696 }
697 stats_slice_initialized_ = true;
698 std::swap(stats_slice_, stats_map);
699 TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
700
701 // delete older stats snapshots to control memory consumption
702 bool purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit;
703 while (purge_needed && !stats_history_.empty()) {
704 stats_history_.erase(stats_history_.begin());
705 purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit;
706 }
707 }
708 // TODO: persist stats to disk
709#endif // !ROCKSDB_LITE
710}
711
712bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
713 uint64_t* new_time,
714 std::map<std::string, uint64_t>* stats_map) {
715 assert(new_time);
716 assert(stats_map);
717 if (!new_time || !stats_map) return false;
718 // lock when search for start_time
719 {
720 InstrumentedMutexLock l(&stats_history_mutex_);
721 auto it = stats_history_.lower_bound(start_time);
722 if (it != stats_history_.end() && it->first < end_time) {
723 // make a copy for timestamp and stats_map
724 *new_time = it->first;
725 *stats_map = it->second;
726 return true;
727 } else {
728 return false;
729 }
730 }
731}
732
733Status DBImpl::GetStatsHistory(
734 uint64_t start_time, uint64_t end_time,
735 std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
736 if (!stats_iterator) {
737 return Status::InvalidArgument("stats_iterator not preallocated.");
738 }
739 stats_iterator->reset(
740 new InMemoryStatsHistoryIterator(start_time, end_time, this));
741 return (*stats_iterator)->status();
742}
743
744void DBImpl::DumpStats() {
745 TEST_SYNC_POINT("DBImpl::DumpStats:1");
746#ifndef ROCKSDB_LITE
747 const DBPropertyInfo* cf_property_info =
748 GetPropertyInfo(DB::Properties::kCFStats);
749 assert(cf_property_info != nullptr);
750 const DBPropertyInfo* db_property_info =
751 GetPropertyInfo(DB::Properties::kDBStats);
752 assert(db_property_info != nullptr);
753
754 std::string stats;
755 if (shutdown_initiated_) {
756 return;
757 }
758 {
759 InstrumentedMutexLock l(&mutex_);
760 default_cf_internal_stats_->GetStringProperty(
761 *db_property_info, DB::Properties::kDBStats, &stats);
762 for (auto cfd : *versions_->GetColumnFamilySet()) {
763 if (cfd->initialized()) {
764 cfd->internal_stats()->GetStringProperty(
765 *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
7c673cae
FG
766 }
767 }
494da23a
TL
768 for (auto cfd : *versions_->GetColumnFamilySet()) {
769 if (cfd->initialized()) {
770 cfd->internal_stats()->GetStringProperty(
771 *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
7c673cae
FG
772 }
773 }
494da23a
TL
774 }
775 TEST_SYNC_POINT("DBImpl::DumpStats:2");
776 ROCKS_LOG_INFO(immutable_db_options_.info_log,
777 "------- DUMPING STATS -------");
778 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
779 if (immutable_db_options_.dump_malloc_stats) {
780 stats.clear();
781 DumpMallocStats(&stats);
782 if (!stats.empty()) {
783 ROCKS_LOG_INFO(immutable_db_options_.info_log,
784 "------- Malloc STATS -------");
785 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
786 }
787 }
7c673cae
FG
788#endif // !ROCKSDB_LITE
789
494da23a 790 PrintStatistics();
7c673cae
FG
791}
792
793void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
794 if (!job_context->logs_to_free.empty()) {
795 for (auto l : job_context->logs_to_free) {
796 AddToLogsToFreeQueue(l);
797 }
798 job_context->logs_to_free.clear();
799 SchedulePurge();
800 }
801}
802
11fdf7f2
TL
803Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
804 assert(cfd);
805 Directory* ret_dir = cfd->GetDataDir(path_id);
806 if (ret_dir == nullptr) {
807 return directories_.GetDataDir(path_id);
808 }
809 return ret_dir;
810}
811
812Directory* DBImpl::Directories::GetDataDir(size_t path_id) const {
7c673cae
FG
813 assert(path_id < data_dirs_.size());
814 Directory* ret_dir = data_dirs_[path_id].get();
815 if (ret_dir == nullptr) {
816 // Should use db_dir_
817 return db_dir_.get();
818 }
819 return ret_dir;
820}
821
11fdf7f2
TL
822Status DBImpl::SetOptions(
823 ColumnFamilyHandle* column_family,
7c673cae
FG
824 const std::unordered_map<std::string, std::string>& options_map) {
825#ifdef ROCKSDB_LITE
11fdf7f2
TL
826 (void)column_family;
827 (void)options_map;
7c673cae
FG
828 return Status::NotSupported("Not supported in ROCKSDB LITE");
829#else
830 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
831 if (options_map.empty()) {
832 ROCKS_LOG_WARN(immutable_db_options_.info_log,
833 "SetOptions() on column family [%s], empty input",
834 cfd->GetName().c_str());
835 return Status::InvalidArgument("empty input");
836 }
837
838 MutableCFOptions new_options;
839 Status s;
840 Status persist_options_status;
11fdf7f2 841 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae
FG
842 {
843 InstrumentedMutexLock l(&mutex_);
844 s = cfd->SetOptions(options_map);
845 if (s.ok()) {
846 new_options = *cfd->GetLatestMutableCFOptions();
847 // Append new version to recompute compaction score.
848 VersionEdit dummy_edit;
849 versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
850 directories_.GetDbDir());
851 // Trigger possible flush/compactions. This has to be before we persist
852 // options to file, otherwise there will be a deadlock with writer
853 // thread.
11fdf7f2 854 InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
7c673cae 855
11fdf7f2
TL
856 persist_options_status = WriteOptionsFile(
857 false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
858 bg_cv_.SignalAll();
7c673cae
FG
859 }
860 }
11fdf7f2 861 sv_context.Clean();
7c673cae 862
11fdf7f2
TL
863 ROCKS_LOG_INFO(
864 immutable_db_options_.info_log,
865 "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
7c673cae
FG
866 for (const auto& o : options_map) {
867 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
868 o.second.c_str());
869 }
870 if (s.ok()) {
871 ROCKS_LOG_INFO(immutable_db_options_.info_log,
872 "[%s] SetOptions() succeeded", cfd->GetName().c_str());
873 new_options.Dump(immutable_db_options_.info_log.get());
874 if (!persist_options_status.ok()) {
11fdf7f2 875 s = persist_options_status;
7c673cae
FG
876 }
877 } else {
878 ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
879 cfd->GetName().c_str());
880 }
881 LogFlush(immutable_db_options_.info_log);
882 return s;
883#endif // ROCKSDB_LITE
884}
885
886Status DBImpl::SetDBOptions(
887 const std::unordered_map<std::string, std::string>& options_map) {
888#ifdef ROCKSDB_LITE
11fdf7f2 889 (void)options_map;
7c673cae
FG
890 return Status::NotSupported("Not supported in ROCKSDB LITE");
891#else
892 if (options_map.empty()) {
893 ROCKS_LOG_WARN(immutable_db_options_.info_log,
894 "SetDBOptions(), empty input.");
895 return Status::InvalidArgument("empty input");
896 }
897
898 MutableDBOptions new_options;
899 Status s;
900 Status persist_options_status;
11fdf7f2 901 bool wal_changed = false;
7c673cae
FG
902 WriteContext write_context;
903 {
904 InstrumentedMutexLock l(&mutex_);
905 s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
906 &new_options);
907 if (s.ok()) {
908 if (new_options.max_background_compactions >
909 mutable_db_options_.max_background_compactions) {
910 env_->IncBackgroundThreadsIfNeeded(
911 new_options.max_background_compactions, Env::Priority::LOW);
912 MaybeScheduleFlushOrCompaction();
913 }
494da23a
TL
914 if (new_options.stats_dump_period_sec !=
915 mutable_db_options_.stats_dump_period_sec) {
916 if (thread_dump_stats_) {
917 mutex_.Unlock();
918 thread_dump_stats_->cancel();
919 mutex_.Lock();
920 }
921 if (new_options.stats_dump_period_sec > 0) {
922 thread_dump_stats_.reset(new rocksdb::RepeatableThread(
923 [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
924 new_options.stats_dump_period_sec * 1000000));
925 } else {
926 thread_dump_stats_.reset();
927 }
928 }
929 if (new_options.stats_persist_period_sec !=
930 mutable_db_options_.stats_persist_period_sec) {
931 if (thread_persist_stats_) {
932 mutex_.Unlock();
933 thread_persist_stats_->cancel();
934 mutex_.Lock();
935 }
936 if (new_options.stats_persist_period_sec > 0) {
937 thread_persist_stats_.reset(new rocksdb::RepeatableThread(
938 [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
939 new_options.stats_persist_period_sec * 1000000));
940 } else {
941 thread_persist_stats_.reset();
942 }
943 }
11fdf7f2
TL
944 write_controller_.set_max_delayed_write_rate(
945 new_options.delayed_write_rate);
946 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
947 ? TableCache::kInfiniteCapacity
948 : new_options.max_open_files - 10);
949 wal_changed = mutable_db_options_.wal_bytes_per_sync !=
950 new_options.wal_bytes_per_sync;
951 if (new_options.bytes_per_sync == 0) {
952 new_options.bytes_per_sync = 1024 * 1024;
953 }
7c673cae 954 mutable_db_options_ = new_options;
11fdf7f2
TL
955 env_options_for_compaction_ = EnvOptions(
956 BuildDBOptions(immutable_db_options_, mutable_db_options_));
957 env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite(
958 env_options_for_compaction_, immutable_db_options_);
959 versions_->ChangeEnvOptions(mutable_db_options_);
960 env_options_for_compaction_ = env_->OptimizeForCompactionTableRead(
961 env_options_for_compaction_, immutable_db_options_);
962 env_options_for_compaction_.compaction_readahead_size =
963 mutable_db_options_.compaction_readahead_size;
494da23a 964 WriteThread::Writer w;
7c673cae 965 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
966 if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
967 Status purge_wal_status = SwitchWAL(&write_context);
7c673cae
FG
968 if (!purge_wal_status.ok()) {
969 ROCKS_LOG_WARN(immutable_db_options_.info_log,
970 "Unable to purge WAL files in SetDBOptions() -- %s",
971 purge_wal_status.ToString().c_str());
972 }
973 }
11fdf7f2
TL
974 persist_options_status = WriteOptionsFile(
975 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
7c673cae
FG
976 write_thread_.ExitUnbatched(&w);
977 }
978 }
979 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
980 for (const auto& o : options_map) {
981 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
982 o.second.c_str());
983 }
984 if (s.ok()) {
985 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
986 new_options.Dump(immutable_db_options_.info_log.get());
987 if (!persist_options_status.ok()) {
988 if (immutable_db_options_.fail_if_options_file_error) {
989 s = Status::IOError(
990 "SetDBOptions() succeeded, but unable to persist options",
991 persist_options_status.ToString());
992 }
993 ROCKS_LOG_WARN(immutable_db_options_.info_log,
994 "Unable to persist options in SetDBOptions() -- %s",
995 persist_options_status.ToString().c_str());
996 }
997 } else {
998 ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
999 }
1000 LogFlush(immutable_db_options_.info_log);
1001 return s;
1002#endif // ROCKSDB_LITE
1003}
1004
1005// return the same level if it cannot be moved
11fdf7f2
TL
1006int DBImpl::FindMinimumEmptyLevelFitting(
1007 ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1008 int level) {
7c673cae
FG
1009 mutex_.AssertHeld();
1010 const auto* vstorage = cfd->current()->storage_info();
1011 int minimum_level = level;
1012 for (int i = level - 1; i > 0; --i) {
1013 // stop if level i is not empty
1014 if (vstorage->NumLevelFiles(i) > 0) break;
1015 // stop if level i is too small (cannot fit the level files)
1016 if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1017 break;
1018 }
1019
1020 minimum_level = i;
1021 }
1022 return minimum_level;
1023}
1024
11fdf7f2
TL
1025Status DBImpl::FlushWAL(bool sync) {
1026 if (manual_wal_flush_) {
1027 // We need to lock log_write_mutex_ since logs_ might change concurrently
1028 InstrumentedMutexLock wl(&log_write_mutex_);
1029 log::Writer* cur_log_writer = logs_.back().writer;
1030 auto s = cur_log_writer->WriteBuffer();
1031 if (!s.ok()) {
1032 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1033 s.ToString().c_str());
1034 // In case there is a fs error we should set it globally to prevent the
1035 // future writes
1036 WriteStatusCheck(s);
1037 // whether sync or not, we should abort the rest of function upon error
1038 return s;
1039 }
1040 if (!sync) {
1041 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1042 return s;
1043 }
1044 }
1045 if (!sync) {
1046 return Status::OK();
1047 }
1048 // sync = true
1049 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1050 return SyncWAL();
1051}
1052
7c673cae
FG
1053Status DBImpl::SyncWAL() {
1054 autovector<log::Writer*, 1> logs_to_sync;
1055 bool need_log_dir_sync;
1056 uint64_t current_log_number;
1057
1058 {
1059 InstrumentedMutexLock l(&mutex_);
1060 assert(!logs_.empty());
1061
1062 // This SyncWAL() call only cares about logs up to this number.
1063 current_log_number = logfile_number_;
1064
1065 while (logs_.front().number <= current_log_number &&
1066 logs_.front().getting_synced) {
1067 log_sync_cv_.Wait();
1068 }
1069 // First check that logs are safe to sync in background.
1070 for (auto it = logs_.begin();
1071 it != logs_.end() && it->number <= current_log_number; ++it) {
1072 if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1073 return Status::NotSupported(
1074 "SyncWAL() is not supported for this implementation of WAL file",
1075 immutable_db_options_.allow_mmap_writes
1076 ? "try setting Options::allow_mmap_writes to false"
1077 : Slice());
1078 }
1079 }
1080 for (auto it = logs_.begin();
1081 it != logs_.end() && it->number <= current_log_number; ++it) {
1082 auto& log = *it;
1083 assert(!log.getting_synced);
1084 log.getting_synced = true;
1085 logs_to_sync.push_back(log.writer);
1086 }
1087
1088 need_log_dir_sync = !log_dir_synced_;
1089 }
1090
11fdf7f2 1091 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
7c673cae
FG
1092 RecordTick(stats_, WAL_FILE_SYNCED);
1093 Status status;
1094 for (log::Writer* log : logs_to_sync) {
1095 status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1096 if (!status.ok()) {
1097 break;
1098 }
1099 }
1100 if (status.ok() && need_log_dir_sync) {
1101 status = directories_.GetWalDir()->Fsync();
1102 }
11fdf7f2 1103 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
7c673cae
FG
1104
1105 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1106 {
1107 InstrumentedMutexLock l(&mutex_);
1108 MarkLogsSynced(current_log_number, need_log_dir_sync, status);
1109 }
1110 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1111
1112 return status;
1113}
1114
494da23a
TL
1115Status DBImpl::LockWAL() {
1116 log_write_mutex_.Lock();
1117 auto cur_log_writer = logs_.back().writer;
1118 auto status = cur_log_writer->WriteBuffer();
1119 if (!status.ok()) {
1120 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1121 status.ToString().c_str());
1122 // In case there is a fs error we should set it globally to prevent the
1123 // future writes
1124 WriteStatusCheck(status);
1125 }
1126 return status;
1127}
1128
1129Status DBImpl::UnlockWAL() {
1130 log_write_mutex_.Unlock();
1131 return Status::OK();
1132}
1133
11fdf7f2
TL
1134void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
1135 const Status& status) {
7c673cae 1136 mutex_.AssertHeld();
11fdf7f2 1137 if (synced_dir && logfile_number_ == up_to && status.ok()) {
7c673cae
FG
1138 log_dir_synced_ = true;
1139 }
1140 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
1141 auto& log = *it;
1142 assert(log.getting_synced);
1143 if (status.ok() && logs_.size() > 1) {
1144 logs_to_free_.push_back(log.ReleaseWriter());
11fdf7f2
TL
1145 // To modify logs_ both mutex_ and log_write_mutex_ must be held
1146 InstrumentedMutexLock l(&log_write_mutex_);
7c673cae
FG
1147 it = logs_.erase(it);
1148 } else {
1149 log.getting_synced = false;
1150 ++it;
1151 }
1152 }
1153 assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
1154 (logs_.size() == 1 && !logs_[0].getting_synced));
1155 log_sync_cv_.SignalAll();
1156}
1157
1158SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1159 return versions_->LastSequence();
1160}
1161
11fdf7f2
TL
1162void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1163 versions_->SetLastPublishedSequence(seq);
1164}
1165
1166bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1167 if (seqnum > preserve_deletes_seqnum_.load()) {
1168 preserve_deletes_seqnum_.store(seqnum);
1169 return true;
1170 } else {
1171 return false;
1172 }
1173}
1174
7c673cae 1175InternalIterator* DBImpl::NewInternalIterator(
494da23a 1176 Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
7c673cae
FG
1177 ColumnFamilyHandle* column_family) {
1178 ColumnFamilyData* cfd;
1179 if (column_family == nullptr) {
1180 cfd = default_cf_handle_->cfd();
1181 } else {
1182 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1183 cfd = cfh->cfd();
1184 }
1185
1186 mutex_.Lock();
1187 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1188 mutex_.Unlock();
1189 ReadOptions roptions;
494da23a
TL
1190 return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
1191 sequence);
7c673cae
FG
1192}
1193
1194void DBImpl::SchedulePurge() {
1195 mutex_.AssertHeld();
1196 assert(opened_successfully_);
1197
1198 // Purge operations are put into High priority queue
1199 bg_purge_scheduled_++;
1200 env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1201}
1202
1203void DBImpl::BackgroundCallPurge() {
1204 mutex_.Lock();
1205
1206 // We use one single loop to clear both queues so that after existing the loop
1207 // both queues are empty. This is stricter than what is needed, but can make
1208 // it easier for us to reason the correctness.
1209 while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) {
1210 if (!purge_queue_.empty()) {
1211 auto purge_file = purge_queue_.begin();
1212 auto fname = purge_file->fname;
11fdf7f2 1213 auto dir_to_sync = purge_file->dir_to_sync;
7c673cae
FG
1214 auto type = purge_file->type;
1215 auto number = purge_file->number;
7c673cae
FG
1216 auto job_id = purge_file->job_id;
1217 purge_queue_.pop_front();
1218
1219 mutex_.Unlock();
11fdf7f2 1220 DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
7c673cae
FG
1221 mutex_.Lock();
1222 } else {
1223 assert(!logs_to_free_queue_.empty());
1224 log::Writer* log_writer = *(logs_to_free_queue_.begin());
1225 logs_to_free_queue_.pop_front();
1226 mutex_.Unlock();
1227 delete log_writer;
1228 mutex_.Lock();
1229 }
1230 }
1231 bg_purge_scheduled_--;
1232
1233 bg_cv_.SignalAll();
1234 // IMPORTANT:there should be no code after calling SignalAll. This call may
1235 // signal the DB destructor that it's OK to proceed with destruction. In
1236 // that case, all DB variables will be dealloacated and referencing them
1237 // will cause trouble.
1238 mutex_.Unlock();
1239}
1240
1241namespace {
1242struct IterState {
1243 IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1244 bool _background_purge)
1245 : db(_db),
1246 mu(_mu),
1247 super_version(_super_version),
1248 background_purge(_background_purge) {}
1249
1250 DBImpl* db;
1251 InstrumentedMutex* mu;
1252 SuperVersion* super_version;
1253 bool background_purge;
1254};
1255
11fdf7f2 1256static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
7c673cae
FG
1257 IterState* state = reinterpret_cast<IterState*>(arg1);
1258
1259 if (state->super_version->Unref()) {
1260 // Job id == 0 means that this is not our background process, but rather
1261 // user thread
1262 JobContext job_context(0);
1263
1264 state->mu->Lock();
1265 state->super_version->Cleanup();
1266 state->db->FindObsoleteFiles(&job_context, false, true);
1267 if (state->background_purge) {
1268 state->db->ScheduleBgLogWriterClose(&job_context);
1269 }
1270 state->mu->Unlock();
1271
1272 delete state->super_version;
1273 if (job_context.HaveSomethingToDelete()) {
1274 if (state->background_purge) {
1275 // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1276 // files to be deleted to a job queue, and deletes it in a separate
1277 // background thread.
1278 state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1279 state->mu->Lock();
1280 state->db->SchedulePurge();
1281 state->mu->Unlock();
1282 } else {
1283 state->db->PurgeObsoleteFiles(job_context);
1284 }
1285 }
1286 job_context.Clean();
1287 }
1288
1289 delete state;
1290}
1291} // namespace
1292
494da23a
TL
1293InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1294 ColumnFamilyData* cfd,
1295 SuperVersion* super_version,
1296 Arena* arena,
1297 RangeDelAggregator* range_del_agg,
1298 SequenceNumber sequence) {
7c673cae
FG
1299 InternalIterator* internal_iter;
1300 assert(arena != nullptr);
1301 assert(range_del_agg != nullptr);
1302 // Need to create internal iterator from the arena.
1303 MergeIteratorBuilder merge_iter_builder(
1304 &cfd->internal_comparator(), arena,
1305 !read_options.total_order_seek &&
11fdf7f2 1306 super_version->mutable_cf_options.prefix_extractor != nullptr);
7c673cae
FG
1307 // Collect iterator for mutable mem
1308 merge_iter_builder.AddIterator(
1309 super_version->mem->NewIterator(read_options, arena));
494da23a 1310 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
7c673cae
FG
1311 Status s;
1312 if (!read_options.ignore_range_deletions) {
1313 range_del_iter.reset(
494da23a
TL
1314 super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1315 range_del_agg->AddTombstones(std::move(range_del_iter));
7c673cae
FG
1316 }
1317 // Collect all needed child iterators for immutable memtables
1318 if (s.ok()) {
1319 super_version->imm->AddIterators(read_options, &merge_iter_builder);
1320 if (!read_options.ignore_range_deletions) {
1321 s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1322 range_del_agg);
1323 }
1324 }
11fdf7f2 1325 TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
7c673cae
FG
1326 if (s.ok()) {
1327 // Collect iterators for files in L0 - Ln
1328 if (read_options.read_tier != kMemtableTier) {
1329 super_version->current->AddIterators(read_options, env_options_,
1330 &merge_iter_builder, range_del_agg);
1331 }
1332 internal_iter = merge_iter_builder.Finish();
1333 IterState* cleanup =
1334 new IterState(this, &mutex_, super_version,
494da23a
TL
1335 read_options.background_purge_on_iterator_cleanup ||
1336 immutable_db_options_.avoid_unnecessary_blocking_io);
7c673cae
FG
1337 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1338
1339 return internal_iter;
11fdf7f2
TL
1340 } else {
1341 CleanupSuperVersion(super_version);
7c673cae 1342 }
11fdf7f2 1343 return NewErrorInternalIterator<Slice>(s, arena);
7c673cae
FG
1344}
1345
1346ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1347 return default_cf_handle_;
1348}
1349
1350Status DBImpl::Get(const ReadOptions& read_options,
1351 ColumnFamilyHandle* column_family, const Slice& key,
1352 PinnableSlice* value) {
1353 return GetImpl(read_options, column_family, key, value);
1354}
1355
1356Status DBImpl::GetImpl(const ReadOptions& read_options,
1357 ColumnFamilyHandle* column_family, const Slice& key,
11fdf7f2
TL
1358 PinnableSlice* pinnable_val, bool* value_found,
1359 ReadCallback* callback, bool* is_blob_index) {
7c673cae 1360 assert(pinnable_val != nullptr);
494da23a 1361 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
7c673cae
FG
1362 StopWatch sw(env_, stats_, DB_GET);
1363 PERF_TIMER_GUARD(get_snapshot_time);
1364
1365 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1366 auto cfd = cfh->cfd();
1367
11fdf7f2
TL
1368 if (tracer_) {
1369 // TODO: This mutex should be removed later, to improve performance when
1370 // tracing is enabled.
1371 InstrumentedMutexLock lock(&trace_mutex_);
1372 if (tracer_) {
1373 tracer_->Get(column_family, key);
1374 }
1375 }
1376
7c673cae
FG
1377 // Acquire SuperVersion
1378 SuperVersion* sv = GetAndRefSuperVersion(cfd);
1379
1380 TEST_SYNC_POINT("DBImpl::GetImpl:1");
1381 TEST_SYNC_POINT("DBImpl::GetImpl:2");
1382
1383 SequenceNumber snapshot;
1384 if (read_options.snapshot != nullptr) {
11fdf7f2
TL
1385 // Note: In WritePrepared txns this is not necessary but not harmful
1386 // either. Because prep_seq > snapshot => commit_seq > snapshot so if
1387 // a snapshot is specified we should be fine with skipping seq numbers
1388 // that are greater than that.
1389 //
1390 // In WriteUnprepared, we cannot set snapshot in the lookup key because we
1391 // may skip uncommitted data that should be visible to the transaction for
1392 // reading own writes.
1393 snapshot =
1394 reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1395 if (callback) {
494da23a 1396 snapshot = std::max(snapshot, callback->max_visible_seq());
11fdf7f2 1397 }
7c673cae
FG
1398 } else {
1399 // Since we get and reference the super version before getting
1400 // the snapshot number, without a mutex protection, it is possible
1401 // that a memtable switch happened in the middle and not all the
1402 // data for this snapshot is available. But it will contain all
1403 // the data available in the super version we have, which is also
1404 // a valid snapshot to read from.
11fdf7f2
TL
1405 // We shouldn't get snapshot before finding and referencing the super
1406 // version because a flush happening in between may compact away data for
1407 // the snapshot, but the snapshot is earlier than the data overwriting it,
1408 // so users may see wrong results.
1409 snapshot = last_seq_same_as_publish_seq_
1410 ? versions_->LastSequence()
1411 : versions_->LastPublishedSequence();
7c673cae
FG
1412 }
1413 TEST_SYNC_POINT("DBImpl::GetImpl:3");
1414 TEST_SYNC_POINT("DBImpl::GetImpl:4");
1415
1416 // Prepare to store a list of merge operations if merge occurs.
1417 MergeContext merge_context;
494da23a 1418 SequenceNumber max_covering_tombstone_seq = 0;
7c673cae
FG
1419
1420 Status s;
1421 // First look in the memtable, then in the immutable memtable (if any).
1422 // s is both in/out. When in, s could either be OK or MergeInProgress.
1423 // merge_operands will contain the sequence of merges in the latter case.
1424 LookupKey lkey(key, snapshot);
1425 PERF_TIMER_STOP(get_snapshot_time);
1426
1427 bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1428 has_unpersisted_data_.load(std::memory_order_relaxed));
1429 bool done = false;
1430 if (!skip_memtable) {
1431 if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
494da23a
TL
1432 &max_covering_tombstone_seq, read_options, callback,
1433 is_blob_index)) {
7c673cae
FG
1434 done = true;
1435 pinnable_val->PinSelf();
1436 RecordTick(stats_, MEMTABLE_HIT);
1437 } else if ((s.ok() || s.IsMergeInProgress()) &&
1438 sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
494da23a 1439 &max_covering_tombstone_seq, read_options, callback,
11fdf7f2 1440 is_blob_index)) {
7c673cae
FG
1441 done = true;
1442 pinnable_val->PinSelf();
1443 RecordTick(stats_, MEMTABLE_HIT);
1444 }
1445 if (!done && !s.ok() && !s.IsMergeInProgress()) {
11fdf7f2 1446 ReturnAndCleanupSuperVersion(cfd, sv);
7c673cae
FG
1447 return s;
1448 }
1449 }
1450 if (!done) {
1451 PERF_TIMER_GUARD(get_from_output_files_time);
1452 sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
494da23a
TL
1453 &max_covering_tombstone_seq, value_found, nullptr, nullptr,
1454 callback, is_blob_index);
7c673cae
FG
1455 RecordTick(stats_, MEMTABLE_MISS);
1456 }
1457
1458 {
1459 PERF_TIMER_GUARD(get_post_process_time);
1460
1461 ReturnAndCleanupSuperVersion(cfd, sv);
1462
1463 RecordTick(stats_, NUMBER_KEYS_READ);
494da23a
TL
1464 size_t size = 0;
1465 if (s.ok()) {
1466 size = pinnable_val->size();
1467 RecordTick(stats_, BYTES_READ, size);
1468 PERF_COUNTER_ADD(get_read_bytes, size);
1469 }
1470 RecordInHistogram(stats_, BYTES_PER_READ, size);
7c673cae
FG
1471 }
1472 return s;
1473}
1474
1475std::vector<Status> DBImpl::MultiGet(
1476 const ReadOptions& read_options,
1477 const std::vector<ColumnFamilyHandle*>& column_family,
1478 const std::vector<Slice>& keys, std::vector<std::string>* values) {
494da23a 1479 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
7c673cae
FG
1480 StopWatch sw(env_, stats_, DB_MULTIGET);
1481 PERF_TIMER_GUARD(get_snapshot_time);
1482
1483 SequenceNumber snapshot;
1484
1485 struct MultiGetColumnFamilyData {
1486 ColumnFamilyData* cfd;
1487 SuperVersion* super_version;
494da23a
TL
1488 MultiGetColumnFamilyData(ColumnFamilyData* cf, SuperVersion* sv)
1489 : cfd(cf), super_version(sv) {}
7c673cae 1490 };
494da23a
TL
1491 std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1492 column_family.size());
7c673cae
FG
1493 for (auto cf : column_family) {
1494 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
1495 auto cfd = cfh->cfd();
1496 if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
494da23a
TL
1497 multiget_cf_data.emplace(cfd->GetID(),
1498 MultiGetColumnFamilyData(cfd, nullptr));
7c673cae
FG
1499 }
1500 }
1501
494da23a
TL
1502 bool last_try = false;
1503 {
1504 // If we end up with the same issue of memtable geting sealed during 2
1505 // consecutive retries, it means the write rate is very high. In that case
1506 // its probably ok to take the mutex on the 3rd try so we can succeed for
1507 // sure
1508 static const int num_retries = 3;
1509 for (auto i = 0; i < num_retries; ++i) {
1510 last_try = (i == num_retries - 1);
1511 bool retry = false;
1512
1513 if (i > 0) {
1514 for (auto mgd_iter = multiget_cf_data.begin();
1515 mgd_iter != multiget_cf_data.end(); ++mgd_iter) {
1516 auto super_version = mgd_iter->second.super_version;
1517 auto cfd = mgd_iter->second.cfd;
1518 if (super_version != nullptr) {
1519 ReturnAndCleanupSuperVersion(cfd, super_version);
1520 }
1521 mgd_iter->second.super_version = nullptr;
1522 }
1523 }
1524
1525 if (read_options.snapshot == nullptr) {
1526 if (last_try) {
1527 TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
1528 // We're close to max number of retries. For the last retry,
1529 // acquire the lock so we're sure to succeed
1530 mutex_.Lock();
1531 }
1532 snapshot = last_seq_same_as_publish_seq_
1533 ? versions_->LastSequence()
1534 : versions_->LastPublishedSequence();
1535 } else {
1536 snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
1537 ->number_;
1538 }
1539
1540 for (auto mgd_iter = multiget_cf_data.begin();
1541 mgd_iter != multiget_cf_data.end(); ++mgd_iter) {
1542 if (!last_try) {
1543 mgd_iter->second.super_version =
1544 GetAndRefSuperVersion(mgd_iter->second.cfd);
1545 } else {
1546 mgd_iter->second.super_version =
1547 mgd_iter->second.cfd->GetSuperVersion()->Ref();
1548 }
1549 TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
1550 if (read_options.snapshot != nullptr || last_try) {
1551 // If user passed a snapshot, then we don't care if a memtable is
1552 // sealed or compaction happens because the snapshot would ensure
1553 // that older key versions are kept around. If this is the last
1554 // retry, then we have the lock so nothing bad can happen
1555 continue;
1556 }
1557 // We could get the earliest sequence number for the whole list of
1558 // memtables, which will include immutable memtables as well, but that
1559 // might be tricky to maintain in case we decide, in future, to do
1560 // memtable compaction.
1561 if (!last_try) {
1562 auto seq =
1563 mgd_iter->second.super_version->mem->GetEarliestSequenceNumber();
1564 if (seq > snapshot) {
1565 retry = true;
1566 break;
1567 }
1568 }
1569 }
1570 if (!retry) {
1571 if (last_try) {
1572 mutex_.Unlock();
1573 }
1574 break;
1575 }
1576 }
7c673cae 1577 }
7c673cae
FG
1578
1579 // Contain a list of merge operations if merge occurs.
1580 MergeContext merge_context;
1581
1582 // Note: this always resizes the values array
1583 size_t num_keys = keys.size();
1584 std::vector<Status> stat_list(num_keys);
1585 values->resize(num_keys);
1586
1587 // Keep track of bytes that we read for statistics-recording later
1588 uint64_t bytes_read = 0;
1589 PERF_TIMER_STOP(get_snapshot_time);
1590
1591 // For each of the given keys, apply the entire "get" process as follows:
1592 // First look in the memtable, then in the immutable memtable (if any).
1593 // s is both in/out. When in, s could either be OK or MergeInProgress.
1594 // merge_operands will contain the sequence of merges in the latter case.
11fdf7f2 1595 size_t num_found = 0;
7c673cae
FG
1596 for (size_t i = 0; i < num_keys; ++i) {
1597 merge_context.Clear();
1598 Status& s = stat_list[i];
1599 std::string* value = &(*values)[i];
1600
1601 LookupKey lkey(keys[i], snapshot);
1602 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
494da23a 1603 SequenceNumber max_covering_tombstone_seq = 0;
7c673cae
FG
1604 auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1605 assert(mgd_iter != multiget_cf_data.end());
1606 auto mgd = mgd_iter->second;
494da23a 1607 auto super_version = mgd.super_version;
7c673cae
FG
1608 bool skip_memtable =
1609 (read_options.read_tier == kPersistedTier &&
1610 has_unpersisted_data_.load(std::memory_order_relaxed));
1611 bool done = false;
1612 if (!skip_memtable) {
1613 if (super_version->mem->Get(lkey, value, &s, &merge_context,
494da23a 1614 &max_covering_tombstone_seq, read_options)) {
7c673cae 1615 done = true;
11fdf7f2 1616 RecordTick(stats_, MEMTABLE_HIT);
7c673cae 1617 } else if (super_version->imm->Get(lkey, value, &s, &merge_context,
494da23a
TL
1618 &max_covering_tombstone_seq,
1619 read_options)) {
7c673cae 1620 done = true;
11fdf7f2 1621 RecordTick(stats_, MEMTABLE_HIT);
7c673cae
FG
1622 }
1623 }
1624 if (!done) {
1625 PinnableSlice pinnable_val;
1626 PERF_TIMER_GUARD(get_from_output_files_time);
1627 super_version->current->Get(read_options, lkey, &pinnable_val, &s,
494da23a 1628 &merge_context, &max_covering_tombstone_seq);
7c673cae 1629 value->assign(pinnable_val.data(), pinnable_val.size());
11fdf7f2 1630 RecordTick(stats_, MEMTABLE_MISS);
7c673cae
FG
1631 }
1632
1633 if (s.ok()) {
1634 bytes_read += value->size();
11fdf7f2 1635 num_found++;
7c673cae
FG
1636 }
1637 }
1638
1639 // Post processing (decrement reference counts and record statistics)
1640 PERF_TIMER_GUARD(get_post_process_time);
1641 autovector<SuperVersion*> superversions_to_delete;
1642
7c673cae
FG
1643 for (auto mgd_iter : multiget_cf_data) {
1644 auto mgd = mgd_iter.second;
494da23a
TL
1645 if (!last_try) {
1646 ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
1647 } else {
1648 mgd.cfd->GetSuperVersion()->Unref();
7c673cae
FG
1649 }
1650 }
7c673cae
FG
1651 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
1652 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
11fdf7f2 1653 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
7c673cae 1654 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
494da23a 1655 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
11fdf7f2 1656 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
7c673cae
FG
1657 PERF_TIMER_STOP(get_post_process_time);
1658
1659 return stat_list;
1660}
1661
1662Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
11fdf7f2 1663 const std::string& column_family,
7c673cae 1664 ColumnFamilyHandle** handle) {
11fdf7f2
TL
1665 assert(handle != nullptr);
1666 Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
1667 if (s.ok()) {
1668 s = WriteOptionsFile(true /*need_mutex_lock*/,
1669 true /*need_enter_write_thread*/);
1670 }
1671 return s;
1672}
1673
1674Status DBImpl::CreateColumnFamilies(
1675 const ColumnFamilyOptions& cf_options,
1676 const std::vector<std::string>& column_family_names,
1677 std::vector<ColumnFamilyHandle*>* handles) {
1678 assert(handles != nullptr);
1679 handles->clear();
1680 size_t num_cf = column_family_names.size();
1681 Status s;
1682 bool success_once = false;
1683 for (size_t i = 0; i < num_cf; i++) {
1684 ColumnFamilyHandle* handle;
1685 s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
1686 if (!s.ok()) {
1687 break;
1688 }
1689 handles->push_back(handle);
1690 success_once = true;
1691 }
1692 if (success_once) {
1693 Status persist_options_status = WriteOptionsFile(
1694 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
1695 if (s.ok() && !persist_options_status.ok()) {
1696 s = persist_options_status;
1697 }
1698 }
1699 return s;
1700}
1701
1702Status DBImpl::CreateColumnFamilies(
1703 const std::vector<ColumnFamilyDescriptor>& column_families,
1704 std::vector<ColumnFamilyHandle*>* handles) {
1705 assert(handles != nullptr);
1706 handles->clear();
1707 size_t num_cf = column_families.size();
1708 Status s;
1709 bool success_once = false;
1710 for (size_t i = 0; i < num_cf; i++) {
1711 ColumnFamilyHandle* handle;
1712 s = CreateColumnFamilyImpl(column_families[i].options,
1713 column_families[i].name, &handle);
1714 if (!s.ok()) {
1715 break;
1716 }
1717 handles->push_back(handle);
1718 success_once = true;
1719 }
1720 if (success_once) {
1721 Status persist_options_status = WriteOptionsFile(
1722 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
1723 if (s.ok() && !persist_options_status.ok()) {
1724 s = persist_options_status;
1725 }
1726 }
1727 return s;
1728}
1729
1730Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
1731 const std::string& column_family_name,
1732 ColumnFamilyHandle** handle) {
7c673cae
FG
1733 Status s;
1734 Status persist_options_status;
1735 *handle = nullptr;
1736
1737 s = CheckCompressionSupported(cf_options);
1738 if (s.ok() && immutable_db_options_.allow_concurrent_memtable_write) {
1739 s = CheckConcurrentWritesSupported(cf_options);
1740 }
11fdf7f2
TL
1741 if (s.ok()) {
1742 s = CheckCFPathsSupported(initial_db_options_, cf_options);
1743 }
1744 if (s.ok()) {
1745 for (auto& cf_path : cf_options.cf_paths) {
1746 s = env_->CreateDirIfMissing(cf_path.path);
1747 if (!s.ok()) {
1748 break;
1749 }
1750 }
1751 }
7c673cae
FG
1752 if (!s.ok()) {
1753 return s;
1754 }
1755
11fdf7f2 1756 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae
FG
1757 {
1758 InstrumentedMutexLock l(&mutex_);
1759
1760 if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
1761 nullptr) {
1762 return Status::InvalidArgument("Column family already exists");
1763 }
1764 VersionEdit edit;
1765 edit.AddColumnFamily(column_family_name);
1766 uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
1767 edit.SetColumnFamily(new_id);
1768 edit.SetLogNumber(logfile_number_);
1769 edit.SetComparatorName(cf_options.comparator->Name());
1770
1771 // LogAndApply will both write the creation in MANIFEST and create
1772 // ColumnFamilyData object
1773 { // write thread
1774 WriteThread::Writer w;
1775 write_thread_.EnterUnbatched(&w, &mutex_);
1776 // LogAndApply will both write the creation in MANIFEST and create
1777 // ColumnFamilyData object
1778 s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
1779 &mutex_, directories_.GetDbDir(), false,
1780 &cf_options);
7c673cae
FG
1781 write_thread_.ExitUnbatched(&w);
1782 }
11fdf7f2
TL
1783 if (s.ok()) {
1784 auto* cfd =
1785 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
1786 assert(cfd != nullptr);
1787 s = cfd->AddDirectories();
1788 }
7c673cae
FG
1789 if (s.ok()) {
1790 single_column_family_mode_ = false;
1791 auto* cfd =
1792 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
1793 assert(cfd != nullptr);
11fdf7f2
TL
1794 InstallSuperVersionAndScheduleWork(cfd, &sv_context,
1795 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
1796
1797 if (!cfd->mem()->IsSnapshotSupported()) {
1798 is_snapshot_supported_ = false;
1799 }
1800
11fdf7f2
TL
1801 cfd->set_initialized();
1802
7c673cae
FG
1803 *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
1804 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1805 "Created column family [%s] (ID %u)",
1806 column_family_name.c_str(), (unsigned)cfd->GetID());
1807 } else {
1808 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1809 "Creating column family [%s] FAILED -- %s",
1810 column_family_name.c_str(), s.ToString().c_str());
1811 }
1812 } // InstrumentedMutexLock l(&mutex_)
1813
11fdf7f2 1814 sv_context.Clean();
7c673cae
FG
1815 // this is outside the mutex
1816 if (s.ok()) {
1817 NewThreadStatusCfInfo(
1818 reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
7c673cae
FG
1819 }
1820 return s;
1821}
1822
1823Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
11fdf7f2
TL
1824 assert(column_family != nullptr);
1825 Status s = DropColumnFamilyImpl(column_family);
1826 if (s.ok()) {
1827 s = WriteOptionsFile(true /*need_mutex_lock*/,
1828 true /*need_enter_write_thread*/);
1829 }
1830 return s;
1831}
1832
1833Status DBImpl::DropColumnFamilies(
1834 const std::vector<ColumnFamilyHandle*>& column_families) {
1835 Status s;
1836 bool success_once = false;
1837 for (auto* handle : column_families) {
1838 s = DropColumnFamilyImpl(handle);
1839 if (!s.ok()) {
1840 break;
1841 }
1842 success_once = true;
1843 }
1844 if (success_once) {
1845 Status persist_options_status = WriteOptionsFile(
1846 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
1847 if (s.ok() && !persist_options_status.ok()) {
1848 s = persist_options_status;
1849 }
1850 }
1851 return s;
1852}
1853
1854Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
7c673cae
FG
1855 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1856 auto cfd = cfh->cfd();
1857 if (cfd->GetID() == 0) {
1858 return Status::InvalidArgument("Can't drop default column family");
1859 }
1860
1861 bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
1862
1863 VersionEdit edit;
1864 edit.DropColumnFamily();
1865 edit.SetColumnFamily(cfd->GetID());
1866
1867 Status s;
7c673cae
FG
1868 {
1869 InstrumentedMutexLock l(&mutex_);
1870 if (cfd->IsDropped()) {
1871 s = Status::InvalidArgument("Column family already dropped!\n");
1872 }
1873 if (s.ok()) {
1874 // we drop column family from a single write thread
1875 WriteThread::Writer w;
1876 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
1877 s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
1878 &mutex_);
7c673cae
FG
1879 write_thread_.ExitUnbatched(&w);
1880 }
11fdf7f2
TL
1881 if (s.ok()) {
1882 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
1883 max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
1884 mutable_cf_options->max_write_buffer_number;
1885 }
7c673cae
FG
1886
1887 if (!cf_support_snapshot) {
1888 // Dropped Column Family doesn't support snapshot. Need to recalculate
1889 // is_snapshot_supported_.
1890 bool new_is_snapshot_supported = true;
1891 for (auto c : *versions_->GetColumnFamilySet()) {
1892 if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
1893 new_is_snapshot_supported = false;
1894 break;
1895 }
1896 }
1897 is_snapshot_supported_ = new_is_snapshot_supported;
1898 }
11fdf7f2 1899 bg_cv_.SignalAll();
7c673cae
FG
1900 }
1901
1902 if (s.ok()) {
1903 // Note that here we erase the associated cf_info of the to-be-dropped
1904 // cfd before its ref-count goes to zero to avoid having to erase cf_info
1905 // later inside db_mutex.
1906 EraseThreadStatusCfInfo(cfd);
1907 assert(cfd->IsDropped());
7c673cae
FG
1908 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1909 "Dropped column family with id %u\n", cfd->GetID());
7c673cae
FG
1910 } else {
1911 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1912 "Dropping column family with id %u FAILED -- %s\n",
1913 cfd->GetID(), s.ToString().c_str());
1914 }
1915
1916 return s;
1917}
1918
1919bool DBImpl::KeyMayExist(const ReadOptions& read_options,
1920 ColumnFamilyHandle* column_family, const Slice& key,
1921 std::string* value, bool* value_found) {
1922 assert(value != nullptr);
1923 if (value_found != nullptr) {
1924 // falsify later if key-may-exist but can't fetch value
1925 *value_found = true;
1926 }
1927 ReadOptions roptions = read_options;
11fdf7f2 1928 roptions.read_tier = kBlockCacheTier; // read from block cache only
7c673cae
FG
1929 PinnableSlice pinnable_val;
1930 auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found);
1931 value->assign(pinnable_val.data(), pinnable_val.size());
1932
1933 // If block_cache is enabled and the index block of the table didn't
1934 // not present in block_cache, the return value will be Status::Incomplete.
1935 // In this case, key may still exist in the table.
1936 return s.ok() || s.IsIncomplete();
1937}
1938
1939Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
1940 ColumnFamilyHandle* column_family) {
11fdf7f2
TL
1941 if (read_options.managed) {
1942 return NewErrorIterator(
1943 Status::NotSupported("Managed iterator is not supported anymore."));
1944 }
1945 Iterator* result = nullptr;
7c673cae
FG
1946 if (read_options.read_tier == kPersistedTier) {
1947 return NewErrorIterator(Status::NotSupported(
1948 "ReadTier::kPersistedData is not yet supported in iterators."));
1949 }
11fdf7f2
TL
1950 // if iterator wants internal keys, we can only proceed if
1951 // we can guarantee the deletes haven't been processed yet
1952 if (immutable_db_options_.preserve_deletes &&
1953 read_options.iter_start_seqnum > 0 &&
1954 read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
1955 return NewErrorIterator(Status::InvalidArgument(
1956 "Iterator requested internal keys which are too old and are not"
1957 " guaranteed to be preserved, try larger iter_start_seqnum opt."));
1958 }
7c673cae
FG
1959 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1960 auto cfd = cfh->cfd();
11fdf7f2 1961 ReadCallback* read_callback = nullptr; // No read callback provided.
494da23a 1962 if (read_options.tailing) {
7c673cae
FG
1963#ifdef ROCKSDB_LITE
1964 // not supported in lite version
11fdf7f2
TL
1965 result = nullptr;
1966
7c673cae
FG
1967#else
1968 SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
1969 auto iter = new ForwardIterator(this, read_options, cfd, sv);
11fdf7f2
TL
1970 result = NewDBIterator(
1971 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
1972 cfd->user_comparator(), iter, kMaxSequenceNumber,
1973 sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
1974 this, cfd);
7c673cae
FG
1975#endif
1976 } else {
11fdf7f2
TL
1977 // Note: no need to consider the special case of
1978 // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
1979 // WritePreparedTxnDB
1980 auto snapshot = read_options.snapshot != nullptr
1981 ? read_options.snapshot->GetSequenceNumber()
1982 : versions_->LastSequence();
1983 result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
7c673cae 1984 }
11fdf7f2
TL
1985 return result;
1986}
1987
1988ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
1989 ColumnFamilyData* cfd,
1990 SequenceNumber snapshot,
1991 ReadCallback* read_callback,
1992 bool allow_blob,
1993 bool allow_refresh) {
1994 SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
1995
1996 // Try to generate a DB iterator tree in continuous memory area to be
1997 // cache friendly. Here is an example of result:
1998 // +-------------------------------+
1999 // | |
2000 // | ArenaWrappedDBIter |
2001 // | + |
2002 // | +---> Inner Iterator ------------+
2003 // | | | |
2004 // | | +-- -- -- -- -- -- -- --+ |
2005 // | +--- | Arena | |
2006 // | | | |
2007 // | Allocated Memory: | |
2008 // | | +-------------------+ |
2009 // | | | DBIter | <---+
2010 // | | + |
2011 // | | | +-> iter_ ------------+
2012 // | | | | |
2013 // | | +-------------------+ |
2014 // | | | MergingIterator | <---+
2015 // | | + |
2016 // | | | +->child iter1 ------------+
2017 // | | | | | |
2018 // | | +->child iter2 ----------+ |
2019 // | | | | | | |
2020 // | | | +->child iter3 --------+ | |
2021 // | | | | | |
2022 // | | +-------------------+ | | |
2023 // | | | Iterator1 | <--------+
2024 // | | +-------------------+ | |
2025 // | | | Iterator2 | <------+
2026 // | | +-------------------+ |
2027 // | | | Iterator3 | <----+
2028 // | | +-------------------+
2029 // | | |
2030 // +-------+-----------------------+
2031 //
2032 // ArenaWrappedDBIter inlines an arena area where all the iterators in
2033 // the iterator tree are allocated in the order of being accessed when
2034 // querying.
2035 // Laying out the iterators in the order of being accessed makes it more
2036 // likely that any iterator pointer is close to the iterator it points to so
2037 // that they are likely to be in the same cache line and/or page.
2038 ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2039 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
2040 sv->mutable_cf_options.max_sequential_skip_in_iterations,
2041 sv->version_number, read_callback, this, cfd, allow_blob,
2042 ((read_options.snapshot != nullptr) ? false : allow_refresh));
2043
2044 InternalIterator* internal_iter =
2045 NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
494da23a 2046 db_iter->GetRangeDelAggregator(), snapshot);
11fdf7f2
TL
2047 db_iter->SetIterUnderDBIter(internal_iter);
2048
2049 return db_iter;
7c673cae
FG
2050}
2051
2052Status DBImpl::NewIterators(
2053 const ReadOptions& read_options,
2054 const std::vector<ColumnFamilyHandle*>& column_families,
2055 std::vector<Iterator*>* iterators) {
11fdf7f2
TL
2056 if (read_options.managed) {
2057 return Status::NotSupported("Managed iterator is not supported anymore.");
2058 }
7c673cae
FG
2059 if (read_options.read_tier == kPersistedTier) {
2060 return Status::NotSupported(
2061 "ReadTier::kPersistedData is not yet supported in iterators.");
2062 }
11fdf7f2 2063 ReadCallback* read_callback = nullptr; // No read callback provided.
7c673cae
FG
2064 iterators->clear();
2065 iterators->reserve(column_families.size());
11fdf7f2 2066 if (read_options.tailing) {
7c673cae
FG
2067#ifdef ROCKSDB_LITE
2068 return Status::InvalidArgument(
494da23a 2069 "Tailing iterator not supported in RocksDB lite");
7c673cae
FG
2070#else
2071 for (auto cfh : column_families) {
2072 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
2073 SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
2074 auto iter = new ForwardIterator(this, read_options, cfd, sv);
2075 iterators->push_back(NewDBIterator(
11fdf7f2
TL
2076 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2077 cfd->user_comparator(), iter, kMaxSequenceNumber,
7c673cae 2078 sv->mutable_cf_options.max_sequential_skip_in_iterations,
11fdf7f2 2079 read_callback, this, cfd));
7c673cae
FG
2080 }
2081#endif
2082 } else {
11fdf7f2
TL
2083 // Note: no need to consider the special case of
2084 // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2085 // WritePreparedTxnDB
2086 auto snapshot = read_options.snapshot != nullptr
2087 ? read_options.snapshot->GetSequenceNumber()
2088 : versions_->LastSequence();
7c673cae 2089 for (size_t i = 0; i < column_families.size(); ++i) {
11fdf7f2
TL
2090 auto* cfd =
2091 reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i])->cfd();
2092 iterators->push_back(
2093 NewIteratorImpl(read_options, cfd, snapshot, read_callback));
7c673cae
FG
2094 }
2095 }
2096
2097 return Status::OK();
2098}
2099
2100const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2101
2102#ifndef ROCKSDB_LITE
2103const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2104 return GetSnapshotImpl(true);
2105}
2106#endif // ROCKSDB_LITE
2107
494da23a
TL
2108SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
2109 bool lock) {
7c673cae
FG
2110 int64_t unix_time = 0;
2111 env_->GetCurrentTime(&unix_time); // Ignore error
2112 SnapshotImpl* s = new SnapshotImpl;
2113
494da23a
TL
2114 if (lock) {
2115 mutex_.Lock();
2116 }
7c673cae
FG
2117 // returns null if the underlying memtable does not support snapshot.
2118 if (!is_snapshot_supported_) {
494da23a
TL
2119 if (lock) {
2120 mutex_.Unlock();
2121 }
7c673cae
FG
2122 delete s;
2123 return nullptr;
2124 }
11fdf7f2
TL
2125 auto snapshot_seq = last_seq_same_as_publish_seq_
2126 ? versions_->LastSequence()
2127 : versions_->LastPublishedSequence();
494da23a
TL
2128 SnapshotImpl* snapshot =
2129 snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
2130 if (lock) {
2131 mutex_.Unlock();
2132 }
2133 return snapshot;
7c673cae
FG
2134}
2135
494da23a
TL
2136namespace {
2137typedef autovector<ColumnFamilyData*, 2> CfdList;
2138bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
2139 for (const ColumnFamilyData* t : list) {
2140 if (t == cfd) {
2141 return true;
2142 }
2143 }
2144 return false;
2145}
2146} // namespace
2147
7c673cae
FG
2148void DBImpl::ReleaseSnapshot(const Snapshot* s) {
2149 const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
2150 {
2151 InstrumentedMutexLock l(&mutex_);
2152 snapshots_.Delete(casted_s);
11fdf7f2
TL
2153 uint64_t oldest_snapshot;
2154 if (snapshots_.empty()) {
2155 oldest_snapshot = last_seq_same_as_publish_seq_
2156 ? versions_->LastSequence()
2157 : versions_->LastPublishedSequence();
2158 } else {
2159 oldest_snapshot = snapshots_.oldest()->number_;
2160 }
494da23a
TL
2161 // Avoid to go through every column family by checking a global threshold
2162 // first.
2163 if (oldest_snapshot > bottommost_files_mark_threshold_) {
2164 CfdList cf_scheduled;
2165 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2166 cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
2167 if (!cfd->current()
2168 ->storage_info()
2169 ->BottommostFilesMarkedForCompaction()
2170 .empty()) {
2171 SchedulePendingCompaction(cfd);
2172 MaybeScheduleFlushOrCompaction();
2173 cf_scheduled.push_back(cfd);
2174 }
11fdf7f2 2175 }
494da23a
TL
2176
2177 // Calculate a new threshold, skipping those CFs where compactions are
2178 // scheduled. We do not do the same pass as the previous loop because
2179 // mutex might be unlocked during the loop, making the result inaccurate.
2180 SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
2181 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2182 if (CfdListContains(cf_scheduled, cfd)) {
2183 continue;
2184 }
2185 new_bottommost_files_mark_threshold = std::min(
2186 new_bottommost_files_mark_threshold,
2187 cfd->current()->storage_info()->bottommost_files_mark_threshold());
2188 }
2189 bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
11fdf7f2 2190 }
7c673cae
FG
2191 }
2192 delete casted_s;
2193}
2194
2195#ifndef ROCKSDB_LITE
2196Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
2197 TablePropertiesCollection* props) {
2198 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2199 auto cfd = cfh->cfd();
2200
2201 // Increment the ref count
2202 mutex_.Lock();
2203 auto version = cfd->current();
2204 version->Ref();
2205 mutex_.Unlock();
2206
2207 auto s = version->GetPropertiesOfAllTables(props);
2208
2209 // Decrement the ref count
2210 mutex_.Lock();
2211 version->Unref();
2212 mutex_.Unlock();
2213
2214 return s;
2215}
2216
2217Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
2218 const Range* range, std::size_t n,
2219 TablePropertiesCollection* props) {
2220 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2221 auto cfd = cfh->cfd();
2222
2223 // Increment the ref count
2224 mutex_.Lock();
2225 auto version = cfd->current();
2226 version->Ref();
2227 mutex_.Unlock();
2228
2229 auto s = version->GetPropertiesOfTablesInRange(range, n, props);
2230
2231 // Decrement the ref count
2232 mutex_.Lock();
2233 version->Unref();
2234 mutex_.Unlock();
2235
2236 return s;
2237}
2238
2239#endif // ROCKSDB_LITE
2240
11fdf7f2 2241const std::string& DBImpl::GetName() const { return dbname_; }
7c673cae 2242
11fdf7f2 2243Env* DBImpl::GetEnv() const { return env_; }
7c673cae
FG
2244
2245Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
2246 InstrumentedMutexLock l(&mutex_);
2247 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2248 return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
2249 cfh->cfd()->GetLatestCFOptions());
2250}
2251
2252DBOptions DBImpl::GetDBOptions() const {
2253 InstrumentedMutexLock l(&mutex_);
2254 return BuildDBOptions(immutable_db_options_, mutable_db_options_);
2255}
2256
2257bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
2258 const Slice& property, std::string* value) {
2259 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2260 value->clear();
2261 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2262 if (property_info == nullptr) {
2263 return false;
2264 } else if (property_info->handle_int) {
2265 uint64_t int_value;
2266 bool ret_value =
2267 GetIntPropertyInternal(cfd, *property_info, false, &int_value);
2268 if (ret_value) {
2269 *value = ToString(int_value);
2270 }
2271 return ret_value;
2272 } else if (property_info->handle_string) {
2273 InstrumentedMutexLock l(&mutex_);
2274 return cfd->internal_stats()->GetStringProperty(*property_info, property,
2275 value);
11fdf7f2
TL
2276 } else if (property_info->handle_string_dbimpl) {
2277 std::string tmp_value;
2278 bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
2279 if (ret_value) {
2280 *value = tmp_value;
2281 }
2282 return ret_value;
7c673cae
FG
2283 }
2284 // Shouldn't reach here since exactly one of handle_string and handle_int
2285 // should be non-nullptr.
2286 assert(false);
2287 return false;
2288}
2289
2290bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
2291 const Slice& property,
11fdf7f2 2292 std::map<std::string, std::string>* value) {
7c673cae
FG
2293 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2294 value->clear();
2295 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2296 if (property_info == nullptr) {
2297 return false;
2298 } else if (property_info->handle_map) {
2299 InstrumentedMutexLock l(&mutex_);
2300 return cfd->internal_stats()->GetMapProperty(*property_info, property,
2301 value);
2302 }
2303 // If we reach this point it means that handle_map is not provided for the
2304 // requested property
2305 return false;
2306}
2307
2308bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
2309 const Slice& property, uint64_t* value) {
2310 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2311 if (property_info == nullptr || property_info->handle_int == nullptr) {
2312 return false;
2313 }
2314 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2315 return GetIntPropertyInternal(cfd, *property_info, false, value);
2316}
2317
2318bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
2319 const DBPropertyInfo& property_info,
2320 bool is_locked, uint64_t* value) {
2321 assert(property_info.handle_int != nullptr);
2322 if (!property_info.need_out_of_mutex) {
2323 if (is_locked) {
2324 mutex_.AssertHeld();
2325 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2326 } else {
2327 InstrumentedMutexLock l(&mutex_);
2328 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2329 }
2330 } else {
2331 SuperVersion* sv = nullptr;
2332 if (!is_locked) {
2333 sv = GetAndRefSuperVersion(cfd);
2334 } else {
2335 sv = cfd->GetSuperVersion();
2336 }
2337
2338 bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
2339 property_info, sv->current, value);
2340
2341 if (!is_locked) {
2342 ReturnAndCleanupSuperVersion(cfd, sv);
2343 }
2344
2345 return ret;
2346 }
2347}
2348
11fdf7f2
TL
2349bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
2350 assert(value != nullptr);
2351 Statistics* statistics = immutable_db_options_.statistics.get();
2352 if (!statistics) {
2353 return false;
2354 }
2355 *value = statistics->ToString();
2356 return true;
2357}
2358
7c673cae
FG
2359#ifndef ROCKSDB_LITE
2360Status DBImpl::ResetStats() {
2361 InstrumentedMutexLock l(&mutex_);
2362 for (auto* cfd : *versions_->GetColumnFamilySet()) {
11fdf7f2
TL
2363 if (cfd->initialized()) {
2364 cfd->internal_stats()->Clear();
2365 }
7c673cae
FG
2366 }
2367 return Status::OK();
2368}
2369#endif // ROCKSDB_LITE
2370
2371bool DBImpl::GetAggregatedIntProperty(const Slice& property,
2372 uint64_t* aggregated_value) {
2373 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2374 if (property_info == nullptr || property_info->handle_int == nullptr) {
2375 return false;
2376 }
2377
2378 uint64_t sum = 0;
2379 {
2380 // Needs mutex to protect the list of column families.
2381 InstrumentedMutexLock l(&mutex_);
2382 uint64_t value;
2383 for (auto* cfd : *versions_->GetColumnFamilySet()) {
11fdf7f2
TL
2384 if (!cfd->initialized()) {
2385 continue;
2386 }
7c673cae
FG
2387 if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
2388 sum += value;
2389 } else {
2390 return false;
2391 }
2392 }
2393 }
2394 *aggregated_value = sum;
2395 return true;
2396}
2397
2398SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
2399 // TODO(ljin): consider using GetReferencedSuperVersion() directly
2400 return cfd->GetThreadLocalSuperVersion(&mutex_);
2401}
2402
2403// REQUIRED: this function should only be called on the write thread or if the
2404// mutex is held.
2405SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
2406 auto column_family_set = versions_->GetColumnFamilySet();
2407 auto cfd = column_family_set->GetColumnFamily(column_family_id);
2408 if (!cfd) {
2409 return nullptr;
2410 }
2411
2412 return GetAndRefSuperVersion(cfd);
2413}
2414
11fdf7f2
TL
2415void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
2416 // Release SuperVersion
2417 if (sv->Unref()) {
2418 {
2419 InstrumentedMutexLock l(&mutex_);
2420 sv->Cleanup();
2421 }
2422 delete sv;
2423 RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
2424 }
2425 RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
2426}
2427
7c673cae
FG
2428void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
2429 SuperVersion* sv) {
11fdf7f2
TL
2430 if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
2431 CleanupSuperVersion(sv);
7c673cae
FG
2432 }
2433}
2434
2435// REQUIRED: this function should only be called on the write thread.
2436void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
2437 SuperVersion* sv) {
2438 auto column_family_set = versions_->GetColumnFamilySet();
2439 auto cfd = column_family_set->GetColumnFamily(column_family_id);
2440
2441 // If SuperVersion is held, and we successfully fetched a cfd using
2442 // GetAndRefSuperVersion(), it must still exist.
2443 assert(cfd != nullptr);
2444 ReturnAndCleanupSuperVersion(cfd, sv);
2445}
2446
2447// REQUIRED: this function should only be called on the write thread or if the
2448// mutex is held.
2449ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
2450 ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
2451
2452 if (!cf_memtables->Seek(column_family_id)) {
2453 return nullptr;
2454 }
2455
2456 return cf_memtables->GetColumnFamilyHandle();
2457}
2458
11fdf7f2 2459// REQUIRED: mutex is NOT held.
494da23a 2460std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
11fdf7f2 2461 uint32_t column_family_id) {
11fdf7f2
TL
2462 InstrumentedMutexLock l(&mutex_);
2463
494da23a
TL
2464 auto* cfd =
2465 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
2466 if (cfd == nullptr) {
11fdf7f2
TL
2467 return nullptr;
2468 }
2469
494da23a
TL
2470 return std::unique_ptr<ColumnFamilyHandleImpl>(
2471 new ColumnFamilyHandleImpl(cfd, this, &mutex_));
11fdf7f2
TL
2472}
2473
7c673cae
FG
2474void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
2475 const Range& range,
2476 uint64_t* const count,
2477 uint64_t* const size) {
2478 ColumnFamilyHandleImpl* cfh =
2479 reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2480 ColumnFamilyData* cfd = cfh->cfd();
2481 SuperVersion* sv = GetAndRefSuperVersion(cfd);
2482
2483 // Convert user_key into a corresponding internal key.
2484 InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
2485 InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
2486 MemTable::MemTableStats memStats =
2487 sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
2488 MemTable::MemTableStats immStats =
2489 sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
2490 *count = memStats.count + immStats.count;
2491 *size = memStats.size + immStats.size;
2492
2493 ReturnAndCleanupSuperVersion(cfd, sv);
2494}
2495
2496void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
2497 const Range* range, int n, uint64_t* sizes,
2498 uint8_t include_flags) {
2499 assert(include_flags & DB::SizeApproximationFlags::INCLUDE_FILES ||
2500 include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES);
2501 Version* v;
2502 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2503 auto cfd = cfh->cfd();
2504 SuperVersion* sv = GetAndRefSuperVersion(cfd);
2505 v = sv->current;
2506
2507 for (int i = 0; i < n; i++) {
2508 // Convert user_key into a corresponding internal key.
2509 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
2510 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
2511 sizes[i] = 0;
2512 if (include_flags & DB::SizeApproximationFlags::INCLUDE_FILES) {
2513 sizes[i] += versions_->ApproximateSize(v, k1.Encode(), k2.Encode());
2514 }
2515 if (include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES) {
2516 sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
2517 sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
2518 }
2519 }
2520
2521 ReturnAndCleanupSuperVersion(cfd, sv);
2522}
2523
2524std::list<uint64_t>::iterator
2525DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
2526 // We need to remember the iterator of our insert, because after the
2527 // background job is done, we need to remove that element from
2528 // pending_outputs_.
2529 pending_outputs_.push_back(versions_->current_next_file_number());
2530 auto pending_outputs_inserted_elem = pending_outputs_.end();
2531 --pending_outputs_inserted_elem;
2532 return pending_outputs_inserted_elem;
2533}
2534
2535void DBImpl::ReleaseFileNumberFromPendingOutputs(
2536 std::list<uint64_t>::iterator v) {
2537 pending_outputs_.erase(v);
2538}
2539
2540#ifndef ROCKSDB_LITE
2541Status DBImpl::GetUpdatesSince(
494da23a 2542 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
7c673cae 2543 const TransactionLogIterator::ReadOptions& read_options) {
7c673cae
FG
2544 RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
2545 if (seq > versions_->LastSequence()) {
2546 return Status::NotFound("Requested sequence not yet written in the db");
2547 }
2548 return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
2549}
2550
2551Status DBImpl::DeleteFile(std::string name) {
2552 uint64_t number;
2553 FileType type;
2554 WalFileType log_type;
2555 if (!ParseFileName(name, &number, &type, &log_type) ||
2556 (type != kTableFile && type != kLogFile)) {
2557 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
2558 name.c_str());
2559 return Status::InvalidArgument("Invalid file name");
2560 }
2561
2562 Status status;
2563 if (type == kLogFile) {
2564 // Only allow deleting archived log files
2565 if (log_type != kArchivedLogFile) {
2566 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2567 "DeleteFile %s failed - not archived log.\n",
2568 name.c_str());
2569 return Status::NotSupported("Delete only supported for archived logs");
2570 }
11fdf7f2 2571 status = wal_manager_.DeleteFile(name, number);
7c673cae
FG
2572 if (!status.ok()) {
2573 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2574 "DeleteFile %s failed -- %s.\n", name.c_str(),
2575 status.ToString().c_str());
2576 }
2577 return status;
2578 }
2579
2580 int level;
2581 FileMetaData* metadata;
2582 ColumnFamilyData* cfd;
2583 VersionEdit edit;
2584 JobContext job_context(next_job_id_.fetch_add(1), true);
2585 {
2586 InstrumentedMutexLock l(&mutex_);
2587 status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
2588 if (!status.ok()) {
2589 ROCKS_LOG_WARN(immutable_db_options_.info_log,
2590 "DeleteFile %s failed. File not found\n", name.c_str());
2591 job_context.Clean();
2592 return Status::InvalidArgument("File not found");
2593 }
2594 assert(level < cfd->NumberLevels());
2595
2596 // If the file is being compacted no need to delete.
2597 if (metadata->being_compacted) {
2598 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2599 "DeleteFile %s Skipped. File about to be compacted\n",
2600 name.c_str());
2601 job_context.Clean();
2602 return Status::OK();
2603 }
2604
2605 // Only the files in the last level can be deleted externally.
2606 // This is to make sure that any deletion tombstones are not
2607 // lost. Check that the level passed is the last level.
2608 auto* vstoreage = cfd->current()->storage_info();
2609 for (int i = level + 1; i < cfd->NumberLevels(); i++) {
2610 if (vstoreage->NumLevelFiles(i) != 0) {
2611 ROCKS_LOG_WARN(immutable_db_options_.info_log,
2612 "DeleteFile %s FAILED. File not in last level\n",
2613 name.c_str());
2614 job_context.Clean();
2615 return Status::InvalidArgument("File not in last level");
2616 }
2617 }
2618 // if level == 0, it has to be the oldest file
2619 if (level == 0 &&
2620 vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
2621 ROCKS_LOG_WARN(immutable_db_options_.info_log,
2622 "DeleteFile %s failed ---"
2623 " target file in level 0 must be the oldest.",
2624 name.c_str());
2625 job_context.Clean();
2626 return Status::InvalidArgument("File in level 0, but not oldest");
2627 }
2628 edit.SetColumnFamily(cfd->GetID());
2629 edit.DeleteFile(level, number);
2630 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
2631 &edit, &mutex_, directories_.GetDbDir());
2632 if (status.ok()) {
494da23a
TL
2633 InstallSuperVersionAndScheduleWork(cfd,
2634 &job_context.superversion_contexts[0],
2635 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
2636 }
2637 FindObsoleteFiles(&job_context, false);
2638 } // lock released here
2639
2640 LogFlush(immutable_db_options_.info_log);
2641 // remove files outside the db-lock
2642 if (job_context.HaveSomethingToDelete()) {
2643 // Call PurgeObsoleteFiles() without holding mutex.
2644 PurgeObsoleteFiles(job_context);
2645 }
2646 job_context.Clean();
2647 return status;
2648}
2649
11fdf7f2
TL
2650Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
2651 const RangePtr* ranges, size_t n,
2652 bool include_end) {
7c673cae
FG
2653 Status status;
2654 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2655 ColumnFamilyData* cfd = cfh->cfd();
2656 VersionEdit edit;
11fdf7f2 2657 std::set<FileMetaData*> deleted_files;
7c673cae
FG
2658 JobContext job_context(next_job_id_.fetch_add(1), true);
2659 {
2660 InstrumentedMutexLock l(&mutex_);
2661 Version* input_version = cfd->current();
2662
2663 auto* vstorage = input_version->storage_info();
11fdf7f2
TL
2664 for (size_t r = 0; r < n; r++) {
2665 auto begin = ranges[r].start, end = ranges[r].limit;
2666 for (int i = 1; i < cfd->NumberLevels(); i++) {
2667 if (vstorage->LevelFiles(i).empty() ||
2668 !vstorage->OverlapInLevel(i, begin, end)) {
2669 continue;
2670 }
2671 std::vector<FileMetaData*> level_files;
2672 InternalKey begin_storage, end_storage, *begin_key, *end_key;
2673 if (begin == nullptr) {
2674 begin_key = nullptr;
2675 } else {
2676 begin_storage.SetMinPossibleForUserKey(*begin);
2677 begin_key = &begin_storage;
2678 }
2679 if (end == nullptr) {
2680 end_key = nullptr;
2681 } else {
2682 end_storage.SetMaxPossibleForUserKey(*end);
2683 end_key = &end_storage;
2684 }
7c673cae 2685
11fdf7f2
TL
2686 vstorage->GetCleanInputsWithinInterval(
2687 i, begin_key, end_key, &level_files, -1 /* hint_index */,
2688 nullptr /* file_index */);
2689 FileMetaData* level_file;
2690 for (uint32_t j = 0; j < level_files.size(); j++) {
2691 level_file = level_files[j];
7c673cae
FG
2692 if (level_file->being_compacted) {
2693 continue;
2694 }
11fdf7f2
TL
2695 if (deleted_files.find(level_file) != deleted_files.end()) {
2696 continue;
2697 }
2698 if (!include_end && end != nullptr &&
2699 cfd->user_comparator()->Compare(level_file->largest.user_key(),
2700 *end) == 0) {
2701 continue;
2702 }
7c673cae
FG
2703 edit.SetColumnFamily(cfd->GetID());
2704 edit.DeleteFile(i, level_file->fd.GetNumber());
11fdf7f2 2705 deleted_files.insert(level_file);
7c673cae
FG
2706 level_file->being_compacted = true;
2707 }
2708 }
2709 }
2710 if (edit.GetDeletedFiles().empty()) {
2711 job_context.Clean();
2712 return Status::OK();
2713 }
2714 input_version->Ref();
2715 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
2716 &edit, &mutex_, directories_.GetDbDir());
2717 if (status.ok()) {
494da23a
TL
2718 InstallSuperVersionAndScheduleWork(cfd,
2719 &job_context.superversion_contexts[0],
2720 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
2721 }
2722 for (auto* deleted_file : deleted_files) {
2723 deleted_file->being_compacted = false;
2724 }
2725 input_version->Unref();
2726 FindObsoleteFiles(&job_context, false);
2727 } // lock released here
2728
2729 LogFlush(immutable_db_options_.info_log);
2730 // remove files outside the db-lock
2731 if (job_context.HaveSomethingToDelete()) {
2732 // Call PurgeObsoleteFiles() without holding mutex.
2733 PurgeObsoleteFiles(job_context);
2734 }
2735 job_context.Clean();
2736 return status;
2737}
2738
2739void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
2740 InstrumentedMutexLock l(&mutex_);
2741 versions_->GetLiveFilesMetaData(metadata);
2742}
2743
11fdf7f2
TL
2744void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
2745 ColumnFamilyMetaData* cf_meta) {
7c673cae
FG
2746 assert(column_family);
2747 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2748 auto* sv = GetAndRefSuperVersion(cfd);
2749 sv->current->GetColumnFamilyMetaData(cf_meta);
2750 ReturnAndCleanupSuperVersion(cfd, sv);
2751}
2752
2753#endif // ROCKSDB_LITE
2754
2755Status DBImpl::CheckConsistency() {
2756 mutex_.AssertHeld();
2757 std::vector<LiveFileMetaData> metadata;
2758 versions_->GetLiveFilesMetaData(&metadata);
2759
2760 std::string corruption_messages;
2761 for (const auto& md : metadata) {
2762 // md.name has a leading "/".
2763 std::string file_path = md.db_path + md.name;
2764
2765 uint64_t fsize = 0;
2766 Status s = env_->GetFileSize(file_path, &fsize);
2767 if (!s.ok() &&
2768 env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
2769 s = Status::OK();
2770 }
2771 if (!s.ok()) {
2772 corruption_messages +=
2773 "Can't access " + md.name + ": " + s.ToString() + "\n";
2774 } else if (fsize != md.size) {
2775 corruption_messages += "Sst file size mismatch: " + file_path +
2776 ". Size recorded in manifest " +
2777 ToString(md.size) + ", actual size " +
2778 ToString(fsize) + "\n";
2779 }
2780 }
2781 if (corruption_messages.size() == 0) {
2782 return Status::OK();
2783 } else {
2784 return Status::Corruption(corruption_messages);
2785 }
2786}
2787
2788Status DBImpl::GetDbIdentity(std::string& identity) const {
2789 std::string idfilename = IdentityFileName(dbname_);
2790 const EnvOptions soptions;
494da23a 2791 std::unique_ptr<SequentialFileReader> id_file_reader;
7c673cae
FG
2792 Status s;
2793 {
494da23a 2794 std::unique_ptr<SequentialFile> idfile;
7c673cae
FG
2795 s = env_->NewSequentialFile(idfilename, &idfile, soptions);
2796 if (!s.ok()) {
2797 return s;
2798 }
11fdf7f2
TL
2799 id_file_reader.reset(
2800 new SequentialFileReader(std::move(idfile), idfilename));
7c673cae
FG
2801 }
2802
2803 uint64_t file_size;
2804 s = env_->GetFileSize(idfilename, &file_size);
2805 if (!s.ok()) {
2806 return s;
2807 }
494da23a
TL
2808 char* buffer =
2809 reinterpret_cast<char*>(alloca(static_cast<size_t>(file_size)));
7c673cae
FG
2810 Slice id;
2811 s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer);
2812 if (!s.ok()) {
2813 return s;
2814 }
2815 identity.assign(id.ToString());
2816 // If last character is '\n' remove it from identity
2817 if (identity.size() > 0 && identity.back() == '\n') {
2818 identity.pop_back();
2819 }
2820 return s;
2821}
2822
2823// Default implementation -- returns not supported status
11fdf7f2
TL
2824Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
2825 const std::string& /*column_family_name*/,
2826 ColumnFamilyHandle** /*handle*/) {
2827 return Status::NotSupported("");
2828}
2829
2830Status DB::CreateColumnFamilies(
2831 const ColumnFamilyOptions& /*cf_options*/,
2832 const std::vector<std::string>& /*column_family_names*/,
2833 std::vector<ColumnFamilyHandle*>* /*handles*/) {
7c673cae
FG
2834 return Status::NotSupported("");
2835}
11fdf7f2
TL
2836
2837Status DB::CreateColumnFamilies(
2838 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
2839 std::vector<ColumnFamilyHandle*>* /*handles*/) {
2840 return Status::NotSupported("");
2841}
2842
2843Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
2844 return Status::NotSupported("");
2845}
2846
2847Status DB::DropColumnFamilies(
2848 const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
7c673cae
FG
2849 return Status::NotSupported("");
2850}
11fdf7f2 2851
7c673cae
FG
2852Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
2853 delete column_family;
2854 return Status::OK();
2855}
2856
11fdf7f2
TL
2857DB::~DB() {}
2858
2859Status DBImpl::Close() {
2860 if (!closed_) {
2861 closed_ = true;
2862 return CloseImpl();
2863 }
2864 return Status::OK();
2865}
7c673cae
FG
2866
2867Status DB::ListColumnFamilies(const DBOptions& db_options,
2868 const std::string& name,
2869 std::vector<std::string>* column_families) {
2870 return VersionSet::ListColumnFamilies(column_families, name, db_options.env);
2871}
2872
11fdf7f2 2873Snapshot::~Snapshot() {}
7c673cae 2874
11fdf7f2
TL
2875Status DestroyDB(const std::string& dbname, const Options& options,
2876 const std::vector<ColumnFamilyDescriptor>& column_families) {
2877 ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
7c673cae
FG
2878 Env* env = soptions.env;
2879 std::vector<std::string> filenames;
2880
11fdf7f2
TL
2881 // Reset the logger because it holds a handle to the
2882 // log file and prevents cleanup and directory removal
2883 soptions.info_log.reset();
7c673cae
FG
2884 // Ignore error in case directory does not exist
2885 env->GetChildren(dbname, &filenames);
2886
2887 FileLock* lock;
2888 const std::string lockname = LockFileName(dbname);
2889 Status result = env->LockFile(lockname, &lock);
2890 if (result.ok()) {
2891 uint64_t number;
2892 FileType type;
2893 InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
11fdf7f2
TL
2894 for (const auto& fname : filenames) {
2895 if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
494da23a 2896 type != kDBLockFile) { // Lock file will be deleted at end
7c673cae 2897 Status del;
11fdf7f2 2898 std::string path_to_delete = dbname + "/" + fname;
7c673cae
FG
2899 if (type == kMetaDatabase) {
2900 del = DestroyDB(path_to_delete, options);
494da23a
TL
2901 } else if (type == kTableFile || type == kLogFile) {
2902 del = DeleteDBFile(&soptions, path_to_delete, dbname);
7c673cae
FG
2903 } else {
2904 del = env->DeleteFile(path_to_delete);
2905 }
2906 if (result.ok() && !del.ok()) {
2907 result = del;
2908 }
2909 }
2910 }
2911
11fdf7f2
TL
2912 std::vector<std::string> paths;
2913
2914 for (const auto& path : options.db_paths) {
2915 paths.emplace_back(path.path);
2916 }
2917 for (const auto& cf : column_families) {
2918 for (const auto& path : cf.options.cf_paths) {
2919 paths.emplace_back(path.path);
2920 }
2921 }
2922
2923 // Remove duplicate paths.
2924 // Note that we compare only the actual paths but not path ids.
2925 // This reason is that same path can appear at different path_ids
2926 // for different column families.
2927 std::sort(paths.begin(), paths.end());
2928 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
2929
2930 for (const auto& path : paths) {
2931 if (env->GetChildren(path, &filenames).ok()) {
2932 for (const auto& fname : filenames) {
2933 if (ParseFileName(fname, &number, &type) &&
494da23a 2934 type == kTableFile) { // Lock file will be deleted at end
11fdf7f2 2935 std::string table_path = path + "/" + fname;
494da23a 2936 Status del = DeleteDBFile(&soptions, table_path, dbname);
11fdf7f2
TL
2937 if (result.ok() && !del.ok()) {
2938 result = del;
2939 }
7c673cae
FG
2940 }
2941 }
11fdf7f2 2942 env->DeleteDir(path);
7c673cae
FG
2943 }
2944 }
2945
2946 std::vector<std::string> walDirFiles;
2947 std::string archivedir = ArchivalDirectory(dbname);
11fdf7f2 2948 bool wal_dir_exists = false;
7c673cae 2949 if (dbname != soptions.wal_dir) {
11fdf7f2 2950 wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
7c673cae
FG
2951 archivedir = ArchivalDirectory(soptions.wal_dir);
2952 }
2953
11fdf7f2
TL
2954 // Archive dir may be inside wal dir or dbname and should be
2955 // processed and removed before those otherwise we have issues
2956 // removing them
2957 std::vector<std::string> archiveFiles;
2958 if (env->GetChildren(archivedir, &archiveFiles).ok()) {
2959 // Delete archival files.
2960 for (const auto& file : archiveFiles) {
494da23a
TL
2961 if (ParseFileName(file, &number, &type) && type == kLogFile) {
2962 Status del =
2963 DeleteDBFile(&soptions, archivedir + "/" + file, archivedir);
11fdf7f2
TL
2964 if (result.ok() && !del.ok()) {
2965 result = del;
2966 }
7c673cae
FG
2967 }
2968 }
11fdf7f2 2969 env->DeleteDir(archivedir);
7c673cae
FG
2970 }
2971
11fdf7f2
TL
2972 // Delete log files in the WAL dir
2973 if (wal_dir_exists) {
2974 for (const auto& file : walDirFiles) {
2975 if (ParseFileName(file, &number, &type) && type == kLogFile) {
494da23a
TL
2976 Status del =
2977 DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
2978 soptions.wal_dir);
11fdf7f2
TL
2979 if (result.ok() && !del.ok()) {
2980 result = del;
2981 }
7c673cae
FG
2982 }
2983 }
11fdf7f2 2984 env->DeleteDir(soptions.wal_dir);
7c673cae
FG
2985 }
2986
7c673cae
FG
2987 env->UnlockFile(lock); // Ignore error since state is already gone
2988 env->DeleteFile(lockname);
2989 env->DeleteDir(dbname); // Ignore error in case dir contains other files
7c673cae
FG
2990 }
2991 return result;
2992}
2993
11fdf7f2
TL
2994Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
2995 bool need_enter_write_thread) {
7c673cae 2996#ifndef ROCKSDB_LITE
11fdf7f2
TL
2997 WriteThread::Writer w;
2998 if (need_mutex_lock) {
2999 mutex_.Lock();
3000 } else {
3001 mutex_.AssertHeld();
3002 }
3003 if (need_enter_write_thread) {
3004 write_thread_.EnterUnbatched(&w, &mutex_);
3005 }
7c673cae
FG
3006
3007 std::vector<std::string> cf_names;
3008 std::vector<ColumnFamilyOptions> cf_opts;
3009
3010 // This part requires mutex to protect the column family options
3011 for (auto cfd : *versions_->GetColumnFamilySet()) {
3012 if (cfd->IsDropped()) {
3013 continue;
3014 }
3015 cf_names.push_back(cfd->GetName());
3016 cf_opts.push_back(cfd->GetLatestCFOptions());
3017 }
3018
3019 // Unlock during expensive operations. New writes cannot get here
3020 // because the single write thread ensures all new writes get queued.
3021 DBOptions db_options =
3022 BuildDBOptions(immutable_db_options_, mutable_db_options_);
3023 mutex_.Unlock();
3024
11fdf7f2
TL
3025 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
3026 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
3027
7c673cae
FG
3028 std::string file_name =
3029 TempOptionsFileName(GetName(), versions_->NewFileNumber());
3030 Status s =
3031 PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name, GetEnv());
3032
3033 if (s.ok()) {
3034 s = RenameTempFileToOptionsFile(file_name);
3035 }
11fdf7f2
TL
3036 // restore lock
3037 if (!need_mutex_lock) {
3038 mutex_.Lock();
3039 }
3040 if (need_enter_write_thread) {
3041 write_thread_.ExitUnbatched(&w);
3042 }
3043 if (!s.ok()) {
3044 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3045 "Unnable to persist options -- %s", s.ToString().c_str());
3046 if (immutable_db_options_.fail_if_options_file_error) {
3047 return Status::IOError("Unable to persist options.",
3048 s.ToString().c_str());
3049 }
3050 }
7c673cae 3051#else
11fdf7f2
TL
3052 (void)need_mutex_lock;
3053 (void)need_enter_write_thread;
7c673cae 3054#endif // !ROCKSDB_LITE
11fdf7f2 3055 return Status::OK();
7c673cae
FG
3056}
3057
3058#ifndef ROCKSDB_LITE
3059namespace {
3060void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
3061 const size_t num_files_to_keep,
3062 const std::shared_ptr<Logger>& info_log,
3063 Env* env) {
3064 if (filenames.size() <= num_files_to_keep) {
3065 return;
3066 }
3067 for (auto iter = std::next(filenames.begin(), num_files_to_keep);
3068 iter != filenames.end(); ++iter) {
3069 if (!env->DeleteFile(iter->second).ok()) {
3070 ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
3071 iter->second.c_str());
3072 }
3073 }
3074}
3075} // namespace
3076#endif // !ROCKSDB_LITE
3077
3078Status DBImpl::DeleteObsoleteOptionsFiles() {
3079#ifndef ROCKSDB_LITE
3080 std::vector<std::string> filenames;
3081 // use ordered map to store keep the filenames sorted from the newest
3082 // to the oldest.
3083 std::map<uint64_t, std::string> options_filenames;
3084 Status s;
3085 s = GetEnv()->GetChildren(GetName(), &filenames);
3086 if (!s.ok()) {
3087 return s;
3088 }
3089 for (auto& filename : filenames) {
3090 uint64_t file_number;
3091 FileType type;
3092 if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
3093 options_filenames.insert(
3094 {std::numeric_limits<uint64_t>::max() - file_number,
3095 GetName() + "/" + filename});
3096 }
3097 }
3098
3099 // Keeps the latest 2 Options file
3100 const size_t kNumOptionsFilesKept = 2;
3101 DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
3102 immutable_db_options_.info_log, GetEnv());
3103 return Status::OK();
3104#else
3105 return Status::OK();
3106#endif // !ROCKSDB_LITE
3107}
3108
3109Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
3110#ifndef ROCKSDB_LITE
3111 Status s;
3112
494da23a 3113 uint64_t options_file_number = versions_->NewFileNumber();
7c673cae 3114 std::string options_file_name =
494da23a 3115 OptionsFileName(GetName(), options_file_number);
7c673cae
FG
3116 // Retry if the file name happen to conflict with an existing one.
3117 s = GetEnv()->RenameFile(file_name, options_file_name);
494da23a
TL
3118 if (s.ok()) {
3119 InstrumentedMutexLock l(&mutex_);
3120 versions_->options_file_number_ = options_file_number;
3121 }
7c673cae 3122
11fdf7f2
TL
3123 if (0 == disable_delete_obsolete_files_) {
3124 DeleteObsoleteOptionsFiles();
3125 }
7c673cae
FG
3126 return s;
3127#else
11fdf7f2 3128 (void)file_name;
7c673cae
FG
3129 return Status::OK();
3130#endif // !ROCKSDB_LITE
3131}
3132
3133#ifdef ROCKSDB_USING_THREAD_STATUS
3134
11fdf7f2 3135void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
7c673cae
FG
3136 if (immutable_db_options_.enable_thread_tracking) {
3137 ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
3138 cfd->ioptions()->env);
3139 }
3140}
3141
11fdf7f2 3142void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
7c673cae
FG
3143 if (immutable_db_options_.enable_thread_tracking) {
3144 ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
3145 }
3146}
3147
3148void DBImpl::EraseThreadStatusDbInfo() const {
3149 if (immutable_db_options_.enable_thread_tracking) {
3150 ThreadStatusUtil::EraseDatabaseInfo(this);
3151 }
3152}
3153
3154#else
11fdf7f2 3155void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
7c673cae 3156
11fdf7f2 3157void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
7c673cae 3158
11fdf7f2 3159void DBImpl::EraseThreadStatusDbInfo() const {}
7c673cae
FG
3160#endif // ROCKSDB_USING_THREAD_STATUS
3161
3162//
3163// A global method that can dump out the build version
11fdf7f2 3164void DumpRocksDBBuildVersion(Logger* log) {
7c673cae 3165#if !defined(IOS_CROSS_COMPILE)
11fdf7f2 3166 // if we compile with Xcode, we don't run build_detect_version, so we don't
7c673cae
FG
3167 // generate util/build_version.cc
3168 ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
3169 ROCKSDB_MINOR, ROCKSDB_PATCH);
3170 ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
3171 ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
494da23a
TL
3172#else
3173 (void)log; // ignore "-Wunused-parameter"
7c673cae
FG
3174#endif
3175}
3176
3177#ifndef ROCKSDB_LITE
3178SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
3179 bool include_history) {
3180 // Find the earliest sequence number that we know we can rely on reading
3181 // from the memtable without needing to check sst files.
3182 SequenceNumber earliest_seq =
3183 sv->imm->GetEarliestSequenceNumber(include_history);
3184 if (earliest_seq == kMaxSequenceNumber) {
3185 earliest_seq = sv->mem->GetEarliestSequenceNumber();
3186 }
3187 assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
3188
3189 return earliest_seq;
3190}
3191#endif // ROCKSDB_LITE
3192
3193#ifndef ROCKSDB_LITE
3194Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
3195 bool cache_only, SequenceNumber* seq,
11fdf7f2
TL
3196 bool* found_record_for_key,
3197 bool* is_blob_index) {
7c673cae
FG
3198 Status s;
3199 MergeContext merge_context;
494da23a 3200 SequenceNumber max_covering_tombstone_seq = 0;
7c673cae
FG
3201
3202 ReadOptions read_options;
3203 SequenceNumber current_seq = versions_->LastSequence();
3204 LookupKey lkey(key, current_seq);
3205
3206 *seq = kMaxSequenceNumber;
3207 *found_record_for_key = false;
3208
3209 // Check if there is a record for this key in the latest memtable
494da23a
TL
3210 sv->mem->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
3211 seq, read_options, nullptr /*read_callback*/, is_blob_index);
7c673cae
FG
3212
3213 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3214 // unexpected error reading memtable.
3215 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3216 "Unexpected status returned from MemTable::Get: %s\n",
3217 s.ToString().c_str());
3218
3219 return s;
3220 }
3221
3222 if (*seq != kMaxSequenceNumber) {
3223 // Found a sequence number, no need to check immutable memtables
3224 *found_record_for_key = true;
3225 return Status::OK();
3226 }
3227
3228 // Check if there is a record for this key in the immutable memtables
494da23a
TL
3229 sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
3230 seq, read_options, nullptr /*read_callback*/, is_blob_index);
7c673cae
FG
3231
3232 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3233 // unexpected error reading memtable.
3234 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3235 "Unexpected status returned from MemTableList::Get: %s\n",
3236 s.ToString().c_str());
3237
3238 return s;
3239 }
3240
3241 if (*seq != kMaxSequenceNumber) {
3242 // Found a sequence number, no need to check memtable history
3243 *found_record_for_key = true;
3244 return Status::OK();
3245 }
3246
3247 // Check if there is a record for this key in the immutable memtables
494da23a
TL
3248 sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context,
3249 &max_covering_tombstone_seq, seq, read_options,
3250 is_blob_index);
7c673cae
FG
3251
3252 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3253 // unexpected error reading memtable.
3254 ROCKS_LOG_ERROR(
3255 immutable_db_options_.info_log,
3256 "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
3257 s.ToString().c_str());
3258
3259 return s;
3260 }
3261
3262 if (*seq != kMaxSequenceNumber) {
3263 // Found a sequence number, no need to check SST files
3264 *found_record_for_key = true;
3265 return Status::OK();
3266 }
3267
3268 // TODO(agiardullo): possible optimization: consider checking cached
3269 // SST files if cache_only=true?
3270 if (!cache_only) {
3271 // Check tables
3272 sv->current->Get(read_options, lkey, nullptr, &s, &merge_context,
494da23a 3273 &max_covering_tombstone_seq, nullptr /* value_found */,
11fdf7f2
TL
3274 found_record_for_key, seq, nullptr /*read_callback*/,
3275 is_blob_index);
7c673cae
FG
3276
3277 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3278 // unexpected error reading SST files
3279 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3280 "Unexpected status returned from Version::Get: %s\n",
3281 s.ToString().c_str());
7c673cae
FG
3282 }
3283 }
3284
494da23a 3285 return s;
7c673cae
FG
3286}
3287
3288Status DBImpl::IngestExternalFile(
3289 ColumnFamilyHandle* column_family,
3290 const std::vector<std::string>& external_files,
3291 const IngestExternalFileOptions& ingestion_options) {
494da23a
TL
3292 IngestExternalFileArg arg;
3293 arg.column_family = column_family;
3294 arg.external_files = external_files;
3295 arg.options = ingestion_options;
3296 return IngestExternalFiles({arg});
3297}
7c673cae 3298
494da23a
TL
3299Status DBImpl::IngestExternalFiles(
3300 const std::vector<IngestExternalFileArg>& args) {
3301 if (args.empty()) {
3302 return Status::InvalidArgument("ingestion arg list is empty");
3303 }
3304 {
3305 std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
3306 for (const auto& arg : args) {
3307 if (arg.column_family == nullptr) {
3308 return Status::InvalidArgument("column family handle is null");
3309 } else if (unique_cfhs.count(arg.column_family) > 0) {
3310 return Status::InvalidArgument(
3311 "ingestion args have duplicate column families");
3312 }
3313 unique_cfhs.insert(arg.column_family);
3314 }
3315 }
3316 // Ingest multiple external SST files atomically.
3317 size_t num_cfs = args.size();
3318 for (size_t i = 0; i != num_cfs; ++i) {
3319 if (args[i].external_files.empty()) {
3320 char err_msg[128] = {0};
3321 snprintf(err_msg, 128, "external_files[%zu] is empty", i);
3322 return Status::InvalidArgument(err_msg);
3323 }
3324 }
3325 for (const auto& arg : args) {
3326 const IngestExternalFileOptions& ingest_opts = arg.options;
3327 if (ingest_opts.ingest_behind &&
3328 !immutable_db_options_.allow_ingest_behind) {
11fdf7f2 3329 return Status::InvalidArgument(
494da23a 3330 "can't ingest_behind file in DB with allow_ingest_behind=false");
11fdf7f2
TL
3331 }
3332 }
3333
494da23a
TL
3334 // TODO (yanqin) maybe handle the case in which column_families have
3335 // duplicates
7c673cae 3336 std::list<uint64_t>::iterator pending_output_elem;
494da23a
TL
3337 size_t total = 0;
3338 for (const auto& arg : args) {
3339 total += arg.external_files.size();
11fdf7f2 3340 }
494da23a
TL
3341 uint64_t next_file_number = 0;
3342 Status status = ReserveFileNumbersBeforeIngestion(
3343 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
3344 &pending_output_elem, &next_file_number);
11fdf7f2
TL
3345 if (!status.ok()) {
3346 InstrumentedMutexLock l(&mutex_);
3347 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
3348 return status;
7c673cae
FG
3349 }
3350
494da23a
TL
3351 std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
3352 for (const auto& arg : args) {
3353 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
3354 ingestion_jobs.emplace_back(env_, versions_.get(), cfd,
3355 immutable_db_options_, env_options_,
3356 &snapshots_, arg.options);
3357 }
3358 std::vector<std::pair<bool, Status>> exec_results;
3359 for (size_t i = 0; i != num_cfs; ++i) {
3360 exec_results.emplace_back(false, Status::OK());
3361 }
3362 // TODO(yanqin) maybe make jobs run in parallel
3363 for (size_t i = 1; i != num_cfs; ++i) {
3364 uint64_t start_file_number =
3365 next_file_number + args[i - 1].external_files.size();
3366 auto* cfd =
3367 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
3368 SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
3369 exec_results[i].second = ingestion_jobs[i].Prepare(
3370 args[i].external_files, start_file_number, super_version);
3371 exec_results[i].first = true;
3372 CleanupSuperVersion(super_version);
3373 }
3374 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
3375 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
3376 {
3377 auto* cfd =
3378 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
3379 SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
3380 exec_results[0].second = ingestion_jobs[0].Prepare(
3381 args[0].external_files, next_file_number, super_version);
3382 exec_results[0].first = true;
3383 CleanupSuperVersion(super_version);
3384 }
3385 for (const auto& exec_result : exec_results) {
3386 if (!exec_result.second.ok()) {
3387 status = exec_result.second;
3388 break;
3389 }
3390 }
7c673cae 3391 if (!status.ok()) {
494da23a
TL
3392 for (size_t i = 0; i != num_cfs; ++i) {
3393 if (exec_results[i].first) {
3394 ingestion_jobs[i].Cleanup(status);
3395 }
3396 }
11fdf7f2
TL
3397 InstrumentedMutexLock l(&mutex_);
3398 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
7c673cae
FG
3399 return status;
3400 }
3401
494da23a
TL
3402 std::vector<SuperVersionContext> sv_ctxs;
3403 for (size_t i = 0; i != num_cfs; ++i) {
3404 sv_ctxs.emplace_back(true /* create_superversion */);
3405 }
3406 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
3407 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
7c673cae
FG
3408 TEST_SYNC_POINT("DBImpl::AddFile:Start");
3409 {
7c673cae
FG
3410 InstrumentedMutexLock l(&mutex_);
3411 TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
3412
11fdf7f2 3413 // Stop writes to the DB by entering both write threads
7c673cae
FG
3414 WriteThread::Writer w;
3415 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
3416 WriteThread::Writer nonmem_w;
3417 if (two_write_queues_) {
3418 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
3419 }
7c673cae 3420
494da23a
TL
3421 num_running_ingest_file_ += static_cast<int>(num_cfs);
3422 TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
7c673cae 3423
494da23a
TL
3424 bool at_least_one_cf_need_flush = false;
3425 std::vector<bool> need_flush(num_cfs, false);
3426 for (size_t i = 0; i != num_cfs; ++i) {
3427 auto* cfd =
3428 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
3429 if (cfd->IsDropped()) {
3430 // TODO (yanqin) investigate whether we should abort ingestion or
3431 // proceed with other non-dropped column families.
3432 status = Status::InvalidArgument(
3433 "cannot ingest an external file into a dropped CF");
3434 break;
3435 }
3436 bool tmp = false;
3437 status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
3438 need_flush[i] = tmp;
3439 at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
3440 if (!status.ok()) {
3441 break;
3442 }
7c673cae 3443 }
494da23a
TL
3444 TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
3445 &at_least_one_cf_need_flush);
7c673cae 3446
494da23a
TL
3447 if (status.ok() && at_least_one_cf_need_flush) {
3448 FlushOptions flush_opts;
3449 flush_opts.allow_write_stall = true;
3450 if (immutable_db_options_.atomic_flush) {
3451 autovector<ColumnFamilyData*> cfds_to_flush;
3452 SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
7c673cae 3453 mutex_.Unlock();
494da23a
TL
3454 status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
3455 FlushReason::kExternalFileIngestion,
3456 true /* writes_stopped */);
7c673cae 3457 mutex_.Lock();
494da23a
TL
3458 } else {
3459 for (size_t i = 0; i != num_cfs; ++i) {
3460 if (need_flush[i]) {
3461 mutex_.Unlock();
3462 auto* cfd =
3463 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
3464 ->cfd();
3465 status = FlushMemTable(cfd, flush_opts,
3466 FlushReason::kExternalFileIngestion,
3467 true /* writes_stopped */);
3468 mutex_.Lock();
3469 if (!status.ok()) {
3470 break;
3471 }
3472 }
3473 }
7c673cae
FG
3474 }
3475 }
494da23a 3476 // Run ingestion jobs.
7c673cae 3477 if (status.ok()) {
494da23a
TL
3478 for (size_t i = 0; i != num_cfs; ++i) {
3479 status = ingestion_jobs[i].Run();
3480 if (!status.ok()) {
3481 break;
3482 }
3483 }
7c673cae 3484 }
7c673cae 3485 if (status.ok()) {
494da23a
TL
3486 bool should_increment_last_seqno =
3487 ingestion_jobs[0].ShouldIncrementLastSequence();
3488#ifndef NDEBUG
3489 for (size_t i = 1; i != num_cfs; ++i) {
3490 assert(should_increment_last_seqno ==
3491 ingestion_jobs[i].ShouldIncrementLastSequence());
3492 }
3493#endif
3494 if (should_increment_last_seqno) {
3495 const SequenceNumber last_seqno = versions_->LastSequence();
3496 versions_->SetLastAllocatedSequence(last_seqno + 1);
3497 versions_->SetLastPublishedSequence(last_seqno + 1);
3498 versions_->SetLastSequence(last_seqno + 1);
3499 }
3500 autovector<ColumnFamilyData*> cfds_to_commit;
3501 autovector<const MutableCFOptions*> mutable_cf_options_list;
3502 autovector<autovector<VersionEdit*>> edit_lists;
3503 uint32_t num_entries = 0;
3504 for (size_t i = 0; i != num_cfs; ++i) {
3505 auto* cfd =
3506 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
3507 if (cfd->IsDropped()) {
3508 continue;
3509 }
3510 cfds_to_commit.push_back(cfd);
3511 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
3512 autovector<VersionEdit*> edit_list;
3513 edit_list.push_back(ingestion_jobs[i].edit());
3514 edit_lists.push_back(edit_list);
3515 ++num_entries;
3516 }
3517 // Mark the version edits as an atomic group if the number of version
3518 // edits exceeds 1.
3519 if (cfds_to_commit.size() > 1) {
3520 for (auto& edits : edit_lists) {
3521 assert(edits.size() == 1);
3522 edits[0]->MarkAtomicGroup(--num_entries);
3523 }
3524 assert(0 == num_entries);
3525 }
7c673cae 3526 status =
494da23a
TL
3527 versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
3528 edit_lists, &mutex_, directories_.GetDbDir());
7c673cae 3529 }
494da23a 3530
7c673cae 3531 if (status.ok()) {
494da23a
TL
3532 for (size_t i = 0; i != num_cfs; ++i) {
3533 auto* cfd =
3534 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
3535 if (!cfd->IsDropped()) {
3536 InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
3537 *cfd->GetLatestMutableCFOptions());
3538#ifndef NDEBUG
3539 if (0 == i && num_cfs > 1) {
3540 TEST_SYNC_POINT(
3541 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
3542 TEST_SYNC_POINT(
3543 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
3544 }
3545#endif // !NDEBUG
3546 }
3547 }
7c673cae
FG
3548 }
3549
3550 // Resume writes to the DB
11fdf7f2
TL
3551 if (two_write_queues_) {
3552 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
3553 }
7c673cae
FG
3554 write_thread_.ExitUnbatched(&w);
3555
7c673cae 3556 if (status.ok()) {
494da23a
TL
3557 for (auto& job : ingestion_jobs) {
3558 job.UpdateStats();
3559 }
7c673cae 3560 }
7c673cae 3561 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
494da23a
TL
3562 num_running_ingest_file_ -= static_cast<int>(num_cfs);
3563 if (0 == num_running_ingest_file_) {
7c673cae
FG
3564 bg_cv_.SignalAll();
3565 }
7c673cae
FG
3566 TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
3567 }
3568 // mutex_ is unlocked here
3569
3570 // Cleanup
494da23a
TL
3571 for (size_t i = 0; i != num_cfs; ++i) {
3572 sv_ctxs[i].Clean();
3573 // This may rollback jobs that have completed successfully. This is
3574 // intended for atomicity.
3575 ingestion_jobs[i].Cleanup(status);
3576 }
7c673cae 3577 if (status.ok()) {
494da23a
TL
3578 for (size_t i = 0; i != num_cfs; ++i) {
3579 auto* cfd =
3580 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
3581 if (!cfd->IsDropped()) {
3582 NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
3583 }
3584 }
7c673cae 3585 }
7c673cae
FG
3586 return status;
3587}
3588
11fdf7f2
TL
3589Status DBImpl::VerifyChecksum() {
3590 Status s;
3591 std::vector<ColumnFamilyData*> cfd_list;
3592 {
3593 InstrumentedMutexLock l(&mutex_);
3594 for (auto cfd : *versions_->GetColumnFamilySet()) {
3595 if (!cfd->IsDropped() && cfd->initialized()) {
3596 cfd->Ref();
3597 cfd_list.push_back(cfd);
3598 }
3599 }
3600 }
3601 std::vector<SuperVersion*> sv_list;
3602 for (auto cfd : cfd_list) {
3603 sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_));
3604 }
3605 for (auto& sv : sv_list) {
3606 VersionStorageInfo* vstorage = sv->current->storage_info();
3607 ColumnFamilyData* cfd = sv->current->cfd();
3608 Options opts;
3609 {
3610 InstrumentedMutexLock l(&mutex_);
494da23a
TL
3611 opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
3612 cfd->GetLatestCFOptions());
11fdf7f2
TL
3613 }
3614 for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
3615 for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
3616 j++) {
3617 const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
3618 std::string fname = TableFileName(cfd->ioptions()->cf_paths,
3619 fd.GetNumber(), fd.GetPathId());
3620 s = rocksdb::VerifySstFileChecksum(opts, env_options_, fname);
3621 }
3622 }
3623 if (!s.ok()) {
3624 break;
3625 }
3626 }
3627 {
3628 InstrumentedMutexLock l(&mutex_);
3629 for (auto sv : sv_list) {
3630 if (sv && sv->Unref()) {
3631 sv->Cleanup();
3632 delete sv;
3633 }
3634 }
3635 for (auto cfd : cfd_list) {
3636 cfd->Unref();
3637 }
3638 }
3639 return s;
3640}
3641
7c673cae
FG
3642void DBImpl::NotifyOnExternalFileIngested(
3643 ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
7c673cae
FG
3644 if (immutable_db_options_.listeners.empty()) {
3645 return;
3646 }
3647
3648 for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
3649 ExternalFileIngestionInfo info;
3650 info.cf_name = cfd->GetName();
3651 info.external_file_path = f.external_file_path;
3652 info.internal_file_path = f.internal_file_path;
3653 info.global_seqno = f.assigned_seqno;
3654 info.table_properties = f.table_properties;
3655 for (auto listener : immutable_db_options_.listeners) {
3656 listener->OnExternalFileIngested(this, info);
3657 }
3658 }
7c673cae
FG
3659}
3660
3661void DBImpl::WaitForIngestFile() {
3662 mutex_.AssertHeld();
3663 while (num_running_ingest_file_ > 0) {
3664 bg_cv_.Wait();
3665 }
3666}
3667
494da23a 3668Status DBImpl::StartTrace(const TraceOptions& trace_options,
11fdf7f2
TL
3669 std::unique_ptr<TraceWriter>&& trace_writer) {
3670 InstrumentedMutexLock lock(&trace_mutex_);
494da23a 3671 tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
11fdf7f2
TL
3672 return Status::OK();
3673}
3674
3675Status DBImpl::EndTrace() {
3676 InstrumentedMutexLock lock(&trace_mutex_);
494da23a
TL
3677 Status s;
3678 if (tracer_ != nullptr) {
3679 s = tracer_->Close();
3680 tracer_.reset();
3681 } else {
3682 return Status::IOError("No trace file to close");
3683 }
11fdf7f2
TL
3684 return s;
3685}
3686
3687Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
3688 Status s;
3689 if (tracer_) {
3690 InstrumentedMutexLock lock(&trace_mutex_);
3691 if (tracer_) {
3692 s = tracer_->IteratorSeek(cf_id, key);
3693 }
3694 }
3695 return s;
3696}
3697
3698Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
3699 const Slice& key) {
3700 Status s;
3701 if (tracer_) {
3702 InstrumentedMutexLock lock(&trace_mutex_);
3703 if (tracer_) {
3704 s = tracer_->IteratorSeekForPrev(cf_id, key);
3705 }
3706 }
3707 return s;
3708}
3709
494da23a
TL
3710Status DBImpl::ReserveFileNumbersBeforeIngestion(
3711 ColumnFamilyData* cfd, uint64_t num,
3712 std::list<uint64_t>::iterator* pending_output_elem,
3713 uint64_t* next_file_number) {
3714 Status s;
3715 SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
3716 assert(nullptr != pending_output_elem);
3717 assert(nullptr != next_file_number);
3718 InstrumentedMutexLock l(&mutex_);
3719 if (error_handler_.IsDBStopped()) {
3720 // Do not ingest files when there is a bg_error
3721 return error_handler_.GetBGError();
3722 }
3723 *pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
3724 *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
3725 auto cf_options = cfd->GetLatestMutableCFOptions();
3726 VersionEdit dummy_edit;
3727 // If crash happen after a hard link established, Recover function may
3728 // reuse the file number that has already assigned to the internal file,
3729 // and this will overwrite the external file. To protect the external
3730 // file, we have to make sure the file number will never being reused.
3731 s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
3732 directories_.GetDbDir());
3733 if (s.ok()) {
3734 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
3735 }
3736 dummy_sv_ctx.Clean();
3737 return s;
3738}
7c673cae
FG
3739#endif // ROCKSDB_LITE
3740
3741} // namespace rocksdb