]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl/db_impl.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
f67539c2 9#include "db/db_impl/db_impl.h"
7c673cae 10
7c673cae
FG
11#include <stdint.h>
12#ifdef OS_SOLARIS
13#include <alloca.h>
14#endif
7c673cae
FG
15
16#include <algorithm>
f67539c2 17#include <cinttypes>
7c673cae
FG
18#include <cstdio>
19#include <map>
20#include <set>
20effc67 21#include <sstream>
7c673cae
FG
22#include <stdexcept>
23#include <string>
24#include <unordered_map>
7c673cae
FG
25#include <utility>
26#include <vector>
27
f67539c2 28#include "db/arena_wrapped_db_iter.h"
7c673cae 29#include "db/builder.h"
f67539c2 30#include "db/compaction/compaction_job.h"
7c673cae
FG
31#include "db/db_info_dumper.h"
32#include "db/db_iter.h"
33#include "db/dbformat.h"
11fdf7f2 34#include "db/error_handler.h"
7c673cae
FG
35#include "db/event_helpers.h"
36#include "db/external_sst_file_ingestion_job.h"
37#include "db/flush_job.h"
38#include "db/forward_iterator.h"
f67539c2 39#include "db/import_column_family_job.h"
7c673cae
FG
40#include "db/job_context.h"
41#include "db/log_reader.h"
42#include "db/log_writer.h"
11fdf7f2 43#include "db/malloc_stats.h"
7c673cae
FG
44#include "db/memtable.h"
45#include "db/memtable_list.h"
46#include "db/merge_context.h"
47#include "db/merge_helper.h"
20effc67 48#include "db/periodic_work_scheduler.h"
494da23a 49#include "db/range_tombstone_fragmenter.h"
7c673cae
FG
50#include "db/table_cache.h"
51#include "db/table_properties_collector.h"
52#include "db/transaction_log_impl.h"
53#include "db/version_set.h"
54#include "db/write_batch_internal.h"
55#include "db/write_callback.h"
f67539c2
TL
56#include "env/composite_env_wrapper.h"
57#include "file/file_util.h"
58#include "file/filename.h"
59#include "file/random_access_file_reader.h"
60#include "file/sst_file_manager_impl.h"
61#include "logging/auto_roll_logger.h"
62#include "logging/log_buffer.h"
63#include "logging/logging.h"
7c673cae
FG
64#include "memtable/hash_linklist_rep.h"
65#include "memtable/hash_skiplist_rep.h"
f67539c2 66#include "monitoring/in_memory_stats_history.h"
7c673cae
FG
67#include "monitoring/iostats_context_imp.h"
68#include "monitoring/perf_context_imp.h"
f67539c2 69#include "monitoring/persistent_stats_history.h"
7c673cae
FG
70#include "monitoring/thread_status_updater.h"
71#include "monitoring/thread_status_util.h"
72#include "options/cf_options.h"
73#include "options/options_helper.h"
74#include "options/options_parser.h"
7c673cae
FG
75#include "port/port.h"
76#include "rocksdb/cache.h"
77#include "rocksdb/compaction_filter.h"
11fdf7f2 78#include "rocksdb/convenience.h"
7c673cae
FG
79#include "rocksdb/db.h"
80#include "rocksdb/env.h"
81#include "rocksdb/merge_operator.h"
82#include "rocksdb/statistics.h"
494da23a 83#include "rocksdb/stats_history.h"
7c673cae
FG
84#include "rocksdb/status.h"
85#include "rocksdb/table.h"
7c673cae 86#include "rocksdb/write_buffer_manager.h"
f67539c2
TL
87#include "table/block_based/block.h"
88#include "table/block_based/block_based_table_factory.h"
89#include "table/get_context.h"
7c673cae 90#include "table/merging_iterator.h"
f67539c2 91#include "table/multiget_context.h"
20effc67 92#include "table/sst_file_dumper.h"
7c673cae
FG
93#include "table/table_builder.h"
94#include "table/two_level_iterator.h"
f67539c2 95#include "test_util/sync_point.h"
7c673cae
FG
96#include "util/autovector.h"
97#include "util/build_version.h"
f67539c2 98#include "util/cast_util.h"
7c673cae
FG
99#include "util/coding.h"
100#include "util/compression.h"
101#include "util/crc32c.h"
7c673cae 102#include "util/mutexlock.h"
7c673cae
FG
103#include "util/stop_watch.h"
104#include "util/string_util.h"
7c673cae 105
f67539c2
TL
106namespace ROCKSDB_NAMESPACE {
107
7c673cae 108const std::string kDefaultColumnFamilyName("default");
f67539c2
TL
109const std::string kPersistentStatsColumnFamilyName(
110 "___rocksdb_stats_history___");
11fdf7f2 111void DumpRocksDBBuildVersion(Logger* log);
7c673cae
FG
112
113CompressionType GetCompressionFlush(
114 const ImmutableCFOptions& ioptions,
115 const MutableCFOptions& mutable_cf_options) {
116 // Compressing memtable flushes might not help unless the sequential load
117 // optimization is used for leveled compaction. Otherwise the CPU and
118 // latency overhead is not offset by saving much space.
119 if (ioptions.compaction_style == kCompactionStyleUniversal) {
11fdf7f2
TL
120 if (mutable_cf_options.compaction_options_universal
121 .compression_size_percent < 0) {
7c673cae
FG
122 return mutable_cf_options.compression;
123 } else {
124 return kNoCompression;
125 }
126 } else if (!ioptions.compression_per_level.empty()) {
127 // For leveled compress when min_level_to_compress != 0.
128 return ioptions.compression_per_level[0];
129 } else {
130 return mutable_cf_options.compression;
131 }
132}
133
134namespace {
135void DumpSupportInfo(Logger* logger) {
136 ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
11fdf7f2
TL
137 for (auto& compression : OptionsHelper::compression_type_string_map) {
138 if (compression.second != kNoCompression &&
139 compression.second != kDisableCompressionOption) {
140 ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
141 CompressionTypeSupported(compression.second));
142 }
143 }
144 ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
145 crc32c::IsFastCrc32Supported().c_str());
7c673cae 146}
11fdf7f2
TL
147} // namespace
148
149DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
150 const bool seq_per_batch, const bool batch_per_txn)
f67539c2 151 : dbname_(dbname),
11fdf7f2 152 own_info_log_(options.info_log == nullptr),
7c673cae 153 initial_db_options_(SanitizeOptions(dbname, options)),
f67539c2 154 env_(initial_db_options_.env),
20effc67 155 io_tracer_(std::make_shared<IOTracer>()),
7c673cae 156 immutable_db_options_(initial_db_options_),
20effc67 157 fs_(immutable_db_options_.fs, io_tracer_),
7c673cae
FG
158 mutable_db_options_(initial_db_options_),
159 stats_(immutable_db_options_.statistics.get()),
7c673cae
FG
160 mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
161 immutable_db_options_.use_adaptive_mutex),
494da23a
TL
162 default_cf_handle_(nullptr),
163 max_total_in_memory_state_(0),
f67539c2
TL
164 file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
165 file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
166 file_options_, immutable_db_options_)),
167 seq_per_batch_(seq_per_batch),
168 batch_per_txn_(batch_per_txn),
494da23a 169 db_lock_(nullptr),
7c673cae 170 shutting_down_(false),
f67539c2 171 manual_compaction_paused_(false),
7c673cae
FG
172 bg_cv_(&mutex_),
173 logfile_number_(0),
174 log_dir_synced_(false),
175 log_empty_(true),
f67539c2 176 persist_stats_cf_handle_(nullptr),
7c673cae
FG
177 log_sync_cv_(&mutex_),
178 total_log_size_(0),
7c673cae
FG
179 is_snapshot_supported_(true),
180 write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
11fdf7f2
TL
181 write_thread_(immutable_db_options_),
182 nonmem_write_thread_(immutable_db_options_),
7c673cae
FG
183 write_controller_(mutable_db_options_.delayed_write_rate),
184 last_batch_group_size_(0),
185 unscheduled_flushes_(0),
186 unscheduled_compactions_(0),
11fdf7f2 187 bg_bottom_compaction_scheduled_(0),
7c673cae
FG
188 bg_compaction_scheduled_(0),
189 num_running_compactions_(0),
190 bg_flush_scheduled_(0),
191 num_running_flushes_(0),
192 bg_purge_scheduled_(0),
193 disable_delete_obsolete_files_(0),
11fdf7f2 194 pending_purge_obsolete_files_(0),
7c673cae
FG
195 delete_obsolete_files_last_run_(env_->NowMicros()),
196 last_stats_dump_time_microsec_(0),
197 next_job_id_(1),
198 has_unpersisted_data_(false),
11fdf7f2 199 unable_to_release_oldest_log_(false),
7c673cae
FG
200 num_running_ingest_file_(0),
201#ifndef ROCKSDB_LITE
20effc67
TL
202 wal_manager_(immutable_db_options_, file_options_, io_tracer_,
203 seq_per_batch),
7c673cae
FG
204#endif // ROCKSDB_LITE
205 event_logger_(immutable_db_options_.info_log.get()),
206 bg_work_paused_(0),
207 bg_compaction_paused_(0),
208 refitting_level_(false),
11fdf7f2 209 opened_successfully_(false),
20effc67
TL
210#ifndef ROCKSDB_LITE
211 periodic_work_scheduler_(nullptr),
212#endif // ROCKSDB_LITE
11fdf7f2
TL
213 two_write_queues_(options.two_write_queues),
214 manual_wal_flush_(options.manual_wal_flush),
11fdf7f2
TL
215 // last_sequencee_ is always maintained by the main queue that also writes
216 // to the memtable. When two_write_queues_ is disabled last seq in
217 // memtable is the same as last seq published to the readers. When it is
218 // enabled but seq_per_batch_ is disabled, last seq in memtable still
219 // indicates last published seq since wal-only writes that go to the 2nd
220 // queue do not consume a sequence number. Otherwise writes performed by
221 // the 2nd queue could change what is visible to the readers. In this
222 // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
223 // separate variable to indicate the last published sequence.
224 last_seq_same_as_publish_seq_(
225 !(seq_per_batch && options.two_write_queues)),
226 // Since seq_per_batch_ is currently set only by WritePreparedTxn which
227 // requires a custom gc for compaction, we use that to set use_custom_gc_
228 // as well.
229 use_custom_gc_(seq_per_batch),
230 shutdown_initiated_(false),
231 own_sfm_(options.sst_file_manager == nullptr),
232 preserve_deletes_(options.preserve_deletes),
233 closed_(false),
494da23a
TL
234 error_handler_(this, immutable_db_options_, &mutex_),
235 atomic_flush_install_cv_(&mutex_) {
11fdf7f2
TL
236 // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
237 // WriteUnprepared, which should use seq_per_batch_.
238 assert(batch_per_txn_ || seq_per_batch_);
20effc67
TL
239 // TODO: Check for an error here
240 env_->GetAbsolutePath(dbname, &db_absolute_path_).PermitUncheckedError();
7c673cae
FG
241
242 // Reserve ten files or so for other uses and give the rest to TableCache.
243 // Give a large number for setting of "infinite" open files.
11fdf7f2
TL
244 const int table_cache_size = (mutable_db_options_.max_open_files == -1)
245 ? TableCache::kInfiniteCapacity
246 : mutable_db_options_.max_open_files - 10;
f67539c2
TL
247 LRUCacheOptions co;
248 co.capacity = table_cache_size;
249 co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
250 co.metadata_charge_policy = kDontChargeCacheMetadata;
251 table_cache_ = NewLRUCache(co);
7c673cae 252
f67539c2 253 versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
7c673cae 254 table_cache_.get(), write_buffer_manager_,
20effc67
TL
255 &write_controller_, &block_cache_tracer_,
256 io_tracer_));
7c673cae
FG
257 column_family_memtables_.reset(
258 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
259
260 DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
20effc67
TL
261 SetDbSessionId();
262 DumpDBFileSummary(immutable_db_options_, dbname_, db_session_id_);
7c673cae
FG
263 immutable_db_options_.Dump(immutable_db_options_.info_log.get());
264 mutable_db_options_.Dump(immutable_db_options_.info_log.get());
265 DumpSupportInfo(immutable_db_options_.info_log.get());
11fdf7f2
TL
266
267 // always open the DB with 0 here, which means if preserve_deletes_==true
268 // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
269 // is called by client and this seqnum is advanced.
270 preserve_deletes_seqnum_.store(0);
271}
272
273Status DBImpl::Resume() {
274 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
275
276 InstrumentedMutexLock db_mutex(&mutex_);
277
278 if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
279 // Nothing to do
280 return Status::OK();
281 }
282
283 if (error_handler_.IsRecoveryInProgress()) {
284 // Don't allow a mix of manual and automatic recovery
285 return Status::Busy();
286 }
287
288 mutex_.Unlock();
289 Status s = error_handler_.RecoverFromBGError(true);
290 mutex_.Lock();
291 return s;
292}
293
294// This function implements the guts of recovery from a background error. It
295// is eventually called for both manual as well as automatic recovery. It does
296// the following -
297// 1. Wait for currently scheduled background flush/compaction to exit, in
298// order to inadvertently causing an error and thinking recovery failed
299// 2. Flush memtables if there's any data for all the CFs. This may result
300// another error, which will be saved by error_handler_ and reported later
301// as the recovery status
302// 3. Find and delete any obsolete files
303// 4. Schedule compactions if needed for all the CFs. This is needed as the
304// flush in the prior step might have been a no-op for some CFs, which
305// means a new super version wouldn't have been installed
20effc67 306Status DBImpl::ResumeImpl(DBRecoverContext context) {
11fdf7f2
TL
307 mutex_.AssertHeld();
308 WaitForBackgroundWork();
309
310 Status bg_error = error_handler_.GetBGError();
311 Status s;
312 if (shutdown_initiated_) {
313 // Returning shutdown status to SFM during auto recovery will cause it
314 // to abort the recovery and allow the shutdown to progress
315 s = Status::ShutdownInProgress();
316 }
317 if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
494da23a
TL
318 ROCKS_LOG_INFO(
319 immutable_db_options_.info_log,
11fdf7f2
TL
320 "DB resume requested but failed due to Fatal/Unrecoverable error");
321 s = bg_error;
322 }
323
20effc67
TL
324 // Make sure the IO Status stored in version set is set to OK.
325 bool file_deletion_disabled = !IsFileDeletionsEnabled();
326 if (s.ok()) {
327 IOStatus io_s = versions_->io_status();
328 if (io_s.IsIOError()) {
329 // If resuming from IOError resulted from MANIFEST write, then assert
330 // that we must have already set the MANIFEST writer to nullptr during
331 // clean-up phase MANIFEST writing. We must have also disabled file
332 // deletions.
333 assert(!versions_->descriptor_log_);
334 assert(file_deletion_disabled);
335 // Since we are trying to recover from MANIFEST write error, we need to
336 // switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
337 // Therefore, force writing a dummy version edit because we do not know
338 // whether there are flush jobs with non-empty data to flush, triggering
339 // appends to MANIFEST.
340 VersionEdit edit;
341 auto cfh =
342 static_cast_with_check<ColumnFamilyHandleImpl>(default_cf_handle_);
343 assert(cfh);
344 ColumnFamilyData* cfd = cfh->cfd();
345 const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
346 s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
347 directories_.GetDbDir());
348 if (!s.ok()) {
349 io_s = versions_->io_status();
350 if (!io_s.ok()) {
351 s = error_handler_.SetBGError(io_s,
352 BackgroundErrorReason::kManifestWrite);
353 }
354 }
355 }
356 }
357
11fdf7f2
TL
358 // We cannot guarantee consistency of the WAL. So force flush Memtables of
359 // all the column families
360 if (s.ok()) {
494da23a
TL
361 FlushOptions flush_opts;
362 // We allow flush to stall write since we are trying to resume from error.
363 flush_opts.allow_write_stall = true;
364 if (immutable_db_options_.atomic_flush) {
365 autovector<ColumnFamilyData*> cfds;
366 SelectColumnFamiliesForAtomicFlush(&cfds);
367 mutex_.Unlock();
20effc67 368 s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
494da23a
TL
369 mutex_.Lock();
370 } else {
371 for (auto cfd : *versions_->GetColumnFamilySet()) {
372 if (cfd->IsDropped()) {
373 continue;
374 }
375 cfd->Ref();
376 mutex_.Unlock();
20effc67 377 s = FlushMemTable(cfd, flush_opts, context.flush_reason);
494da23a 378 mutex_.Lock();
f67539c2 379 cfd->UnrefAndTryDelete();
494da23a
TL
380 if (!s.ok()) {
381 break;
382 }
383 }
384 }
11fdf7f2
TL
385 if (!s.ok()) {
386 ROCKS_LOG_INFO(immutable_db_options_.info_log,
387 "DB resume requested but failed due to Flush failure [%s]",
388 s.ToString().c_str());
389 }
390 }
391
392 JobContext job_context(0);
393 FindObsoleteFiles(&job_context, true);
394 if (s.ok()) {
395 s = error_handler_.ClearBGError();
396 }
397 mutex_.Unlock();
398
399 job_context.manifest_file_number = 1;
400 if (job_context.HaveSomethingToDelete()) {
401 PurgeObsoleteFiles(job_context);
402 }
403 job_context.Clean();
404
405 if (s.ok()) {
20effc67
TL
406 assert(versions_->io_status().ok());
407 // If we reach here, we should re-enable file deletions if it was disabled
408 // during previous error handling.
409 if (file_deletion_disabled) {
410 // Always return ok
411 s = EnableFileDeletions(/*force=*/true);
412 }
11fdf7f2
TL
413 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
414 }
415 mutex_.Lock();
416 // Check for shutdown again before scheduling further compactions,
417 // since we released and re-acquired the lock above
418 if (shutdown_initiated_) {
419 s = Status::ShutdownInProgress();
420 }
421 if (s.ok()) {
422 for (auto cfd : *versions_->GetColumnFamilySet()) {
423 SchedulePendingCompaction(cfd);
424 }
425 MaybeScheduleFlushOrCompaction();
426 }
427
428 // Wake up any waiters - in this case, it could be the shutdown thread
429 bg_cv_.SignalAll();
430
494da23a
TL
431 // No need to check BGError again. If something happened, event listener would
432 // be notified and the operation causing it would have failed
11fdf7f2
TL
433 return s;
434}
435
436void DBImpl::WaitForBackgroundWork() {
437 // Wait for background work to finish
438 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
439 bg_flush_scheduled_) {
440 bg_cv_.Wait();
441 }
7c673cae
FG
442}
443
444// Will lock the mutex_, will wait for completion if wait is true
445void DBImpl::CancelAllBackgroundWork(bool wait) {
7c673cae
FG
446 ROCKS_LOG_INFO(immutable_db_options_.info_log,
447 "Shutdown: canceling all background work");
448
20effc67
TL
449#ifndef ROCKSDB_LITE
450 if (periodic_work_scheduler_ != nullptr) {
451 periodic_work_scheduler_->Unregister(this);
494da23a 452 }
20effc67
TL
453#endif // !ROCKSDB_LITE
454
494da23a 455 InstrumentedMutexLock l(&mutex_);
7c673cae
FG
456 if (!shutting_down_.load(std::memory_order_acquire) &&
457 has_unpersisted_data_.load(std::memory_order_relaxed) &&
458 !mutable_db_options_.avoid_flush_during_shutdown) {
494da23a
TL
459 if (immutable_db_options_.atomic_flush) {
460 autovector<ColumnFamilyData*> cfds;
461 SelectColumnFamiliesForAtomicFlush(&cfds);
462 mutex_.Unlock();
20effc67
TL
463 Status s =
464 AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
465 s.PermitUncheckedError(); //**TODO: What to do on error?
494da23a
TL
466 mutex_.Lock();
467 } else {
468 for (auto cfd : *versions_->GetColumnFamilySet()) {
469 if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
470 cfd->Ref();
471 mutex_.Unlock();
20effc67
TL
472 Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
473 s.PermitUncheckedError(); //**TODO: What to do on error?
494da23a 474 mutex_.Lock();
f67539c2 475 cfd->UnrefAndTryDelete();
494da23a 476 }
7c673cae
FG
477 }
478 }
479 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
480 }
481
482 shutting_down_.store(true, std::memory_order_release);
483 bg_cv_.SignalAll();
484 if (!wait) {
485 return;
486 }
11fdf7f2
TL
487 WaitForBackgroundWork();
488}
489
490Status DBImpl::CloseHelper() {
491 // Guarantee that there is no background error recovery in progress before
492 // continuing with the shutdown
493 mutex_.Lock();
494 shutdown_initiated_ = true;
495 error_handler_.CancelErrorRecovery();
496 while (error_handler_.IsRecoveryInProgress()) {
7c673cae
FG
497 bg_cv_.Wait();
498 }
11fdf7f2 499 mutex_.Unlock();
7c673cae 500
7c673cae
FG
501 // CancelAllBackgroundWork called with false means we just set the shutdown
502 // marker. After this we do a variant of the waiting and unschedule work
503 // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
504 CancelAllBackgroundWork(false);
11fdf7f2
TL
505 int bottom_compactions_unscheduled =
506 env_->UnSchedule(this, Env::Priority::BOTTOM);
7c673cae
FG
507 int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
508 int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
20effc67 509 Status ret = Status::OK();
7c673cae 510 mutex_.Lock();
11fdf7f2 511 bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
7c673cae
FG
512 bg_compaction_scheduled_ -= compactions_unscheduled;
513 bg_flush_scheduled_ -= flushes_unscheduled;
514
515 // Wait for background work to finish
11fdf7f2
TL
516 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
517 bg_flush_scheduled_ || bg_purge_scheduled_ ||
518 pending_purge_obsolete_files_ ||
519 error_handler_.IsRecoveryInProgress()) {
7c673cae
FG
520 TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
521 bg_cv_.Wait();
522 }
11fdf7f2
TL
523 TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
524 &files_grabbed_for_purge_);
7c673cae
FG
525 EraseThreadStatusDbInfo();
526 flush_scheduler_.Clear();
f67539c2 527 trim_history_scheduler_.Clear();
7c673cae
FG
528
529 while (!flush_queue_.empty()) {
11fdf7f2
TL
530 const FlushRequest& flush_req = PopFirstFromFlushQueue();
531 for (const auto& iter : flush_req) {
f67539c2 532 iter.first->UnrefAndTryDelete();
7c673cae
FG
533 }
534 }
535 while (!compaction_queue_.empty()) {
536 auto cfd = PopFirstFromCompactionQueue();
f67539c2 537 cfd->UnrefAndTryDelete();
7c673cae
FG
538 }
539
f67539c2 540 if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
7c673cae
FG
541 // we need to delete handle outside of lock because it does its own locking
542 mutex_.Unlock();
f67539c2
TL
543 if (default_cf_handle_) {
544 delete default_cf_handle_;
545 default_cf_handle_ = nullptr;
546 }
547 if (persist_stats_cf_handle_) {
548 delete persist_stats_cf_handle_;
549 persist_stats_cf_handle_ = nullptr;
550 }
7c673cae
FG
551 mutex_.Lock();
552 }
553
554 // Clean up obsolete files due to SuperVersion release.
555 // (1) Need to delete to obsolete files before closing because RepairDB()
556 // scans all existing files in the file system and builds manifest file.
557 // Keeping obsolete files confuses the repair process.
558 // (2) Need to check if we Open()/Recover() the DB successfully before
559 // deleting because if VersionSet recover fails (may be due to corrupted
560 // manifest file), it is not able to identify live files correctly. As a
561 // result, all "live" files can get deleted by accident. However, corrupted
562 // manifest is recoverable by RepairDB().
563 if (opened_successfully_) {
564 JobContext job_context(next_job_id_.fetch_add(1));
565 FindObsoleteFiles(&job_context, true);
566
567 mutex_.Unlock();
568 // manifest number starting from 2
569 job_context.manifest_file_number = 1;
570 if (job_context.HaveSomethingToDelete()) {
571 PurgeObsoleteFiles(job_context);
572 }
573 job_context.Clean();
574 mutex_.Lock();
575 }
576
577 for (auto l : logs_to_free_) {
578 delete l;
579 }
580 for (auto& log : logs_) {
11fdf7f2
TL
581 uint64_t log_number = log.writer->get_log_number();
582 Status s = log.ClearWriter();
583 if (!s.ok()) {
584 ROCKS_LOG_WARN(
585 immutable_db_options_.info_log,
586 "Unable to Sync WAL file %s with error -- %s",
587 LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
588 s.ToString().c_str());
589 // Retain the first error
590 if (ret.ok()) {
591 ret = s;
592 }
593 }
7c673cae
FG
594 }
595 logs_.clear();
596
597 // Table cache may have table handles holding blocks from the block cache.
598 // We need to release them before the block cache is destroyed. The block
599 // cache may be destroyed inside versions_.reset(), when column family data
600 // list is destroyed, so leaving handles in table cache after
601 // versions_.reset() may cause issues.
602 // Here we clean all unreferenced handles in table cache.
603 // Now we assume all user queries have finished, so only version set itself
604 // can possibly hold the blocks from block cache. After releasing unreferenced
605 // handles here, only handles held by version set left and inside
606 // versions_.reset(), we will release them. There, we need to make sure every
607 // time a handle is released, we erase it from the cache too. By doing that,
608 // we can guarantee that after versions_.reset(), table cache is empty
609 // so the cache can be safely destroyed.
610 table_cache_->EraseUnRefEntries();
611
612 for (auto& txn_entry : recovered_transactions_) {
613 delete txn_entry.second;
614 }
615
616 // versions need to be destroyed before table_cache since it can hold
617 // references to table_cache.
618 versions_.reset();
619 mutex_.Unlock();
620 if (db_lock_ != nullptr) {
20effc67
TL
621 // TODO: Check for unlock error
622 env_->UnlockFile(db_lock_).PermitUncheckedError();
7c673cae
FG
623 }
624
625 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
626 LogFlush(immutable_db_options_.info_log);
11fdf7f2
TL
627
628#ifndef ROCKSDB_LITE
629 // If the sst_file_manager was allocated by us during DB::Open(), ccall
630 // Close() on it before closing the info_log. Otherwise, background thread
631 // in SstFileManagerImpl might try to log something
632 if (immutable_db_options_.sst_file_manager && own_sfm_) {
633 auto sfm = static_cast<SstFileManagerImpl*>(
634 immutable_db_options_.sst_file_manager.get());
635 sfm->Close();
636 }
494da23a 637#endif // ROCKSDB_LITE
11fdf7f2
TL
638
639 if (immutable_db_options_.info_log && own_info_log_) {
640 Status s = immutable_db_options_.info_log->Close();
20effc67 641 if (!s.ok() && !s.IsNotSupported() && ret.ok()) {
11fdf7f2
TL
642 ret = s;
643 }
644 }
f67539c2
TL
645
646 if (ret.IsAborted()) {
647 // Reserve IsAborted() error for those where users didn't release
648 // certain resource and they can release them and come back and
649 // retry. In this case, we wrap this exception to something else.
650 return Status::Incomplete(ret.ToString());
651 }
11fdf7f2
TL
652 return ret;
653}
654
655Status DBImpl::CloseImpl() { return CloseHelper(); }
656
657DBImpl::~DBImpl() {
658 if (!closed_) {
659 closed_ = true;
20effc67 660 CloseHelper().PermitUncheckedError();
11fdf7f2 661 }
7c673cae
FG
662}
663
664void DBImpl::MaybeIgnoreError(Status* s) const {
665 if (s->ok() || immutable_db_options_.paranoid_checks) {
666 // No change needed
667 } else {
668 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
669 s->ToString().c_str());
670 *s = Status::OK();
671 }
672}
673
674const Status DBImpl::CreateArchivalDirectory() {
675 if (immutable_db_options_.wal_ttl_seconds > 0 ||
676 immutable_db_options_.wal_size_limit_mb > 0) {
677 std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
678 return env_->CreateDirIfMissing(archivalPath);
679 }
680 return Status::OK();
681}
682
683void DBImpl::PrintStatistics() {
684 auto dbstats = immutable_db_options_.statistics.get();
685 if (dbstats) {
494da23a 686 ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
7c673cae
FG
687 dbstats->ToString().c_str());
688 }
689}
690
20effc67
TL
691void DBImpl::StartPeriodicWorkScheduler() {
692#ifndef ROCKSDB_LITE
494da23a
TL
693 {
694 InstrumentedMutexLock l(&mutex_);
20effc67
TL
695 periodic_work_scheduler_ = PeriodicWorkScheduler::Default();
696 TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init",
697 &periodic_work_scheduler_);
494da23a 698 }
20effc67
TL
699
700 periodic_work_scheduler_->Register(
701 this, mutable_db_options_.stats_dump_period_sec,
702 mutable_db_options_.stats_persist_period_sec);
703#endif // !ROCKSDB_LITE
494da23a 704}
7c673cae 705
494da23a 706// esitmate the total size of stats_history_
f67539c2 707size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
494da23a
TL
708 size_t size_total =
709 sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
710 if (stats_history_.size() == 0) return size_total;
711 size_t size_per_slice =
712 sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
713 // non-empty map, stats_history_.begin() guaranteed to exist
714 std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
715 for (const auto& pairs : sample_slice) {
716 size_per_slice +=
717 pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
718 }
719 size_total = size_per_slice * stats_history_.size();
720 return size_total;
721}
7c673cae 722
494da23a
TL
723void DBImpl::PersistStats() {
724 TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
7c673cae 725#ifndef ROCKSDB_LITE
494da23a
TL
726 if (shutdown_initiated_) {
727 return;
728 }
20effc67 729 TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
f67539c2 730 uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
20effc67 731
494da23a
TL
732 Statistics* statistics = immutable_db_options_.statistics.get();
733 if (!statistics) {
734 return;
735 }
736 size_t stats_history_size_limit = 0;
737 {
738 InstrumentedMutexLock l(&mutex_);
739 stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
740 }
741
f67539c2
TL
742 std::map<std::string, uint64_t> stats_map;
743 if (!statistics->getTickerMap(&stats_map)) {
744 return;
745 }
746 ROCKS_LOG_INFO(immutable_db_options_.info_log,
747 "------- PERSISTING STATS -------");
748
749 if (immutable_db_options_.persist_stats_to_disk) {
750 WriteBatch batch;
20effc67 751 Status s = Status::OK();
f67539c2
TL
752 if (stats_slice_initialized_) {
753 ROCKS_LOG_INFO(immutable_db_options_.info_log,
754 "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
755 stats_slice_.size());
756 for (const auto& stat : stats_map) {
20effc67
TL
757 if (s.ok()) {
758 char key[100];
759 int length =
760 EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
761 // calculate the delta from last time
762 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
763 uint64_t delta = stat.second - stats_slice_[stat.first];
764 s = batch.Put(persist_stats_cf_handle_,
765 Slice(key, std::min(100, length)), ToString(delta));
766 }
f67539c2
TL
767 }
768 }
769 }
770 stats_slice_initialized_ = true;
771 std::swap(stats_slice_, stats_map);
20effc67
TL
772 if (s.ok()) {
773 WriteOptions wo;
774 wo.low_pri = true;
775 wo.no_slowdown = true;
776 wo.sync = false;
777 s = Write(wo, &batch);
778 }
f67539c2
TL
779 if (!s.ok()) {
780 ROCKS_LOG_INFO(immutable_db_options_.info_log,
781 "Writing to persistent stats CF failed -- %s",
782 s.ToString().c_str());
783 } else {
784 ROCKS_LOG_INFO(immutable_db_options_.info_log,
785 "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
786 " to persistent stats CF succeeded",
787 stats_slice_.size(), now_seconds);
494da23a 788 }
f67539c2
TL
789 // TODO(Zhongyi): add purging for persisted data
790 } else {
494da23a
TL
791 InstrumentedMutexLock l(&stats_history_mutex_);
792 // calculate the delta from last time
793 if (stats_slice_initialized_) {
794 std::map<std::string, uint64_t> stats_delta;
795 for (const auto& stat : stats_map) {
796 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
797 stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
11fdf7f2 798 }
7c673cae 799 }
f67539c2
TL
800 ROCKS_LOG_INFO(immutable_db_options_.info_log,
801 "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
802 " to in-memory stats history",
803 stats_slice_.size(), now_seconds);
804 stats_history_[now_seconds] = stats_delta;
494da23a
TL
805 }
806 stats_slice_initialized_ = true;
807 std::swap(stats_slice_, stats_map);
808 TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
809
810 // delete older stats snapshots to control memory consumption
f67539c2
TL
811 size_t stats_history_size = EstimateInMemoryStatsHistorySize();
812 bool purge_needed = stats_history_size > stats_history_size_limit;
813 ROCKS_LOG_INFO(immutable_db_options_.info_log,
814 "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
815 " bytes, slice count: %" ROCKSDB_PRIszt,
816 stats_history_size, stats_history_.size());
494da23a
TL
817 while (purge_needed && !stats_history_.empty()) {
818 stats_history_.erase(stats_history_.begin());
f67539c2
TL
819 purge_needed =
820 EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
494da23a 821 }
f67539c2
TL
822 ROCKS_LOG_INFO(immutable_db_options_.info_log,
823 "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
824 " bytes, slice count: %" ROCKSDB_PRIszt,
825 stats_history_size, stats_history_.size());
494da23a 826 }
20effc67 827 TEST_SYNC_POINT("DBImpl::PersistStats:End");
494da23a
TL
828#endif // !ROCKSDB_LITE
829}
830
831bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
832 uint64_t* new_time,
833 std::map<std::string, uint64_t>* stats_map) {
834 assert(new_time);
835 assert(stats_map);
836 if (!new_time || !stats_map) return false;
837 // lock when search for start_time
838 {
839 InstrumentedMutexLock l(&stats_history_mutex_);
840 auto it = stats_history_.lower_bound(start_time);
841 if (it != stats_history_.end() && it->first < end_time) {
842 // make a copy for timestamp and stats_map
843 *new_time = it->first;
844 *stats_map = it->second;
845 return true;
846 } else {
847 return false;
848 }
849 }
850}
851
852Status DBImpl::GetStatsHistory(
853 uint64_t start_time, uint64_t end_time,
854 std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
855 if (!stats_iterator) {
856 return Status::InvalidArgument("stats_iterator not preallocated.");
857 }
f67539c2
TL
858 if (immutable_db_options_.persist_stats_to_disk) {
859 stats_iterator->reset(
860 new PersistentStatsHistoryIterator(start_time, end_time, this));
861 } else {
862 stats_iterator->reset(
863 new InMemoryStatsHistoryIterator(start_time, end_time, this));
864 }
494da23a
TL
865 return (*stats_iterator)->status();
866}
867
868void DBImpl::DumpStats() {
869 TEST_SYNC_POINT("DBImpl::DumpStats:1");
870#ifndef ROCKSDB_LITE
871 const DBPropertyInfo* cf_property_info =
872 GetPropertyInfo(DB::Properties::kCFStats);
873 assert(cf_property_info != nullptr);
874 const DBPropertyInfo* db_property_info =
875 GetPropertyInfo(DB::Properties::kDBStats);
876 assert(db_property_info != nullptr);
877
878 std::string stats;
879 if (shutdown_initiated_) {
880 return;
881 }
20effc67 882 TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
494da23a
TL
883 {
884 InstrumentedMutexLock l(&mutex_);
885 default_cf_internal_stats_->GetStringProperty(
886 *db_property_info, DB::Properties::kDBStats, &stats);
887 for (auto cfd : *versions_->GetColumnFamilySet()) {
888 if (cfd->initialized()) {
889 cfd->internal_stats()->GetStringProperty(
890 *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
7c673cae
FG
891 }
892 }
494da23a
TL
893 for (auto cfd : *versions_->GetColumnFamilySet()) {
894 if (cfd->initialized()) {
895 cfd->internal_stats()->GetStringProperty(
896 *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
7c673cae
FG
897 }
898 }
494da23a
TL
899 }
900 TEST_SYNC_POINT("DBImpl::DumpStats:2");
901 ROCKS_LOG_INFO(immutable_db_options_.info_log,
902 "------- DUMPING STATS -------");
903 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
904 if (immutable_db_options_.dump_malloc_stats) {
905 stats.clear();
906 DumpMallocStats(&stats);
907 if (!stats.empty()) {
908 ROCKS_LOG_INFO(immutable_db_options_.info_log,
909 "------- Malloc STATS -------");
910 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
911 }
912 }
7c673cae
FG
913#endif // !ROCKSDB_LITE
914
494da23a 915 PrintStatistics();
7c673cae
FG
916}
917
20effc67
TL
918void DBImpl::FlushInfoLog() {
919 if (shutdown_initiated_) {
920 return;
921 }
922 TEST_SYNC_POINT("DBImpl::FlushInfoLog:StartRunning");
923 LogFlush(immutable_db_options_.info_log);
924}
925
f67539c2
TL
926Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
927 int max_entries_to_print,
928 std::string* out_str) {
20effc67 929 auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
f67539c2
TL
930 ColumnFamilyData* cfd = cfh->cfd();
931
932 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
933 Version* version = super_version->current;
934
935 Status s =
936 version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
937
938 CleanupSuperVersion(super_version);
939 return s;
940}
941
7c673cae
FG
942void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
943 if (!job_context->logs_to_free.empty()) {
944 for (auto l : job_context->logs_to_free) {
945 AddToLogsToFreeQueue(l);
946 }
947 job_context->logs_to_free.clear();
7c673cae
FG
948 }
949}
950
20effc67 951FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
11fdf7f2 952 assert(cfd);
20effc67 953 FSDirectory* ret_dir = cfd->GetDataDir(path_id);
11fdf7f2
TL
954 if (ret_dir == nullptr) {
955 return directories_.GetDataDir(path_id);
956 }
957 return ret_dir;
958}
959
11fdf7f2
TL
960Status DBImpl::SetOptions(
961 ColumnFamilyHandle* column_family,
7c673cae
FG
962 const std::unordered_map<std::string, std::string>& options_map) {
963#ifdef ROCKSDB_LITE
11fdf7f2
TL
964 (void)column_family;
965 (void)options_map;
7c673cae
FG
966 return Status::NotSupported("Not supported in ROCKSDB LITE");
967#else
20effc67
TL
968 auto* cfd =
969 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
970 if (options_map.empty()) {
971 ROCKS_LOG_WARN(immutable_db_options_.info_log,
972 "SetOptions() on column family [%s], empty input",
973 cfd->GetName().c_str());
974 return Status::InvalidArgument("empty input");
975 }
976
977 MutableCFOptions new_options;
978 Status s;
979 Status persist_options_status;
20effc67 980 persist_options_status.PermitUncheckedError(); // Allow uninitialized access
11fdf7f2 981 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae 982 {
f67539c2 983 auto db_options = GetDBOptions();
7c673cae 984 InstrumentedMutexLock l(&mutex_);
f67539c2 985 s = cfd->SetOptions(db_options, options_map);
7c673cae
FG
986 if (s.ok()) {
987 new_options = *cfd->GetLatestMutableCFOptions();
988 // Append new version to recompute compaction score.
989 VersionEdit dummy_edit;
20effc67
TL
990 s = versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
991 directories_.GetDbDir());
7c673cae
FG
992 // Trigger possible flush/compactions. This has to be before we persist
993 // options to file, otherwise there will be a deadlock with writer
994 // thread.
11fdf7f2 995 InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
7c673cae 996
11fdf7f2
TL
997 persist_options_status = WriteOptionsFile(
998 false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
999 bg_cv_.SignalAll();
7c673cae
FG
1000 }
1001 }
11fdf7f2 1002 sv_context.Clean();
7c673cae 1003
11fdf7f2
TL
1004 ROCKS_LOG_INFO(
1005 immutable_db_options_.info_log,
1006 "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
7c673cae
FG
1007 for (const auto& o : options_map) {
1008 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1009 o.second.c_str());
1010 }
1011 if (s.ok()) {
1012 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1013 "[%s] SetOptions() succeeded", cfd->GetName().c_str());
1014 new_options.Dump(immutable_db_options_.info_log.get());
1015 if (!persist_options_status.ok()) {
11fdf7f2 1016 s = persist_options_status;
7c673cae
FG
1017 }
1018 } else {
1019 ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
1020 cfd->GetName().c_str());
1021 }
1022 LogFlush(immutable_db_options_.info_log);
1023 return s;
1024#endif // ROCKSDB_LITE
1025}
1026
1027Status DBImpl::SetDBOptions(
1028 const std::unordered_map<std::string, std::string>& options_map) {
1029#ifdef ROCKSDB_LITE
11fdf7f2 1030 (void)options_map;
7c673cae
FG
1031 return Status::NotSupported("Not supported in ROCKSDB LITE");
1032#else
1033 if (options_map.empty()) {
1034 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1035 "SetDBOptions(), empty input.");
1036 return Status::InvalidArgument("empty input");
1037 }
1038
1039 MutableDBOptions new_options;
1040 Status s;
20effc67 1041 Status persist_options_status = Status::OK();
11fdf7f2 1042 bool wal_changed = false;
7c673cae
FG
1043 WriteContext write_context;
1044 {
1045 InstrumentedMutexLock l(&mutex_);
1046 s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
1047 &new_options);
f67539c2
TL
1048 if (new_options.bytes_per_sync == 0) {
1049 new_options.bytes_per_sync = 1024 * 1024;
1050 }
1051 DBOptions new_db_options =
1052 BuildDBOptions(immutable_db_options_, new_options);
1053 if (s.ok()) {
1054 s = ValidateOptions(new_db_options);
1055 }
7c673cae 1056 if (s.ok()) {
f67539c2
TL
1057 for (auto c : *versions_->GetColumnFamilySet()) {
1058 if (!c->IsDropped()) {
1059 auto cf_options = c->GetLatestCFOptions();
1060 s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1061 if (!s.ok()) {
1062 break;
1063 }
1064 }
1065 }
1066 }
1067 if (s.ok()) {
1068 const BGJobLimits current_bg_job_limits =
20effc67 1069 GetBGJobLimits(mutable_db_options_.max_background_flushes,
f67539c2
TL
1070 mutable_db_options_.max_background_compactions,
1071 mutable_db_options_.max_background_jobs,
1072 /* parallelize_compactions */ true);
1073 const BGJobLimits new_bg_job_limits = GetBGJobLimits(
20effc67 1074 new_options.max_background_flushes,
f67539c2
TL
1075 new_options.max_background_compactions,
1076 new_options.max_background_jobs, /* parallelize_compactions */ true);
1077
1078 const bool max_flushes_increased =
1079 new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1080 const bool max_compactions_increased =
1081 new_bg_job_limits.max_compactions >
1082 current_bg_job_limits.max_compactions;
1083
1084 if (max_flushes_increased || max_compactions_increased) {
1085 if (max_flushes_increased) {
1086 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1087 Env::Priority::HIGH);
1088 }
1089
1090 if (max_compactions_increased) {
1091 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1092 Env::Priority::LOW);
1093 }
1094
7c673cae
FG
1095 MaybeScheduleFlushOrCompaction();
1096 }
f67539c2 1097
494da23a 1098 if (new_options.stats_dump_period_sec !=
20effc67
TL
1099 mutable_db_options_.stats_dump_period_sec ||
1100 new_options.stats_persist_period_sec !=
1101 mutable_db_options_.stats_persist_period_sec) {
1102 mutex_.Unlock();
1103 periodic_work_scheduler_->Unregister(this);
1104 periodic_work_scheduler_->Register(
1105 this, new_options.stats_dump_period_sec,
1106 new_options.stats_persist_period_sec);
1107 mutex_.Lock();
494da23a 1108 }
11fdf7f2
TL
1109 write_controller_.set_max_delayed_write_rate(
1110 new_options.delayed_write_rate);
1111 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1112 ? TableCache::kInfiniteCapacity
1113 : new_options.max_open_files - 10);
1114 wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1115 new_options.wal_bytes_per_sync;
7c673cae 1116 mutable_db_options_ = new_options;
f67539c2
TL
1117 file_options_for_compaction_ = FileOptions(new_db_options);
1118 file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1119 file_options_for_compaction_, immutable_db_options_);
1120 versions_->ChangeFileOptions(mutable_db_options_);
1121 //TODO(xiez): clarify why apply optimize for read to write options
1122 file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1123 file_options_for_compaction_, immutable_db_options_);
1124 file_options_for_compaction_.compaction_readahead_size =
11fdf7f2 1125 mutable_db_options_.compaction_readahead_size;
494da23a 1126 WriteThread::Writer w;
7c673cae 1127 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
1128 if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1129 Status purge_wal_status = SwitchWAL(&write_context);
7c673cae
FG
1130 if (!purge_wal_status.ok()) {
1131 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1132 "Unable to purge WAL files in SetDBOptions() -- %s",
1133 purge_wal_status.ToString().c_str());
1134 }
1135 }
11fdf7f2
TL
1136 persist_options_status = WriteOptionsFile(
1137 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
7c673cae 1138 write_thread_.ExitUnbatched(&w);
20effc67
TL
1139 } else {
1140 // To get here, we must have had invalid options and will not attempt to
1141 // persist the options, which means the status is "OK/Uninitialized.
1142 persist_options_status.PermitUncheckedError();
7c673cae
FG
1143 }
1144 }
1145 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1146 for (const auto& o : options_map) {
1147 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1148 o.second.c_str());
1149 }
1150 if (s.ok()) {
1151 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1152 new_options.Dump(immutable_db_options_.info_log.get());
1153 if (!persist_options_status.ok()) {
1154 if (immutable_db_options_.fail_if_options_file_error) {
1155 s = Status::IOError(
1156 "SetDBOptions() succeeded, but unable to persist options",
1157 persist_options_status.ToString());
1158 }
1159 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1160 "Unable to persist options in SetDBOptions() -- %s",
1161 persist_options_status.ToString().c_str());
1162 }
1163 } else {
1164 ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1165 }
1166 LogFlush(immutable_db_options_.info_log);
1167 return s;
1168#endif // ROCKSDB_LITE
1169}
1170
1171// return the same level if it cannot be moved
11fdf7f2
TL
1172int DBImpl::FindMinimumEmptyLevelFitting(
1173 ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1174 int level) {
7c673cae
FG
1175 mutex_.AssertHeld();
1176 const auto* vstorage = cfd->current()->storage_info();
1177 int minimum_level = level;
1178 for (int i = level - 1; i > 0; --i) {
1179 // stop if level i is not empty
1180 if (vstorage->NumLevelFiles(i) > 0) break;
1181 // stop if level i is too small (cannot fit the level files)
1182 if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1183 break;
1184 }
1185
1186 minimum_level = i;
1187 }
1188 return minimum_level;
1189}
1190
11fdf7f2
TL
1191Status DBImpl::FlushWAL(bool sync) {
1192 if (manual_wal_flush_) {
20effc67 1193 IOStatus io_s;
f67539c2
TL
1194 {
1195 // We need to lock log_write_mutex_ since logs_ might change concurrently
1196 InstrumentedMutexLock wl(&log_write_mutex_);
1197 log::Writer* cur_log_writer = logs_.back().writer;
20effc67 1198 io_s = cur_log_writer->WriteBuffer();
f67539c2 1199 }
20effc67 1200 if (!io_s.ok()) {
11fdf7f2 1201 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
20effc67 1202 io_s.ToString().c_str());
11fdf7f2
TL
1203 // In case there is a fs error we should set it globally to prevent the
1204 // future writes
20effc67 1205 IOStatusCheck(io_s);
11fdf7f2 1206 // whether sync or not, we should abort the rest of function upon error
20effc67 1207 return std::move(io_s);
11fdf7f2
TL
1208 }
1209 if (!sync) {
1210 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
20effc67 1211 return std::move(io_s);
11fdf7f2
TL
1212 }
1213 }
1214 if (!sync) {
1215 return Status::OK();
1216 }
1217 // sync = true
1218 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1219 return SyncWAL();
1220}
1221
7c673cae
FG
1222Status DBImpl::SyncWAL() {
1223 autovector<log::Writer*, 1> logs_to_sync;
1224 bool need_log_dir_sync;
1225 uint64_t current_log_number;
1226
1227 {
1228 InstrumentedMutexLock l(&mutex_);
1229 assert(!logs_.empty());
1230
1231 // This SyncWAL() call only cares about logs up to this number.
1232 current_log_number = logfile_number_;
1233
1234 while (logs_.front().number <= current_log_number &&
1235 logs_.front().getting_synced) {
1236 log_sync_cv_.Wait();
1237 }
1238 // First check that logs are safe to sync in background.
1239 for (auto it = logs_.begin();
1240 it != logs_.end() && it->number <= current_log_number; ++it) {
1241 if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1242 return Status::NotSupported(
1243 "SyncWAL() is not supported for this implementation of WAL file",
1244 immutable_db_options_.allow_mmap_writes
1245 ? "try setting Options::allow_mmap_writes to false"
1246 : Slice());
1247 }
1248 }
1249 for (auto it = logs_.begin();
1250 it != logs_.end() && it->number <= current_log_number; ++it) {
1251 auto& log = *it;
1252 assert(!log.getting_synced);
1253 log.getting_synced = true;
1254 logs_to_sync.push_back(log.writer);
1255 }
1256
1257 need_log_dir_sync = !log_dir_synced_;
1258 }
1259
11fdf7f2 1260 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
7c673cae
FG
1261 RecordTick(stats_, WAL_FILE_SYNCED);
1262 Status status;
20effc67 1263 IOStatus io_s;
7c673cae 1264 for (log::Writer* log : logs_to_sync) {
20effc67
TL
1265 io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1266 if (!io_s.ok()) {
1267 status = io_s;
7c673cae
FG
1268 break;
1269 }
1270 }
20effc67
TL
1271 if (!io_s.ok()) {
1272 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
1273 io_s.ToString().c_str());
1274 // In case there is a fs error we should set it globally to prevent the
1275 // future writes
1276 IOStatusCheck(io_s);
1277 }
7c673cae 1278 if (status.ok() && need_log_dir_sync) {
20effc67 1279 status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
7c673cae 1280 }
11fdf7f2 1281 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
7c673cae
FG
1282
1283 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1284 {
1285 InstrumentedMutexLock l(&mutex_);
20effc67
TL
1286 if (status.ok()) {
1287 status = MarkLogsSynced(current_log_number, need_log_dir_sync);
1288 } else {
1289 MarkLogsNotSynced(current_log_number);
1290 }
7c673cae
FG
1291 }
1292 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1293
1294 return status;
1295}
1296
494da23a
TL
1297Status DBImpl::LockWAL() {
1298 log_write_mutex_.Lock();
1299 auto cur_log_writer = logs_.back().writer;
1300 auto status = cur_log_writer->WriteBuffer();
1301 if (!status.ok()) {
1302 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1303 status.ToString().c_str());
1304 // In case there is a fs error we should set it globally to prevent the
1305 // future writes
1306 WriteStatusCheck(status);
1307 }
20effc67 1308 return std::move(status);
494da23a
TL
1309}
1310
1311Status DBImpl::UnlockWAL() {
1312 log_write_mutex_.Unlock();
1313 return Status::OK();
1314}
1315
20effc67 1316Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
7c673cae 1317 mutex_.AssertHeld();
20effc67 1318 if (synced_dir && logfile_number_ == up_to) {
7c673cae
FG
1319 log_dir_synced_ = true;
1320 }
20effc67 1321 VersionEdit synced_wals;
7c673cae 1322 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
20effc67
TL
1323 auto& wal = *it;
1324 assert(wal.getting_synced);
1325 if (logs_.size() > 1) {
1326 if (immutable_db_options_.track_and_verify_wals_in_manifest) {
1327 synced_wals.AddWal(wal.number,
1328 WalMetadata(wal.writer->file()->GetFileSize()));
1329 }
1330 logs_to_free_.push_back(wal.ReleaseWriter());
11fdf7f2
TL
1331 // To modify logs_ both mutex_ and log_write_mutex_ must be held
1332 InstrumentedMutexLock l(&log_write_mutex_);
7c673cae
FG
1333 it = logs_.erase(it);
1334 } else {
20effc67 1335 wal.getting_synced = false;
7c673cae
FG
1336 ++it;
1337 }
1338 }
20effc67 1339 assert(logs_.empty() || logs_[0].number > up_to ||
7c673cae 1340 (logs_.size() == 1 && !logs_[0].getting_synced));
20effc67
TL
1341
1342 Status s;
1343 if (synced_wals.IsWalAddition()) {
1344 // not empty, write to MANIFEST.
1345 s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
1346 if (!s.ok() && versions_->io_status().IsIOError()) {
1347 s = error_handler_.SetBGError(versions_->io_status(),
1348 BackgroundErrorReason::kManifestWrite);
1349 }
1350 }
1351 log_sync_cv_.SignalAll();
1352 return s;
1353}
1354
1355void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
1356 mutex_.AssertHeld();
1357 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
1358 ++it) {
1359 auto& wal = *it;
1360 assert(wal.getting_synced);
1361 wal.getting_synced = false;
1362 }
7c673cae
FG
1363 log_sync_cv_.SignalAll();
1364}
1365
1366SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1367 return versions_->LastSequence();
1368}
1369
11fdf7f2
TL
1370void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1371 versions_->SetLastPublishedSequence(seq);
1372}
1373
1374bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1375 if (seqnum > preserve_deletes_seqnum_.load()) {
1376 preserve_deletes_seqnum_.store(seqnum);
1377 return true;
1378 } else {
1379 return false;
1380 }
1381}
1382
20effc67
TL
1383InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1384 Arena* arena,
1385 RangeDelAggregator* range_del_agg,
1386 SequenceNumber sequence,
1387 ColumnFamilyHandle* column_family,
1388 bool allow_unprepared_value) {
7c673cae
FG
1389 ColumnFamilyData* cfd;
1390 if (column_family == nullptr) {
1391 cfd = default_cf_handle_->cfd();
1392 } else {
20effc67 1393 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
1394 cfd = cfh->cfd();
1395 }
1396
1397 mutex_.Lock();
1398 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1399 mutex_.Unlock();
20effc67
TL
1400 return NewInternalIterator(read_options, cfd, super_version, arena,
1401 range_del_agg, sequence, allow_unprepared_value);
7c673cae
FG
1402}
1403
1404void DBImpl::SchedulePurge() {
1405 mutex_.AssertHeld();
1406 assert(opened_successfully_);
1407
1408 // Purge operations are put into High priority queue
1409 bg_purge_scheduled_++;
1410 env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1411}
1412
1413void DBImpl::BackgroundCallPurge() {
1414 mutex_.Lock();
1415
f67539c2
TL
1416 while (!logs_to_free_queue_.empty()) {
1417 assert(!logs_to_free_queue_.empty());
1418 log::Writer* log_writer = *(logs_to_free_queue_.begin());
1419 logs_to_free_queue_.pop_front();
1420 mutex_.Unlock();
1421 delete log_writer;
1422 mutex_.Lock();
1423 }
1424 while (!superversions_to_free_queue_.empty()) {
1425 assert(!superversions_to_free_queue_.empty());
1426 SuperVersion* sv = superversions_to_free_queue_.front();
1427 superversions_to_free_queue_.pop_front();
1428 mutex_.Unlock();
1429 delete sv;
1430 mutex_.Lock();
1431 }
7c673cae 1432
f67539c2
TL
1433 // Can't use iterator to go over purge_files_ because inside the loop we're
1434 // unlocking the mutex that protects purge_files_.
1435 while (!purge_files_.empty()) {
1436 auto it = purge_files_.begin();
1437 // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1438 PurgeFileInfo purge_file = it->second;
1439
1440 const std::string& fname = purge_file.fname;
1441 const std::string& dir_to_sync = purge_file.dir_to_sync;
1442 FileType type = purge_file.type;
1443 uint64_t number = purge_file.number;
1444 int job_id = purge_file.job_id;
1445
1446 purge_files_.erase(it);
1447
1448 mutex_.Unlock();
1449 DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1450 mutex_.Lock();
7c673cae 1451 }
f67539c2 1452
7c673cae
FG
1453 bg_purge_scheduled_--;
1454
1455 bg_cv_.SignalAll();
1456 // IMPORTANT:there should be no code after calling SignalAll. This call may
1457 // signal the DB destructor that it's OK to proceed with destruction. In
1458 // that case, all DB variables will be dealloacated and referencing them
1459 // will cause trouble.
1460 mutex_.Unlock();
1461}
1462
1463namespace {
1464struct IterState {
1465 IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1466 bool _background_purge)
1467 : db(_db),
1468 mu(_mu),
1469 super_version(_super_version),
1470 background_purge(_background_purge) {}
1471
1472 DBImpl* db;
1473 InstrumentedMutex* mu;
1474 SuperVersion* super_version;
1475 bool background_purge;
1476};
1477
11fdf7f2 1478static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
7c673cae
FG
1479 IterState* state = reinterpret_cast<IterState*>(arg1);
1480
1481 if (state->super_version->Unref()) {
1482 // Job id == 0 means that this is not our background process, but rather
1483 // user thread
1484 JobContext job_context(0);
1485
1486 state->mu->Lock();
1487 state->super_version->Cleanup();
1488 state->db->FindObsoleteFiles(&job_context, false, true);
1489 if (state->background_purge) {
1490 state->db->ScheduleBgLogWriterClose(&job_context);
f67539c2
TL
1491 state->db->AddSuperVersionsToFreeQueue(state->super_version);
1492 state->db->SchedulePurge();
7c673cae
FG
1493 }
1494 state->mu->Unlock();
1495
f67539c2
TL
1496 if (!state->background_purge) {
1497 delete state->super_version;
1498 }
7c673cae
FG
1499 if (job_context.HaveSomethingToDelete()) {
1500 if (state->background_purge) {
1501 // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1502 // files to be deleted to a job queue, and deletes it in a separate
1503 // background thread.
1504 state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1505 state->mu->Lock();
1506 state->db->SchedulePurge();
1507 state->mu->Unlock();
1508 } else {
1509 state->db->PurgeObsoleteFiles(job_context);
1510 }
1511 }
1512 job_context.Clean();
1513 }
1514
1515 delete state;
1516}
1517} // namespace
1518
494da23a
TL
1519InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1520 ColumnFamilyData* cfd,
1521 SuperVersion* super_version,
1522 Arena* arena,
1523 RangeDelAggregator* range_del_agg,
20effc67
TL
1524 SequenceNumber sequence,
1525 bool allow_unprepared_value) {
7c673cae
FG
1526 InternalIterator* internal_iter;
1527 assert(arena != nullptr);
1528 assert(range_del_agg != nullptr);
1529 // Need to create internal iterator from the arena.
1530 MergeIteratorBuilder merge_iter_builder(
1531 &cfd->internal_comparator(), arena,
1532 !read_options.total_order_seek &&
11fdf7f2 1533 super_version->mutable_cf_options.prefix_extractor != nullptr);
7c673cae
FG
1534 // Collect iterator for mutable mem
1535 merge_iter_builder.AddIterator(
1536 super_version->mem->NewIterator(read_options, arena));
494da23a 1537 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
7c673cae
FG
1538 Status s;
1539 if (!read_options.ignore_range_deletions) {
1540 range_del_iter.reset(
494da23a
TL
1541 super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1542 range_del_agg->AddTombstones(std::move(range_del_iter));
7c673cae
FG
1543 }
1544 // Collect all needed child iterators for immutable memtables
1545 if (s.ok()) {
1546 super_version->imm->AddIterators(read_options, &merge_iter_builder);
1547 if (!read_options.ignore_range_deletions) {
1548 s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1549 range_del_agg);
1550 }
1551 }
11fdf7f2 1552 TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
7c673cae
FG
1553 if (s.ok()) {
1554 // Collect iterators for files in L0 - Ln
1555 if (read_options.read_tier != kMemtableTier) {
f67539c2 1556 super_version->current->AddIterators(read_options, file_options_,
20effc67
TL
1557 &merge_iter_builder, range_del_agg,
1558 allow_unprepared_value);
7c673cae
FG
1559 }
1560 internal_iter = merge_iter_builder.Finish();
1561 IterState* cleanup =
1562 new IterState(this, &mutex_, super_version,
494da23a
TL
1563 read_options.background_purge_on_iterator_cleanup ||
1564 immutable_db_options_.avoid_unnecessary_blocking_io);
7c673cae
FG
1565 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1566
1567 return internal_iter;
11fdf7f2
TL
1568 } else {
1569 CleanupSuperVersion(super_version);
7c673cae 1570 }
11fdf7f2 1571 return NewErrorInternalIterator<Slice>(s, arena);
7c673cae
FG
1572}
1573
1574ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1575 return default_cf_handle_;
1576}
1577
f67539c2
TL
1578ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1579 return persist_stats_cf_handle_;
1580}
1581
7c673cae
FG
1582Status DBImpl::Get(const ReadOptions& read_options,
1583 ColumnFamilyHandle* column_family, const Slice& key,
1584 PinnableSlice* value) {
20effc67
TL
1585 return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
1586}
1587
1588Status DBImpl::Get(const ReadOptions& read_options,
1589 ColumnFamilyHandle* column_family, const Slice& key,
1590 PinnableSlice* value, std::string* timestamp) {
f67539c2
TL
1591 GetImplOptions get_impl_options;
1592 get_impl_options.column_family = column_family;
1593 get_impl_options.value = value;
20effc67
TL
1594 get_impl_options.timestamp = timestamp;
1595 Status s = GetImpl(read_options, key, get_impl_options);
1596 return s;
7c673cae
FG
1597}
1598
20effc67
TL
1599namespace {
1600class GetWithTimestampReadCallback : public ReadCallback {
1601 public:
1602 explicit GetWithTimestampReadCallback(SequenceNumber seq)
1603 : ReadCallback(seq) {}
1604 bool IsVisibleFullCheck(SequenceNumber seq) override {
1605 return seq <= max_visible_seq_;
1606 }
1607};
1608} // namespace
1609
f67539c2 1610Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
20effc67 1611 GetImplOptions& get_impl_options) {
f67539c2
TL
1612 assert(get_impl_options.value != nullptr ||
1613 get_impl_options.merge_operands != nullptr);
20effc67
TL
1614
1615 assert(get_impl_options.column_family);
1616 const Comparator* ucmp = get_impl_options.column_family->GetComparator();
1617 assert(ucmp);
1618 size_t ts_sz = ucmp->timestamp_size();
1619 GetWithTimestampReadCallback read_cb(0); // Will call Refresh
1620
1621#ifndef NDEBUG
1622 if (ts_sz > 0) {
1623 assert(read_options.timestamp);
1624 assert(read_options.timestamp->size() == ts_sz);
1625 } else {
1626 assert(!read_options.timestamp);
1627 }
1628#endif // NDEBUG
1629
494da23a 1630 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
7c673cae
FG
1631 StopWatch sw(env_, stats_, DB_GET);
1632 PERF_TIMER_GUARD(get_snapshot_time);
1633
20effc67
TL
1634 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
1635 get_impl_options.column_family);
7c673cae
FG
1636 auto cfd = cfh->cfd();
1637
11fdf7f2
TL
1638 if (tracer_) {
1639 // TODO: This mutex should be removed later, to improve performance when
1640 // tracing is enabled.
1641 InstrumentedMutexLock lock(&trace_mutex_);
1642 if (tracer_) {
20effc67
TL
1643 // TODO: maybe handle the tracing status?
1644 tracer_->Get(get_impl_options.column_family, key).PermitUncheckedError();
11fdf7f2
TL
1645 }
1646 }
1647
7c673cae
FG
1648 // Acquire SuperVersion
1649 SuperVersion* sv = GetAndRefSuperVersion(cfd);
1650
1651 TEST_SYNC_POINT("DBImpl::GetImpl:1");
1652 TEST_SYNC_POINT("DBImpl::GetImpl:2");
1653
1654 SequenceNumber snapshot;
1655 if (read_options.snapshot != nullptr) {
f67539c2
TL
1656 if (get_impl_options.callback) {
1657 // Already calculated based on read_options.snapshot
1658 snapshot = get_impl_options.callback->max_visible_seq();
1659 } else {
1660 snapshot =
1661 reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
11fdf7f2 1662 }
7c673cae 1663 } else {
f67539c2
TL
1664 // Note that the snapshot is assigned AFTER referencing the super
1665 // version because otherwise a flush happening in between may compact away
1666 // data for the snapshot, so the reader would see neither data that was be
1667 // visible to the snapshot before compaction nor the newer data inserted
1668 // afterwards.
20effc67
TL
1669 if (last_seq_same_as_publish_seq_) {
1670 snapshot = versions_->LastSequence();
1671 } else {
1672 snapshot = versions_->LastPublishedSequence();
1673 }
f67539c2
TL
1674 if (get_impl_options.callback) {
1675 // The unprep_seqs are not published for write unprepared, so it could be
1676 // that max_visible_seq is larger. Seek to the std::max of the two.
1677 // However, we still want our callback to contain the actual snapshot so
1678 // that it can do the correct visibility filtering.
1679 get_impl_options.callback->Refresh(snapshot);
1680
1681 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
1682 // max_visible_seq = max(max_visible_seq, snapshot)
1683 //
1684 // Currently, the commented out assert is broken by
1685 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
1686 // the regular transaction flow, then this special read callback would not
1687 // be needed.
1688 //
1689 // assert(callback->max_visible_seq() >= snapshot);
1690 snapshot = get_impl_options.callback->max_visible_seq();
1691 }
7c673cae 1692 }
20effc67
TL
1693 // If timestamp is used, we use read callback to ensure <key,t,s> is returned
1694 // only if t <= read_opts.timestamp and s <= snapshot.
1695 if (ts_sz > 0 && !get_impl_options.callback) {
1696 read_cb.Refresh(snapshot);
1697 get_impl_options.callback = &read_cb;
1698 }
7c673cae
FG
1699 TEST_SYNC_POINT("DBImpl::GetImpl:3");
1700 TEST_SYNC_POINT("DBImpl::GetImpl:4");
1701
1702 // Prepare to store a list of merge operations if merge occurs.
1703 MergeContext merge_context;
494da23a 1704 SequenceNumber max_covering_tombstone_seq = 0;
7c673cae
FG
1705
1706 Status s;
1707 // First look in the memtable, then in the immutable memtable (if any).
1708 // s is both in/out. When in, s could either be OK or MergeInProgress.
1709 // merge_operands will contain the sequence of merges in the latter case.
f67539c2 1710 LookupKey lkey(key, snapshot, read_options.timestamp);
7c673cae
FG
1711 PERF_TIMER_STOP(get_snapshot_time);
1712
1713 bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1714 has_unpersisted_data_.load(std::memory_order_relaxed));
1715 bool done = false;
20effc67 1716 std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
7c673cae 1717 if (!skip_memtable) {
f67539c2
TL
1718 // Get value associated with key
1719 if (get_impl_options.get_value) {
20effc67 1720 if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
f67539c2
TL
1721 &merge_context, &max_covering_tombstone_seq,
1722 read_options, get_impl_options.callback,
1723 get_impl_options.is_blob_index)) {
1724 done = true;
1725 get_impl_options.value->PinSelf();
1726 RecordTick(stats_, MEMTABLE_HIT);
1727 } else if ((s.ok() || s.IsMergeInProgress()) &&
20effc67
TL
1728 sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
1729 timestamp, &s, &merge_context,
1730 &max_covering_tombstone_seq, read_options,
1731 get_impl_options.callback,
f67539c2
TL
1732 get_impl_options.is_blob_index)) {
1733 done = true;
1734 get_impl_options.value->PinSelf();
1735 RecordTick(stats_, MEMTABLE_HIT);
1736 }
1737 } else {
1738 // Get Merge Operands associated with key, Merge Operands should not be
1739 // merged and raw values should be returned to the user.
20effc67
TL
1740 if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
1741 &merge_context, &max_covering_tombstone_seq,
1742 read_options, nullptr, nullptr, false)) {
f67539c2
TL
1743 done = true;
1744 RecordTick(stats_, MEMTABLE_HIT);
1745 } else if ((s.ok() || s.IsMergeInProgress()) &&
1746 sv->imm->GetMergeOperands(lkey, &s, &merge_context,
1747 &max_covering_tombstone_seq,
1748 read_options)) {
1749 done = true;
1750 RecordTick(stats_, MEMTABLE_HIT);
1751 }
7c673cae
FG
1752 }
1753 if (!done && !s.ok() && !s.IsMergeInProgress()) {
11fdf7f2 1754 ReturnAndCleanupSuperVersion(cfd, sv);
7c673cae
FG
1755 return s;
1756 }
1757 }
1758 if (!done) {
1759 PERF_TIMER_GUARD(get_from_output_files_time);
f67539c2 1760 sv->current->Get(
20effc67
TL
1761 read_options, lkey, get_impl_options.value, timestamp, &s,
1762 &merge_context, &max_covering_tombstone_seq,
f67539c2
TL
1763 get_impl_options.get_value ? get_impl_options.value_found : nullptr,
1764 nullptr, nullptr,
1765 get_impl_options.get_value ? get_impl_options.callback : nullptr,
1766 get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
1767 get_impl_options.get_value);
7c673cae
FG
1768 RecordTick(stats_, MEMTABLE_MISS);
1769 }
1770
1771 {
1772 PERF_TIMER_GUARD(get_post_process_time);
1773
1774 ReturnAndCleanupSuperVersion(cfd, sv);
1775
1776 RecordTick(stats_, NUMBER_KEYS_READ);
494da23a
TL
1777 size_t size = 0;
1778 if (s.ok()) {
f67539c2
TL
1779 if (get_impl_options.get_value) {
1780 size = get_impl_options.value->size();
1781 } else {
1782 // Return all merge operands for get_impl_options.key
1783 *get_impl_options.number_of_operands =
1784 static_cast<int>(merge_context.GetNumOperands());
1785 if (*get_impl_options.number_of_operands >
1786 get_impl_options.get_merge_operands_options
1787 ->expected_max_number_of_operands) {
1788 s = Status::Incomplete(
1789 Status::SubCode::KMergeOperandsInsufficientCapacity);
1790 } else {
1791 for (const Slice& sl : merge_context.GetOperands()) {
1792 size += sl.size();
1793 get_impl_options.merge_operands->PinSelf(sl);
1794 get_impl_options.merge_operands++;
1795 }
1796 }
1797 }
494da23a
TL
1798 RecordTick(stats_, BYTES_READ, size);
1799 PERF_COUNTER_ADD(get_read_bytes, size);
1800 }
1801 RecordInHistogram(stats_, BYTES_PER_READ, size);
7c673cae
FG
1802 }
1803 return s;
1804}
1805
1806std::vector<Status> DBImpl::MultiGet(
1807 const ReadOptions& read_options,
1808 const std::vector<ColumnFamilyHandle*>& column_family,
1809 const std::vector<Slice>& keys, std::vector<std::string>* values) {
20effc67
TL
1810 return MultiGet(read_options, column_family, keys, values,
1811 /*timestamps=*/nullptr);
1812}
1813
1814std::vector<Status> DBImpl::MultiGet(
1815 const ReadOptions& read_options,
1816 const std::vector<ColumnFamilyHandle*>& column_family,
1817 const std::vector<Slice>& keys, std::vector<std::string>* values,
1818 std::vector<std::string>* timestamps) {
494da23a 1819 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
7c673cae
FG
1820 StopWatch sw(env_, stats_, DB_MULTIGET);
1821 PERF_TIMER_GUARD(get_snapshot_time);
1822
20effc67
TL
1823#ifndef NDEBUG
1824 for (const auto* cfh : column_family) {
1825 assert(cfh);
1826 const Comparator* const ucmp = cfh->GetComparator();
1827 assert(ucmp);
1828 if (ucmp->timestamp_size() > 0) {
1829 assert(read_options.timestamp);
1830 assert(ucmp->timestamp_size() == read_options.timestamp->size());
1831 } else {
1832 assert(!read_options.timestamp);
1833 }
1834 }
1835#endif // NDEBUG
1836
f67539c2 1837 SequenceNumber consistent_seqnum;
7c673cae 1838
494da23a
TL
1839 std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1840 column_family.size());
7c673cae 1841 for (auto cf : column_family) {
20effc67 1842 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
7c673cae
FG
1843 auto cfd = cfh->cfd();
1844 if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
494da23a 1845 multiget_cf_data.emplace(cfd->GetID(),
f67539c2 1846 MultiGetColumnFamilyData(cfh, nullptr));
7c673cae
FG
1847 }
1848 }
1849
f67539c2
TL
1850 std::function<MultiGetColumnFamilyData*(
1851 std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
1852 iter_deref_lambda =
1853 [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
1854 cf_iter) { return &cf_iter->second; };
1855
1856 bool unref_only =
1857 MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
1858 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1859 &consistent_seqnum);
1860
20effc67
TL
1861 TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum1");
1862 TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum2");
1863
f67539c2
TL
1864 // Contain a list of merge operations if merge occurs.
1865 MergeContext merge_context;
1866
1867 // Note: this always resizes the values array
1868 size_t num_keys = keys.size();
1869 std::vector<Status> stat_list(num_keys);
1870 values->resize(num_keys);
20effc67
TL
1871 if (timestamps) {
1872 timestamps->resize(num_keys);
1873 }
f67539c2
TL
1874
1875 // Keep track of bytes that we read for statistics-recording later
1876 uint64_t bytes_read = 0;
1877 PERF_TIMER_STOP(get_snapshot_time);
1878
1879 // For each of the given keys, apply the entire "get" process as follows:
1880 // First look in the memtable, then in the immutable memtable (if any).
1881 // s is both in/out. When in, s could either be OK or MergeInProgress.
1882 // merge_operands will contain the sequence of merges in the latter case.
1883 size_t num_found = 0;
20effc67
TL
1884 size_t keys_read;
1885 uint64_t curr_value_size = 0;
1886
1887 GetWithTimestampReadCallback timestamp_read_callback(0);
1888 ReadCallback* read_callback = nullptr;
1889 if (read_options.timestamp && read_options.timestamp->size() > 0) {
1890 timestamp_read_callback.Refresh(consistent_seqnum);
1891 read_callback = &timestamp_read_callback;
1892 }
1893
1894 for (keys_read = 0; keys_read < num_keys; ++keys_read) {
f67539c2 1895 merge_context.Clear();
20effc67
TL
1896 Status& s = stat_list[keys_read];
1897 std::string* value = &(*values)[keys_read];
1898 std::string* timestamp = timestamps ? &(*timestamps)[keys_read] : nullptr;
f67539c2 1899
20effc67
TL
1900 LookupKey lkey(keys[keys_read], consistent_seqnum, read_options.timestamp);
1901 auto cfh =
1902 static_cast_with_check<ColumnFamilyHandleImpl>(column_family[keys_read]);
f67539c2
TL
1903 SequenceNumber max_covering_tombstone_seq = 0;
1904 auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1905 assert(mgd_iter != multiget_cf_data.end());
1906 auto mgd = mgd_iter->second;
1907 auto super_version = mgd.super_version;
1908 bool skip_memtable =
1909 (read_options.read_tier == kPersistedTier &&
1910 has_unpersisted_data_.load(std::memory_order_relaxed));
1911 bool done = false;
1912 if (!skip_memtable) {
20effc67
TL
1913 if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
1914 &max_covering_tombstone_seq, read_options,
1915 read_callback)) {
f67539c2
TL
1916 done = true;
1917 RecordTick(stats_, MEMTABLE_HIT);
20effc67
TL
1918 } else if (super_version->imm->Get(lkey, value, timestamp, &s,
1919 &merge_context,
f67539c2 1920 &max_covering_tombstone_seq,
20effc67 1921 read_options, read_callback)) {
f67539c2
TL
1922 done = true;
1923 RecordTick(stats_, MEMTABLE_HIT);
1924 }
1925 }
1926 if (!done) {
1927 PinnableSlice pinnable_val;
1928 PERF_TIMER_GUARD(get_from_output_files_time);
20effc67
TL
1929 super_version->current->Get(
1930 read_options, lkey, &pinnable_val, timestamp, &s, &merge_context,
1931 &max_covering_tombstone_seq, /*value_found=*/nullptr,
1932 /*key_exists=*/nullptr,
1933 /*seq=*/nullptr, read_callback);
f67539c2
TL
1934 value->assign(pinnable_val.data(), pinnable_val.size());
1935 RecordTick(stats_, MEMTABLE_MISS);
1936 }
1937
1938 if (s.ok()) {
1939 bytes_read += value->size();
1940 num_found++;
20effc67
TL
1941 curr_value_size += value->size();
1942 if (curr_value_size > read_options.value_size_soft_limit) {
1943 while (++keys_read < num_keys) {
1944 stat_list[keys_read] = Status::Aborted();
1945 }
1946 break;
1947 }
1948 }
1949
1950 if (read_options.deadline.count() &&
1951 env_->NowMicros() >
1952 static_cast<uint64_t>(read_options.deadline.count())) {
1953 break;
1954 }
1955 }
1956
1957 if (keys_read < num_keys) {
1958 // The only reason to break out of the loop is when the deadline is
1959 // exceeded
1960 assert(env_->NowMicros() >
1961 static_cast<uint64_t>(read_options.deadline.count()));
1962 for (++keys_read; keys_read < num_keys; ++keys_read) {
1963 stat_list[keys_read] = Status::TimedOut();
f67539c2
TL
1964 }
1965 }
1966
1967 // Post processing (decrement reference counts and record statistics)
1968 PERF_TIMER_GUARD(get_post_process_time);
1969 autovector<SuperVersion*> superversions_to_delete;
1970
1971 for (auto mgd_iter : multiget_cf_data) {
1972 auto mgd = mgd_iter.second;
1973 if (!unref_only) {
1974 ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
1975 } else {
1976 mgd.cfd->GetSuperVersion()->Unref();
1977 }
1978 }
1979 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
1980 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
1981 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
1982 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
1983 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
1984 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
1985 PERF_TIMER_STOP(get_post_process_time);
1986
1987 return stat_list;
1988}
1989
1990template <class T>
1991bool DBImpl::MultiCFSnapshot(
1992 const ReadOptions& read_options, ReadCallback* callback,
1993 std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
1994 iter_deref_func,
1995 T* cf_list, SequenceNumber* snapshot) {
1996 PERF_TIMER_GUARD(get_snapshot_time);
1997
494da23a 1998 bool last_try = false;
f67539c2
TL
1999 if (cf_list->size() == 1) {
2000 // Fast path for a single column family. We can simply get the thread loca
2001 // super version
2002 auto cf_iter = cf_list->begin();
2003 auto node = iter_deref_func(cf_iter);
2004 node->super_version = GetAndRefSuperVersion(node->cfd);
2005 if (read_options.snapshot != nullptr) {
2006 // Note: In WritePrepared txns this is not necessary but not harmful
2007 // either. Because prep_seq > snapshot => commit_seq > snapshot so if
2008 // a snapshot is specified we should be fine with skipping seq numbers
2009 // that are greater than that.
2010 //
2011 // In WriteUnprepared, we cannot set snapshot in the lookup key because we
2012 // may skip uncommitted data that should be visible to the transaction for
2013 // reading own writes.
2014 *snapshot =
2015 static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
2016 if (callback) {
2017 *snapshot = std::max(*snapshot, callback->max_visible_seq());
2018 }
2019 } else {
2020 // Since we get and reference the super version before getting
2021 // the snapshot number, without a mutex protection, it is possible
2022 // that a memtable switch happened in the middle and not all the
2023 // data for this snapshot is available. But it will contain all
2024 // the data available in the super version we have, which is also
2025 // a valid snapshot to read from.
2026 // We shouldn't get snapshot before finding and referencing the super
2027 // version because a flush happening in between may compact away data for
2028 // the snapshot, but the snapshot is earlier than the data overwriting it,
2029 // so users may see wrong results.
20effc67
TL
2030 if (last_seq_same_as_publish_seq_) {
2031 *snapshot = versions_->LastSequence();
2032 } else {
2033 *snapshot = versions_->LastPublishedSequence();
2034 }
f67539c2
TL
2035 }
2036 } else {
494da23a
TL
2037 // If we end up with the same issue of memtable geting sealed during 2
2038 // consecutive retries, it means the write rate is very high. In that case
2039 // its probably ok to take the mutex on the 3rd try so we can succeed for
2040 // sure
2041 static const int num_retries = 3;
f67539c2 2042 for (int i = 0; i < num_retries; ++i) {
494da23a
TL
2043 last_try = (i == num_retries - 1);
2044 bool retry = false;
2045
2046 if (i > 0) {
f67539c2
TL
2047 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2048 ++cf_iter) {
2049 auto node = iter_deref_func(cf_iter);
2050 SuperVersion* super_version = node->super_version;
2051 ColumnFamilyData* cfd = node->cfd;
494da23a
TL
2052 if (super_version != nullptr) {
2053 ReturnAndCleanupSuperVersion(cfd, super_version);
2054 }
f67539c2 2055 node->super_version = nullptr;
494da23a
TL
2056 }
2057 }
494da23a
TL
2058 if (read_options.snapshot == nullptr) {
2059 if (last_try) {
2060 TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
2061 // We're close to max number of retries. For the last retry,
2062 // acquire the lock so we're sure to succeed
2063 mutex_.Lock();
2064 }
20effc67
TL
2065 if (last_seq_same_as_publish_seq_) {
2066 *snapshot = versions_->LastSequence();
2067 } else {
2068 *snapshot = versions_->LastPublishedSequence();
2069 }
494da23a 2070 } else {
f67539c2
TL
2071 *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
2072 ->number_;
494da23a 2073 }
f67539c2
TL
2074 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2075 ++cf_iter) {
2076 auto node = iter_deref_func(cf_iter);
494da23a 2077 if (!last_try) {
f67539c2 2078 node->super_version = GetAndRefSuperVersion(node->cfd);
494da23a 2079 } else {
f67539c2 2080 node->super_version = node->cfd->GetSuperVersion()->Ref();
494da23a
TL
2081 }
2082 TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
2083 if (read_options.snapshot != nullptr || last_try) {
2084 // If user passed a snapshot, then we don't care if a memtable is
2085 // sealed or compaction happens because the snapshot would ensure
2086 // that older key versions are kept around. If this is the last
2087 // retry, then we have the lock so nothing bad can happen
2088 continue;
2089 }
2090 // We could get the earliest sequence number for the whole list of
2091 // memtables, which will include immutable memtables as well, but that
2092 // might be tricky to maintain in case we decide, in future, to do
2093 // memtable compaction.
2094 if (!last_try) {
f67539c2
TL
2095 SequenceNumber seq =
2096 node->super_version->mem->GetEarliestSequenceNumber();
2097 if (seq > *snapshot) {
494da23a
TL
2098 retry = true;
2099 break;
2100 }
2101 }
2102 }
2103 if (!retry) {
2104 if (last_try) {
2105 mutex_.Unlock();
2106 }
2107 break;
2108 }
2109 }
7c673cae 2110 }
7c673cae 2111
7c673cae 2112 // Keep track of bytes that we read for statistics-recording later
7c673cae
FG
2113 PERF_TIMER_STOP(get_snapshot_time);
2114
f67539c2
TL
2115 return last_try;
2116}
2117
2118void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2119 ColumnFamilyHandle** column_families, const Slice* keys,
2120 PinnableSlice* values, Status* statuses,
2121 const bool sorted_input) {
20effc67
TL
2122 return MultiGet(read_options, num_keys, column_families, keys, values,
2123 /*timestamps=*/nullptr, statuses, sorted_input);
2124}
2125
2126void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2127 ColumnFamilyHandle** column_families, const Slice* keys,
2128 PinnableSlice* values, std::string* timestamps,
2129 Status* statuses, const bool sorted_input) {
f67539c2
TL
2130 if (num_keys == 0) {
2131 return;
2132 }
20effc67
TL
2133
2134#ifndef NDEBUG
2135 for (size_t i = 0; i < num_keys; ++i) {
2136 ColumnFamilyHandle* cfh = column_families[i];
2137 assert(cfh);
2138 const Comparator* const ucmp = cfh->GetComparator();
2139 assert(ucmp);
2140 if (ucmp->timestamp_size() > 0) {
2141 assert(read_options.timestamp);
2142 assert(read_options.timestamp->size() == ucmp->timestamp_size());
2143 } else {
2144 assert(!read_options.timestamp);
2145 }
2146 }
2147#endif // NDEBUG
2148
f67539c2
TL
2149 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2150 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2151 sorted_keys.resize(num_keys);
2152 for (size_t i = 0; i < num_keys; ++i) {
2153 key_context.emplace_back(column_families[i], keys[i], &values[i],
20effc67 2154 timestamps ? &timestamps[i] : nullptr,
f67539c2
TL
2155 &statuses[i]);
2156 }
2157 for (size_t i = 0; i < num_keys; ++i) {
2158 sorted_keys[i] = &key_context[i];
2159 }
2160 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2161
2162 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
2163 multiget_cf_data;
2164 size_t cf_start = 0;
2165 ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
2166 for (size_t i = 0; i < num_keys; ++i) {
2167 KeyContext* key_ctx = sorted_keys[i];
2168 if (key_ctx->column_family != cf) {
2169 multiget_cf_data.emplace_back(
2170 MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
2171 cf_start = i;
2172 cf = key_ctx->column_family;
2173 }
2174 }
2175 {
2176 // multiget_cf_data.emplace_back(
2177 // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
2178 multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
2179 }
2180 std::function<MultiGetColumnFamilyData*(
2181 autovector<MultiGetColumnFamilyData,
2182 MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
2183 iter_deref_lambda =
2184 [](autovector<MultiGetColumnFamilyData,
2185 MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
2186 return &(*cf_iter);
2187 };
2188
2189 SequenceNumber consistent_seqnum;
2190 bool unref_only = MultiCFSnapshot<
2191 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
2192 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
2193 &consistent_seqnum);
2194
20effc67
TL
2195 GetWithTimestampReadCallback timestamp_read_callback(0);
2196 ReadCallback* read_callback = nullptr;
2197 if (read_options.timestamp && read_options.timestamp->size() > 0) {
2198 timestamp_read_callback.Refresh(consistent_seqnum);
2199 read_callback = &timestamp_read_callback;
2200 }
2201
2202 Status s;
2203 auto cf_iter = multiget_cf_data.begin();
2204 for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2205 s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
2206 &sorted_keys, cf_iter->super_version, consistent_seqnum,
2207 read_callback, nullptr);
2208 if (!s.ok()) {
2209 break;
2210 }
2211 }
2212 if (!s.ok()) {
2213 assert(s.IsTimedOut() || s.IsAborted());
2214 for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2215 for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
2216 ++i) {
2217 *sorted_keys[i]->s = s;
2218 }
2219 }
2220 }
2221
2222 for (const auto& iter : multiget_cf_data) {
f67539c2 2223 if (!unref_only) {
20effc67 2224 ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version);
f67539c2 2225 } else {
20effc67 2226 iter.cfd->GetSuperVersion()->Unref();
f67539c2
TL
2227 }
2228 }
2229}
2230
2231namespace {
2232// Order keys by CF ID, followed by key contents
2233struct CompareKeyContext {
2234 inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
2235 ColumnFamilyHandleImpl* cfh =
2236 static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2237 uint32_t cfd_id1 = cfh->cfd()->GetID();
2238 const Comparator* comparator = cfh->cfd()->user_comparator();
2239 cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2240 uint32_t cfd_id2 = cfh->cfd()->GetID();
2241
2242 if (cfd_id1 < cfd_id2) {
2243 return true;
2244 } else if (cfd_id1 > cfd_id2) {
2245 return false;
2246 }
2247
2248 // Both keys are from the same column family
20effc67
TL
2249 int cmp = comparator->CompareWithoutTimestamp(
2250 *(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
f67539c2
TL
2251 if (cmp < 0) {
2252 return true;
2253 }
2254 return false;
2255 }
2256};
2257
2258} // anonymous namespace
2259
2260void DBImpl::PrepareMultiGetKeys(
2261 size_t num_keys, bool sorted_input,
2262 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2263#ifndef NDEBUG
2264 if (sorted_input) {
2265 for (size_t index = 0; index < sorted_keys->size(); ++index) {
2266 if (index > 0) {
2267 KeyContext* lhs = (*sorted_keys)[index - 1];
2268 KeyContext* rhs = (*sorted_keys)[index];
2269 ColumnFamilyHandleImpl* cfh =
20effc67 2270 static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
f67539c2
TL
2271 uint32_t cfd_id1 = cfh->cfd()->GetID();
2272 const Comparator* comparator = cfh->cfd()->user_comparator();
20effc67
TL
2273 cfh =
2274 static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
f67539c2
TL
2275 uint32_t cfd_id2 = cfh->cfd()->GetID();
2276
2277 assert(cfd_id1 <= cfd_id2);
2278 if (cfd_id1 < cfd_id2) {
2279 continue;
2280 }
2281
2282 // Both keys are from the same column family
20effc67
TL
2283 int cmp = comparator->CompareWithoutTimestamp(
2284 *(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
f67539c2
TL
2285 assert(cmp <= 0);
2286 }
2287 index++;
2288 }
2289 }
2290#endif
2291 if (!sorted_input) {
2292 CompareKeyContext sort_comparator;
2293 std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2294 sort_comparator);
2295 }
2296}
2297
2298void DBImpl::MultiGet(const ReadOptions& read_options,
2299 ColumnFamilyHandle* column_family, const size_t num_keys,
2300 const Slice* keys, PinnableSlice* values,
2301 Status* statuses, const bool sorted_input) {
20effc67
TL
2302 return MultiGet(read_options, column_family, num_keys, keys, values,
2303 /*timestamp=*/nullptr, statuses, sorted_input);
2304}
2305
2306void DBImpl::MultiGet(const ReadOptions& read_options,
2307 ColumnFamilyHandle* column_family, const size_t num_keys,
2308 const Slice* keys, PinnableSlice* values,
2309 std::string* timestamps, Status* statuses,
2310 const bool sorted_input) {
f67539c2
TL
2311 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2312 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2313 sorted_keys.resize(num_keys);
2314 for (size_t i = 0; i < num_keys; ++i) {
20effc67
TL
2315 key_context.emplace_back(column_family, keys[i], &values[i],
2316 timestamps ? &timestamps[i] : nullptr,
2317 &statuses[i]);
f67539c2
TL
2318 }
2319 for (size_t i = 0; i < num_keys; ++i) {
2320 sorted_keys[i] = &key_context[i];
2321 }
2322 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2323 MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2324}
2325
2326void DBImpl::MultiGetWithCallback(
2327 const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2328 ReadCallback* callback,
2329 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2330 std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2331 multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2332 std::function<MultiGetColumnFamilyData*(
2333 std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2334 iter_deref_lambda =
2335 [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2336 return &(*cf_iter);
2337 };
2338
2339 size_t num_keys = sorted_keys->size();
2340 SequenceNumber consistent_seqnum;
2341 bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2342 read_options, callback, iter_deref_lambda, &multiget_cf_data,
2343 &consistent_seqnum);
2344#ifndef NDEBUG
2345 assert(!unref_only);
2346#else
2347 // Silence unused variable warning
2348 (void)unref_only;
2349#endif // NDEBUG
2350
2351 if (callback && read_options.snapshot == nullptr) {
2352 // The unprep_seqs are not published for write unprepared, so it could be
2353 // that max_visible_seq is larger. Seek to the std::max of the two.
2354 // However, we still want our callback to contain the actual snapshot so
2355 // that it can do the correct visibility filtering.
2356 callback->Refresh(consistent_seqnum);
2357
2358 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2359 // max_visible_seq = max(max_visible_seq, snapshot)
2360 //
2361 // Currently, the commented out assert is broken by
2362 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2363 // the regular transaction flow, then this special read callback would not
2364 // be needed.
2365 //
2366 // assert(callback->max_visible_seq() >= snapshot);
2367 consistent_seqnum = callback->max_visible_seq();
2368 }
2369
20effc67
TL
2370 GetWithTimestampReadCallback timestamp_read_callback(0);
2371 ReadCallback* read_callback = nullptr;
2372 if (read_options.timestamp && read_options.timestamp->size() > 0) {
2373 timestamp_read_callback.Refresh(consistent_seqnum);
2374 read_callback = &timestamp_read_callback;
2375 }
2376
2377 Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2378 multiget_cf_data[0].super_version, consistent_seqnum,
2379 read_callback, nullptr);
2380 assert(s.ok() || s.IsTimedOut() || s.IsAborted());
f67539c2
TL
2381 ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2382 multiget_cf_data[0].super_version);
2383}
2384
20effc67
TL
2385// The actual implementation of batched MultiGet. Parameters -
2386// start_key - Index in the sorted_keys vector to start processing from
2387// num_keys - Number of keys to lookup, starting with sorted_keys[start_key]
2388// sorted_keys - The entire batch of sorted keys for this CF
2389//
2390// The per key status is returned in the KeyContext structures pointed to by
2391// sorted_keys. An overall Status is also returned, with the only possible
2392// values being Status::OK() and Status::TimedOut(). The latter indicates
2393// that the call exceeded read_options.deadline
2394Status DBImpl::MultiGetImpl(
f67539c2
TL
2395 const ReadOptions& read_options, size_t start_key, size_t num_keys,
2396 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2397 SuperVersion* super_version, SequenceNumber snapshot,
2398 ReadCallback* callback, bool* is_blob_index) {
2399 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
2400 StopWatch sw(env_, stats_, DB_MULTIGET);
2401
7c673cae
FG
2402 // For each of the given keys, apply the entire "get" process as follows:
2403 // First look in the memtable, then in the immutable memtable (if any).
2404 // s is both in/out. When in, s could either be OK or MergeInProgress.
2405 // merge_operands will contain the sequence of merges in the latter case.
f67539c2 2406 size_t keys_left = num_keys;
20effc67
TL
2407 Status s;
2408 uint64_t curr_value_size = 0;
f67539c2 2409 while (keys_left) {
20effc67
TL
2410 if (read_options.deadline.count() &&
2411 env_->NowMicros() >
2412 static_cast<uint64_t>(read_options.deadline.count())) {
2413 s = Status::TimedOut();
2414 break;
2415 }
2416
f67539c2
TL
2417 size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2418 ? MultiGetContext::MAX_BATCH_SIZE
2419 : keys_left;
2420 MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
20effc67 2421 batch_size, snapshot, read_options);
f67539c2 2422 MultiGetRange range = ctx.GetMultiGetRange();
20effc67 2423 range.AddValueSize(curr_value_size);
f67539c2
TL
2424 bool lookup_current = false;
2425
2426 keys_left -= batch_size;
2427 for (auto mget_iter = range.begin(); mget_iter != range.end();
2428 ++mget_iter) {
2429 mget_iter->merge_context.Clear();
2430 *mget_iter->s = Status::OK();
2431 }
7c673cae 2432
7c673cae
FG
2433 bool skip_memtable =
2434 (read_options.read_tier == kPersistedTier &&
2435 has_unpersisted_data_.load(std::memory_order_relaxed));
7c673cae 2436 if (!skip_memtable) {
f67539c2
TL
2437 super_version->mem->MultiGet(read_options, &range, callback,
2438 is_blob_index);
2439 if (!range.empty()) {
2440 super_version->imm->MultiGet(read_options, &range, callback,
2441 is_blob_index);
2442 }
2443 if (!range.empty()) {
2444 lookup_current = true;
2445 uint64_t left = range.KeysLeft();
2446 RecordTick(stats_, MEMTABLE_MISS, left);
7c673cae
FG
2447 }
2448 }
f67539c2 2449 if (lookup_current) {
7c673cae 2450 PERF_TIMER_GUARD(get_from_output_files_time);
f67539c2
TL
2451 super_version->current->MultiGet(read_options, &range, callback,
2452 is_blob_index);
7c673cae 2453 }
20effc67
TL
2454 curr_value_size = range.GetValueSize();
2455 if (curr_value_size > read_options.value_size_soft_limit) {
2456 s = Status::Aborted();
2457 break;
2458 }
7c673cae
FG
2459 }
2460
2461 // Post processing (decrement reference counts and record statistics)
2462 PERF_TIMER_GUARD(get_post_process_time);
f67539c2
TL
2463 size_t num_found = 0;
2464 uint64_t bytes_read = 0;
20effc67 2465 for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
f67539c2
TL
2466 KeyContext* key = (*sorted_keys)[i];
2467 if (key->s->ok()) {
2468 bytes_read += key->value->size();
2469 num_found++;
7c673cae
FG
2470 }
2471 }
20effc67
TL
2472 if (keys_left) {
2473 assert(s.IsTimedOut() || s.IsAborted());
2474 for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
2475 ++i) {
2476 KeyContext* key = (*sorted_keys)[i];
2477 *key->s = s;
2478 }
2479 }
f67539c2 2480
7c673cae
FG
2481 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2482 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
11fdf7f2 2483 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
7c673cae 2484 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
494da23a 2485 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
11fdf7f2 2486 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
7c673cae 2487 PERF_TIMER_STOP(get_post_process_time);
20effc67
TL
2488
2489 return s;
7c673cae
FG
2490}
2491
2492Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
11fdf7f2 2493 const std::string& column_family,
7c673cae 2494 ColumnFamilyHandle** handle) {
11fdf7f2
TL
2495 assert(handle != nullptr);
2496 Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2497 if (s.ok()) {
2498 s = WriteOptionsFile(true /*need_mutex_lock*/,
2499 true /*need_enter_write_thread*/);
2500 }
2501 return s;
2502}
2503
2504Status DBImpl::CreateColumnFamilies(
2505 const ColumnFamilyOptions& cf_options,
2506 const std::vector<std::string>& column_family_names,
2507 std::vector<ColumnFamilyHandle*>* handles) {
2508 assert(handles != nullptr);
2509 handles->clear();
2510 size_t num_cf = column_family_names.size();
2511 Status s;
2512 bool success_once = false;
2513 for (size_t i = 0; i < num_cf; i++) {
2514 ColumnFamilyHandle* handle;
2515 s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2516 if (!s.ok()) {
2517 break;
2518 }
2519 handles->push_back(handle);
2520 success_once = true;
2521 }
2522 if (success_once) {
2523 Status persist_options_status = WriteOptionsFile(
2524 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2525 if (s.ok() && !persist_options_status.ok()) {
2526 s = persist_options_status;
2527 }
2528 }
2529 return s;
2530}
2531
2532Status DBImpl::CreateColumnFamilies(
2533 const std::vector<ColumnFamilyDescriptor>& column_families,
2534 std::vector<ColumnFamilyHandle*>* handles) {
2535 assert(handles != nullptr);
2536 handles->clear();
2537 size_t num_cf = column_families.size();
2538 Status s;
2539 bool success_once = false;
2540 for (size_t i = 0; i < num_cf; i++) {
2541 ColumnFamilyHandle* handle;
2542 s = CreateColumnFamilyImpl(column_families[i].options,
2543 column_families[i].name, &handle);
2544 if (!s.ok()) {
2545 break;
2546 }
2547 handles->push_back(handle);
2548 success_once = true;
2549 }
2550 if (success_once) {
2551 Status persist_options_status = WriteOptionsFile(
2552 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2553 if (s.ok() && !persist_options_status.ok()) {
2554 s = persist_options_status;
2555 }
2556 }
2557 return s;
2558}
2559
2560Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
2561 const std::string& column_family_name,
2562 ColumnFamilyHandle** handle) {
7c673cae 2563 Status s;
7c673cae
FG
2564 *handle = nullptr;
2565
f67539c2
TL
2566 DBOptions db_options =
2567 BuildDBOptions(immutable_db_options_, mutable_db_options_);
2568 s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
11fdf7f2
TL
2569 if (s.ok()) {
2570 for (auto& cf_path : cf_options.cf_paths) {
2571 s = env_->CreateDirIfMissing(cf_path.path);
2572 if (!s.ok()) {
2573 break;
2574 }
2575 }
2576 }
7c673cae
FG
2577 if (!s.ok()) {
2578 return s;
2579 }
2580
11fdf7f2 2581 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae
FG
2582 {
2583 InstrumentedMutexLock l(&mutex_);
2584
2585 if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
2586 nullptr) {
2587 return Status::InvalidArgument("Column family already exists");
2588 }
2589 VersionEdit edit;
2590 edit.AddColumnFamily(column_family_name);
2591 uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2592 edit.SetColumnFamily(new_id);
2593 edit.SetLogNumber(logfile_number_);
2594 edit.SetComparatorName(cf_options.comparator->Name());
2595
2596 // LogAndApply will both write the creation in MANIFEST and create
2597 // ColumnFamilyData object
2598 { // write thread
2599 WriteThread::Writer w;
2600 write_thread_.EnterUnbatched(&w, &mutex_);
2601 // LogAndApply will both write the creation in MANIFEST and create
2602 // ColumnFamilyData object
2603 s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
2604 &mutex_, directories_.GetDbDir(), false,
2605 &cf_options);
7c673cae
FG
2606 write_thread_.ExitUnbatched(&w);
2607 }
11fdf7f2
TL
2608 if (s.ok()) {
2609 auto* cfd =
2610 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2611 assert(cfd != nullptr);
20effc67 2612 std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
f67539c2 2613 s = cfd->AddDirectories(&dummy_created_dirs);
11fdf7f2 2614 }
7c673cae
FG
2615 if (s.ok()) {
2616 single_column_family_mode_ = false;
2617 auto* cfd =
2618 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2619 assert(cfd != nullptr);
11fdf7f2
TL
2620 InstallSuperVersionAndScheduleWork(cfd, &sv_context,
2621 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
2622
2623 if (!cfd->mem()->IsSnapshotSupported()) {
2624 is_snapshot_supported_ = false;
2625 }
2626
11fdf7f2
TL
2627 cfd->set_initialized();
2628
7c673cae
FG
2629 *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2630 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2631 "Created column family [%s] (ID %u)",
2632 column_family_name.c_str(), (unsigned)cfd->GetID());
2633 } else {
2634 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2635 "Creating column family [%s] FAILED -- %s",
2636 column_family_name.c_str(), s.ToString().c_str());
2637 }
2638 } // InstrumentedMutexLock l(&mutex_)
2639
11fdf7f2 2640 sv_context.Clean();
7c673cae
FG
2641 // this is outside the mutex
2642 if (s.ok()) {
2643 NewThreadStatusCfInfo(
20effc67 2644 static_cast_with_check<ColumnFamilyHandleImpl>(*handle)->cfd());
7c673cae
FG
2645 }
2646 return s;
2647}
2648
2649Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
11fdf7f2
TL
2650 assert(column_family != nullptr);
2651 Status s = DropColumnFamilyImpl(column_family);
2652 if (s.ok()) {
2653 s = WriteOptionsFile(true /*need_mutex_lock*/,
2654 true /*need_enter_write_thread*/);
2655 }
2656 return s;
2657}
2658
2659Status DBImpl::DropColumnFamilies(
2660 const std::vector<ColumnFamilyHandle*>& column_families) {
2661 Status s;
2662 bool success_once = false;
2663 for (auto* handle : column_families) {
2664 s = DropColumnFamilyImpl(handle);
2665 if (!s.ok()) {
2666 break;
2667 }
2668 success_once = true;
2669 }
2670 if (success_once) {
2671 Status persist_options_status = WriteOptionsFile(
2672 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2673 if (s.ok() && !persist_options_status.ok()) {
2674 s = persist_options_status;
2675 }
2676 }
2677 return s;
2678}
2679
2680Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
20effc67 2681 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
2682 auto cfd = cfh->cfd();
2683 if (cfd->GetID() == 0) {
2684 return Status::InvalidArgument("Can't drop default column family");
2685 }
2686
2687 bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
2688
2689 VersionEdit edit;
2690 edit.DropColumnFamily();
2691 edit.SetColumnFamily(cfd->GetID());
2692
2693 Status s;
7c673cae
FG
2694 {
2695 InstrumentedMutexLock l(&mutex_);
2696 if (cfd->IsDropped()) {
2697 s = Status::InvalidArgument("Column family already dropped!\n");
2698 }
2699 if (s.ok()) {
2700 // we drop column family from a single write thread
2701 WriteThread::Writer w;
2702 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
2703 s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
2704 &mutex_);
7c673cae
FG
2705 write_thread_.ExitUnbatched(&w);
2706 }
11fdf7f2
TL
2707 if (s.ok()) {
2708 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2709 max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
2710 mutable_cf_options->max_write_buffer_number;
2711 }
7c673cae
FG
2712
2713 if (!cf_support_snapshot) {
2714 // Dropped Column Family doesn't support snapshot. Need to recalculate
2715 // is_snapshot_supported_.
2716 bool new_is_snapshot_supported = true;
2717 for (auto c : *versions_->GetColumnFamilySet()) {
2718 if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
2719 new_is_snapshot_supported = false;
2720 break;
2721 }
2722 }
2723 is_snapshot_supported_ = new_is_snapshot_supported;
2724 }
11fdf7f2 2725 bg_cv_.SignalAll();
7c673cae
FG
2726 }
2727
2728 if (s.ok()) {
2729 // Note that here we erase the associated cf_info of the to-be-dropped
2730 // cfd before its ref-count goes to zero to avoid having to erase cf_info
2731 // later inside db_mutex.
2732 EraseThreadStatusCfInfo(cfd);
2733 assert(cfd->IsDropped());
7c673cae
FG
2734 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2735 "Dropped column family with id %u\n", cfd->GetID());
7c673cae
FG
2736 } else {
2737 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2738 "Dropping column family with id %u FAILED -- %s\n",
2739 cfd->GetID(), s.ToString().c_str());
2740 }
2741
2742 return s;
2743}
2744
2745bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2746 ColumnFamilyHandle* column_family, const Slice& key,
20effc67
TL
2747 std::string* value, std::string* timestamp,
2748 bool* value_found) {
7c673cae
FG
2749 assert(value != nullptr);
2750 if (value_found != nullptr) {
2751 // falsify later if key-may-exist but can't fetch value
2752 *value_found = true;
2753 }
2754 ReadOptions roptions = read_options;
11fdf7f2 2755 roptions.read_tier = kBlockCacheTier; // read from block cache only
7c673cae 2756 PinnableSlice pinnable_val;
f67539c2
TL
2757 GetImplOptions get_impl_options;
2758 get_impl_options.column_family = column_family;
2759 get_impl_options.value = &pinnable_val;
2760 get_impl_options.value_found = value_found;
20effc67 2761 get_impl_options.timestamp = timestamp;
f67539c2 2762 auto s = GetImpl(roptions, key, get_impl_options);
7c673cae
FG
2763 value->assign(pinnable_val.data(), pinnable_val.size());
2764
2765 // If block_cache is enabled and the index block of the table didn't
2766 // not present in block_cache, the return value will be Status::Incomplete.
2767 // In this case, key may still exist in the table.
2768 return s.ok() || s.IsIncomplete();
2769}
2770
2771Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2772 ColumnFamilyHandle* column_family) {
11fdf7f2
TL
2773 if (read_options.managed) {
2774 return NewErrorIterator(
2775 Status::NotSupported("Managed iterator is not supported anymore."));
2776 }
2777 Iterator* result = nullptr;
7c673cae
FG
2778 if (read_options.read_tier == kPersistedTier) {
2779 return NewErrorIterator(Status::NotSupported(
2780 "ReadTier::kPersistedData is not yet supported in iterators."));
2781 }
11fdf7f2
TL
2782 // if iterator wants internal keys, we can only proceed if
2783 // we can guarantee the deletes haven't been processed yet
2784 if (immutable_db_options_.preserve_deletes &&
2785 read_options.iter_start_seqnum > 0 &&
2786 read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
2787 return NewErrorIterator(Status::InvalidArgument(
2788 "Iterator requested internal keys which are too old and are not"
2789 " guaranteed to be preserved, try larger iter_start_seqnum opt."));
2790 }
20effc67
TL
2791 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
2792 ColumnFamilyData* cfd = cfh->cfd();
2793 assert(cfd != nullptr);
11fdf7f2 2794 ReadCallback* read_callback = nullptr; // No read callback provided.
494da23a 2795 if (read_options.tailing) {
7c673cae
FG
2796#ifdef ROCKSDB_LITE
2797 // not supported in lite version
11fdf7f2
TL
2798 result = nullptr;
2799
7c673cae 2800#else
f67539c2 2801 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
20effc67
TL
2802 auto iter = new ForwardIterator(this, read_options, cfd, sv,
2803 /* allow_unprepared_value */ true);
11fdf7f2
TL
2804 result = NewDBIterator(
2805 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2806 cfd->user_comparator(), iter, kMaxSequenceNumber,
2807 sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
2808 this, cfd);
7c673cae
FG
2809#endif
2810 } else {
11fdf7f2
TL
2811 // Note: no need to consider the special case of
2812 // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
2813 // WritePreparedTxnDB
20effc67
TL
2814 result = NewIteratorImpl(read_options, cfd,
2815 (read_options.snapshot != nullptr)
2816 ? read_options.snapshot->GetSequenceNumber()
2817 : kMaxSequenceNumber,
2818 read_callback);
7c673cae 2819 }
11fdf7f2
TL
2820 return result;
2821}
2822
2823ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
2824 ColumnFamilyData* cfd,
2825 SequenceNumber snapshot,
2826 ReadCallback* read_callback,
2827 bool allow_blob,
2828 bool allow_refresh) {
f67539c2 2829 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
11fdf7f2 2830
20effc67
TL
2831 TEST_SYNC_POINT("DBImpl::NewIterator:1");
2832 TEST_SYNC_POINT("DBImpl::NewIterator:2");
2833
2834 if (snapshot == kMaxSequenceNumber) {
2835 // Note that the snapshot is assigned AFTER referencing the super
2836 // version because otherwise a flush happening in between may compact away
2837 // data for the snapshot, so the reader would see neither data that was be
2838 // visible to the snapshot before compaction nor the newer data inserted
2839 // afterwards.
2840 // Note that the super version might not contain all the data available
2841 // to this snapshot, but in that case it can see all the data in the
2842 // super version, which is a valid consistent state after the user
2843 // calls NewIterator().
2844 snapshot = versions_->LastSequence();
2845 TEST_SYNC_POINT("DBImpl::NewIterator:3");
2846 TEST_SYNC_POINT("DBImpl::NewIterator:4");
2847 }
2848
11fdf7f2
TL
2849 // Try to generate a DB iterator tree in continuous memory area to be
2850 // cache friendly. Here is an example of result:
2851 // +-------------------------------+
2852 // | |
2853 // | ArenaWrappedDBIter |
2854 // | + |
2855 // | +---> Inner Iterator ------------+
2856 // | | | |
2857 // | | +-- -- -- -- -- -- -- --+ |
2858 // | +--- | Arena | |
2859 // | | | |
2860 // | Allocated Memory: | |
2861 // | | +-------------------+ |
2862 // | | | DBIter | <---+
2863 // | | + |
2864 // | | | +-> iter_ ------------+
2865 // | | | | |
2866 // | | +-------------------+ |
2867 // | | | MergingIterator | <---+
2868 // | | + |
2869 // | | | +->child iter1 ------------+
2870 // | | | | | |
2871 // | | +->child iter2 ----------+ |
2872 // | | | | | | |
2873 // | | | +->child iter3 --------+ | |
2874 // | | | | | |
2875 // | | +-------------------+ | | |
2876 // | | | Iterator1 | <--------+
2877 // | | +-------------------+ | |
2878 // | | | Iterator2 | <------+
2879 // | | +-------------------+ |
2880 // | | | Iterator3 | <----+
2881 // | | +-------------------+
2882 // | | |
2883 // +-------+-----------------------+
2884 //
2885 // ArenaWrappedDBIter inlines an arena area where all the iterators in
2886 // the iterator tree are allocated in the order of being accessed when
2887 // querying.
2888 // Laying out the iterators in the order of being accessed makes it more
2889 // likely that any iterator pointer is close to the iterator it points to so
2890 // that they are likely to be in the same cache line and/or page.
2891 ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2892 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
2893 sv->mutable_cf_options.max_sequential_skip_in_iterations,
2894 sv->version_number, read_callback, this, cfd, allow_blob,
f67539c2 2895 read_options.snapshot != nullptr ? false : allow_refresh);
11fdf7f2 2896
20effc67
TL
2897 InternalIterator* internal_iter = NewInternalIterator(
2898 db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
2899 db_iter->GetRangeDelAggregator(), snapshot,
2900 /* allow_unprepared_value */ true);
11fdf7f2
TL
2901 db_iter->SetIterUnderDBIter(internal_iter);
2902
2903 return db_iter;
7c673cae
FG
2904}
2905
2906Status DBImpl::NewIterators(
2907 const ReadOptions& read_options,
2908 const std::vector<ColumnFamilyHandle*>& column_families,
2909 std::vector<Iterator*>* iterators) {
11fdf7f2
TL
2910 if (read_options.managed) {
2911 return Status::NotSupported("Managed iterator is not supported anymore.");
2912 }
7c673cae
FG
2913 if (read_options.read_tier == kPersistedTier) {
2914 return Status::NotSupported(
2915 "ReadTier::kPersistedData is not yet supported in iterators.");
2916 }
11fdf7f2 2917 ReadCallback* read_callback = nullptr; // No read callback provided.
7c673cae
FG
2918 iterators->clear();
2919 iterators->reserve(column_families.size());
11fdf7f2 2920 if (read_options.tailing) {
7c673cae
FG
2921#ifdef ROCKSDB_LITE
2922 return Status::InvalidArgument(
494da23a 2923 "Tailing iterator not supported in RocksDB lite");
7c673cae
FG
2924#else
2925 for (auto cfh : column_families) {
20effc67 2926 auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
f67539c2 2927 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
20effc67
TL
2928 auto iter = new ForwardIterator(this, read_options, cfd, sv,
2929 /* allow_unprepared_value */ true);
7c673cae 2930 iterators->push_back(NewDBIterator(
11fdf7f2
TL
2931 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2932 cfd->user_comparator(), iter, kMaxSequenceNumber,
7c673cae 2933 sv->mutable_cf_options.max_sequential_skip_in_iterations,
11fdf7f2 2934 read_callback, this, cfd));
7c673cae
FG
2935 }
2936#endif
2937 } else {
11fdf7f2
TL
2938 // Note: no need to consider the special case of
2939 // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2940 // WritePreparedTxnDB
2941 auto snapshot = read_options.snapshot != nullptr
2942 ? read_options.snapshot->GetSequenceNumber()
2943 : versions_->LastSequence();
7c673cae 2944 for (size_t i = 0; i < column_families.size(); ++i) {
11fdf7f2 2945 auto* cfd =
20effc67
TL
2946 static_cast_with_check<ColumnFamilyHandleImpl>(column_families[i])
2947 ->cfd();
11fdf7f2
TL
2948 iterators->push_back(
2949 NewIteratorImpl(read_options, cfd, snapshot, read_callback));
7c673cae
FG
2950 }
2951 }
2952
2953 return Status::OK();
2954}
2955
2956const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2957
2958#ifndef ROCKSDB_LITE
2959const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2960 return GetSnapshotImpl(true);
2961}
2962#endif // ROCKSDB_LITE
2963
494da23a
TL
2964SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
2965 bool lock) {
7c673cae 2966 int64_t unix_time = 0;
20effc67 2967 env_->GetCurrentTime(&unix_time).PermitUncheckedError(); // Ignore error
7c673cae
FG
2968 SnapshotImpl* s = new SnapshotImpl;
2969
494da23a
TL
2970 if (lock) {
2971 mutex_.Lock();
2972 }
7c673cae
FG
2973 // returns null if the underlying memtable does not support snapshot.
2974 if (!is_snapshot_supported_) {
494da23a
TL
2975 if (lock) {
2976 mutex_.Unlock();
2977 }
7c673cae
FG
2978 delete s;
2979 return nullptr;
2980 }
11fdf7f2
TL
2981 auto snapshot_seq = last_seq_same_as_publish_seq_
2982 ? versions_->LastSequence()
2983 : versions_->LastPublishedSequence();
494da23a
TL
2984 SnapshotImpl* snapshot =
2985 snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
2986 if (lock) {
2987 mutex_.Unlock();
2988 }
2989 return snapshot;
7c673cae
FG
2990}
2991
494da23a
TL
2992namespace {
2993typedef autovector<ColumnFamilyData*, 2> CfdList;
2994bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
2995 for (const ColumnFamilyData* t : list) {
2996 if (t == cfd) {
2997 return true;
2998 }
2999 }
3000 return false;
3001}
3002} // namespace
3003
7c673cae
FG
3004void DBImpl::ReleaseSnapshot(const Snapshot* s) {
3005 const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
3006 {
3007 InstrumentedMutexLock l(&mutex_);
3008 snapshots_.Delete(casted_s);
11fdf7f2
TL
3009 uint64_t oldest_snapshot;
3010 if (snapshots_.empty()) {
20effc67
TL
3011 if (last_seq_same_as_publish_seq_) {
3012 oldest_snapshot = versions_->LastSequence();
3013 } else {
3014 oldest_snapshot = versions_->LastPublishedSequence();
3015 }
11fdf7f2
TL
3016 } else {
3017 oldest_snapshot = snapshots_.oldest()->number_;
3018 }
494da23a
TL
3019 // Avoid to go through every column family by checking a global threshold
3020 // first.
3021 if (oldest_snapshot > bottommost_files_mark_threshold_) {
3022 CfdList cf_scheduled;
3023 for (auto* cfd : *versions_->GetColumnFamilySet()) {
3024 cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
3025 if (!cfd->current()
3026 ->storage_info()
3027 ->BottommostFilesMarkedForCompaction()
3028 .empty()) {
3029 SchedulePendingCompaction(cfd);
3030 MaybeScheduleFlushOrCompaction();
3031 cf_scheduled.push_back(cfd);
3032 }
11fdf7f2 3033 }
494da23a
TL
3034
3035 // Calculate a new threshold, skipping those CFs where compactions are
3036 // scheduled. We do not do the same pass as the previous loop because
3037 // mutex might be unlocked during the loop, making the result inaccurate.
3038 SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
3039 for (auto* cfd : *versions_->GetColumnFamilySet()) {
3040 if (CfdListContains(cf_scheduled, cfd)) {
3041 continue;
3042 }
3043 new_bottommost_files_mark_threshold = std::min(
3044 new_bottommost_files_mark_threshold,
3045 cfd->current()->storage_info()->bottommost_files_mark_threshold());
3046 }
3047 bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
11fdf7f2 3048 }
7c673cae
FG
3049 }
3050 delete casted_s;
3051}
3052
3053#ifndef ROCKSDB_LITE
3054Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
3055 TablePropertiesCollection* props) {
20effc67 3056 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3057 auto cfd = cfh->cfd();
3058
3059 // Increment the ref count
3060 mutex_.Lock();
3061 auto version = cfd->current();
3062 version->Ref();
3063 mutex_.Unlock();
3064
3065 auto s = version->GetPropertiesOfAllTables(props);
3066
3067 // Decrement the ref count
3068 mutex_.Lock();
3069 version->Unref();
3070 mutex_.Unlock();
3071
3072 return s;
3073}
3074
3075Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
3076 const Range* range, std::size_t n,
3077 TablePropertiesCollection* props) {
20effc67 3078 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3079 auto cfd = cfh->cfd();
3080
3081 // Increment the ref count
3082 mutex_.Lock();
3083 auto version = cfd->current();
3084 version->Ref();
3085 mutex_.Unlock();
3086
3087 auto s = version->GetPropertiesOfTablesInRange(range, n, props);
3088
3089 // Decrement the ref count
3090 mutex_.Lock();
3091 version->Unref();
3092 mutex_.Unlock();
3093
3094 return s;
3095}
3096
3097#endif // ROCKSDB_LITE
3098
11fdf7f2 3099const std::string& DBImpl::GetName() const { return dbname_; }
7c673cae 3100
11fdf7f2 3101Env* DBImpl::GetEnv() const { return env_; }
7c673cae 3102
f67539c2
TL
3103FileSystem* DB::GetFileSystem() const {
3104 static LegacyFileSystemWrapper fs_wrap(GetEnv());
3105 return &fs_wrap;
3106}
3107
3108FileSystem* DBImpl::GetFileSystem() const {
3109 return immutable_db_options_.fs.get();
3110}
3111
20effc67
TL
3112#ifndef ROCKSDB_LITE
3113
3114Status DBImpl::StartIOTrace(Env* env, const TraceOptions& trace_options,
3115 std::unique_ptr<TraceWriter>&& trace_writer) {
3116 assert(trace_writer != nullptr);
3117 return io_tracer_->StartIOTrace(env, trace_options, std::move(trace_writer));
3118}
3119
3120Status DBImpl::EndIOTrace() {
3121 io_tracer_->EndIOTrace();
3122 return Status::OK();
3123}
3124
3125#endif // ROCKSDB_LITE
3126
7c673cae
FG
3127Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
3128 InstrumentedMutexLock l(&mutex_);
20effc67 3129 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3130 return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
3131 cfh->cfd()->GetLatestCFOptions());
3132}
3133
3134DBOptions DBImpl::GetDBOptions() const {
3135 InstrumentedMutexLock l(&mutex_);
3136 return BuildDBOptions(immutable_db_options_, mutable_db_options_);
3137}
3138
3139bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
3140 const Slice& property, std::string* value) {
3141 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3142 value->clear();
20effc67
TL
3143 auto cfd =
3144 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
3145 if (property_info == nullptr) {
3146 return false;
3147 } else if (property_info->handle_int) {
3148 uint64_t int_value;
3149 bool ret_value =
3150 GetIntPropertyInternal(cfd, *property_info, false, &int_value);
3151 if (ret_value) {
3152 *value = ToString(int_value);
3153 }
3154 return ret_value;
3155 } else if (property_info->handle_string) {
3156 InstrumentedMutexLock l(&mutex_);
3157 return cfd->internal_stats()->GetStringProperty(*property_info, property,
3158 value);
11fdf7f2
TL
3159 } else if (property_info->handle_string_dbimpl) {
3160 std::string tmp_value;
3161 bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
3162 if (ret_value) {
3163 *value = tmp_value;
3164 }
3165 return ret_value;
7c673cae
FG
3166 }
3167 // Shouldn't reach here since exactly one of handle_string and handle_int
3168 // should be non-nullptr.
3169 assert(false);
3170 return false;
3171}
3172
3173bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
3174 const Slice& property,
11fdf7f2 3175 std::map<std::string, std::string>* value) {
7c673cae
FG
3176 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3177 value->clear();
20effc67
TL
3178 auto cfd =
3179 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
3180 if (property_info == nullptr) {
3181 return false;
3182 } else if (property_info->handle_map) {
3183 InstrumentedMutexLock l(&mutex_);
3184 return cfd->internal_stats()->GetMapProperty(*property_info, property,
3185 value);
3186 }
3187 // If we reach this point it means that handle_map is not provided for the
3188 // requested property
3189 return false;
3190}
3191
3192bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
3193 const Slice& property, uint64_t* value) {
3194 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3195 if (property_info == nullptr || property_info->handle_int == nullptr) {
3196 return false;
3197 }
20effc67
TL
3198 auto cfd =
3199 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
3200 return GetIntPropertyInternal(cfd, *property_info, false, value);
3201}
3202
3203bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
3204 const DBPropertyInfo& property_info,
3205 bool is_locked, uint64_t* value) {
3206 assert(property_info.handle_int != nullptr);
3207 if (!property_info.need_out_of_mutex) {
3208 if (is_locked) {
3209 mutex_.AssertHeld();
3210 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3211 } else {
3212 InstrumentedMutexLock l(&mutex_);
3213 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3214 }
3215 } else {
3216 SuperVersion* sv = nullptr;
20effc67
TL
3217 if (is_locked) {
3218 mutex_.Unlock();
7c673cae 3219 }
20effc67 3220 sv = GetAndRefSuperVersion(cfd);
7c673cae
FG
3221
3222 bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
3223 property_info, sv->current, value);
3224
20effc67
TL
3225 ReturnAndCleanupSuperVersion(cfd, sv);
3226 if (is_locked) {
3227 mutex_.Lock();
7c673cae
FG
3228 }
3229
3230 return ret;
3231 }
3232}
3233
11fdf7f2
TL
3234bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
3235 assert(value != nullptr);
3236 Statistics* statistics = immutable_db_options_.statistics.get();
3237 if (!statistics) {
3238 return false;
3239 }
3240 *value = statistics->ToString();
3241 return true;
3242}
3243
7c673cae
FG
3244#ifndef ROCKSDB_LITE
3245Status DBImpl::ResetStats() {
3246 InstrumentedMutexLock l(&mutex_);
3247 for (auto* cfd : *versions_->GetColumnFamilySet()) {
11fdf7f2
TL
3248 if (cfd->initialized()) {
3249 cfd->internal_stats()->Clear();
3250 }
7c673cae
FG
3251 }
3252 return Status::OK();
3253}
3254#endif // ROCKSDB_LITE
3255
3256bool DBImpl::GetAggregatedIntProperty(const Slice& property,
3257 uint64_t* aggregated_value) {
3258 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3259 if (property_info == nullptr || property_info->handle_int == nullptr) {
3260 return false;
3261 }
3262
3263 uint64_t sum = 0;
20effc67 3264 bool ret = true;
7c673cae
FG
3265 {
3266 // Needs mutex to protect the list of column families.
3267 InstrumentedMutexLock l(&mutex_);
3268 uint64_t value;
3269 for (auto* cfd : *versions_->GetColumnFamilySet()) {
11fdf7f2
TL
3270 if (!cfd->initialized()) {
3271 continue;
3272 }
20effc67
TL
3273 cfd->Ref();
3274 ret = GetIntPropertyInternal(cfd, *property_info, true, &value);
3275 // GetIntPropertyInternal may release db mutex and re-acquire it.
3276 mutex_.AssertHeld();
3277 cfd->UnrefAndTryDelete();
3278 if (ret) {
7c673cae
FG
3279 sum += value;
3280 } else {
20effc67
TL
3281 ret = false;
3282 break;
7c673cae
FG
3283 }
3284 }
3285 }
3286 *aggregated_value = sum;
20effc67 3287 return ret;
7c673cae
FG
3288}
3289
3290SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
3291 // TODO(ljin): consider using GetReferencedSuperVersion() directly
f67539c2 3292 return cfd->GetThreadLocalSuperVersion(this);
7c673cae
FG
3293}
3294
3295// REQUIRED: this function should only be called on the write thread or if the
3296// mutex is held.
3297SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
3298 auto column_family_set = versions_->GetColumnFamilySet();
3299 auto cfd = column_family_set->GetColumnFamily(column_family_id);
3300 if (!cfd) {
3301 return nullptr;
3302 }
3303
3304 return GetAndRefSuperVersion(cfd);
3305}
3306
11fdf7f2
TL
3307void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
3308 // Release SuperVersion
3309 if (sv->Unref()) {
f67539c2
TL
3310 bool defer_purge =
3311 immutable_db_options().avoid_unnecessary_blocking_io;
11fdf7f2
TL
3312 {
3313 InstrumentedMutexLock l(&mutex_);
3314 sv->Cleanup();
f67539c2
TL
3315 if (defer_purge) {
3316 AddSuperVersionsToFreeQueue(sv);
3317 SchedulePurge();
3318 }
3319 }
3320 if (!defer_purge) {
3321 delete sv;
11fdf7f2 3322 }
11fdf7f2
TL
3323 RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
3324 }
3325 RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
3326}
3327
7c673cae
FG
3328void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
3329 SuperVersion* sv) {
11fdf7f2
TL
3330 if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
3331 CleanupSuperVersion(sv);
7c673cae
FG
3332 }
3333}
3334
3335// REQUIRED: this function should only be called on the write thread.
3336void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
3337 SuperVersion* sv) {
3338 auto column_family_set = versions_->GetColumnFamilySet();
3339 auto cfd = column_family_set->GetColumnFamily(column_family_id);
3340
3341 // If SuperVersion is held, and we successfully fetched a cfd using
3342 // GetAndRefSuperVersion(), it must still exist.
3343 assert(cfd != nullptr);
3344 ReturnAndCleanupSuperVersion(cfd, sv);
3345}
3346
3347// REQUIRED: this function should only be called on the write thread or if the
3348// mutex is held.
3349ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
3350 ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
3351
3352 if (!cf_memtables->Seek(column_family_id)) {
3353 return nullptr;
3354 }
3355
3356 return cf_memtables->GetColumnFamilyHandle();
3357}
3358
11fdf7f2 3359// REQUIRED: mutex is NOT held.
494da23a 3360std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
11fdf7f2 3361 uint32_t column_family_id) {
11fdf7f2
TL
3362 InstrumentedMutexLock l(&mutex_);
3363
494da23a
TL
3364 auto* cfd =
3365 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
3366 if (cfd == nullptr) {
11fdf7f2
TL
3367 return nullptr;
3368 }
3369
494da23a
TL
3370 return std::unique_ptr<ColumnFamilyHandleImpl>(
3371 new ColumnFamilyHandleImpl(cfd, this, &mutex_));
11fdf7f2
TL
3372}
3373
7c673cae
FG
3374void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
3375 const Range& range,
3376 uint64_t* const count,
3377 uint64_t* const size) {
3378 ColumnFamilyHandleImpl* cfh =
20effc67 3379 static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3380 ColumnFamilyData* cfd = cfh->cfd();
3381 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3382
3383 // Convert user_key into a corresponding internal key.
3384 InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
3385 InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
3386 MemTable::MemTableStats memStats =
3387 sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
3388 MemTable::MemTableStats immStats =
3389 sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
3390 *count = memStats.count + immStats.count;
3391 *size = memStats.size + immStats.size;
3392
3393 ReturnAndCleanupSuperVersion(cfd, sv);
3394}
3395
f67539c2
TL
3396Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
3397 ColumnFamilyHandle* column_family,
3398 const Range* range, int n, uint64_t* sizes) {
3399 if (!options.include_memtabtles && !options.include_files) {
3400 return Status::InvalidArgument("Invalid options");
3401 }
3402
7c673cae 3403 Version* v;
20effc67 3404 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3405 auto cfd = cfh->cfd();
3406 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3407 v = sv->current;
3408
3409 for (int i = 0; i < n; i++) {
3410 // Convert user_key into a corresponding internal key.
3411 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
3412 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
3413 sizes[i] = 0;
f67539c2
TL
3414 if (options.include_files) {
3415 sizes[i] += versions_->ApproximateSize(
3416 options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
3417 /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
7c673cae 3418 }
f67539c2 3419 if (options.include_memtabtles) {
7c673cae
FG
3420 sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
3421 sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
3422 }
3423 }
3424
3425 ReturnAndCleanupSuperVersion(cfd, sv);
f67539c2 3426 return Status::OK();
7c673cae
FG
3427}
3428
3429std::list<uint64_t>::iterator
3430DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
3431 // We need to remember the iterator of our insert, because after the
3432 // background job is done, we need to remove that element from
3433 // pending_outputs_.
3434 pending_outputs_.push_back(versions_->current_next_file_number());
3435 auto pending_outputs_inserted_elem = pending_outputs_.end();
3436 --pending_outputs_inserted_elem;
3437 return pending_outputs_inserted_elem;
3438}
3439
3440void DBImpl::ReleaseFileNumberFromPendingOutputs(
f67539c2
TL
3441 std::unique_ptr<std::list<uint64_t>::iterator>& v) {
3442 if (v.get() != nullptr) {
3443 pending_outputs_.erase(*v.get());
3444 v.reset();
3445 }
7c673cae
FG
3446}
3447
3448#ifndef ROCKSDB_LITE
3449Status DBImpl::GetUpdatesSince(
494da23a 3450 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
7c673cae 3451 const TransactionLogIterator::ReadOptions& read_options) {
7c673cae
FG
3452 RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
3453 if (seq > versions_->LastSequence()) {
3454 return Status::NotFound("Requested sequence not yet written in the db");
3455 }
3456 return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
3457}
3458
3459Status DBImpl::DeleteFile(std::string name) {
3460 uint64_t number;
3461 FileType type;
3462 WalFileType log_type;
3463 if (!ParseFileName(name, &number, &type, &log_type) ||
20effc67 3464 (type != kTableFile && type != kWalFile)) {
7c673cae
FG
3465 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
3466 name.c_str());
3467 return Status::InvalidArgument("Invalid file name");
3468 }
3469
3470 Status status;
20effc67 3471 if (type == kWalFile) {
7c673cae
FG
3472 // Only allow deleting archived log files
3473 if (log_type != kArchivedLogFile) {
3474 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3475 "DeleteFile %s failed - not archived log.\n",
3476 name.c_str());
3477 return Status::NotSupported("Delete only supported for archived logs");
3478 }
11fdf7f2 3479 status = wal_manager_.DeleteFile(name, number);
7c673cae
FG
3480 if (!status.ok()) {
3481 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3482 "DeleteFile %s failed -- %s.\n", name.c_str(),
3483 status.ToString().c_str());
3484 }
3485 return status;
3486 }
3487
3488 int level;
3489 FileMetaData* metadata;
3490 ColumnFamilyData* cfd;
3491 VersionEdit edit;
3492 JobContext job_context(next_job_id_.fetch_add(1), true);
3493 {
3494 InstrumentedMutexLock l(&mutex_);
3495 status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
3496 if (!status.ok()) {
3497 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3498 "DeleteFile %s failed. File not found\n", name.c_str());
3499 job_context.Clean();
3500 return Status::InvalidArgument("File not found");
3501 }
3502 assert(level < cfd->NumberLevels());
3503
3504 // If the file is being compacted no need to delete.
3505 if (metadata->being_compacted) {
3506 ROCKS_LOG_INFO(immutable_db_options_.info_log,
3507 "DeleteFile %s Skipped. File about to be compacted\n",
3508 name.c_str());
3509 job_context.Clean();
3510 return Status::OK();
3511 }
3512
3513 // Only the files in the last level can be deleted externally.
3514 // This is to make sure that any deletion tombstones are not
3515 // lost. Check that the level passed is the last level.
3516 auto* vstoreage = cfd->current()->storage_info();
3517 for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3518 if (vstoreage->NumLevelFiles(i) != 0) {
3519 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3520 "DeleteFile %s FAILED. File not in last level\n",
3521 name.c_str());
3522 job_context.Clean();
3523 return Status::InvalidArgument("File not in last level");
3524 }
3525 }
3526 // if level == 0, it has to be the oldest file
3527 if (level == 0 &&
3528 vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3529 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3530 "DeleteFile %s failed ---"
3531 " target file in level 0 must be the oldest.",
3532 name.c_str());
3533 job_context.Clean();
3534 return Status::InvalidArgument("File in level 0, but not oldest");
3535 }
3536 edit.SetColumnFamily(cfd->GetID());
3537 edit.DeleteFile(level, number);
3538 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3539 &edit, &mutex_, directories_.GetDbDir());
3540 if (status.ok()) {
494da23a
TL
3541 InstallSuperVersionAndScheduleWork(cfd,
3542 &job_context.superversion_contexts[0],
3543 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
3544 }
3545 FindObsoleteFiles(&job_context, false);
3546 } // lock released here
3547
3548 LogFlush(immutable_db_options_.info_log);
3549 // remove files outside the db-lock
3550 if (job_context.HaveSomethingToDelete()) {
3551 // Call PurgeObsoleteFiles() without holding mutex.
3552 PurgeObsoleteFiles(job_context);
3553 }
3554 job_context.Clean();
3555 return status;
3556}
3557
11fdf7f2
TL
3558Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
3559 const RangePtr* ranges, size_t n,
3560 bool include_end) {
7c673cae 3561 Status status;
20effc67 3562 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3563 ColumnFamilyData* cfd = cfh->cfd();
3564 VersionEdit edit;
11fdf7f2 3565 std::set<FileMetaData*> deleted_files;
7c673cae
FG
3566 JobContext job_context(next_job_id_.fetch_add(1), true);
3567 {
3568 InstrumentedMutexLock l(&mutex_);
3569 Version* input_version = cfd->current();
3570
3571 auto* vstorage = input_version->storage_info();
11fdf7f2
TL
3572 for (size_t r = 0; r < n; r++) {
3573 auto begin = ranges[r].start, end = ranges[r].limit;
3574 for (int i = 1; i < cfd->NumberLevels(); i++) {
3575 if (vstorage->LevelFiles(i).empty() ||
3576 !vstorage->OverlapInLevel(i, begin, end)) {
3577 continue;
3578 }
3579 std::vector<FileMetaData*> level_files;
3580 InternalKey begin_storage, end_storage, *begin_key, *end_key;
3581 if (begin == nullptr) {
3582 begin_key = nullptr;
3583 } else {
3584 begin_storage.SetMinPossibleForUserKey(*begin);
3585 begin_key = &begin_storage;
3586 }
3587 if (end == nullptr) {
3588 end_key = nullptr;
3589 } else {
3590 end_storage.SetMaxPossibleForUserKey(*end);
3591 end_key = &end_storage;
3592 }
7c673cae 3593
11fdf7f2
TL
3594 vstorage->GetCleanInputsWithinInterval(
3595 i, begin_key, end_key, &level_files, -1 /* hint_index */,
3596 nullptr /* file_index */);
3597 FileMetaData* level_file;
3598 for (uint32_t j = 0; j < level_files.size(); j++) {
3599 level_file = level_files[j];
7c673cae
FG
3600 if (level_file->being_compacted) {
3601 continue;
3602 }
11fdf7f2
TL
3603 if (deleted_files.find(level_file) != deleted_files.end()) {
3604 continue;
3605 }
3606 if (!include_end && end != nullptr &&
3607 cfd->user_comparator()->Compare(level_file->largest.user_key(),
3608 *end) == 0) {
3609 continue;
3610 }
7c673cae
FG
3611 edit.SetColumnFamily(cfd->GetID());
3612 edit.DeleteFile(i, level_file->fd.GetNumber());
11fdf7f2 3613 deleted_files.insert(level_file);
7c673cae
FG
3614 level_file->being_compacted = true;
3615 }
3616 }
3617 }
3618 if (edit.GetDeletedFiles().empty()) {
3619 job_context.Clean();
3620 return Status::OK();
3621 }
3622 input_version->Ref();
3623 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3624 &edit, &mutex_, directories_.GetDbDir());
3625 if (status.ok()) {
494da23a
TL
3626 InstallSuperVersionAndScheduleWork(cfd,
3627 &job_context.superversion_contexts[0],
3628 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
3629 }
3630 for (auto* deleted_file : deleted_files) {
3631 deleted_file->being_compacted = false;
3632 }
3633 input_version->Unref();
3634 FindObsoleteFiles(&job_context, false);
3635 } // lock released here
3636
3637 LogFlush(immutable_db_options_.info_log);
3638 // remove files outside the db-lock
3639 if (job_context.HaveSomethingToDelete()) {
3640 // Call PurgeObsoleteFiles() without holding mutex.
3641 PurgeObsoleteFiles(job_context);
3642 }
3643 job_context.Clean();
3644 return status;
3645}
3646
3647void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3648 InstrumentedMutexLock l(&mutex_);
3649 versions_->GetLiveFilesMetaData(metadata);
3650}
3651
20effc67
TL
3652Status DBImpl::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
3653 InstrumentedMutexLock l(&mutex_);
3654 return versions_->GetLiveFilesChecksumInfo(checksum_list);
3655}
3656
11fdf7f2
TL
3657void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
3658 ColumnFamilyMetaData* cf_meta) {
7c673cae 3659 assert(column_family);
20effc67
TL
3660 auto* cfd =
3661 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae 3662 auto* sv = GetAndRefSuperVersion(cfd);
f67539c2
TL
3663 {
3664 // Without mutex, Version::GetColumnFamilyMetaData will have data race with
3665 // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
3666 // this may cause regression. An alternative is to make
3667 // FileMetaData::being_compacted atomic, but it will make FileMetaData
3668 // non-copy-able. Another option is to separate these variables from
3669 // original FileMetaData struct, and this requires re-organization of data
3670 // structures. For now, we take the easy approach. If
3671 // DB::GetColumnFamilyMetaData is not called frequently, the regression
3672 // should not be big. We still need to keep an eye on it.
3673 InstrumentedMutexLock l(&mutex_);
3674 sv->current->GetColumnFamilyMetaData(cf_meta);
3675 }
7c673cae
FG
3676 ReturnAndCleanupSuperVersion(cfd, sv);
3677}
3678
3679#endif // ROCKSDB_LITE
3680
3681Status DBImpl::CheckConsistency() {
3682 mutex_.AssertHeld();
3683 std::vector<LiveFileMetaData> metadata;
3684 versions_->GetLiveFilesMetaData(&metadata);
f67539c2 3685 TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
7c673cae
FG
3686
3687 std::string corruption_messages;
7c673cae 3688
f67539c2
TL
3689 if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
3690 // Instead of calling GetFileSize() for each expected file, call
3691 // GetChildren() for the DB directory and check that all expected files
3692 // are listed, without checking their sizes.
3693 // Since sst files might be in different directories, do it for each
3694 // directory separately.
3695 std::map<std::string, std::vector<std::string>> files_by_directory;
3696 for (const auto& md : metadata) {
3697 // md.name has a leading "/". Remove it.
3698 std::string fname = md.name;
3699 if (!fname.empty() && fname[0] == '/') {
3700 fname = fname.substr(1);
3701 }
3702 files_by_directory[md.db_path].push_back(fname);
7c673cae 3703 }
f67539c2
TL
3704 for (const auto& dir_files : files_by_directory) {
3705 std::string directory = dir_files.first;
3706 std::vector<std::string> existing_files;
3707 Status s = env_->GetChildren(directory, &existing_files);
3708 if (!s.ok()) {
3709 corruption_messages +=
3710 "Can't list files in " + directory + ": " + s.ToString() + "\n";
3711 continue;
3712 }
3713 std::sort(existing_files.begin(), existing_files.end());
3714
3715 for (const std::string& fname : dir_files.second) {
3716 if (!std::binary_search(existing_files.begin(), existing_files.end(),
3717 fname) &&
3718 !std::binary_search(existing_files.begin(), existing_files.end(),
3719 Rocks2LevelTableFileName(fname))) {
3720 corruption_messages +=
3721 "Missing sst file " + fname + " in " + directory + "\n";
3722 }
3723 }
3724 }
3725 } else {
3726 for (const auto& md : metadata) {
3727 // md.name has a leading "/".
3728 std::string file_path = md.db_path + md.name;
3729
3730 uint64_t fsize = 0;
3731 TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
3732 Status s = env_->GetFileSize(file_path, &fsize);
3733 if (!s.ok() &&
3734 env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
3735 s = Status::OK();
3736 }
3737 if (!s.ok()) {
3738 corruption_messages +=
3739 "Can't access " + md.name + ": " + s.ToString() + "\n";
3740 } else if (fsize != md.size) {
3741 corruption_messages += "Sst file size mismatch: " + file_path +
3742 ". Size recorded in manifest " +
3743 ToString(md.size) + ", actual size " +
3744 ToString(fsize) + "\n";
3745 }
7c673cae
FG
3746 }
3747 }
f67539c2 3748
7c673cae
FG
3749 if (corruption_messages.size() == 0) {
3750 return Status::OK();
3751 } else {
3752 return Status::Corruption(corruption_messages);
3753 }
3754}
3755
3756Status DBImpl::GetDbIdentity(std::string& identity) const {
f67539c2
TL
3757 identity.assign(db_id_);
3758 return Status::OK();
3759}
3760
3761Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
7c673cae 3762 std::string idfilename = IdentityFileName(dbname_);
f67539c2 3763 const FileOptions soptions;
7c673cae 3764
f67539c2 3765 Status s = ReadFileToString(fs_.get(), idfilename, identity);
7c673cae
FG
3766 if (!s.ok()) {
3767 return s;
3768 }
f67539c2 3769
7c673cae 3770 // If last character is '\n' remove it from identity
f67539c2
TL
3771 if (identity->size() > 0 && identity->back() == '\n') {
3772 identity->pop_back();
7c673cae
FG
3773 }
3774 return s;
3775}
3776
20effc67
TL
3777Status DBImpl::GetDbSessionId(std::string& session_id) const {
3778 session_id.assign(db_session_id_);
3779 return Status::OK();
3780}
3781
3782void DBImpl::SetDbSessionId() {
3783 // GenerateUniqueId() generates an identifier that has a negligible
3784 // probability of being duplicated, ~128 bits of entropy
3785 std::string uuid = env_->GenerateUniqueId();
3786
3787 // Hash and reformat that down to a more compact format, 20 characters
3788 // in base-36 ([0-9A-Z]), which is ~103 bits of entropy, which is enough
3789 // to expect no collisions across a billion servers each opening DBs
3790 // a million times (~2^50). Benefits vs. raw unique id:
3791 // * Save ~ dozen bytes per SST file
3792 // * Shorter shared backup file names (some platforms have low limits)
3793 // * Visually distinct from DB id format
3794 uint64_t a = NPHash64(uuid.data(), uuid.size(), 1234U);
3795 uint64_t b = NPHash64(uuid.data(), uuid.size(), 5678U);
3796 db_session_id_.resize(20);
3797 static const char* const base36 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
3798 size_t i = 0;
3799 for (; i < 10U; ++i, a /= 36U) {
3800 db_session_id_[i] = base36[a % 36];
3801 }
3802 for (; i < 20U; ++i, b /= 36U) {
3803 db_session_id_[i] = base36[b % 36];
3804 }
3805 TEST_SYNC_POINT_CALLBACK("DBImpl::SetDbSessionId", &db_session_id_);
3806}
3807
7c673cae 3808// Default implementation -- returns not supported status
11fdf7f2
TL
3809Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
3810 const std::string& /*column_family_name*/,
3811 ColumnFamilyHandle** /*handle*/) {
3812 return Status::NotSupported("");
3813}
3814
3815Status DB::CreateColumnFamilies(
3816 const ColumnFamilyOptions& /*cf_options*/,
3817 const std::vector<std::string>& /*column_family_names*/,
3818 std::vector<ColumnFamilyHandle*>* /*handles*/) {
7c673cae
FG
3819 return Status::NotSupported("");
3820}
11fdf7f2
TL
3821
3822Status DB::CreateColumnFamilies(
3823 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
3824 std::vector<ColumnFamilyHandle*>* /*handles*/) {
3825 return Status::NotSupported("");
3826}
3827
3828Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
3829 return Status::NotSupported("");
3830}
3831
3832Status DB::DropColumnFamilies(
3833 const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
7c673cae
FG
3834 return Status::NotSupported("");
3835}
11fdf7f2 3836
7c673cae
FG
3837Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
3838 delete column_family;
3839 return Status::OK();
3840}
3841
11fdf7f2
TL
3842DB::~DB() {}
3843
3844Status DBImpl::Close() {
3845 if (!closed_) {
f67539c2
TL
3846 {
3847 InstrumentedMutexLock l(&mutex_);
3848 // If there is unreleased snapshot, fail the close call
3849 if (!snapshots_.empty()) {
3850 return Status::Aborted("Cannot close DB with unreleased snapshot.");
3851 }
3852 }
3853
11fdf7f2
TL
3854 closed_ = true;
3855 return CloseImpl();
3856 }
3857 return Status::OK();
3858}
7c673cae
FG
3859
3860Status DB::ListColumnFamilies(const DBOptions& db_options,
3861 const std::string& name,
3862 std::vector<std::string>* column_families) {
20effc67
TL
3863 const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
3864 return VersionSet::ListColumnFamilies(column_families, name, fs.get());
7c673cae
FG
3865}
3866
11fdf7f2 3867Snapshot::~Snapshot() {}
7c673cae 3868
11fdf7f2
TL
3869Status DestroyDB(const std::string& dbname, const Options& options,
3870 const std::vector<ColumnFamilyDescriptor>& column_families) {
3871 ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
7c673cae
FG
3872 Env* env = soptions.env;
3873 std::vector<std::string> filenames;
f67539c2 3874 bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
7c673cae 3875
11fdf7f2
TL
3876 // Reset the logger because it holds a handle to the
3877 // log file and prevents cleanup and directory removal
3878 soptions.info_log.reset();
7c673cae 3879 // Ignore error in case directory does not exist
20effc67 3880 env->GetChildren(dbname, &filenames).PermitUncheckedError();
7c673cae
FG
3881
3882 FileLock* lock;
3883 const std::string lockname = LockFileName(dbname);
3884 Status result = env->LockFile(lockname, &lock);
3885 if (result.ok()) {
3886 uint64_t number;
3887 FileType type;
3888 InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
11fdf7f2
TL
3889 for (const auto& fname : filenames) {
3890 if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
494da23a 3891 type != kDBLockFile) { // Lock file will be deleted at end
7c673cae 3892 Status del;
11fdf7f2 3893 std::string path_to_delete = dbname + "/" + fname;
7c673cae
FG
3894 if (type == kMetaDatabase) {
3895 del = DestroyDB(path_to_delete, options);
20effc67 3896 } else if (type == kTableFile || type == kWalFile) {
f67539c2
TL
3897 del = DeleteDBFile(&soptions, path_to_delete, dbname,
3898 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
7c673cae
FG
3899 } else {
3900 del = env->DeleteFile(path_to_delete);
3901 }
20effc67 3902 if (!del.ok() && result.ok()) {
7c673cae
FG
3903 result = del;
3904 }
3905 }
3906 }
3907
20effc67
TL
3908 std::set<std::string> paths;
3909 for (const DbPath& db_path : options.db_paths) {
3910 paths.insert(db_path.path);
11fdf7f2 3911 }
20effc67
TL
3912 for (const ColumnFamilyDescriptor& cf : column_families) {
3913 for (const DbPath& cf_path : cf.options.cf_paths) {
3914 paths.insert(cf_path.path);
11fdf7f2
TL
3915 }
3916 }
11fdf7f2
TL
3917 for (const auto& path : paths) {
3918 if (env->GetChildren(path, &filenames).ok()) {
3919 for (const auto& fname : filenames) {
3920 if (ParseFileName(fname, &number, &type) &&
494da23a 3921 type == kTableFile) { // Lock file will be deleted at end
11fdf7f2 3922 std::string table_path = path + "/" + fname;
f67539c2
TL
3923 Status del = DeleteDBFile(&soptions, table_path, dbname,
3924 /*force_bg=*/false, /*force_fg=*/false);
20effc67 3925 if (!del.ok() && result.ok()) {
11fdf7f2
TL
3926 result = del;
3927 }
7c673cae
FG
3928 }
3929 }
20effc67
TL
3930 // TODO: Should we return an error if we cannot delete the directory?
3931 env->DeleteDir(path).PermitUncheckedError();
7c673cae
FG
3932 }
3933 }
3934
3935 std::vector<std::string> walDirFiles;
3936 std::string archivedir = ArchivalDirectory(dbname);
11fdf7f2 3937 bool wal_dir_exists = false;
7c673cae 3938 if (dbname != soptions.wal_dir) {
11fdf7f2 3939 wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
7c673cae
FG
3940 archivedir = ArchivalDirectory(soptions.wal_dir);
3941 }
3942
11fdf7f2
TL
3943 // Archive dir may be inside wal dir or dbname and should be
3944 // processed and removed before those otherwise we have issues
3945 // removing them
3946 std::vector<std::string> archiveFiles;
3947 if (env->GetChildren(archivedir, &archiveFiles).ok()) {
3948 // Delete archival files.
3949 for (const auto& file : archiveFiles) {
20effc67 3950 if (ParseFileName(file, &number, &type) && type == kWalFile) {
494da23a 3951 Status del =
f67539c2
TL
3952 DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
3953 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
20effc67 3954 if (!del.ok() && result.ok()) {
11fdf7f2
TL
3955 result = del;
3956 }
7c673cae
FG
3957 }
3958 }
20effc67
TL
3959 // Ignore error in case dir contains other files
3960 env->DeleteDir(archivedir).PermitUncheckedError();
7c673cae
FG
3961 }
3962
11fdf7f2
TL
3963 // Delete log files in the WAL dir
3964 if (wal_dir_exists) {
3965 for (const auto& file : walDirFiles) {
20effc67 3966 if (ParseFileName(file, &number, &type) && type == kWalFile) {
494da23a
TL
3967 Status del =
3968 DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
f67539c2
TL
3969 soptions.wal_dir, /*force_bg=*/false,
3970 /*force_fg=*/!wal_in_db_path);
20effc67 3971 if (!del.ok() && result.ok()) {
11fdf7f2
TL
3972 result = del;
3973 }
7c673cae
FG
3974 }
3975 }
20effc67
TL
3976 // Ignore error in case dir contains other files
3977 env->DeleteDir(soptions.wal_dir).PermitUncheckedError();
7c673cae
FG
3978 }
3979
20effc67
TL
3980 // Ignore error since state is already gone
3981 env->UnlockFile(lock).PermitUncheckedError();
3982 env->DeleteFile(lockname).PermitUncheckedError();
f67539c2
TL
3983
3984 // sst_file_manager holds a ref to the logger. Make sure the logger is
3985 // gone before trying to remove the directory.
3986 soptions.sst_file_manager.reset();
3987
20effc67
TL
3988 // Ignore error in case dir contains other files
3989 env->DeleteDir(dbname).PermitUncheckedError();
3990 ;
7c673cae
FG
3991 }
3992 return result;
3993}
3994
11fdf7f2
TL
3995Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
3996 bool need_enter_write_thread) {
7c673cae 3997#ifndef ROCKSDB_LITE
11fdf7f2
TL
3998 WriteThread::Writer w;
3999 if (need_mutex_lock) {
4000 mutex_.Lock();
4001 } else {
4002 mutex_.AssertHeld();
4003 }
4004 if (need_enter_write_thread) {
4005 write_thread_.EnterUnbatched(&w, &mutex_);
4006 }
7c673cae
FG
4007
4008 std::vector<std::string> cf_names;
4009 std::vector<ColumnFamilyOptions> cf_opts;
4010
4011 // This part requires mutex to protect the column family options
4012 for (auto cfd : *versions_->GetColumnFamilySet()) {
4013 if (cfd->IsDropped()) {
4014 continue;
4015 }
4016 cf_names.push_back(cfd->GetName());
4017 cf_opts.push_back(cfd->GetLatestCFOptions());
4018 }
4019
4020 // Unlock during expensive operations. New writes cannot get here
4021 // because the single write thread ensures all new writes get queued.
4022 DBOptions db_options =
4023 BuildDBOptions(immutable_db_options_, mutable_db_options_);
4024 mutex_.Unlock();
4025
11fdf7f2
TL
4026 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
4027 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
4028
7c673cae
FG
4029 std::string file_name =
4030 TempOptionsFileName(GetName(), versions_->NewFileNumber());
f67539c2 4031 Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
20effc67 4032 fs_.get());
7c673cae
FG
4033
4034 if (s.ok()) {
4035 s = RenameTempFileToOptionsFile(file_name);
4036 }
11fdf7f2
TL
4037 // restore lock
4038 if (!need_mutex_lock) {
4039 mutex_.Lock();
4040 }
4041 if (need_enter_write_thread) {
4042 write_thread_.ExitUnbatched(&w);
4043 }
4044 if (!s.ok()) {
4045 ROCKS_LOG_WARN(immutable_db_options_.info_log,
4046 "Unnable to persist options -- %s", s.ToString().c_str());
4047 if (immutable_db_options_.fail_if_options_file_error) {
4048 return Status::IOError("Unable to persist options.",
4049 s.ToString().c_str());
4050 }
4051 }
7c673cae 4052#else
11fdf7f2
TL
4053 (void)need_mutex_lock;
4054 (void)need_enter_write_thread;
7c673cae 4055#endif // !ROCKSDB_LITE
11fdf7f2 4056 return Status::OK();
7c673cae
FG
4057}
4058
4059#ifndef ROCKSDB_LITE
4060namespace {
4061void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
4062 const size_t num_files_to_keep,
4063 const std::shared_ptr<Logger>& info_log,
4064 Env* env) {
4065 if (filenames.size() <= num_files_to_keep) {
4066 return;
4067 }
4068 for (auto iter = std::next(filenames.begin(), num_files_to_keep);
4069 iter != filenames.end(); ++iter) {
4070 if (!env->DeleteFile(iter->second).ok()) {
4071 ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
4072 iter->second.c_str());
4073 }
4074 }
4075}
4076} // namespace
4077#endif // !ROCKSDB_LITE
4078
4079Status DBImpl::DeleteObsoleteOptionsFiles() {
4080#ifndef ROCKSDB_LITE
4081 std::vector<std::string> filenames;
4082 // use ordered map to store keep the filenames sorted from the newest
4083 // to the oldest.
4084 std::map<uint64_t, std::string> options_filenames;
4085 Status s;
4086 s = GetEnv()->GetChildren(GetName(), &filenames);
4087 if (!s.ok()) {
4088 return s;
4089 }
4090 for (auto& filename : filenames) {
4091 uint64_t file_number;
4092 FileType type;
4093 if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
4094 options_filenames.insert(
4095 {std::numeric_limits<uint64_t>::max() - file_number,
4096 GetName() + "/" + filename});
4097 }
4098 }
4099
4100 // Keeps the latest 2 Options file
4101 const size_t kNumOptionsFilesKept = 2;
4102 DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
4103 immutable_db_options_.info_log, GetEnv());
4104 return Status::OK();
4105#else
4106 return Status::OK();
4107#endif // !ROCKSDB_LITE
4108}
4109
4110Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
4111#ifndef ROCKSDB_LITE
4112 Status s;
4113
494da23a 4114 uint64_t options_file_number = versions_->NewFileNumber();
7c673cae 4115 std::string options_file_name =
494da23a 4116 OptionsFileName(GetName(), options_file_number);
7c673cae
FG
4117 // Retry if the file name happen to conflict with an existing one.
4118 s = GetEnv()->RenameFile(file_name, options_file_name);
494da23a
TL
4119 if (s.ok()) {
4120 InstrumentedMutexLock l(&mutex_);
4121 versions_->options_file_number_ = options_file_number;
4122 }
7c673cae 4123
11fdf7f2 4124 if (0 == disable_delete_obsolete_files_) {
20effc67
TL
4125 // TODO: Should we check for errors here?
4126 DeleteObsoleteOptionsFiles().PermitUncheckedError();
11fdf7f2 4127 }
7c673cae
FG
4128 return s;
4129#else
11fdf7f2 4130 (void)file_name;
7c673cae
FG
4131 return Status::OK();
4132#endif // !ROCKSDB_LITE
4133}
4134
4135#ifdef ROCKSDB_USING_THREAD_STATUS
4136
11fdf7f2 4137void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
7c673cae
FG
4138 if (immutable_db_options_.enable_thread_tracking) {
4139 ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
4140 cfd->ioptions()->env);
4141 }
4142}
4143
11fdf7f2 4144void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
7c673cae
FG
4145 if (immutable_db_options_.enable_thread_tracking) {
4146 ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
4147 }
4148}
4149
4150void DBImpl::EraseThreadStatusDbInfo() const {
4151 if (immutable_db_options_.enable_thread_tracking) {
4152 ThreadStatusUtil::EraseDatabaseInfo(this);
4153 }
4154}
4155
4156#else
11fdf7f2 4157void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
7c673cae 4158
11fdf7f2 4159void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
7c673cae 4160
11fdf7f2 4161void DBImpl::EraseThreadStatusDbInfo() const {}
7c673cae
FG
4162#endif // ROCKSDB_USING_THREAD_STATUS
4163
4164//
4165// A global method that can dump out the build version
11fdf7f2 4166void DumpRocksDBBuildVersion(Logger* log) {
7c673cae 4167#if !defined(IOS_CROSS_COMPILE)
11fdf7f2 4168 // if we compile with Xcode, we don't run build_detect_version, so we don't
7c673cae
FG
4169 // generate util/build_version.cc
4170 ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
4171 ROCKSDB_MINOR, ROCKSDB_PATCH);
4172 ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
4173 ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
494da23a
TL
4174#else
4175 (void)log; // ignore "-Wunused-parameter"
7c673cae
FG
4176#endif
4177}
4178
4179#ifndef ROCKSDB_LITE
4180SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
4181 bool include_history) {
4182 // Find the earliest sequence number that we know we can rely on reading
4183 // from the memtable without needing to check sst files.
4184 SequenceNumber earliest_seq =
4185 sv->imm->GetEarliestSequenceNumber(include_history);
4186 if (earliest_seq == kMaxSequenceNumber) {
4187 earliest_seq = sv->mem->GetEarliestSequenceNumber();
4188 }
4189 assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
4190
4191 return earliest_seq;
4192}
4193#endif // ROCKSDB_LITE
4194
4195#ifndef ROCKSDB_LITE
4196Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
f67539c2
TL
4197 bool cache_only,
4198 SequenceNumber lower_bound_seq,
4199 SequenceNumber* seq,
11fdf7f2
TL
4200 bool* found_record_for_key,
4201 bool* is_blob_index) {
7c673cae
FG
4202 Status s;
4203 MergeContext merge_context;
494da23a 4204 SequenceNumber max_covering_tombstone_seq = 0;
7c673cae
FG
4205
4206 ReadOptions read_options;
4207 SequenceNumber current_seq = versions_->LastSequence();
4208 LookupKey lkey(key, current_seq);
4209
4210 *seq = kMaxSequenceNumber;
4211 *found_record_for_key = false;
4212
4213 // Check if there is a record for this key in the latest memtable
20effc67
TL
4214 sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context,
4215 &max_covering_tombstone_seq, seq, read_options,
4216 nullptr /*read_callback*/, is_blob_index);
7c673cae
FG
4217
4218 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4219 // unexpected error reading memtable.
4220 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4221 "Unexpected status returned from MemTable::Get: %s\n",
4222 s.ToString().c_str());
4223
4224 return s;
4225 }
4226
4227 if (*seq != kMaxSequenceNumber) {
4228 // Found a sequence number, no need to check immutable memtables
4229 *found_record_for_key = true;
4230 return Status::OK();
4231 }
4232
f67539c2
TL
4233 SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
4234 if (lower_bound_in_mem != kMaxSequenceNumber &&
4235 lower_bound_in_mem < lower_bound_seq) {
4236 *found_record_for_key = false;
4237 return Status::OK();
4238 }
4239
7c673cae 4240 // Check if there is a record for this key in the immutable memtables
20effc67
TL
4241 sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context,
4242 &max_covering_tombstone_seq, seq, read_options,
4243 nullptr /*read_callback*/, is_blob_index);
7c673cae
FG
4244
4245 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4246 // unexpected error reading memtable.
4247 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4248 "Unexpected status returned from MemTableList::Get: %s\n",
4249 s.ToString().c_str());
4250
4251 return s;
4252 }
4253
4254 if (*seq != kMaxSequenceNumber) {
4255 // Found a sequence number, no need to check memtable history
4256 *found_record_for_key = true;
4257 return Status::OK();
4258 }
4259
f67539c2
TL
4260 SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
4261 if (lower_bound_in_imm != kMaxSequenceNumber &&
4262 lower_bound_in_imm < lower_bound_seq) {
4263 *found_record_for_key = false;
4264 return Status::OK();
4265 }
4266
7c673cae 4267 // Check if there is a record for this key in the immutable memtables
20effc67 4268 sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context,
494da23a
TL
4269 &max_covering_tombstone_seq, seq, read_options,
4270 is_blob_index);
7c673cae
FG
4271
4272 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4273 // unexpected error reading memtable.
4274 ROCKS_LOG_ERROR(
4275 immutable_db_options_.info_log,
4276 "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
4277 s.ToString().c_str());
4278
4279 return s;
4280 }
4281
4282 if (*seq != kMaxSequenceNumber) {
4283 // Found a sequence number, no need to check SST files
4284 *found_record_for_key = true;
4285 return Status::OK();
4286 }
4287
f67539c2
TL
4288 // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
4289 // check here to skip the history if possible. But currently the caller
4290 // already does that. Maybe we should move the logic here later.
4291
7c673cae
FG
4292 // TODO(agiardullo): possible optimization: consider checking cached
4293 // SST files if cache_only=true?
4294 if (!cache_only) {
4295 // Check tables
20effc67 4296 sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context,
494da23a 4297 &max_covering_tombstone_seq, nullptr /* value_found */,
11fdf7f2
TL
4298 found_record_for_key, seq, nullptr /*read_callback*/,
4299 is_blob_index);
7c673cae
FG
4300
4301 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4302 // unexpected error reading SST files
4303 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4304 "Unexpected status returned from Version::Get: %s\n",
4305 s.ToString().c_str());
7c673cae
FG
4306 }
4307 }
4308
494da23a 4309 return s;
7c673cae
FG
4310}
4311
4312Status DBImpl::IngestExternalFile(
4313 ColumnFamilyHandle* column_family,
4314 const std::vector<std::string>& external_files,
4315 const IngestExternalFileOptions& ingestion_options) {
494da23a
TL
4316 IngestExternalFileArg arg;
4317 arg.column_family = column_family;
4318 arg.external_files = external_files;
4319 arg.options = ingestion_options;
4320 return IngestExternalFiles({arg});
4321}
7c673cae 4322
494da23a
TL
4323Status DBImpl::IngestExternalFiles(
4324 const std::vector<IngestExternalFileArg>& args) {
4325 if (args.empty()) {
4326 return Status::InvalidArgument("ingestion arg list is empty");
4327 }
4328 {
4329 std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
4330 for (const auto& arg : args) {
4331 if (arg.column_family == nullptr) {
4332 return Status::InvalidArgument("column family handle is null");
4333 } else if (unique_cfhs.count(arg.column_family) > 0) {
4334 return Status::InvalidArgument(
4335 "ingestion args have duplicate column families");
4336 }
4337 unique_cfhs.insert(arg.column_family);
4338 }
4339 }
4340 // Ingest multiple external SST files atomically.
4341 size_t num_cfs = args.size();
4342 for (size_t i = 0; i != num_cfs; ++i) {
4343 if (args[i].external_files.empty()) {
4344 char err_msg[128] = {0};
4345 snprintf(err_msg, 128, "external_files[%zu] is empty", i);
4346 return Status::InvalidArgument(err_msg);
4347 }
4348 }
4349 for (const auto& arg : args) {
4350 const IngestExternalFileOptions& ingest_opts = arg.options;
4351 if (ingest_opts.ingest_behind &&
4352 !immutable_db_options_.allow_ingest_behind) {
11fdf7f2 4353 return Status::InvalidArgument(
494da23a 4354 "can't ingest_behind file in DB with allow_ingest_behind=false");
11fdf7f2
TL
4355 }
4356 }
4357
494da23a
TL
4358 // TODO (yanqin) maybe handle the case in which column_families have
4359 // duplicates
f67539c2 4360 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
494da23a
TL
4361 size_t total = 0;
4362 for (const auto& arg : args) {
4363 total += arg.external_files.size();
11fdf7f2 4364 }
494da23a
TL
4365 uint64_t next_file_number = 0;
4366 Status status = ReserveFileNumbersBeforeIngestion(
4367 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
f67539c2 4368 pending_output_elem, &next_file_number);
11fdf7f2
TL
4369 if (!status.ok()) {
4370 InstrumentedMutexLock l(&mutex_);
4371 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4372 return status;
7c673cae
FG
4373 }
4374
494da23a
TL
4375 std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
4376 for (const auto& arg : args) {
4377 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
f67539c2
TL
4378 ingestion_jobs.emplace_back(
4379 env_, versions_.get(), cfd, immutable_db_options_, file_options_,
20effc67 4380 &snapshots_, arg.options, &directories_, &event_logger_, io_tracer_);
494da23a
TL
4381 }
4382 std::vector<std::pair<bool, Status>> exec_results;
4383 for (size_t i = 0; i != num_cfs; ++i) {
4384 exec_results.emplace_back(false, Status::OK());
4385 }
4386 // TODO(yanqin) maybe make jobs run in parallel
f67539c2 4387 uint64_t start_file_number = next_file_number;
494da23a 4388 for (size_t i = 1; i != num_cfs; ++i) {
f67539c2 4389 start_file_number += args[i - 1].external_files.size();
494da23a
TL
4390 auto* cfd =
4391 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
f67539c2 4392 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
494da23a 4393 exec_results[i].second = ingestion_jobs[i].Prepare(
20effc67
TL
4394 args[i].external_files, args[i].files_checksums,
4395 args[i].files_checksum_func_names, start_file_number, super_version);
494da23a
TL
4396 exec_results[i].first = true;
4397 CleanupSuperVersion(super_version);
4398 }
4399 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
4400 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
4401 {
4402 auto* cfd =
4403 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
f67539c2 4404 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
494da23a 4405 exec_results[0].second = ingestion_jobs[0].Prepare(
20effc67
TL
4406 args[0].external_files, args[0].files_checksums,
4407 args[0].files_checksum_func_names, next_file_number, super_version);
494da23a
TL
4408 exec_results[0].first = true;
4409 CleanupSuperVersion(super_version);
4410 }
4411 for (const auto& exec_result : exec_results) {
4412 if (!exec_result.second.ok()) {
4413 status = exec_result.second;
4414 break;
4415 }
4416 }
7c673cae 4417 if (!status.ok()) {
494da23a
TL
4418 for (size_t i = 0; i != num_cfs; ++i) {
4419 if (exec_results[i].first) {
4420 ingestion_jobs[i].Cleanup(status);
4421 }
4422 }
11fdf7f2
TL
4423 InstrumentedMutexLock l(&mutex_);
4424 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
7c673cae
FG
4425 return status;
4426 }
4427
494da23a
TL
4428 std::vector<SuperVersionContext> sv_ctxs;
4429 for (size_t i = 0; i != num_cfs; ++i) {
4430 sv_ctxs.emplace_back(true /* create_superversion */);
4431 }
4432 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
4433 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
7c673cae
FG
4434 TEST_SYNC_POINT("DBImpl::AddFile:Start");
4435 {
7c673cae
FG
4436 InstrumentedMutexLock l(&mutex_);
4437 TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
4438
11fdf7f2 4439 // Stop writes to the DB by entering both write threads
7c673cae
FG
4440 WriteThread::Writer w;
4441 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
4442 WriteThread::Writer nonmem_w;
4443 if (two_write_queues_) {
4444 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4445 }
7c673cae 4446
f67539c2
TL
4447 // When unordered_write is enabled, the keys are writing to memtable in an
4448 // unordered way. If the ingestion job checks memtable key range before the
4449 // key landing in memtable, the ingestion job may skip the necessary
4450 // memtable flush.
4451 // So wait here to ensure there is no pending write to memtable.
4452 WaitForPendingWrites();
4453
494da23a
TL
4454 num_running_ingest_file_ += static_cast<int>(num_cfs);
4455 TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
7c673cae 4456
494da23a
TL
4457 bool at_least_one_cf_need_flush = false;
4458 std::vector<bool> need_flush(num_cfs, false);
4459 for (size_t i = 0; i != num_cfs; ++i) {
4460 auto* cfd =
4461 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4462 if (cfd->IsDropped()) {
4463 // TODO (yanqin) investigate whether we should abort ingestion or
4464 // proceed with other non-dropped column families.
4465 status = Status::InvalidArgument(
4466 "cannot ingest an external file into a dropped CF");
4467 break;
4468 }
4469 bool tmp = false;
4470 status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
4471 need_flush[i] = tmp;
4472 at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
4473 if (!status.ok()) {
4474 break;
4475 }
7c673cae 4476 }
494da23a
TL
4477 TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
4478 &at_least_one_cf_need_flush);
7c673cae 4479
494da23a
TL
4480 if (status.ok() && at_least_one_cf_need_flush) {
4481 FlushOptions flush_opts;
4482 flush_opts.allow_write_stall = true;
4483 if (immutable_db_options_.atomic_flush) {
4484 autovector<ColumnFamilyData*> cfds_to_flush;
4485 SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
7c673cae 4486 mutex_.Unlock();
494da23a
TL
4487 status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
4488 FlushReason::kExternalFileIngestion,
4489 true /* writes_stopped */);
7c673cae 4490 mutex_.Lock();
494da23a
TL
4491 } else {
4492 for (size_t i = 0; i != num_cfs; ++i) {
4493 if (need_flush[i]) {
4494 mutex_.Unlock();
4495 auto* cfd =
4496 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
4497 ->cfd();
4498 status = FlushMemTable(cfd, flush_opts,
4499 FlushReason::kExternalFileIngestion,
4500 true /* writes_stopped */);
4501 mutex_.Lock();
4502 if (!status.ok()) {
4503 break;
4504 }
4505 }
4506 }
7c673cae
FG
4507 }
4508 }
494da23a 4509 // Run ingestion jobs.
7c673cae 4510 if (status.ok()) {
494da23a
TL
4511 for (size_t i = 0; i != num_cfs; ++i) {
4512 status = ingestion_jobs[i].Run();
4513 if (!status.ok()) {
4514 break;
4515 }
4516 }
7c673cae 4517 }
7c673cae 4518 if (status.ok()) {
f67539c2
TL
4519 int consumed_seqno_count =
4520 ingestion_jobs[0].ConsumedSequenceNumbersCount();
494da23a
TL
4521#ifndef NDEBUG
4522 for (size_t i = 1; i != num_cfs; ++i) {
f67539c2
TL
4523 assert(!!consumed_seqno_count ==
4524 !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
4525 consumed_seqno_count +=
4526 ingestion_jobs[i].ConsumedSequenceNumbersCount();
494da23a
TL
4527 }
4528#endif
f67539c2 4529 if (consumed_seqno_count > 0) {
494da23a 4530 const SequenceNumber last_seqno = versions_->LastSequence();
f67539c2
TL
4531 versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
4532 versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
4533 versions_->SetLastSequence(last_seqno + consumed_seqno_count);
494da23a
TL
4534 }
4535 autovector<ColumnFamilyData*> cfds_to_commit;
4536 autovector<const MutableCFOptions*> mutable_cf_options_list;
4537 autovector<autovector<VersionEdit*>> edit_lists;
4538 uint32_t num_entries = 0;
4539 for (size_t i = 0; i != num_cfs; ++i) {
4540 auto* cfd =
4541 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4542 if (cfd->IsDropped()) {
4543 continue;
4544 }
4545 cfds_to_commit.push_back(cfd);
4546 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
4547 autovector<VersionEdit*> edit_list;
4548 edit_list.push_back(ingestion_jobs[i].edit());
4549 edit_lists.push_back(edit_list);
4550 ++num_entries;
4551 }
4552 // Mark the version edits as an atomic group if the number of version
4553 // edits exceeds 1.
4554 if (cfds_to_commit.size() > 1) {
4555 for (auto& edits : edit_lists) {
4556 assert(edits.size() == 1);
4557 edits[0]->MarkAtomicGroup(--num_entries);
4558 }
4559 assert(0 == num_entries);
4560 }
7c673cae 4561 status =
494da23a
TL
4562 versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
4563 edit_lists, &mutex_, directories_.GetDbDir());
7c673cae 4564 }
494da23a 4565
7c673cae 4566 if (status.ok()) {
494da23a
TL
4567 for (size_t i = 0; i != num_cfs; ++i) {
4568 auto* cfd =
4569 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4570 if (!cfd->IsDropped()) {
4571 InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
4572 *cfd->GetLatestMutableCFOptions());
4573#ifndef NDEBUG
4574 if (0 == i && num_cfs > 1) {
4575 TEST_SYNC_POINT(
4576 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
4577 TEST_SYNC_POINT(
4578 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
4579 }
4580#endif // !NDEBUG
4581 }
4582 }
20effc67
TL
4583 } else if (versions_->io_status().IsIOError()) {
4584 // Error while writing to MANIFEST.
4585 // In fact, versions_->io_status() can also be the result of renaming
4586 // CURRENT file. With current code, it's just difficult to tell. So just
4587 // be pessimistic and try write to a new MANIFEST.
4588 // TODO: distinguish between MANIFEST write and CURRENT renaming
4589 const IOStatus& io_s = versions_->io_status();
4590 // Should handle return error?
4591 error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite)
4592 .PermitUncheckedError();
7c673cae
FG
4593 }
4594
4595 // Resume writes to the DB
11fdf7f2
TL
4596 if (two_write_queues_) {
4597 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4598 }
7c673cae
FG
4599 write_thread_.ExitUnbatched(&w);
4600
7c673cae 4601 if (status.ok()) {
494da23a
TL
4602 for (auto& job : ingestion_jobs) {
4603 job.UpdateStats();
4604 }
7c673cae 4605 }
7c673cae 4606 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
494da23a
TL
4607 num_running_ingest_file_ -= static_cast<int>(num_cfs);
4608 if (0 == num_running_ingest_file_) {
7c673cae
FG
4609 bg_cv_.SignalAll();
4610 }
7c673cae
FG
4611 TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
4612 }
4613 // mutex_ is unlocked here
4614
4615 // Cleanup
494da23a
TL
4616 for (size_t i = 0; i != num_cfs; ++i) {
4617 sv_ctxs[i].Clean();
4618 // This may rollback jobs that have completed successfully. This is
4619 // intended for atomicity.
4620 ingestion_jobs[i].Cleanup(status);
4621 }
7c673cae 4622 if (status.ok()) {
494da23a
TL
4623 for (size_t i = 0; i != num_cfs; ++i) {
4624 auto* cfd =
4625 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4626 if (!cfd->IsDropped()) {
4627 NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
4628 }
4629 }
7c673cae 4630 }
7c673cae
FG
4631 return status;
4632}
4633
f67539c2
TL
4634Status DBImpl::CreateColumnFamilyWithImport(
4635 const ColumnFamilyOptions& options, const std::string& column_family_name,
4636 const ImportColumnFamilyOptions& import_options,
4637 const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
4638 assert(handle != nullptr);
4639 assert(*handle == nullptr);
4640 std::string cf_comparator_name = options.comparator->Name();
4641 if (cf_comparator_name != metadata.db_comparator_name) {
4642 return Status::InvalidArgument("Comparator name mismatch");
4643 }
4644
4645 // Create column family.
4646 auto status = CreateColumnFamily(options, column_family_name, handle);
4647 if (!status.ok()) {
4648 return status;
4649 }
4650
4651 // Import sst files from metadata.
20effc67 4652 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(*handle);
f67539c2
TL
4653 auto cfd = cfh->cfd();
4654 ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
4655 immutable_db_options_, file_options_,
20effc67 4656 import_options, metadata.files, io_tracer_);
f67539c2
TL
4657
4658 SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
4659 VersionEdit dummy_edit;
4660 uint64_t next_file_number = 0;
4661 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4662 {
4663 // Lock db mutex
4664 InstrumentedMutexLock l(&mutex_);
4665 if (error_handler_.IsDBStopped()) {
4666 // Don't import files when there is a bg_error
4667 status = error_handler_.GetBGError();
4668 }
4669
4670 // Make sure that bg cleanup wont delete the files that we are importing
4671 pending_output_elem.reset(new std::list<uint64_t>::iterator(
4672 CaptureCurrentFileNumberInPendingOutputs()));
4673
4674 if (status.ok()) {
4675 // If crash happen after a hard link established, Recover function may
4676 // reuse the file number that has already assigned to the internal file,
4677 // and this will overwrite the external file. To protect the external
4678 // file, we have to make sure the file number will never being reused.
4679 next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
4680 auto cf_options = cfd->GetLatestMutableCFOptions();
4681 status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4682 directories_.GetDbDir());
4683 if (status.ok()) {
4684 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4685 }
4686 }
4687 }
4688 dummy_sv_ctx.Clean();
4689
4690 if (status.ok()) {
4691 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
4692 status = import_job.Prepare(next_file_number, sv);
4693 CleanupSuperVersion(sv);
4694 }
4695
4696 if (status.ok()) {
4697 SuperVersionContext sv_context(true /*create_superversion*/);
4698 {
4699 // Lock db mutex
4700 InstrumentedMutexLock l(&mutex_);
4701
4702 // Stop writes to the DB by entering both write threads
4703 WriteThread::Writer w;
4704 write_thread_.EnterUnbatched(&w, &mutex_);
4705 WriteThread::Writer nonmem_w;
4706 if (two_write_queues_) {
4707 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4708 }
4709
4710 num_running_ingest_file_++;
4711 assert(!cfd->IsDropped());
4712 status = import_job.Run();
4713
4714 // Install job edit [Mutex will be unlocked here]
4715 if (status.ok()) {
4716 auto cf_options = cfd->GetLatestMutableCFOptions();
4717 status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
4718 &mutex_, directories_.GetDbDir());
4719 if (status.ok()) {
4720 InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
4721 }
4722 }
4723
4724 // Resume writes to the DB
4725 if (two_write_queues_) {
4726 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4727 }
4728 write_thread_.ExitUnbatched(&w);
4729
4730 num_running_ingest_file_--;
4731 if (num_running_ingest_file_ == 0) {
4732 bg_cv_.SignalAll();
4733 }
4734 }
4735 // mutex_ is unlocked here
4736
4737 sv_context.Clean();
4738 }
4739
4740 {
4741 InstrumentedMutexLock l(&mutex_);
4742 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4743 }
4744
4745 import_job.Cleanup(status);
4746 if (!status.ok()) {
20effc67
TL
4747 Status temp_s = DropColumnFamily(*handle);
4748 if (!temp_s.ok()) {
4749 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4750 "DropColumnFamily failed with error %s",
4751 temp_s.ToString().c_str());
4752 }
4753 // Always returns Status::OK()
4754 temp_s = DestroyColumnFamilyHandle(*handle);
4755 assert(temp_s.ok());
f67539c2
TL
4756 *handle = nullptr;
4757 }
4758 return status;
4759}
4760
20effc67
TL
4761Status DBImpl::VerifyFileChecksums(const ReadOptions& read_options) {
4762 return VerifyChecksumInternal(read_options, /*use_file_checksum=*/true);
4763}
4764
f67539c2 4765Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
20effc67
TL
4766 return VerifyChecksumInternal(read_options, /*use_file_checksum=*/false);
4767}
4768
4769Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
4770 bool use_file_checksum) {
11fdf7f2 4771 Status s;
20effc67
TL
4772
4773 if (use_file_checksum) {
4774 FileChecksumGenFactory* const file_checksum_gen_factory =
4775 immutable_db_options_.file_checksum_gen_factory.get();
4776 if (!file_checksum_gen_factory) {
4777 s = Status::InvalidArgument(
4778 "Cannot verify file checksum if options.file_checksum_gen_factory is "
4779 "null");
4780 return s;
4781 }
4782 }
4783
11fdf7f2
TL
4784 std::vector<ColumnFamilyData*> cfd_list;
4785 {
4786 InstrumentedMutexLock l(&mutex_);
4787 for (auto cfd : *versions_->GetColumnFamilySet()) {
4788 if (!cfd->IsDropped() && cfd->initialized()) {
4789 cfd->Ref();
4790 cfd_list.push_back(cfd);
4791 }
4792 }
4793 }
4794 std::vector<SuperVersion*> sv_list;
4795 for (auto cfd : cfd_list) {
f67539c2 4796 sv_list.push_back(cfd->GetReferencedSuperVersion(this));
11fdf7f2 4797 }
20effc67 4798
11fdf7f2
TL
4799 for (auto& sv : sv_list) {
4800 VersionStorageInfo* vstorage = sv->current->storage_info();
4801 ColumnFamilyData* cfd = sv->current->cfd();
4802 Options opts;
20effc67 4803 if (!use_file_checksum) {
11fdf7f2 4804 InstrumentedMutexLock l(&mutex_);
494da23a
TL
4805 opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
4806 cfd->GetLatestCFOptions());
11fdf7f2
TL
4807 }
4808 for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
4809 for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
4810 j++) {
20effc67
TL
4811 const auto& fd_with_krange = vstorage->LevelFilesBrief(i).files[j];
4812 const auto& fd = fd_with_krange.fd;
4813 const FileMetaData* fmeta = fd_with_krange.file_metadata;
4814 assert(fmeta);
11fdf7f2
TL
4815 std::string fname = TableFileName(cfd->ioptions()->cf_paths,
4816 fd.GetNumber(), fd.GetPathId());
20effc67
TL
4817 if (use_file_checksum) {
4818 s = VerifySstFileChecksum(*fmeta, fname, read_options);
4819 } else {
4820 s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
4821 read_options, fname);
4822 }
11fdf7f2
TL
4823 }
4824 }
4825 if (!s.ok()) {
4826 break;
4827 }
4828 }
f67539c2
TL
4829 bool defer_purge =
4830 immutable_db_options().avoid_unnecessary_blocking_io;
11fdf7f2
TL
4831 {
4832 InstrumentedMutexLock l(&mutex_);
4833 for (auto sv : sv_list) {
4834 if (sv && sv->Unref()) {
4835 sv->Cleanup();
f67539c2
TL
4836 if (defer_purge) {
4837 AddSuperVersionsToFreeQueue(sv);
4838 } else {
4839 delete sv;
4840 }
11fdf7f2
TL
4841 }
4842 }
f67539c2
TL
4843 if (defer_purge) {
4844 SchedulePurge();
4845 }
11fdf7f2 4846 for (auto cfd : cfd_list) {
f67539c2 4847 cfd->UnrefAndTryDelete();
11fdf7f2
TL
4848 }
4849 }
4850 return s;
4851}
4852
20effc67
TL
4853Status DBImpl::VerifySstFileChecksum(const FileMetaData& fmeta,
4854 const std::string& fname,
4855 const ReadOptions& read_options) {
4856 Status s;
4857 if (fmeta.file_checksum == kUnknownFileChecksum) {
4858 return s;
4859 }
4860 std::string file_checksum;
4861 std::string func_name;
4862 s = ROCKSDB_NAMESPACE::GenerateOneFileChecksum(
4863 fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
4864 fmeta.file_checksum_func_name, &file_checksum, &func_name,
4865 read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
4866 io_tracer_);
4867 if (s.ok()) {
4868 assert(fmeta.file_checksum_func_name == func_name);
4869 if (file_checksum != fmeta.file_checksum) {
4870 std::ostringstream oss;
4871 oss << fname << " file checksum mismatch, ";
4872 oss << "expecting " << Slice(fmeta.file_checksum).ToString(/*hex=*/true);
4873 oss << ", but actual " << Slice(file_checksum).ToString(/*hex=*/true);
4874 s = Status::Corruption(oss.str());
4875 TEST_SYNC_POINT_CALLBACK("DBImpl::VerifySstFileChecksum:mismatch", &s);
4876 }
4877 }
4878 return s;
4879}
4880
7c673cae
FG
4881void DBImpl::NotifyOnExternalFileIngested(
4882 ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
7c673cae
FG
4883 if (immutable_db_options_.listeners.empty()) {
4884 return;
4885 }
4886
4887 for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
4888 ExternalFileIngestionInfo info;
4889 info.cf_name = cfd->GetName();
4890 info.external_file_path = f.external_file_path;
4891 info.internal_file_path = f.internal_file_path;
4892 info.global_seqno = f.assigned_seqno;
4893 info.table_properties = f.table_properties;
4894 for (auto listener : immutable_db_options_.listeners) {
4895 listener->OnExternalFileIngested(this, info);
4896 }
4897 }
7c673cae
FG
4898}
4899
4900void DBImpl::WaitForIngestFile() {
4901 mutex_.AssertHeld();
4902 while (num_running_ingest_file_ > 0) {
4903 bg_cv_.Wait();
4904 }
4905}
4906
494da23a 4907Status DBImpl::StartTrace(const TraceOptions& trace_options,
11fdf7f2
TL
4908 std::unique_ptr<TraceWriter>&& trace_writer) {
4909 InstrumentedMutexLock lock(&trace_mutex_);
494da23a 4910 tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
11fdf7f2
TL
4911 return Status::OK();
4912}
4913
4914Status DBImpl::EndTrace() {
4915 InstrumentedMutexLock lock(&trace_mutex_);
494da23a
TL
4916 Status s;
4917 if (tracer_ != nullptr) {
4918 s = tracer_->Close();
4919 tracer_.reset();
4920 } else {
4921 return Status::IOError("No trace file to close");
4922 }
11fdf7f2
TL
4923 return s;
4924}
4925
f67539c2
TL
4926Status DBImpl::StartBlockCacheTrace(
4927 const TraceOptions& trace_options,
4928 std::unique_ptr<TraceWriter>&& trace_writer) {
4929 return block_cache_tracer_.StartTrace(env_, trace_options,
4930 std::move(trace_writer));
4931}
4932
4933Status DBImpl::EndBlockCacheTrace() {
4934 block_cache_tracer_.EndTrace();
4935 return Status::OK();
4936}
4937
11fdf7f2
TL
4938Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
4939 Status s;
4940 if (tracer_) {
4941 InstrumentedMutexLock lock(&trace_mutex_);
4942 if (tracer_) {
4943 s = tracer_->IteratorSeek(cf_id, key);
4944 }
4945 }
4946 return s;
4947}
4948
4949Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
4950 const Slice& key) {
4951 Status s;
4952 if (tracer_) {
4953 InstrumentedMutexLock lock(&trace_mutex_);
4954 if (tracer_) {
4955 s = tracer_->IteratorSeekForPrev(cf_id, key);
4956 }
4957 }
4958 return s;
4959}
4960
494da23a
TL
4961Status DBImpl::ReserveFileNumbersBeforeIngestion(
4962 ColumnFamilyData* cfd, uint64_t num,
f67539c2 4963 std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
494da23a
TL
4964 uint64_t* next_file_number) {
4965 Status s;
4966 SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
494da23a
TL
4967 assert(nullptr != next_file_number);
4968 InstrumentedMutexLock l(&mutex_);
4969 if (error_handler_.IsDBStopped()) {
4970 // Do not ingest files when there is a bg_error
4971 return error_handler_.GetBGError();
4972 }
f67539c2
TL
4973 pending_output_elem.reset(new std::list<uint64_t>::iterator(
4974 CaptureCurrentFileNumberInPendingOutputs()));
494da23a
TL
4975 *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
4976 auto cf_options = cfd->GetLatestMutableCFOptions();
4977 VersionEdit dummy_edit;
4978 // If crash happen after a hard link established, Recover function may
4979 // reuse the file number that has already assigned to the internal file,
4980 // and this will overwrite the external file. To protect the external
4981 // file, we have to make sure the file number will never being reused.
4982 s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4983 directories_.GetDbDir());
4984 if (s.ok()) {
4985 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4986 }
4987 dummy_sv_ctx.Clean();
4988 return s;
4989}
f67539c2
TL
4990
4991Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
4992 if (mutable_db_options_.max_open_files == -1) {
4993 uint64_t oldest_time = port::kMaxUint64;
4994 for (auto cfd : *versions_->GetColumnFamilySet()) {
4995 if (!cfd->IsDropped()) {
4996 uint64_t ctime;
4997 {
4998 SuperVersion* sv = GetAndRefSuperVersion(cfd);
4999 Version* version = sv->current;
5000 version->GetCreationTimeOfOldestFile(&ctime);
5001 ReturnAndCleanupSuperVersion(cfd, sv);
5002 }
5003
5004 if (ctime < oldest_time) {
5005 oldest_time = ctime;
5006 }
5007 if (oldest_time == 0) {
5008 break;
5009 }
5010 }
5011 }
5012 *creation_time = oldest_time;
5013 return Status::OK();
5014 } else {
5015 return Status::NotSupported("This API only works if max_open_files = -1");
5016 }
5017}
7c673cae
FG
5018#endif // ROCKSDB_LITE
5019
f67539c2 5020} // namespace ROCKSDB_NAMESPACE