]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl/db_impl.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
f67539c2 9#include "db/db_impl/db_impl.h"
7c673cae 10
7c673cae
FG
11#include <stdint.h>
12#ifdef OS_SOLARIS
13#include <alloca.h>
14#endif
7c673cae
FG
15
16#include <algorithm>
f67539c2 17#include <cinttypes>
7c673cae
FG
18#include <cstdio>
19#include <map>
20#include <set>
20effc67 21#include <sstream>
7c673cae
FG
22#include <stdexcept>
23#include <string>
24#include <unordered_map>
7c673cae
FG
25#include <utility>
26#include <vector>
27
f67539c2 28#include "db/arena_wrapped_db_iter.h"
7c673cae 29#include "db/builder.h"
f67539c2 30#include "db/compaction/compaction_job.h"
7c673cae
FG
31#include "db/db_info_dumper.h"
32#include "db/db_iter.h"
33#include "db/dbformat.h"
11fdf7f2 34#include "db/error_handler.h"
7c673cae
FG
35#include "db/event_helpers.h"
36#include "db/external_sst_file_ingestion_job.h"
37#include "db/flush_job.h"
38#include "db/forward_iterator.h"
f67539c2 39#include "db/import_column_family_job.h"
7c673cae
FG
40#include "db/job_context.h"
41#include "db/log_reader.h"
42#include "db/log_writer.h"
11fdf7f2 43#include "db/malloc_stats.h"
7c673cae
FG
44#include "db/memtable.h"
45#include "db/memtable_list.h"
46#include "db/merge_context.h"
47#include "db/merge_helper.h"
1e59de90 48#include "db/periodic_task_scheduler.h"
494da23a 49#include "db/range_tombstone_fragmenter.h"
7c673cae
FG
50#include "db/table_cache.h"
51#include "db/table_properties_collector.h"
52#include "db/transaction_log_impl.h"
53#include "db/version_set.h"
54#include "db/write_batch_internal.h"
55#include "db/write_callback.h"
1e59de90 56#include "env/unique_id_gen.h"
f67539c2
TL
57#include "file/file_util.h"
58#include "file/filename.h"
59#include "file/random_access_file_reader.h"
60#include "file/sst_file_manager_impl.h"
61#include "logging/auto_roll_logger.h"
62#include "logging/log_buffer.h"
63#include "logging/logging.h"
f67539c2 64#include "monitoring/in_memory_stats_history.h"
1e59de90 65#include "monitoring/instrumented_mutex.h"
7c673cae
FG
66#include "monitoring/iostats_context_imp.h"
67#include "monitoring/perf_context_imp.h"
f67539c2 68#include "monitoring/persistent_stats_history.h"
7c673cae
FG
69#include "monitoring/thread_status_updater.h"
70#include "monitoring/thread_status_util.h"
71#include "options/cf_options.h"
72#include "options/options_helper.h"
73#include "options/options_parser.h"
7c673cae
FG
74#include "port/port.h"
75#include "rocksdb/cache.h"
76#include "rocksdb/compaction_filter.h"
11fdf7f2 77#include "rocksdb/convenience.h"
7c673cae
FG
78#include "rocksdb/db.h"
79#include "rocksdb/env.h"
80#include "rocksdb/merge_operator.h"
81#include "rocksdb/statistics.h"
494da23a 82#include "rocksdb/stats_history.h"
7c673cae
FG
83#include "rocksdb/status.h"
84#include "rocksdb/table.h"
1e59de90 85#include "rocksdb/version.h"
7c673cae 86#include "rocksdb/write_buffer_manager.h"
f67539c2
TL
87#include "table/block_based/block.h"
88#include "table/block_based/block_based_table_factory.h"
89#include "table/get_context.h"
7c673cae 90#include "table/merging_iterator.h"
f67539c2 91#include "table/multiget_context.h"
20effc67 92#include "table/sst_file_dumper.h"
7c673cae
FG
93#include "table/table_builder.h"
94#include "table/two_level_iterator.h"
1e59de90 95#include "table/unique_id_impl.h"
f67539c2 96#include "test_util/sync_point.h"
1e59de90 97#include "trace_replay/trace_replay.h"
7c673cae 98#include "util/autovector.h"
f67539c2 99#include "util/cast_util.h"
7c673cae
FG
100#include "util/coding.h"
101#include "util/compression.h"
102#include "util/crc32c.h"
1e59de90
TL
103#include "util/defer.h"
104#include "util/distributed_mutex.h"
105#include "util/hash_containers.h"
7c673cae 106#include "util/mutexlock.h"
7c673cae
FG
107#include "util/stop_watch.h"
108#include "util/string_util.h"
1e59de90 109#include "utilities/trace/replayer_impl.h"
7c673cae 110
f67539c2
TL
111namespace ROCKSDB_NAMESPACE {
112
7c673cae 113const std::string kDefaultColumnFamilyName("default");
f67539c2
TL
114const std::string kPersistentStatsColumnFamilyName(
115 "___rocksdb_stats_history___");
11fdf7f2 116void DumpRocksDBBuildVersion(Logger* log);
7c673cae
FG
117
118CompressionType GetCompressionFlush(
119 const ImmutableCFOptions& ioptions,
120 const MutableCFOptions& mutable_cf_options) {
121 // Compressing memtable flushes might not help unless the sequential load
122 // optimization is used for leveled compaction. Otherwise the CPU and
123 // latency overhead is not offset by saving much space.
1e59de90
TL
124 if (ioptions.compaction_style == kCompactionStyleUniversal &&
125 mutable_cf_options.compaction_options_universal
126 .compression_size_percent >= 0) {
127 return kNoCompression;
128 }
129 if (mutable_cf_options.compression_per_level.empty()) {
7c673cae 130 return mutable_cf_options.compression;
1e59de90
TL
131 } else {
132 // For leveled compress when min_level_to_compress != 0.
133 return mutable_cf_options.compression_per_level[0];
7c673cae
FG
134 }
135}
136
137namespace {
138void DumpSupportInfo(Logger* logger) {
139 ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
11fdf7f2
TL
140 for (auto& compression : OptionsHelper::compression_type_string_map) {
141 if (compression.second != kNoCompression &&
142 compression.second != kDisableCompressionOption) {
143 ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
144 CompressionTypeSupported(compression.second));
145 }
146 }
147 ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
148 crc32c::IsFastCrc32Supported().c_str());
1e59de90
TL
149
150 ROCKS_LOG_HEADER(logger, "DMutex implementation: %s", DMutex::kName());
7c673cae 151}
11fdf7f2
TL
152} // namespace
153
154DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
1e59de90
TL
155 const bool seq_per_batch, const bool batch_per_txn,
156 bool read_only)
f67539c2 157 : dbname_(dbname),
11fdf7f2 158 own_info_log_(options.info_log == nullptr),
1e59de90
TL
159 init_logger_creation_s_(),
160 initial_db_options_(SanitizeOptions(dbname, options, read_only,
161 &init_logger_creation_s_)),
f67539c2 162 env_(initial_db_options_.env),
20effc67 163 io_tracer_(std::make_shared<IOTracer>()),
7c673cae 164 immutable_db_options_(initial_db_options_),
20effc67 165 fs_(immutable_db_options_.fs, io_tracer_),
7c673cae 166 mutable_db_options_(initial_db_options_),
1e59de90
TL
167 stats_(immutable_db_options_.stats),
168#ifdef COERCE_CONTEXT_SWITCH
169 mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS, &bg_cv_,
170 immutable_db_options_.use_adaptive_mutex),
171#else // COERCE_CONTEXT_SWITCH
172 mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS,
7c673cae 173 immutable_db_options_.use_adaptive_mutex),
1e59de90 174#endif // COERCE_CONTEXT_SWITCH
494da23a 175 default_cf_handle_(nullptr),
1e59de90
TL
176 error_handler_(this, immutable_db_options_, &mutex_),
177 event_logger_(immutable_db_options_.info_log.get()),
494da23a 178 max_total_in_memory_state_(0),
f67539c2
TL
179 file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
180 file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
181 file_options_, immutable_db_options_)),
182 seq_per_batch_(seq_per_batch),
183 batch_per_txn_(batch_per_txn),
1e59de90 184 next_job_id_(1),
7c673cae 185 shutting_down_(false),
1e59de90 186 db_lock_(nullptr),
f67539c2 187 manual_compaction_paused_(false),
7c673cae
FG
188 bg_cv_(&mutex_),
189 logfile_number_(0),
190 log_dir_synced_(false),
191 log_empty_(true),
f67539c2 192 persist_stats_cf_handle_(nullptr),
1e59de90 193 log_sync_cv_(&log_write_mutex_),
7c673cae 194 total_log_size_(0),
7c673cae
FG
195 is_snapshot_supported_(true),
196 write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
11fdf7f2
TL
197 write_thread_(immutable_db_options_),
198 nonmem_write_thread_(immutable_db_options_),
7c673cae
FG
199 write_controller_(mutable_db_options_.delayed_write_rate),
200 last_batch_group_size_(0),
201 unscheduled_flushes_(0),
202 unscheduled_compactions_(0),
11fdf7f2 203 bg_bottom_compaction_scheduled_(0),
7c673cae
FG
204 bg_compaction_scheduled_(0),
205 num_running_compactions_(0),
206 bg_flush_scheduled_(0),
207 num_running_flushes_(0),
208 bg_purge_scheduled_(0),
209 disable_delete_obsolete_files_(0),
11fdf7f2 210 pending_purge_obsolete_files_(0),
1e59de90 211 delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()),
7c673cae 212 last_stats_dump_time_microsec_(0),
7c673cae 213 has_unpersisted_data_(false),
11fdf7f2 214 unable_to_release_oldest_log_(false),
7c673cae
FG
215 num_running_ingest_file_(0),
216#ifndef ROCKSDB_LITE
20effc67
TL
217 wal_manager_(immutable_db_options_, file_options_, io_tracer_,
218 seq_per_batch),
7c673cae 219#endif // ROCKSDB_LITE
7c673cae
FG
220 bg_work_paused_(0),
221 bg_compaction_paused_(0),
222 refitting_level_(false),
11fdf7f2 223 opened_successfully_(false),
20effc67 224#ifndef ROCKSDB_LITE
1e59de90 225 periodic_task_scheduler_(),
20effc67 226#endif // ROCKSDB_LITE
11fdf7f2
TL
227 two_write_queues_(options.two_write_queues),
228 manual_wal_flush_(options.manual_wal_flush),
11fdf7f2
TL
229 // last_sequencee_ is always maintained by the main queue that also writes
230 // to the memtable. When two_write_queues_ is disabled last seq in
231 // memtable is the same as last seq published to the readers. When it is
232 // enabled but seq_per_batch_ is disabled, last seq in memtable still
233 // indicates last published seq since wal-only writes that go to the 2nd
234 // queue do not consume a sequence number. Otherwise writes performed by
235 // the 2nd queue could change what is visible to the readers. In this
236 // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
237 // separate variable to indicate the last published sequence.
238 last_seq_same_as_publish_seq_(
239 !(seq_per_batch && options.two_write_queues)),
240 // Since seq_per_batch_ is currently set only by WritePreparedTxn which
241 // requires a custom gc for compaction, we use that to set use_custom_gc_
242 // as well.
243 use_custom_gc_(seq_per_batch),
244 shutdown_initiated_(false),
245 own_sfm_(options.sst_file_manager == nullptr),
11fdf7f2 246 closed_(false),
1e59de90
TL
247 atomic_flush_install_cv_(&mutex_),
248 blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
249 &error_handler_, &event_logger_,
250 immutable_db_options_.listeners, dbname_) {
11fdf7f2
TL
251 // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
252 // WriteUnprepared, which should use seq_per_batch_.
253 assert(batch_per_txn_ || seq_per_batch_);
7c673cae
FG
254
255 // Reserve ten files or so for other uses and give the rest to TableCache.
256 // Give a large number for setting of "infinite" open files.
11fdf7f2
TL
257 const int table_cache_size = (mutable_db_options_.max_open_files == -1)
258 ? TableCache::kInfiniteCapacity
259 : mutable_db_options_.max_open_files - 10;
f67539c2
TL
260 LRUCacheOptions co;
261 co.capacity = table_cache_size;
262 co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
263 co.metadata_charge_policy = kDontChargeCacheMetadata;
264 table_cache_ = NewLRUCache(co);
1e59de90
TL
265 SetDbSessionId();
266 assert(!db_session_id_.empty());
267
268#ifndef ROCKSDB_LITE
269 periodic_task_functions_.emplace(PeriodicTaskType::kDumpStats,
270 [this]() { this->DumpStats(); });
271 periodic_task_functions_.emplace(PeriodicTaskType::kPersistStats,
272 [this]() { this->PersistStats(); });
273 periodic_task_functions_.emplace(PeriodicTaskType::kFlushInfoLog,
274 [this]() { this->FlushInfoLog(); });
275 periodic_task_functions_.emplace(
276 PeriodicTaskType::kRecordSeqnoTime,
277 [this]() { this->RecordSeqnoToTimeMapping(); });
278#endif // ROCKSDB_LITE
7c673cae 279
f67539c2 280 versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
7c673cae 281 table_cache_.get(), write_buffer_manager_,
20effc67 282 &write_controller_, &block_cache_tracer_,
1e59de90 283 io_tracer_, db_id_, db_session_id_));
7c673cae
FG
284 column_family_memtables_.reset(
285 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
286
287 DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
20effc67 288 DumpDBFileSummary(immutable_db_options_, dbname_, db_session_id_);
7c673cae
FG
289 immutable_db_options_.Dump(immutable_db_options_.info_log.get());
290 mutable_db_options_.Dump(immutable_db_options_.info_log.get());
291 DumpSupportInfo(immutable_db_options_.info_log.get());
11fdf7f2 292
1e59de90
TL
293 max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
294 std::memory_order_relaxed);
295 if (write_buffer_manager_) {
296 wbm_stall_.reset(new WBMStallInterface());
297 }
11fdf7f2
TL
298}
299
300Status DBImpl::Resume() {
301 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
302
303 InstrumentedMutexLock db_mutex(&mutex_);
304
305 if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
306 // Nothing to do
307 return Status::OK();
308 }
309
310 if (error_handler_.IsRecoveryInProgress()) {
311 // Don't allow a mix of manual and automatic recovery
312 return Status::Busy();
313 }
314
315 mutex_.Unlock();
316 Status s = error_handler_.RecoverFromBGError(true);
317 mutex_.Lock();
318 return s;
319}
320
321// This function implements the guts of recovery from a background error. It
322// is eventually called for both manual as well as automatic recovery. It does
323// the following -
324// 1. Wait for currently scheduled background flush/compaction to exit, in
325// order to inadvertently causing an error and thinking recovery failed
326// 2. Flush memtables if there's any data for all the CFs. This may result
327// another error, which will be saved by error_handler_ and reported later
328// as the recovery status
329// 3. Find and delete any obsolete files
330// 4. Schedule compactions if needed for all the CFs. This is needed as the
331// flush in the prior step might have been a no-op for some CFs, which
332// means a new super version wouldn't have been installed
20effc67 333Status DBImpl::ResumeImpl(DBRecoverContext context) {
11fdf7f2
TL
334 mutex_.AssertHeld();
335 WaitForBackgroundWork();
336
11fdf7f2
TL
337 Status s;
338 if (shutdown_initiated_) {
339 // Returning shutdown status to SFM during auto recovery will cause it
340 // to abort the recovery and allow the shutdown to progress
341 s = Status::ShutdownInProgress();
342 }
1e59de90
TL
343
344 if (s.ok()) {
345 Status bg_error = error_handler_.GetBGError();
346 if (bg_error.severity() > Status::Severity::kHardError) {
347 ROCKS_LOG_INFO(
348 immutable_db_options_.info_log,
349 "DB resume requested but failed due to Fatal/Unrecoverable error");
350 s = bg_error;
351 }
11fdf7f2
TL
352 }
353
20effc67
TL
354 // Make sure the IO Status stored in version set is set to OK.
355 bool file_deletion_disabled = !IsFileDeletionsEnabled();
356 if (s.ok()) {
357 IOStatus io_s = versions_->io_status();
358 if (io_s.IsIOError()) {
359 // If resuming from IOError resulted from MANIFEST write, then assert
360 // that we must have already set the MANIFEST writer to nullptr during
361 // clean-up phase MANIFEST writing. We must have also disabled file
362 // deletions.
363 assert(!versions_->descriptor_log_);
364 assert(file_deletion_disabled);
365 // Since we are trying to recover from MANIFEST write error, we need to
366 // switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
367 // Therefore, force writing a dummy version edit because we do not know
368 // whether there are flush jobs with non-empty data to flush, triggering
369 // appends to MANIFEST.
370 VersionEdit edit;
371 auto cfh =
372 static_cast_with_check<ColumnFamilyHandleImpl>(default_cf_handle_);
373 assert(cfh);
374 ColumnFamilyData* cfd = cfh->cfd();
375 const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
376 s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
377 directories_.GetDbDir());
378 if (!s.ok()) {
379 io_s = versions_->io_status();
380 if (!io_s.ok()) {
381 s = error_handler_.SetBGError(io_s,
382 BackgroundErrorReason::kManifestWrite);
383 }
384 }
385 }
386 }
387
11fdf7f2
TL
388 // We cannot guarantee consistency of the WAL. So force flush Memtables of
389 // all the column families
390 if (s.ok()) {
494da23a
TL
391 FlushOptions flush_opts;
392 // We allow flush to stall write since we are trying to resume from error.
393 flush_opts.allow_write_stall = true;
394 if (immutable_db_options_.atomic_flush) {
395 autovector<ColumnFamilyData*> cfds;
396 SelectColumnFamiliesForAtomicFlush(&cfds);
397 mutex_.Unlock();
20effc67 398 s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
494da23a
TL
399 mutex_.Lock();
400 } else {
1e59de90 401 for (auto cfd : versions_->GetRefedColumnFamilySet()) {
494da23a
TL
402 if (cfd->IsDropped()) {
403 continue;
404 }
1e59de90 405 InstrumentedMutexUnlock u(&mutex_);
20effc67 406 s = FlushMemTable(cfd, flush_opts, context.flush_reason);
494da23a
TL
407 if (!s.ok()) {
408 break;
409 }
410 }
411 }
11fdf7f2
TL
412 if (!s.ok()) {
413 ROCKS_LOG_INFO(immutable_db_options_.info_log,
414 "DB resume requested but failed due to Flush failure [%s]",
415 s.ToString().c_str());
416 }
417 }
418
419 JobContext job_context(0);
420 FindObsoleteFiles(&job_context, true);
11fdf7f2
TL
421 mutex_.Unlock();
422
423 job_context.manifest_file_number = 1;
424 if (job_context.HaveSomethingToDelete()) {
425 PurgeObsoleteFiles(job_context);
426 }
427 job_context.Clean();
428
429 if (s.ok()) {
20effc67
TL
430 assert(versions_->io_status().ok());
431 // If we reach here, we should re-enable file deletions if it was disabled
432 // during previous error handling.
433 if (file_deletion_disabled) {
434 // Always return ok
435 s = EnableFileDeletions(/*force=*/true);
1e59de90
TL
436 if (!s.ok()) {
437 ROCKS_LOG_INFO(
438 immutable_db_options_.info_log,
439 "DB resume requested but could not enable file deletions [%s]",
440 s.ToString().c_str());
441 assert(false);
442 }
20effc67 443 }
11fdf7f2 444 }
1e59de90 445
11fdf7f2 446 mutex_.Lock();
1e59de90
TL
447 if (s.ok()) {
448 // This will notify and unblock threads waiting for error recovery to
449 // finish. Those previouly waiting threads can now proceed, which may
450 // include closing the db.
451 s = error_handler_.ClearBGError();
452 } else {
453 // NOTE: this is needed to pass ASSERT_STATUS_CHECKED
454 // in the DBSSTTest.DBWithMaxSpaceAllowedRandomized test.
455 // See https://github.com/facebook/rocksdb/pull/7715#issuecomment-754947952
456 error_handler_.GetRecoveryError().PermitUncheckedError();
457 }
458
459 if (s.ok()) {
460 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
461 } else {
462 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Failed to resume DB [%s]",
463 s.ToString().c_str());
464 }
465
11fdf7f2
TL
466 // Check for shutdown again before scheduling further compactions,
467 // since we released and re-acquired the lock above
468 if (shutdown_initiated_) {
469 s = Status::ShutdownInProgress();
470 }
471 if (s.ok()) {
472 for (auto cfd : *versions_->GetColumnFamilySet()) {
473 SchedulePendingCompaction(cfd);
474 }
475 MaybeScheduleFlushOrCompaction();
476 }
477
478 // Wake up any waiters - in this case, it could be the shutdown thread
479 bg_cv_.SignalAll();
480
494da23a
TL
481 // No need to check BGError again. If something happened, event listener would
482 // be notified and the operation causing it would have failed
11fdf7f2
TL
483 return s;
484}
485
486void DBImpl::WaitForBackgroundWork() {
487 // Wait for background work to finish
488 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
489 bg_flush_scheduled_) {
490 bg_cv_.Wait();
491 }
7c673cae
FG
492}
493
494// Will lock the mutex_, will wait for completion if wait is true
495void DBImpl::CancelAllBackgroundWork(bool wait) {
7c673cae
FG
496 ROCKS_LOG_INFO(immutable_db_options_.info_log,
497 "Shutdown: canceling all background work");
498
20effc67 499#ifndef ROCKSDB_LITE
1e59de90
TL
500 for (uint8_t task_type = 0;
501 task_type < static_cast<uint8_t>(PeriodicTaskType::kMax); task_type++) {
502 Status s = periodic_task_scheduler_.Unregister(
503 static_cast<PeriodicTaskType>(task_type));
504 if (!s.ok()) {
505 ROCKS_LOG_WARN(immutable_db_options_.info_log,
506 "Failed to unregister periodic task %d, status: %s",
507 task_type, s.ToString().c_str());
508 }
494da23a 509 }
20effc67
TL
510#endif // !ROCKSDB_LITE
511
494da23a 512 InstrumentedMutexLock l(&mutex_);
7c673cae
FG
513 if (!shutting_down_.load(std::memory_order_acquire) &&
514 has_unpersisted_data_.load(std::memory_order_relaxed) &&
515 !mutable_db_options_.avoid_flush_during_shutdown) {
494da23a
TL
516 if (immutable_db_options_.atomic_flush) {
517 autovector<ColumnFamilyData*> cfds;
518 SelectColumnFamiliesForAtomicFlush(&cfds);
519 mutex_.Unlock();
20effc67
TL
520 Status s =
521 AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
522 s.PermitUncheckedError(); //**TODO: What to do on error?
494da23a
TL
523 mutex_.Lock();
524 } else {
1e59de90 525 for (auto cfd : versions_->GetRefedColumnFamilySet()) {
494da23a 526 if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
1e59de90 527 InstrumentedMutexUnlock u(&mutex_);
20effc67
TL
528 Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
529 s.PermitUncheckedError(); //**TODO: What to do on error?
494da23a 530 }
7c673cae
FG
531 }
532 }
7c673cae
FG
533 }
534
535 shutting_down_.store(true, std::memory_order_release);
536 bg_cv_.SignalAll();
537 if (!wait) {
538 return;
539 }
11fdf7f2
TL
540 WaitForBackgroundWork();
541}
542
1e59de90
TL
543Status DBImpl::MaybeReleaseTimestampedSnapshotsAndCheck() {
544 size_t num_snapshots = 0;
545 ReleaseTimestampedSnapshotsOlderThan(std::numeric_limits<uint64_t>::max(),
546 &num_snapshots);
547
548 // If there is unreleased snapshot, fail the close call
549 if (num_snapshots > 0) {
550 return Status::Aborted("Cannot close DB with unreleased snapshot.");
551 }
552
553 return Status::OK();
554}
555
11fdf7f2
TL
556Status DBImpl::CloseHelper() {
557 // Guarantee that there is no background error recovery in progress before
558 // continuing with the shutdown
559 mutex_.Lock();
560 shutdown_initiated_ = true;
561 error_handler_.CancelErrorRecovery();
562 while (error_handler_.IsRecoveryInProgress()) {
7c673cae
FG
563 bg_cv_.Wait();
564 }
11fdf7f2 565 mutex_.Unlock();
7c673cae 566
1e59de90
TL
567 // Below check is added as recovery_error_ is not checked and it causes crash
568 // in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is
569 // reached.
570 error_handler_.GetRecoveryError().PermitUncheckedError();
571
7c673cae
FG
572 // CancelAllBackgroundWork called with false means we just set the shutdown
573 // marker. After this we do a variant of the waiting and unschedule work
574 // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
575 CancelAllBackgroundWork(false);
1e59de90
TL
576
577 // Cancel manual compaction if there's any
578 if (HasPendingManualCompaction()) {
579 DisableManualCompaction();
580 }
7c673cae 581 mutex_.Lock();
1e59de90
TL
582 // Unschedule all tasks for this DB
583 for (uint8_t i = 0; i < static_cast<uint8_t>(TaskType::kCount); i++) {
584 env_->UnSchedule(GetTaskTag(i), Env::Priority::BOTTOM);
585 env_->UnSchedule(GetTaskTag(i), Env::Priority::LOW);
586 env_->UnSchedule(GetTaskTag(i), Env::Priority::HIGH);
587 }
588
589 Status ret = Status::OK();
7c673cae
FG
590
591 // Wait for background work to finish
11fdf7f2
TL
592 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
593 bg_flush_scheduled_ || bg_purge_scheduled_ ||
594 pending_purge_obsolete_files_ ||
595 error_handler_.IsRecoveryInProgress()) {
7c673cae
FG
596 TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
597 bg_cv_.Wait();
598 }
11fdf7f2
TL
599 TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
600 &files_grabbed_for_purge_);
7c673cae
FG
601 EraseThreadStatusDbInfo();
602 flush_scheduler_.Clear();
f67539c2 603 trim_history_scheduler_.Clear();
7c673cae
FG
604
605 while (!flush_queue_.empty()) {
11fdf7f2
TL
606 const FlushRequest& flush_req = PopFirstFromFlushQueue();
607 for (const auto& iter : flush_req) {
f67539c2 608 iter.first->UnrefAndTryDelete();
7c673cae
FG
609 }
610 }
1e59de90 611
7c673cae
FG
612 while (!compaction_queue_.empty()) {
613 auto cfd = PopFirstFromCompactionQueue();
f67539c2 614 cfd->UnrefAndTryDelete();
7c673cae
FG
615 }
616
f67539c2 617 if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
7c673cae
FG
618 // we need to delete handle outside of lock because it does its own locking
619 mutex_.Unlock();
f67539c2
TL
620 if (default_cf_handle_) {
621 delete default_cf_handle_;
622 default_cf_handle_ = nullptr;
623 }
624 if (persist_stats_cf_handle_) {
625 delete persist_stats_cf_handle_;
626 persist_stats_cf_handle_ = nullptr;
627 }
7c673cae
FG
628 mutex_.Lock();
629 }
630
631 // Clean up obsolete files due to SuperVersion release.
632 // (1) Need to delete to obsolete files before closing because RepairDB()
633 // scans all existing files in the file system and builds manifest file.
634 // Keeping obsolete files confuses the repair process.
635 // (2) Need to check if we Open()/Recover() the DB successfully before
636 // deleting because if VersionSet recover fails (may be due to corrupted
637 // manifest file), it is not able to identify live files correctly. As a
638 // result, all "live" files can get deleted by accident. However, corrupted
639 // manifest is recoverable by RepairDB().
640 if (opened_successfully_) {
641 JobContext job_context(next_job_id_.fetch_add(1));
642 FindObsoleteFiles(&job_context, true);
643
644 mutex_.Unlock();
645 // manifest number starting from 2
646 job_context.manifest_file_number = 1;
647 if (job_context.HaveSomethingToDelete()) {
648 PurgeObsoleteFiles(job_context);
649 }
650 job_context.Clean();
651 mutex_.Lock();
652 }
1e59de90
TL
653 {
654 InstrumentedMutexLock lock(&log_write_mutex_);
655 for (auto l : logs_to_free_) {
656 delete l;
657 }
658 for (auto& log : logs_) {
659 uint64_t log_number = log.writer->get_log_number();
660 Status s = log.ClearWriter();
661 if (!s.ok()) {
662 ROCKS_LOG_WARN(
663 immutable_db_options_.info_log,
664 "Unable to Sync WAL file %s with error -- %s",
665 LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
666 s.ToString().c_str());
667 // Retain the first error
668 if (ret.ok()) {
669 ret = s;
670 }
11fdf7f2
TL
671 }
672 }
1e59de90 673 logs_.clear();
7c673cae 674 }
7c673cae
FG
675
676 // Table cache may have table handles holding blocks from the block cache.
677 // We need to release them before the block cache is destroyed. The block
678 // cache may be destroyed inside versions_.reset(), when column family data
679 // list is destroyed, so leaving handles in table cache after
680 // versions_.reset() may cause issues.
681 // Here we clean all unreferenced handles in table cache.
682 // Now we assume all user queries have finished, so only version set itself
683 // can possibly hold the blocks from block cache. After releasing unreferenced
684 // handles here, only handles held by version set left and inside
685 // versions_.reset(), we will release them. There, we need to make sure every
686 // time a handle is released, we erase it from the cache too. By doing that,
687 // we can guarantee that after versions_.reset(), table cache is empty
688 // so the cache can be safely destroyed.
689 table_cache_->EraseUnRefEntries();
690
691 for (auto& txn_entry : recovered_transactions_) {
692 delete txn_entry.second;
693 }
694
695 // versions need to be destroyed before table_cache since it can hold
696 // references to table_cache.
697 versions_.reset();
698 mutex_.Unlock();
699 if (db_lock_ != nullptr) {
20effc67
TL
700 // TODO: Check for unlock error
701 env_->UnlockFile(db_lock_).PermitUncheckedError();
7c673cae
FG
702 }
703
704 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
705 LogFlush(immutable_db_options_.info_log);
11fdf7f2
TL
706
707#ifndef ROCKSDB_LITE
708 // If the sst_file_manager was allocated by us during DB::Open(), ccall
709 // Close() on it before closing the info_log. Otherwise, background thread
710 // in SstFileManagerImpl might try to log something
711 if (immutable_db_options_.sst_file_manager && own_sfm_) {
712 auto sfm = static_cast<SstFileManagerImpl*>(
713 immutable_db_options_.sst_file_manager.get());
714 sfm->Close();
715 }
494da23a 716#endif // ROCKSDB_LITE
11fdf7f2
TL
717
718 if (immutable_db_options_.info_log && own_info_log_) {
719 Status s = immutable_db_options_.info_log->Close();
20effc67 720 if (!s.ok() && !s.IsNotSupported() && ret.ok()) {
11fdf7f2
TL
721 ret = s;
722 }
723 }
f67539c2 724
1e59de90
TL
725 if (write_buffer_manager_ && wbm_stall_) {
726 write_buffer_manager_->RemoveDBFromQueue(wbm_stall_.get());
727 }
728
729 IOStatus io_s = directories_.Close(IOOptions(), nullptr /* dbg */);
730 if (!io_s.ok()) {
731 ret = io_s;
732 }
f67539c2
TL
733 if (ret.IsAborted()) {
734 // Reserve IsAborted() error for those where users didn't release
735 // certain resource and they can release them and come back and
736 // retry. In this case, we wrap this exception to something else.
737 return Status::Incomplete(ret.ToString());
738 }
1e59de90 739
11fdf7f2
TL
740 return ret;
741}
742
743Status DBImpl::CloseImpl() { return CloseHelper(); }
744
745DBImpl::~DBImpl() {
1e59de90
TL
746 // TODO: remove this.
747 init_logger_creation_s_.PermitUncheckedError();
748
749 InstrumentedMutexLock closing_lock_guard(&closing_mutex_);
750 if (closed_) {
751 return;
752 }
753
754 closed_ = true;
755
756 {
757 const Status s = MaybeReleaseTimestampedSnapshotsAndCheck();
758 s.PermitUncheckedError();
11fdf7f2 759 }
1e59de90
TL
760
761 closing_status_ = CloseImpl();
762 closing_status_.PermitUncheckedError();
7c673cae
FG
763}
764
765void DBImpl::MaybeIgnoreError(Status* s) const {
766 if (s->ok() || immutable_db_options_.paranoid_checks) {
767 // No change needed
768 } else {
769 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
770 s->ToString().c_str());
771 *s = Status::OK();
772 }
773}
774
775const Status DBImpl::CreateArchivalDirectory() {
1e59de90
TL
776 if (immutable_db_options_.WAL_ttl_seconds > 0 ||
777 immutable_db_options_.WAL_size_limit_MB > 0) {
778 std::string archivalPath =
779 ArchivalDirectory(immutable_db_options_.GetWalDir());
7c673cae
FG
780 return env_->CreateDirIfMissing(archivalPath);
781 }
782 return Status::OK();
783}
784
785void DBImpl::PrintStatistics() {
1e59de90 786 auto dbstats = immutable_db_options_.stats;
7c673cae 787 if (dbstats) {
494da23a 788 ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
7c673cae
FG
789 dbstats->ToString().c_str());
790 }
791}
792
1e59de90 793Status DBImpl::StartPeriodicTaskScheduler() {
20effc67 794#ifndef ROCKSDB_LITE
1e59de90
TL
795
796#ifndef NDEBUG
797 // It only used by test to disable scheduler
798 bool disable_scheduler = false;
799 TEST_SYNC_POINT_CALLBACK(
800 "DBImpl::StartPeriodicTaskScheduler:DisableScheduler",
801 &disable_scheduler);
802 if (disable_scheduler) {
803 return Status::OK();
804 }
805
494da23a
TL
806 {
807 InstrumentedMutexLock l(&mutex_);
1e59de90
TL
808 TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicTaskScheduler:Init",
809 &periodic_task_scheduler_);
810 }
811
812#endif // !NDEBUG
813 if (mutable_db_options_.stats_dump_period_sec > 0) {
814 Status s = periodic_task_scheduler_.Register(
815 PeriodicTaskType::kDumpStats,
816 periodic_task_functions_.at(PeriodicTaskType::kDumpStats),
817 mutable_db_options_.stats_dump_period_sec);
818 if (!s.ok()) {
819 return s;
820 }
821 }
822 if (mutable_db_options_.stats_persist_period_sec > 0) {
823 Status s = periodic_task_scheduler_.Register(
824 PeriodicTaskType::kPersistStats,
825 periodic_task_functions_.at(PeriodicTaskType::kPersistStats),
826 mutable_db_options_.stats_persist_period_sec);
827 if (!s.ok()) {
828 return s;
829 }
494da23a 830 }
20effc67 831
1e59de90
TL
832 Status s = periodic_task_scheduler_.Register(
833 PeriodicTaskType::kFlushInfoLog,
834 periodic_task_functions_.at(PeriodicTaskType::kFlushInfoLog));
835
836 return s;
837#else
838 return Status::OK();
839#endif // !ROCKSDB_LITE
840}
841
842Status DBImpl::RegisterRecordSeqnoTimeWorker() {
843#ifndef ROCKSDB_LITE
844 uint64_t min_time_duration = std::numeric_limits<uint64_t>::max();
845 uint64_t max_time_duration = std::numeric_limits<uint64_t>::min();
846 {
847 InstrumentedMutexLock l(&mutex_);
848
849 for (auto cfd : *versions_->GetColumnFamilySet()) {
850 // preserve time is the max of 2 options.
851 uint64_t preserve_time_duration =
852 std::max(cfd->ioptions()->preserve_internal_time_seconds,
853 cfd->ioptions()->preclude_last_level_data_seconds);
854 if (!cfd->IsDropped() && preserve_time_duration > 0) {
855 min_time_duration = std::min(preserve_time_duration, min_time_duration);
856 max_time_duration = std::max(preserve_time_duration, max_time_duration);
857 }
858 }
859 if (min_time_duration == std::numeric_limits<uint64_t>::max()) {
860 seqno_time_mapping_.Resize(0, 0);
861 } else {
862 seqno_time_mapping_.Resize(min_time_duration, max_time_duration);
863 }
864 }
865
866 uint64_t seqno_time_cadence = 0;
867 if (min_time_duration != std::numeric_limits<uint64_t>::max()) {
868 // round up to 1 when the time_duration is smaller than
869 // kMaxSeqnoTimePairsPerCF
870 seqno_time_cadence =
871 (min_time_duration + SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF - 1) /
872 SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF;
873 }
874
875 Status s;
876 if (seqno_time_cadence == 0) {
877 s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime);
878 } else {
879 s = periodic_task_scheduler_.Register(
880 PeriodicTaskType::kRecordSeqnoTime,
881 periodic_task_functions_.at(PeriodicTaskType::kRecordSeqnoTime),
882 seqno_time_cadence);
883 }
884
885 return s;
886#else
887 return Status::OK();
20effc67 888#endif // !ROCKSDB_LITE
494da23a 889}
7c673cae 890
494da23a 891// esitmate the total size of stats_history_
f67539c2 892size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
494da23a
TL
893 size_t size_total =
894 sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
895 if (stats_history_.size() == 0) return size_total;
896 size_t size_per_slice =
897 sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
898 // non-empty map, stats_history_.begin() guaranteed to exist
1e59de90 899 for (const auto& pairs : stats_history_.begin()->second) {
494da23a
TL
900 size_per_slice +=
901 pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
902 }
903 size_total = size_per_slice * stats_history_.size();
904 return size_total;
905}
7c673cae 906
494da23a
TL
907void DBImpl::PersistStats() {
908 TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
7c673cae 909#ifndef ROCKSDB_LITE
494da23a
TL
910 if (shutdown_initiated_) {
911 return;
912 }
20effc67 913 TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
1e59de90
TL
914 uint64_t now_seconds =
915 immutable_db_options_.clock->NowMicros() / kMicrosInSecond;
20effc67 916
1e59de90 917 Statistics* statistics = immutable_db_options_.stats;
494da23a
TL
918 if (!statistics) {
919 return;
920 }
921 size_t stats_history_size_limit = 0;
922 {
923 InstrumentedMutexLock l(&mutex_);
924 stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
925 }
926
f67539c2
TL
927 std::map<std::string, uint64_t> stats_map;
928 if (!statistics->getTickerMap(&stats_map)) {
929 return;
930 }
931 ROCKS_LOG_INFO(immutable_db_options_.info_log,
932 "------- PERSISTING STATS -------");
933
934 if (immutable_db_options_.persist_stats_to_disk) {
935 WriteBatch batch;
20effc67 936 Status s = Status::OK();
f67539c2
TL
937 if (stats_slice_initialized_) {
938 ROCKS_LOG_INFO(immutable_db_options_.info_log,
939 "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
940 stats_slice_.size());
941 for (const auto& stat : stats_map) {
20effc67
TL
942 if (s.ok()) {
943 char key[100];
944 int length =
945 EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
946 // calculate the delta from last time
947 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
948 uint64_t delta = stat.second - stats_slice_[stat.first];
949 s = batch.Put(persist_stats_cf_handle_,
1e59de90
TL
950 Slice(key, std::min(100, length)),
951 std::to_string(delta));
20effc67 952 }
f67539c2
TL
953 }
954 }
955 }
956 stats_slice_initialized_ = true;
957 std::swap(stats_slice_, stats_map);
20effc67
TL
958 if (s.ok()) {
959 WriteOptions wo;
960 wo.low_pri = true;
961 wo.no_slowdown = true;
962 wo.sync = false;
963 s = Write(wo, &batch);
964 }
f67539c2
TL
965 if (!s.ok()) {
966 ROCKS_LOG_INFO(immutable_db_options_.info_log,
967 "Writing to persistent stats CF failed -- %s",
968 s.ToString().c_str());
969 } else {
970 ROCKS_LOG_INFO(immutable_db_options_.info_log,
971 "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
972 " to persistent stats CF succeeded",
973 stats_slice_.size(), now_seconds);
494da23a 974 }
f67539c2
TL
975 // TODO(Zhongyi): add purging for persisted data
976 } else {
494da23a
TL
977 InstrumentedMutexLock l(&stats_history_mutex_);
978 // calculate the delta from last time
979 if (stats_slice_initialized_) {
980 std::map<std::string, uint64_t> stats_delta;
981 for (const auto& stat : stats_map) {
982 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
983 stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
11fdf7f2 984 }
7c673cae 985 }
f67539c2
TL
986 ROCKS_LOG_INFO(immutable_db_options_.info_log,
987 "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
988 " to in-memory stats history",
989 stats_slice_.size(), now_seconds);
990 stats_history_[now_seconds] = stats_delta;
494da23a
TL
991 }
992 stats_slice_initialized_ = true;
993 std::swap(stats_slice_, stats_map);
994 TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
995
996 // delete older stats snapshots to control memory consumption
f67539c2
TL
997 size_t stats_history_size = EstimateInMemoryStatsHistorySize();
998 bool purge_needed = stats_history_size > stats_history_size_limit;
999 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1000 "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
1001 " bytes, slice count: %" ROCKSDB_PRIszt,
1002 stats_history_size, stats_history_.size());
494da23a
TL
1003 while (purge_needed && !stats_history_.empty()) {
1004 stats_history_.erase(stats_history_.begin());
f67539c2
TL
1005 purge_needed =
1006 EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
494da23a 1007 }
f67539c2
TL
1008 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1009 "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
1010 " bytes, slice count: %" ROCKSDB_PRIszt,
1011 stats_history_size, stats_history_.size());
494da23a 1012 }
20effc67 1013 TEST_SYNC_POINT("DBImpl::PersistStats:End");
494da23a
TL
1014#endif // !ROCKSDB_LITE
1015}
1016
1017bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
1018 uint64_t* new_time,
1019 std::map<std::string, uint64_t>* stats_map) {
1020 assert(new_time);
1021 assert(stats_map);
1022 if (!new_time || !stats_map) return false;
1023 // lock when search for start_time
1024 {
1025 InstrumentedMutexLock l(&stats_history_mutex_);
1026 auto it = stats_history_.lower_bound(start_time);
1027 if (it != stats_history_.end() && it->first < end_time) {
1028 // make a copy for timestamp and stats_map
1029 *new_time = it->first;
1030 *stats_map = it->second;
1031 return true;
1032 } else {
1033 return false;
1034 }
1035 }
1036}
1037
1038Status DBImpl::GetStatsHistory(
1039 uint64_t start_time, uint64_t end_time,
1040 std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
1041 if (!stats_iterator) {
1042 return Status::InvalidArgument("stats_iterator not preallocated.");
1043 }
f67539c2
TL
1044 if (immutable_db_options_.persist_stats_to_disk) {
1045 stats_iterator->reset(
1046 new PersistentStatsHistoryIterator(start_time, end_time, this));
1047 } else {
1048 stats_iterator->reset(
1049 new InMemoryStatsHistoryIterator(start_time, end_time, this));
1050 }
494da23a
TL
1051 return (*stats_iterator)->status();
1052}
1053
1054void DBImpl::DumpStats() {
1055 TEST_SYNC_POINT("DBImpl::DumpStats:1");
1056#ifndef ROCKSDB_LITE
494da23a
TL
1057 std::string stats;
1058 if (shutdown_initiated_) {
1059 return;
1060 }
1e59de90
TL
1061
1062 // Also probe block cache(s) for problems, dump to info log
1063 UnorderedSet<Cache*> probed_caches;
20effc67 1064 TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
494da23a
TL
1065 {
1066 InstrumentedMutexLock l(&mutex_);
1e59de90
TL
1067 for (auto cfd : versions_->GetRefedColumnFamilySet()) {
1068 if (!cfd->initialized()) {
1069 continue;
1070 }
1071
1072 // Release DB mutex for gathering cache entry stats. Pass over all
1073 // column families for this first so that other stats are dumped
1074 // near-atomically.
1075 InstrumentedMutexUnlock u(&mutex_);
1076 cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
1077
1078 // Probe block cache for problems (if not already via another CF)
1079 if (immutable_db_options_.info_log) {
1080 auto* table_factory = cfd->ioptions()->table_factory.get();
1081 assert(table_factory != nullptr);
1082 Cache* cache =
1083 table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
1084 if (cache && probed_caches.insert(cache).second) {
1085 cache->ReportProblems(immutable_db_options_.info_log);
1086 }
7c673cae
FG
1087 }
1088 }
1e59de90
TL
1089
1090 const std::string* property = &DB::Properties::kDBStats;
1091 const DBPropertyInfo* property_info = GetPropertyInfo(*property);
1092 assert(property_info != nullptr);
1093 assert(!property_info->need_out_of_mutex);
1094 default_cf_internal_stats_->GetStringProperty(*property_info, *property,
1095 &stats);
1096
1097 property = &InternalStats::kPeriodicCFStats;
1098 property_info = GetPropertyInfo(*property);
1099 assert(property_info != nullptr);
1100 assert(!property_info->need_out_of_mutex);
494da23a
TL
1101 for (auto cfd : *versions_->GetColumnFamilySet()) {
1102 if (cfd->initialized()) {
1e59de90
TL
1103 cfd->internal_stats()->GetStringProperty(*property_info, *property,
1104 &stats);
7c673cae
FG
1105 }
1106 }
494da23a
TL
1107 }
1108 TEST_SYNC_POINT("DBImpl::DumpStats:2");
1109 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1110 "------- DUMPING STATS -------");
1111 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
1112 if (immutable_db_options_.dump_malloc_stats) {
1113 stats.clear();
1114 DumpMallocStats(&stats);
1115 if (!stats.empty()) {
1116 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1117 "------- Malloc STATS -------");
1118 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
1119 }
1120 }
7c673cae
FG
1121#endif // !ROCKSDB_LITE
1122
494da23a 1123 PrintStatistics();
7c673cae
FG
1124}
1125
1e59de90
TL
1126// Periodically flush info log out of application buffer at a low frequency.
1127// This improves debuggability in case of RocksDB hanging since it ensures the
1128// log messages leading up to the hang will eventually become visible in the
1129// log.
20effc67
TL
1130void DBImpl::FlushInfoLog() {
1131 if (shutdown_initiated_) {
1132 return;
1133 }
1134 TEST_SYNC_POINT("DBImpl::FlushInfoLog:StartRunning");
1135 LogFlush(immutable_db_options_.info_log);
1136}
1137
f67539c2
TL
1138Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
1139 int max_entries_to_print,
1140 std::string* out_str) {
20effc67 1141 auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
f67539c2
TL
1142 ColumnFamilyData* cfd = cfh->cfd();
1143
1144 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
1145 Version* version = super_version->current;
1146
1147 Status s =
1148 version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
1149
1150 CleanupSuperVersion(super_version);
1151 return s;
1152}
1153
7c673cae 1154void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
1e59de90 1155 mutex_.AssertHeld();
7c673cae
FG
1156 if (!job_context->logs_to_free.empty()) {
1157 for (auto l : job_context->logs_to_free) {
1158 AddToLogsToFreeQueue(l);
1159 }
1160 job_context->logs_to_free.clear();
7c673cae
FG
1161 }
1162}
1163
20effc67 1164FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
11fdf7f2 1165 assert(cfd);
20effc67 1166 FSDirectory* ret_dir = cfd->GetDataDir(path_id);
11fdf7f2
TL
1167 if (ret_dir == nullptr) {
1168 return directories_.GetDataDir(path_id);
1169 }
1170 return ret_dir;
1171}
1172
11fdf7f2
TL
1173Status DBImpl::SetOptions(
1174 ColumnFamilyHandle* column_family,
7c673cae
FG
1175 const std::unordered_map<std::string, std::string>& options_map) {
1176#ifdef ROCKSDB_LITE
11fdf7f2
TL
1177 (void)column_family;
1178 (void)options_map;
7c673cae
FG
1179 return Status::NotSupported("Not supported in ROCKSDB LITE");
1180#else
20effc67
TL
1181 auto* cfd =
1182 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
1183 if (options_map.empty()) {
1184 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1185 "SetOptions() on column family [%s], empty input",
1186 cfd->GetName().c_str());
1187 return Status::InvalidArgument("empty input");
1188 }
1189
1190 MutableCFOptions new_options;
1191 Status s;
1192 Status persist_options_status;
11fdf7f2 1193 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae 1194 {
f67539c2 1195 auto db_options = GetDBOptions();
7c673cae 1196 InstrumentedMutexLock l(&mutex_);
f67539c2 1197 s = cfd->SetOptions(db_options, options_map);
7c673cae
FG
1198 if (s.ok()) {
1199 new_options = *cfd->GetLatestMutableCFOptions();
1200 // Append new version to recompute compaction score.
1201 VersionEdit dummy_edit;
20effc67
TL
1202 s = versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
1203 directories_.GetDbDir());
7c673cae
FG
1204 // Trigger possible flush/compactions. This has to be before we persist
1205 // options to file, otherwise there will be a deadlock with writer
1206 // thread.
11fdf7f2 1207 InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
7c673cae 1208
11fdf7f2
TL
1209 persist_options_status = WriteOptionsFile(
1210 false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
1211 bg_cv_.SignalAll();
7c673cae
FG
1212 }
1213 }
11fdf7f2 1214 sv_context.Clean();
7c673cae 1215
11fdf7f2
TL
1216 ROCKS_LOG_INFO(
1217 immutable_db_options_.info_log,
1218 "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
7c673cae
FG
1219 for (const auto& o : options_map) {
1220 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1221 o.second.c_str());
1222 }
1223 if (s.ok()) {
1224 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1225 "[%s] SetOptions() succeeded", cfd->GetName().c_str());
1226 new_options.Dump(immutable_db_options_.info_log.get());
1227 if (!persist_options_status.ok()) {
1e59de90 1228 // NOTE: WriteOptionsFile already logs on failure
11fdf7f2 1229 s = persist_options_status;
7c673cae
FG
1230 }
1231 } else {
1e59de90 1232 persist_options_status.PermitUncheckedError(); // less important
7c673cae
FG
1233 ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
1234 cfd->GetName().c_str());
1235 }
1236 LogFlush(immutable_db_options_.info_log);
1237 return s;
1238#endif // ROCKSDB_LITE
1239}
1240
1241Status DBImpl::SetDBOptions(
1242 const std::unordered_map<std::string, std::string>& options_map) {
1243#ifdef ROCKSDB_LITE
11fdf7f2 1244 (void)options_map;
7c673cae
FG
1245 return Status::NotSupported("Not supported in ROCKSDB LITE");
1246#else
1247 if (options_map.empty()) {
1248 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1249 "SetDBOptions(), empty input.");
1250 return Status::InvalidArgument("empty input");
1251 }
1252
1253 MutableDBOptions new_options;
1254 Status s;
20effc67 1255 Status persist_options_status = Status::OK();
11fdf7f2 1256 bool wal_changed = false;
7c673cae
FG
1257 WriteContext write_context;
1258 {
1259 InstrumentedMutexLock l(&mutex_);
1260 s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
1261 &new_options);
1e59de90 1262
f67539c2
TL
1263 if (new_options.bytes_per_sync == 0) {
1264 new_options.bytes_per_sync = 1024 * 1024;
1265 }
1e59de90
TL
1266
1267 if (MutableDBOptionsAreEqual(mutable_db_options_, new_options)) {
1268 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1269 "SetDBOptions(), input option value is not changed, "
1270 "skipping updating.");
1271 persist_options_status.PermitUncheckedError();
1272 return s;
1273 }
1274
f67539c2
TL
1275 DBOptions new_db_options =
1276 BuildDBOptions(immutable_db_options_, new_options);
1277 if (s.ok()) {
1278 s = ValidateOptions(new_db_options);
1279 }
7c673cae 1280 if (s.ok()) {
f67539c2
TL
1281 for (auto c : *versions_->GetColumnFamilySet()) {
1282 if (!c->IsDropped()) {
1283 auto cf_options = c->GetLatestCFOptions();
1284 s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1285 if (!s.ok()) {
1286 break;
1287 }
1288 }
1289 }
1290 }
1291 if (s.ok()) {
1292 const BGJobLimits current_bg_job_limits =
20effc67 1293 GetBGJobLimits(mutable_db_options_.max_background_flushes,
f67539c2
TL
1294 mutable_db_options_.max_background_compactions,
1295 mutable_db_options_.max_background_jobs,
1296 /* parallelize_compactions */ true);
1297 const BGJobLimits new_bg_job_limits = GetBGJobLimits(
20effc67 1298 new_options.max_background_flushes,
f67539c2
TL
1299 new_options.max_background_compactions,
1300 new_options.max_background_jobs, /* parallelize_compactions */ true);
1301
1302 const bool max_flushes_increased =
1303 new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1304 const bool max_compactions_increased =
1305 new_bg_job_limits.max_compactions >
1306 current_bg_job_limits.max_compactions;
1307
1308 if (max_flushes_increased || max_compactions_increased) {
1309 if (max_flushes_increased) {
1310 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1311 Env::Priority::HIGH);
1312 }
1313
1314 if (max_compactions_increased) {
1315 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1316 Env::Priority::LOW);
1317 }
1318
7c673cae
FG
1319 MaybeScheduleFlushOrCompaction();
1320 }
f67539c2 1321
1e59de90
TL
1322 mutex_.Unlock();
1323 if (new_options.stats_dump_period_sec == 0) {
1324 s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kDumpStats);
1325 } else {
1326 s = periodic_task_scheduler_.Register(
1327 PeriodicTaskType::kDumpStats,
1328 periodic_task_functions_.at(PeriodicTaskType::kDumpStats),
1329 new_options.stats_dump_period_sec);
494da23a 1330 }
1e59de90
TL
1331 if (new_options.max_total_wal_size !=
1332 mutable_db_options_.max_total_wal_size) {
1333 max_total_wal_size_.store(new_options.max_total_wal_size,
1334 std::memory_order_release);
1335 }
1336 if (s.ok()) {
1337 if (new_options.stats_persist_period_sec == 0) {
1338 s = periodic_task_scheduler_.Unregister(
1339 PeriodicTaskType::kPersistStats);
1340 } else {
1341 s = periodic_task_scheduler_.Register(
1342 PeriodicTaskType::kPersistStats,
1343 periodic_task_functions_.at(PeriodicTaskType::kPersistStats),
1344 new_options.stats_persist_period_sec);
1345 }
1346 }
1347 mutex_.Lock();
1348 if (!s.ok()) {
1349 return s;
1350 }
1351
11fdf7f2
TL
1352 write_controller_.set_max_delayed_write_rate(
1353 new_options.delayed_write_rate);
1354 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1355 ? TableCache::kInfiniteCapacity
1356 : new_options.max_open_files - 10);
1357 wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1358 new_options.wal_bytes_per_sync;
7c673cae 1359 mutable_db_options_ = new_options;
f67539c2
TL
1360 file_options_for_compaction_ = FileOptions(new_db_options);
1361 file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1362 file_options_for_compaction_, immutable_db_options_);
1363 versions_->ChangeFileOptions(mutable_db_options_);
1e59de90 1364 // TODO(xiez): clarify why apply optimize for read to write options
f67539c2
TL
1365 file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1366 file_options_for_compaction_, immutable_db_options_);
1367 file_options_for_compaction_.compaction_readahead_size =
11fdf7f2 1368 mutable_db_options_.compaction_readahead_size;
494da23a 1369 WriteThread::Writer w;
7c673cae 1370 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
1371 if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1372 Status purge_wal_status = SwitchWAL(&write_context);
7c673cae
FG
1373 if (!purge_wal_status.ok()) {
1374 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1375 "Unable to purge WAL files in SetDBOptions() -- %s",
1376 purge_wal_status.ToString().c_str());
1377 }
1378 }
11fdf7f2
TL
1379 persist_options_status = WriteOptionsFile(
1380 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
7c673cae 1381 write_thread_.ExitUnbatched(&w);
20effc67
TL
1382 } else {
1383 // To get here, we must have had invalid options and will not attempt to
1384 // persist the options, which means the status is "OK/Uninitialized.
1385 persist_options_status.PermitUncheckedError();
7c673cae
FG
1386 }
1387 }
1388 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1389 for (const auto& o : options_map) {
1390 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1391 o.second.c_str());
1392 }
1393 if (s.ok()) {
1394 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1395 new_options.Dump(immutable_db_options_.info_log.get());
1396 if (!persist_options_status.ok()) {
1397 if (immutable_db_options_.fail_if_options_file_error) {
1398 s = Status::IOError(
1399 "SetDBOptions() succeeded, but unable to persist options",
1400 persist_options_status.ToString());
1401 }
1402 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1403 "Unable to persist options in SetDBOptions() -- %s",
1404 persist_options_status.ToString().c_str());
1405 }
1406 } else {
1407 ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1408 }
1409 LogFlush(immutable_db_options_.info_log);
1410 return s;
1411#endif // ROCKSDB_LITE
1412}
1413
1414// return the same level if it cannot be moved
11fdf7f2
TL
1415int DBImpl::FindMinimumEmptyLevelFitting(
1416 ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1417 int level) {
7c673cae
FG
1418 mutex_.AssertHeld();
1419 const auto* vstorage = cfd->current()->storage_info();
1420 int minimum_level = level;
1421 for (int i = level - 1; i > 0; --i) {
1422 // stop if level i is not empty
1423 if (vstorage->NumLevelFiles(i) > 0) break;
1424 // stop if level i is too small (cannot fit the level files)
1425 if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1426 break;
1427 }
1428
1429 minimum_level = i;
1430 }
1431 return minimum_level;
1432}
1433
11fdf7f2
TL
1434Status DBImpl::FlushWAL(bool sync) {
1435 if (manual_wal_flush_) {
20effc67 1436 IOStatus io_s;
f67539c2
TL
1437 {
1438 // We need to lock log_write_mutex_ since logs_ might change concurrently
1439 InstrumentedMutexLock wl(&log_write_mutex_);
1440 log::Writer* cur_log_writer = logs_.back().writer;
20effc67 1441 io_s = cur_log_writer->WriteBuffer();
f67539c2 1442 }
20effc67 1443 if (!io_s.ok()) {
11fdf7f2 1444 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
20effc67 1445 io_s.ToString().c_str());
11fdf7f2
TL
1446 // In case there is a fs error we should set it globally to prevent the
1447 // future writes
20effc67 1448 IOStatusCheck(io_s);
11fdf7f2 1449 // whether sync or not, we should abort the rest of function upon error
1e59de90 1450 return static_cast<Status>(io_s);
11fdf7f2
TL
1451 }
1452 if (!sync) {
1453 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1e59de90 1454 return static_cast<Status>(io_s);
11fdf7f2
TL
1455 }
1456 }
1457 if (!sync) {
1458 return Status::OK();
1459 }
1460 // sync = true
1461 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1462 return SyncWAL();
1463}
1464
1e59de90
TL
1465bool DBImpl::WALBufferIsEmpty(bool lock) {
1466 if (lock) {
1467 log_write_mutex_.Lock();
1468 }
1469 log::Writer* cur_log_writer = logs_.back().writer;
1470 auto res = cur_log_writer->BufferIsEmpty();
1471 if (lock) {
1472 log_write_mutex_.Unlock();
1473 }
1474 return res;
1475}
1476
7c673cae 1477Status DBImpl::SyncWAL() {
1e59de90 1478 TEST_SYNC_POINT("DBImpl::SyncWAL:Begin");
7c673cae
FG
1479 autovector<log::Writer*, 1> logs_to_sync;
1480 bool need_log_dir_sync;
1481 uint64_t current_log_number;
1482
1483 {
1e59de90 1484 InstrumentedMutexLock l(&log_write_mutex_);
7c673cae
FG
1485 assert(!logs_.empty());
1486
1487 // This SyncWAL() call only cares about logs up to this number.
1488 current_log_number = logfile_number_;
1489
1490 while (logs_.front().number <= current_log_number &&
1e59de90 1491 logs_.front().IsSyncing()) {
7c673cae
FG
1492 log_sync_cv_.Wait();
1493 }
1494 // First check that logs are safe to sync in background.
1495 for (auto it = logs_.begin();
1496 it != logs_.end() && it->number <= current_log_number; ++it) {
1497 if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1498 return Status::NotSupported(
1499 "SyncWAL() is not supported for this implementation of WAL file",
1500 immutable_db_options_.allow_mmap_writes
1501 ? "try setting Options::allow_mmap_writes to false"
1502 : Slice());
1503 }
1504 }
1505 for (auto it = logs_.begin();
1506 it != logs_.end() && it->number <= current_log_number; ++it) {
1507 auto& log = *it;
1e59de90 1508 log.PrepareForSync();
7c673cae
FG
1509 logs_to_sync.push_back(log.writer);
1510 }
1511
1512 need_log_dir_sync = !log_dir_synced_;
1513 }
1514
11fdf7f2 1515 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
7c673cae
FG
1516 RecordTick(stats_, WAL_FILE_SYNCED);
1517 Status status;
20effc67 1518 IOStatus io_s;
7c673cae 1519 for (log::Writer* log : logs_to_sync) {
20effc67
TL
1520 io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1521 if (!io_s.ok()) {
1522 status = io_s;
7c673cae
FG
1523 break;
1524 }
1525 }
20effc67
TL
1526 if (!io_s.ok()) {
1527 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
1528 io_s.ToString().c_str());
1529 // In case there is a fs error we should set it globally to prevent the
1530 // future writes
1531 IOStatusCheck(io_s);
1532 }
7c673cae 1533 if (status.ok() && need_log_dir_sync) {
1e59de90
TL
1534 status = directories_.GetWalDir()->FsyncWithDirOptions(
1535 IOOptions(), nullptr,
1536 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
7c673cae 1537 }
11fdf7f2 1538 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
7c673cae
FG
1539
1540 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1e59de90 1541 VersionEdit synced_wals;
7c673cae 1542 {
1e59de90 1543 InstrumentedMutexLock l(&log_write_mutex_);
20effc67 1544 if (status.ok()) {
1e59de90 1545 MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals);
20effc67
TL
1546 } else {
1547 MarkLogsNotSynced(current_log_number);
1548 }
7c673cae 1549 }
1e59de90
TL
1550 if (status.ok() && synced_wals.IsWalAddition()) {
1551 InstrumentedMutexLock l(&mutex_);
1552 status = ApplyWALToManifest(&synced_wals);
1553 }
1554
7c673cae
FG
1555 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1556
1557 return status;
1558}
1559
1e59de90
TL
1560Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
1561 // not empty, write to MANIFEST.
1562 mutex_.AssertHeld();
1563 Status status = versions_->LogAndApplyToDefaultColumnFamily(
1564 synced_wals, &mutex_, directories_.GetDbDir());
1565 if (!status.ok() && versions_->io_status().IsIOError()) {
1566 status = error_handler_.SetBGError(versions_->io_status(),
1567 BackgroundErrorReason::kManifestWrite);
1568 }
1569 return status;
1570}
1571
494da23a
TL
1572Status DBImpl::LockWAL() {
1573 log_write_mutex_.Lock();
1574 auto cur_log_writer = logs_.back().writer;
1e59de90 1575 IOStatus status = cur_log_writer->WriteBuffer();
494da23a
TL
1576 if (!status.ok()) {
1577 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1578 status.ToString().c_str());
1579 // In case there is a fs error we should set it globally to prevent the
1580 // future writes
1581 WriteStatusCheck(status);
1582 }
1e59de90 1583 return static_cast<Status>(status);
494da23a
TL
1584}
1585
1586Status DBImpl::UnlockWAL() {
1587 log_write_mutex_.Unlock();
1588 return Status::OK();
1589}
1590
1e59de90
TL
1591void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
1592 VersionEdit* synced_wals) {
1593 log_write_mutex_.AssertHeld();
20effc67 1594 if (synced_dir && logfile_number_ == up_to) {
7c673cae
FG
1595 log_dir_synced_ = true;
1596 }
1597 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
20effc67 1598 auto& wal = *it;
1e59de90
TL
1599 assert(wal.IsSyncing());
1600
1601 if (wal.number < logs_.back().number) {
1602 // Inactive WAL
1603 if (immutable_db_options_.track_and_verify_wals_in_manifest &&
1604 wal.GetPreSyncSize() > 0) {
1605 synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
1606 }
1607 if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
1608 // Fully synced
1609 logs_to_free_.push_back(wal.ReleaseWriter());
1610 it = logs_.erase(it);
1611 } else {
1612 assert(wal.GetPreSyncSize() < wal.writer->file()->GetFlushedSize());
1613 wal.FinishSync();
1614 ++it;
20effc67 1615 }
7c673cae 1616 } else {
1e59de90
TL
1617 assert(wal.number == logs_.back().number);
1618 // Active WAL
1619 wal.FinishSync();
7c673cae
FG
1620 ++it;
1621 }
1622 }
20effc67 1623 log_sync_cv_.SignalAll();
20effc67
TL
1624}
1625
1626void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
1e59de90 1627 log_write_mutex_.AssertHeld();
20effc67
TL
1628 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
1629 ++it) {
1630 auto& wal = *it;
1e59de90 1631 wal.FinishSync();
20effc67 1632 }
7c673cae
FG
1633 log_sync_cv_.SignalAll();
1634}
1635
1636SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1637 return versions_->LastSequence();
1638}
1639
11fdf7f2
TL
1640void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1641 versions_->SetLastPublishedSequence(seq);
1642}
1643
1e59de90
TL
1644Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
1645 std::string* ts_low) {
1646 if (ts_low == nullptr) {
1647 return Status::InvalidArgument("ts_low is nullptr");
1648 }
1649 ColumnFamilyData* cfd = nullptr;
1650 if (column_family == nullptr) {
1651 cfd = default_cf_handle_->cfd();
11fdf7f2 1652 } else {
1e59de90
TL
1653 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1654 assert(cfh != nullptr);
1655 cfd = cfh->cfd();
1656 }
1657 assert(cfd != nullptr && cfd->user_comparator() != nullptr);
1658 if (cfd->user_comparator()->timestamp_size() == 0) {
1659 return Status::InvalidArgument(
1660 "Timestamp is not enabled in this column family");
11fdf7f2 1661 }
1e59de90
TL
1662 InstrumentedMutexLock l(&mutex_);
1663 *ts_low = cfd->GetFullHistoryTsLow();
1664 assert(cfd->user_comparator()->timestamp_size() == ts_low->size());
1665 return Status::OK();
11fdf7f2
TL
1666}
1667
20effc67
TL
1668InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1669 Arena* arena,
20effc67
TL
1670 SequenceNumber sequence,
1671 ColumnFamilyHandle* column_family,
1672 bool allow_unprepared_value) {
7c673cae
FG
1673 ColumnFamilyData* cfd;
1674 if (column_family == nullptr) {
1675 cfd = default_cf_handle_->cfd();
1676 } else {
20effc67 1677 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
1678 cfd = cfh->cfd();
1679 }
1680
1681 mutex_.Lock();
1682 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1683 mutex_.Unlock();
1e59de90
TL
1684 return NewInternalIterator(read_options, cfd, super_version, arena, sequence,
1685 allow_unprepared_value);
7c673cae
FG
1686}
1687
1688void DBImpl::SchedulePurge() {
1689 mutex_.AssertHeld();
1690 assert(opened_successfully_);
1691
1692 // Purge operations are put into High priority queue
1693 bg_purge_scheduled_++;
1694 env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1695}
1696
1697void DBImpl::BackgroundCallPurge() {
1698 mutex_.Lock();
1699
f67539c2
TL
1700 while (!logs_to_free_queue_.empty()) {
1701 assert(!logs_to_free_queue_.empty());
1702 log::Writer* log_writer = *(logs_to_free_queue_.begin());
1703 logs_to_free_queue_.pop_front();
1704 mutex_.Unlock();
1705 delete log_writer;
1706 mutex_.Lock();
1707 }
1708 while (!superversions_to_free_queue_.empty()) {
1709 assert(!superversions_to_free_queue_.empty());
1710 SuperVersion* sv = superversions_to_free_queue_.front();
1711 superversions_to_free_queue_.pop_front();
1712 mutex_.Unlock();
1713 delete sv;
1714 mutex_.Lock();
1715 }
7c673cae 1716
1e59de90
TL
1717 assert(bg_purge_scheduled_ > 0);
1718
f67539c2
TL
1719 // Can't use iterator to go over purge_files_ because inside the loop we're
1720 // unlocking the mutex that protects purge_files_.
1721 while (!purge_files_.empty()) {
1722 auto it = purge_files_.begin();
1723 // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1724 PurgeFileInfo purge_file = it->second;
1725
1726 const std::string& fname = purge_file.fname;
1727 const std::string& dir_to_sync = purge_file.dir_to_sync;
1728 FileType type = purge_file.type;
1729 uint64_t number = purge_file.number;
1730 int job_id = purge_file.job_id;
1731
1732 purge_files_.erase(it);
1733
1734 mutex_.Unlock();
1735 DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1736 mutex_.Lock();
7c673cae 1737 }
f67539c2 1738
7c673cae
FG
1739 bg_purge_scheduled_--;
1740
1741 bg_cv_.SignalAll();
1742 // IMPORTANT:there should be no code after calling SignalAll. This call may
1743 // signal the DB destructor that it's OK to proceed with destruction. In
1744 // that case, all DB variables will be dealloacated and referencing them
1745 // will cause trouble.
1746 mutex_.Unlock();
1747}
1748
1749namespace {
1e59de90
TL
1750
1751// A `SuperVersionHandle` holds a non-null `SuperVersion*` pointing at a
1752// `SuperVersion` referenced once for this object. It also contains the state
1753// needed to clean up the `SuperVersion` reference from outside of `DBImpl`
1754// using `CleanupSuperVersionHandle()`.
1755struct SuperVersionHandle {
1756 // `_super_version` must be non-nullptr and `Ref()`'d once as long as the
1757 // `SuperVersionHandle` may use it.
1758 SuperVersionHandle(DBImpl* _db, InstrumentedMutex* _mu,
1759 SuperVersion* _super_version, bool _background_purge)
7c673cae
FG
1760 : db(_db),
1761 mu(_mu),
1762 super_version(_super_version),
1763 background_purge(_background_purge) {}
1764
1765 DBImpl* db;
1766 InstrumentedMutex* mu;
1767 SuperVersion* super_version;
1768 bool background_purge;
1769};
1770
1e59de90
TL
1771static void CleanupSuperVersionHandle(void* arg1, void* /*arg2*/) {
1772 SuperVersionHandle* sv_handle = reinterpret_cast<SuperVersionHandle*>(arg1);
7c673cae 1773
1e59de90 1774 if (sv_handle->super_version->Unref()) {
7c673cae
FG
1775 // Job id == 0 means that this is not our background process, but rather
1776 // user thread
1777 JobContext job_context(0);
1778
1e59de90
TL
1779 sv_handle->mu->Lock();
1780 sv_handle->super_version->Cleanup();
1781 sv_handle->db->FindObsoleteFiles(&job_context, false, true);
1782 if (sv_handle->background_purge) {
1783 sv_handle->db->ScheduleBgLogWriterClose(&job_context);
1784 sv_handle->db->AddSuperVersionsToFreeQueue(sv_handle->super_version);
1785 sv_handle->db->SchedulePurge();
7c673cae 1786 }
1e59de90 1787 sv_handle->mu->Unlock();
7c673cae 1788
1e59de90
TL
1789 if (!sv_handle->background_purge) {
1790 delete sv_handle->super_version;
f67539c2 1791 }
7c673cae 1792 if (job_context.HaveSomethingToDelete()) {
1e59de90
TL
1793 sv_handle->db->PurgeObsoleteFiles(job_context,
1794 sv_handle->background_purge);
7c673cae
FG
1795 }
1796 job_context.Clean();
1797 }
1798
1e59de90
TL
1799 delete sv_handle;
1800}
1801
1802struct GetMergeOperandsState {
1803 MergeContext merge_context;
1804 PinnedIteratorsManager pinned_iters_mgr;
1805 SuperVersionHandle* sv_handle;
1806};
1807
1808static void CleanupGetMergeOperandsState(void* arg1, void* /*arg2*/) {
1809 GetMergeOperandsState* state = static_cast<GetMergeOperandsState*>(arg1);
1810 CleanupSuperVersionHandle(state->sv_handle /* arg1 */, nullptr /* arg2 */);
7c673cae
FG
1811 delete state;
1812}
1e59de90 1813
7c673cae
FG
1814} // namespace
1815
1e59de90
TL
1816InternalIterator* DBImpl::NewInternalIterator(
1817 const ReadOptions& read_options, ColumnFamilyData* cfd,
1818 SuperVersion* super_version, Arena* arena, SequenceNumber sequence,
1819 bool allow_unprepared_value, ArenaWrappedDBIter* db_iter) {
7c673cae
FG
1820 InternalIterator* internal_iter;
1821 assert(arena != nullptr);
7c673cae
FG
1822 // Need to create internal iterator from the arena.
1823 MergeIteratorBuilder merge_iter_builder(
1824 &cfd->internal_comparator(), arena,
1825 !read_options.total_order_seek &&
1e59de90
TL
1826 super_version->mutable_cf_options.prefix_extractor != nullptr,
1827 read_options.iterate_upper_bound);
1828 // Collect iterator for mutable memtable
1829 auto mem_iter = super_version->mem->NewIterator(read_options, arena);
7c673cae
FG
1830 Status s;
1831 if (!read_options.ignore_range_deletions) {
1e59de90
TL
1832 TruncatedRangeDelIterator* mem_tombstone_iter = nullptr;
1833 auto range_del_iter = super_version->mem->NewRangeTombstoneIterator(
1834 read_options, sequence, false /* immutable_memtable */);
1835 if (range_del_iter == nullptr || range_del_iter->empty()) {
1836 delete range_del_iter;
1837 } else {
1838 mem_tombstone_iter = new TruncatedRangeDelIterator(
1839 std::unique_ptr<FragmentedRangeTombstoneIterator>(range_del_iter),
1840 &cfd->ioptions()->internal_comparator, nullptr /* smallest */,
1841 nullptr /* largest */);
1842 }
1843 merge_iter_builder.AddPointAndTombstoneIterator(mem_iter,
1844 mem_tombstone_iter);
1845 } else {
1846 merge_iter_builder.AddIterator(mem_iter);
7c673cae 1847 }
1e59de90 1848
7c673cae
FG
1849 // Collect all needed child iterators for immutable memtables
1850 if (s.ok()) {
1e59de90
TL
1851 super_version->imm->AddIterators(read_options, &merge_iter_builder,
1852 !read_options.ignore_range_deletions);
7c673cae 1853 }
11fdf7f2 1854 TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
7c673cae
FG
1855 if (s.ok()) {
1856 // Collect iterators for files in L0 - Ln
1857 if (read_options.read_tier != kMemtableTier) {
f67539c2 1858 super_version->current->AddIterators(read_options, file_options_,
1e59de90 1859 &merge_iter_builder,
20effc67 1860 allow_unprepared_value);
7c673cae 1861 }
1e59de90
TL
1862 internal_iter = merge_iter_builder.Finish(
1863 read_options.ignore_range_deletions ? nullptr : db_iter);
1864 SuperVersionHandle* cleanup = new SuperVersionHandle(
1865 this, &mutex_, super_version,
1866 read_options.background_purge_on_iterator_cleanup ||
1867 immutable_db_options_.avoid_unnecessary_blocking_io);
1868 internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup, nullptr);
7c673cae
FG
1869
1870 return internal_iter;
11fdf7f2
TL
1871 } else {
1872 CleanupSuperVersion(super_version);
7c673cae 1873 }
11fdf7f2 1874 return NewErrorInternalIterator<Slice>(s, arena);
7c673cae
FG
1875}
1876
1877ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1878 return default_cf_handle_;
1879}
1880
f67539c2
TL
1881ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1882 return persist_stats_cf_handle_;
1883}
1884
7c673cae
FG
1885Status DBImpl::Get(const ReadOptions& read_options,
1886 ColumnFamilyHandle* column_family, const Slice& key,
1887 PinnableSlice* value) {
20effc67
TL
1888 return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
1889}
1890
1891Status DBImpl::Get(const ReadOptions& read_options,
1892 ColumnFamilyHandle* column_family, const Slice& key,
1893 PinnableSlice* value, std::string* timestamp) {
1e59de90
TL
1894 assert(value != nullptr);
1895 value->Reset();
f67539c2
TL
1896 GetImplOptions get_impl_options;
1897 get_impl_options.column_family = column_family;
1898 get_impl_options.value = value;
20effc67
TL
1899 get_impl_options.timestamp = timestamp;
1900 Status s = GetImpl(read_options, key, get_impl_options);
1901 return s;
7c673cae
FG
1902}
1903
1e59de90
TL
1904Status DBImpl::GetEntity(const ReadOptions& read_options,
1905 ColumnFamilyHandle* column_family, const Slice& key,
1906 PinnableWideColumns* columns) {
1907 if (!column_family) {
1908 return Status::InvalidArgument(
1909 "Cannot call GetEntity without a column family handle");
20effc67 1910 }
1e59de90
TL
1911
1912 if (!columns) {
1913 return Status::InvalidArgument(
1914 "Cannot call GetEntity without a PinnableWideColumns object");
1915 }
1916
1917 columns->Reset();
1918
1919 GetImplOptions get_impl_options;
1920 get_impl_options.column_family = column_family;
1921 get_impl_options.columns = columns;
1922
1923 return GetImpl(read_options, key, get_impl_options);
1924}
1925
1926bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) {
1927 // If both thresholds are reached, a function returning merge operands as
1928 // `PinnableSlice`s should reference the `SuperVersion` to avoid large and/or
1929 // numerous `memcpy()`s.
1930 //
1931 // The below constants enable the optimization conservatively. They are
1932 // verified to not regress `GetMergeOperands()` latency in the following
1933 // scenarios.
1934 //
1935 // - CPU: two socket Intel(R) Xeon(R) Gold 6138 CPU @ 2.00GHz
1936 // - `GetMergeOperands()` threads: 1 - 32
1937 // - Entry size: 32 bytes - 4KB
1938 // - Merges per key: 1 - 16K
1939 // - LSM component: memtable
1940 //
1941 // TODO(ajkr): expand measurement to SST files.
1942 static const size_t kNumBytesForSvRef = 32768;
1943 static const size_t kLog2AvgBytesForSvRef = 8; // 256 bytes
1944
1945 size_t num_bytes = 0;
1946 for (const Slice& sl : merge_context.GetOperands()) {
1947 num_bytes += sl.size();
1948 }
1949 return num_bytes >= kNumBytesForSvRef &&
1950 (num_bytes >> kLog2AvgBytesForSvRef) >=
1951 merge_context.GetOperands().size();
1952}
20effc67 1953
f67539c2 1954Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
20effc67 1955 GetImplOptions& get_impl_options) {
f67539c2 1956 assert(get_impl_options.value != nullptr ||
1e59de90
TL
1957 get_impl_options.merge_operands != nullptr ||
1958 get_impl_options.columns != nullptr);
20effc67
TL
1959
1960 assert(get_impl_options.column_family);
20effc67 1961
1e59de90
TL
1962 if (read_options.timestamp) {
1963 const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
1964 *(read_options.timestamp),
1965 /*ts_for_read=*/true);
1966 if (!s.ok()) {
1967 return s;
1968 }
20effc67 1969 } else {
1e59de90
TL
1970 const Status s = FailIfCfHasTs(get_impl_options.column_family);
1971 if (!s.ok()) {
1972 return s;
1973 }
20effc67 1974 }
20effc67 1975
1e59de90
TL
1976 // Clear the timestamps for returning results so that we can distinguish
1977 // between tombstone or key that has never been written
1978 if (get_impl_options.timestamp) {
1979 get_impl_options.timestamp->clear();
1980 }
1981
1982 GetWithTimestampReadCallback read_cb(0); // Will call Refresh
1983
1984 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
1985 StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
7c673cae
FG
1986 PERF_TIMER_GUARD(get_snapshot_time);
1987
20effc67
TL
1988 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
1989 get_impl_options.column_family);
7c673cae
FG
1990 auto cfd = cfh->cfd();
1991
11fdf7f2
TL
1992 if (tracer_) {
1993 // TODO: This mutex should be removed later, to improve performance when
1994 // tracing is enabled.
1995 InstrumentedMutexLock lock(&trace_mutex_);
1996 if (tracer_) {
20effc67
TL
1997 // TODO: maybe handle the tracing status?
1998 tracer_->Get(get_impl_options.column_family, key).PermitUncheckedError();
11fdf7f2
TL
1999 }
2000 }
2001
1e59de90
TL
2002 if (get_impl_options.get_merge_operands_options != nullptr) {
2003 for (int i = 0; i < get_impl_options.get_merge_operands_options
2004 ->expected_max_number_of_operands;
2005 ++i) {
2006 get_impl_options.merge_operands[i].Reset();
2007 }
2008 }
2009
7c673cae
FG
2010 // Acquire SuperVersion
2011 SuperVersion* sv = GetAndRefSuperVersion(cfd);
2012
2013 TEST_SYNC_POINT("DBImpl::GetImpl:1");
2014 TEST_SYNC_POINT("DBImpl::GetImpl:2");
2015
2016 SequenceNumber snapshot;
2017 if (read_options.snapshot != nullptr) {
f67539c2
TL
2018 if (get_impl_options.callback) {
2019 // Already calculated based on read_options.snapshot
2020 snapshot = get_impl_options.callback->max_visible_seq();
2021 } else {
2022 snapshot =
2023 reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
11fdf7f2 2024 }
7c673cae 2025 } else {
f67539c2
TL
2026 // Note that the snapshot is assigned AFTER referencing the super
2027 // version because otherwise a flush happening in between may compact away
2028 // data for the snapshot, so the reader would see neither data that was be
2029 // visible to the snapshot before compaction nor the newer data inserted
2030 // afterwards.
1e59de90 2031 snapshot = GetLastPublishedSequence();
f67539c2
TL
2032 if (get_impl_options.callback) {
2033 // The unprep_seqs are not published for write unprepared, so it could be
2034 // that max_visible_seq is larger. Seek to the std::max of the two.
2035 // However, we still want our callback to contain the actual snapshot so
2036 // that it can do the correct visibility filtering.
2037 get_impl_options.callback->Refresh(snapshot);
2038
2039 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2040 // max_visible_seq = max(max_visible_seq, snapshot)
2041 //
2042 // Currently, the commented out assert is broken by
2043 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2044 // the regular transaction flow, then this special read callback would not
2045 // be needed.
2046 //
2047 // assert(callback->max_visible_seq() >= snapshot);
2048 snapshot = get_impl_options.callback->max_visible_seq();
2049 }
7c673cae 2050 }
20effc67
TL
2051 // If timestamp is used, we use read callback to ensure <key,t,s> is returned
2052 // only if t <= read_opts.timestamp and s <= snapshot.
1e59de90
TL
2053 // HACK: temporarily overwrite input struct field but restore
2054 SaveAndRestore<ReadCallback*> restore_callback(&get_impl_options.callback);
2055 const Comparator* ucmp = get_impl_options.column_family->GetComparator();
2056 assert(ucmp);
2057 if (ucmp->timestamp_size() > 0) {
2058 assert(!get_impl_options
2059 .callback); // timestamp with callback is not supported
20effc67
TL
2060 read_cb.Refresh(snapshot);
2061 get_impl_options.callback = &read_cb;
2062 }
7c673cae
FG
2063 TEST_SYNC_POINT("DBImpl::GetImpl:3");
2064 TEST_SYNC_POINT("DBImpl::GetImpl:4");
2065
2066 // Prepare to store a list of merge operations if merge occurs.
2067 MergeContext merge_context;
494da23a 2068 SequenceNumber max_covering_tombstone_seq = 0;
7c673cae
FG
2069
2070 Status s;
2071 // First look in the memtable, then in the immutable memtable (if any).
2072 // s is both in/out. When in, s could either be OK or MergeInProgress.
2073 // merge_operands will contain the sequence of merges in the latter case.
f67539c2 2074 LookupKey lkey(key, snapshot, read_options.timestamp);
7c673cae
FG
2075 PERF_TIMER_STOP(get_snapshot_time);
2076
2077 bool skip_memtable = (read_options.read_tier == kPersistedTier &&
2078 has_unpersisted_data_.load(std::memory_order_relaxed));
2079 bool done = false;
1e59de90
TL
2080 std::string* timestamp =
2081 ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
7c673cae 2082 if (!skip_memtable) {
f67539c2
TL
2083 // Get value associated with key
2084 if (get_impl_options.get_value) {
1e59de90
TL
2085 if (sv->mem->Get(
2086 lkey,
2087 get_impl_options.value ? get_impl_options.value->GetSelf()
2088 : nullptr,
2089 get_impl_options.columns, timestamp, &s, &merge_context,
2090 &max_covering_tombstone_seq, read_options,
2091 false /* immutable_memtable */, get_impl_options.callback,
2092 get_impl_options.is_blob_index)) {
f67539c2 2093 done = true;
1e59de90
TL
2094
2095 if (get_impl_options.value) {
2096 get_impl_options.value->PinSelf();
2097 }
2098
f67539c2
TL
2099 RecordTick(stats_, MEMTABLE_HIT);
2100 } else if ((s.ok() || s.IsMergeInProgress()) &&
1e59de90
TL
2101 sv->imm->Get(lkey,
2102 get_impl_options.value
2103 ? get_impl_options.value->GetSelf()
2104 : nullptr,
2105 get_impl_options.columns, timestamp, &s,
2106 &merge_context, &max_covering_tombstone_seq,
2107 read_options, get_impl_options.callback,
f67539c2
TL
2108 get_impl_options.is_blob_index)) {
2109 done = true;
1e59de90
TL
2110
2111 if (get_impl_options.value) {
2112 get_impl_options.value->PinSelf();
2113 }
2114
f67539c2
TL
2115 RecordTick(stats_, MEMTABLE_HIT);
2116 }
2117 } else {
2118 // Get Merge Operands associated with key, Merge Operands should not be
2119 // merged and raw values should be returned to the user.
1e59de90
TL
2120 if (sv->mem->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr,
2121 /*timestamp=*/nullptr, &s, &merge_context,
2122 &max_covering_tombstone_seq, read_options,
2123 false /* immutable_memtable */, nullptr, nullptr,
2124 false)) {
f67539c2
TL
2125 done = true;
2126 RecordTick(stats_, MEMTABLE_HIT);
2127 } else if ((s.ok() || s.IsMergeInProgress()) &&
2128 sv->imm->GetMergeOperands(lkey, &s, &merge_context,
2129 &max_covering_tombstone_seq,
2130 read_options)) {
2131 done = true;
2132 RecordTick(stats_, MEMTABLE_HIT);
2133 }
7c673cae
FG
2134 }
2135 if (!done && !s.ok() && !s.IsMergeInProgress()) {
11fdf7f2 2136 ReturnAndCleanupSuperVersion(cfd, sv);
7c673cae
FG
2137 return s;
2138 }
2139 }
1e59de90
TL
2140 TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:0");
2141 TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:1");
2142 PinnedIteratorsManager pinned_iters_mgr;
7c673cae
FG
2143 if (!done) {
2144 PERF_TIMER_GUARD(get_from_output_files_time);
f67539c2 2145 sv->current->Get(
1e59de90
TL
2146 read_options, lkey, get_impl_options.value, get_impl_options.columns,
2147 timestamp, &s, &merge_context, &max_covering_tombstone_seq,
2148 &pinned_iters_mgr,
f67539c2
TL
2149 get_impl_options.get_value ? get_impl_options.value_found : nullptr,
2150 nullptr, nullptr,
2151 get_impl_options.get_value ? get_impl_options.callback : nullptr,
2152 get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
2153 get_impl_options.get_value);
7c673cae
FG
2154 RecordTick(stats_, MEMTABLE_MISS);
2155 }
2156
2157 {
2158 PERF_TIMER_GUARD(get_post_process_time);
2159
7c673cae 2160 RecordTick(stats_, NUMBER_KEYS_READ);
494da23a
TL
2161 size_t size = 0;
2162 if (s.ok()) {
f67539c2 2163 if (get_impl_options.get_value) {
1e59de90
TL
2164 if (get_impl_options.value) {
2165 size = get_impl_options.value->size();
2166 } else if (get_impl_options.columns) {
2167 size = get_impl_options.columns->serialized_size();
2168 }
f67539c2
TL
2169 } else {
2170 // Return all merge operands for get_impl_options.key
2171 *get_impl_options.number_of_operands =
2172 static_cast<int>(merge_context.GetNumOperands());
2173 if (*get_impl_options.number_of_operands >
2174 get_impl_options.get_merge_operands_options
2175 ->expected_max_number_of_operands) {
2176 s = Status::Incomplete(
2177 Status::SubCode::KMergeOperandsInsufficientCapacity);
2178 } else {
1e59de90
TL
2179 // Each operand depends on one of the following resources: `sv`,
2180 // `pinned_iters_mgr`, or `merge_context`. It would be crazy expensive
2181 // to reference `sv` for each operand relying on it because `sv` is
2182 // (un)ref'd in all threads using the DB. Furthermore, we do not track
2183 // on which resource each operand depends.
2184 //
2185 // To solve this, we bundle the resources in a `GetMergeOperandsState`
2186 // and manage them with a `SharedCleanablePtr` shared among the
2187 // `PinnableSlice`s we return. This bundle includes one `sv` reference
2188 // and ownership of the `merge_context` and `pinned_iters_mgr`
2189 // objects.
2190 bool ref_sv = ShouldReferenceSuperVersion(merge_context);
2191 if (ref_sv) {
2192 assert(!merge_context.GetOperands().empty());
2193 SharedCleanablePtr shared_cleanable;
2194 GetMergeOperandsState* state = nullptr;
2195 state = new GetMergeOperandsState();
2196 state->merge_context = std::move(merge_context);
2197 state->pinned_iters_mgr = std::move(pinned_iters_mgr);
2198
2199 sv->Ref();
2200
2201 state->sv_handle = new SuperVersionHandle(
2202 this, &mutex_, sv,
2203 immutable_db_options_.avoid_unnecessary_blocking_io);
2204
2205 shared_cleanable.Allocate();
2206 shared_cleanable->RegisterCleanup(CleanupGetMergeOperandsState,
2207 state /* arg1 */,
2208 nullptr /* arg2 */);
2209 for (size_t i = 0; i < state->merge_context.GetOperands().size();
2210 ++i) {
2211 const Slice& sl = state->merge_context.GetOperands()[i];
2212 size += sl.size();
2213
2214 get_impl_options.merge_operands->PinSlice(
2215 sl, nullptr /* cleanable */);
2216 if (i == state->merge_context.GetOperands().size() - 1) {
2217 shared_cleanable.MoveAsCleanupTo(
2218 get_impl_options.merge_operands);
2219 } else {
2220 shared_cleanable.RegisterCopyWith(
2221 get_impl_options.merge_operands);
2222 }
2223 get_impl_options.merge_operands++;
2224 }
2225 } else {
2226 for (const Slice& sl : merge_context.GetOperands()) {
2227 size += sl.size();
2228 get_impl_options.merge_operands->PinSelf(sl);
2229 get_impl_options.merge_operands++;
2230 }
f67539c2
TL
2231 }
2232 }
2233 }
494da23a
TL
2234 RecordTick(stats_, BYTES_READ, size);
2235 PERF_COUNTER_ADD(get_read_bytes, size);
2236 }
1e59de90
TL
2237
2238 ReturnAndCleanupSuperVersion(cfd, sv);
2239
494da23a 2240 RecordInHistogram(stats_, BYTES_PER_READ, size);
7c673cae
FG
2241 }
2242 return s;
2243}
2244
2245std::vector<Status> DBImpl::MultiGet(
2246 const ReadOptions& read_options,
2247 const std::vector<ColumnFamilyHandle*>& column_family,
2248 const std::vector<Slice>& keys, std::vector<std::string>* values) {
20effc67
TL
2249 return MultiGet(read_options, column_family, keys, values,
2250 /*timestamps=*/nullptr);
2251}
2252
2253std::vector<Status> DBImpl::MultiGet(
2254 const ReadOptions& read_options,
2255 const std::vector<ColumnFamilyHandle*>& column_family,
2256 const std::vector<Slice>& keys, std::vector<std::string>* values,
2257 std::vector<std::string>* timestamps) {
1e59de90
TL
2258 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
2259 StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
7c673cae
FG
2260 PERF_TIMER_GUARD(get_snapshot_time);
2261
1e59de90
TL
2262 size_t num_keys = keys.size();
2263 assert(column_family.size() == num_keys);
2264 std::vector<Status> stat_list(num_keys);
2265
2266 bool should_fail = false;
2267 for (size_t i = 0; i < num_keys; ++i) {
2268 assert(column_family[i]);
2269 if (read_options.timestamp) {
2270 stat_list[i] = FailIfTsMismatchCf(
2271 column_family[i], *(read_options.timestamp), /*ts_for_read=*/true);
2272 if (!stat_list[i].ok()) {
2273 should_fail = true;
2274 }
20effc67 2275 } else {
1e59de90
TL
2276 stat_list[i] = FailIfCfHasTs(column_family[i]);
2277 if (!stat_list[i].ok()) {
2278 should_fail = true;
2279 }
2280 }
2281 }
2282
2283 if (should_fail) {
2284 for (auto& s : stat_list) {
2285 if (s.ok()) {
2286 s = Status::Incomplete(
2287 "DB not queried due to invalid argument(s) in the same MultiGet");
2288 }
2289 }
2290 return stat_list;
2291 }
2292
2293 if (tracer_) {
2294 // TODO: This mutex should be removed later, to improve performance when
2295 // tracing is enabled.
2296 InstrumentedMutexLock lock(&trace_mutex_);
2297 if (tracer_) {
2298 // TODO: maybe handle the tracing status?
2299 tracer_->MultiGet(column_family, keys).PermitUncheckedError();
20effc67
TL
2300 }
2301 }
20effc67 2302
f67539c2 2303 SequenceNumber consistent_seqnum;
7c673cae 2304
1e59de90 2305 UnorderedMap<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
494da23a 2306 column_family.size());
7c673cae 2307 for (auto cf : column_family) {
20effc67 2308 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
7c673cae
FG
2309 auto cfd = cfh->cfd();
2310 if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
494da23a 2311 multiget_cf_data.emplace(cfd->GetID(),
f67539c2 2312 MultiGetColumnFamilyData(cfh, nullptr));
7c673cae
FG
2313 }
2314 }
2315
f67539c2 2316 std::function<MultiGetColumnFamilyData*(
1e59de90 2317 UnorderedMap<uint32_t, MultiGetColumnFamilyData>::iterator&)>
f67539c2 2318 iter_deref_lambda =
1e59de90 2319 [](UnorderedMap<uint32_t, MultiGetColumnFamilyData>::iterator&
f67539c2
TL
2320 cf_iter) { return &cf_iter->second; };
2321
2322 bool unref_only =
1e59de90 2323 MultiCFSnapshot<UnorderedMap<uint32_t, MultiGetColumnFamilyData>>(
f67539c2
TL
2324 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
2325 &consistent_seqnum);
2326
20effc67
TL
2327 TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum1");
2328 TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum2");
2329
f67539c2
TL
2330 // Contain a list of merge operations if merge occurs.
2331 MergeContext merge_context;
2332
2333 // Note: this always resizes the values array
f67539c2 2334 values->resize(num_keys);
20effc67
TL
2335 if (timestamps) {
2336 timestamps->resize(num_keys);
2337 }
f67539c2
TL
2338
2339 // Keep track of bytes that we read for statistics-recording later
2340 uint64_t bytes_read = 0;
2341 PERF_TIMER_STOP(get_snapshot_time);
2342
2343 // For each of the given keys, apply the entire "get" process as follows:
2344 // First look in the memtable, then in the immutable memtable (if any).
2345 // s is both in/out. When in, s could either be OK or MergeInProgress.
2346 // merge_operands will contain the sequence of merges in the latter case.
2347 size_t num_found = 0;
20effc67
TL
2348 size_t keys_read;
2349 uint64_t curr_value_size = 0;
2350
2351 GetWithTimestampReadCallback timestamp_read_callback(0);
2352 ReadCallback* read_callback = nullptr;
2353 if (read_options.timestamp && read_options.timestamp->size() > 0) {
2354 timestamp_read_callback.Refresh(consistent_seqnum);
2355 read_callback = &timestamp_read_callback;
2356 }
2357
2358 for (keys_read = 0; keys_read < num_keys; ++keys_read) {
f67539c2 2359 merge_context.Clear();
20effc67
TL
2360 Status& s = stat_list[keys_read];
2361 std::string* value = &(*values)[keys_read];
2362 std::string* timestamp = timestamps ? &(*timestamps)[keys_read] : nullptr;
f67539c2 2363
20effc67 2364 LookupKey lkey(keys[keys_read], consistent_seqnum, read_options.timestamp);
1e59de90
TL
2365 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
2366 column_family[keys_read]);
f67539c2
TL
2367 SequenceNumber max_covering_tombstone_seq = 0;
2368 auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
2369 assert(mgd_iter != multiget_cf_data.end());
2370 auto mgd = mgd_iter->second;
2371 auto super_version = mgd.super_version;
2372 bool skip_memtable =
2373 (read_options.read_tier == kPersistedTier &&
2374 has_unpersisted_data_.load(std::memory_order_relaxed));
2375 bool done = false;
2376 if (!skip_memtable) {
1e59de90
TL
2377 if (super_version->mem->Get(
2378 lkey, value, /*columns=*/nullptr, timestamp, &s, &merge_context,
2379 &max_covering_tombstone_seq, read_options,
2380 false /* immutable_memtable */, read_callback)) {
f67539c2
TL
2381 done = true;
2382 RecordTick(stats_, MEMTABLE_HIT);
1e59de90
TL
2383 } else if (super_version->imm->Get(lkey, value, /*columns=*/nullptr,
2384 timestamp, &s, &merge_context,
f67539c2 2385 &max_covering_tombstone_seq,
20effc67 2386 read_options, read_callback)) {
f67539c2
TL
2387 done = true;
2388 RecordTick(stats_, MEMTABLE_HIT);
2389 }
2390 }
2391 if (!done) {
2392 PinnableSlice pinnable_val;
2393 PERF_TIMER_GUARD(get_from_output_files_time);
1e59de90
TL
2394 PinnedIteratorsManager pinned_iters_mgr;
2395 super_version->current->Get(read_options, lkey, &pinnable_val,
2396 /*columns=*/nullptr, timestamp, &s,
2397 &merge_context, &max_covering_tombstone_seq,
2398 &pinned_iters_mgr, /*value_found=*/nullptr,
2399 /*key_exists=*/nullptr,
2400 /*seq=*/nullptr, read_callback);
f67539c2
TL
2401 value->assign(pinnable_val.data(), pinnable_val.size());
2402 RecordTick(stats_, MEMTABLE_MISS);
2403 }
2404
2405 if (s.ok()) {
2406 bytes_read += value->size();
2407 num_found++;
20effc67
TL
2408 curr_value_size += value->size();
2409 if (curr_value_size > read_options.value_size_soft_limit) {
2410 while (++keys_read < num_keys) {
2411 stat_list[keys_read] = Status::Aborted();
2412 }
2413 break;
2414 }
2415 }
20effc67 2416 if (read_options.deadline.count() &&
1e59de90 2417 immutable_db_options_.clock->NowMicros() >
20effc67
TL
2418 static_cast<uint64_t>(read_options.deadline.count())) {
2419 break;
2420 }
2421 }
2422
2423 if (keys_read < num_keys) {
2424 // The only reason to break out of the loop is when the deadline is
2425 // exceeded
1e59de90
TL
2426 assert(immutable_db_options_.clock->NowMicros() >
2427 static_cast<uint64_t>(read_options.deadline.count()));
20effc67
TL
2428 for (++keys_read; keys_read < num_keys; ++keys_read) {
2429 stat_list[keys_read] = Status::TimedOut();
f67539c2
TL
2430 }
2431 }
2432
2433 // Post processing (decrement reference counts and record statistics)
2434 PERF_TIMER_GUARD(get_post_process_time);
2435 autovector<SuperVersion*> superversions_to_delete;
2436
2437 for (auto mgd_iter : multiget_cf_data) {
2438 auto mgd = mgd_iter.second;
2439 if (!unref_only) {
2440 ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
2441 } else {
2442 mgd.cfd->GetSuperVersion()->Unref();
2443 }
2444 }
2445 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2446 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2447 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2448 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2449 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2450 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2451 PERF_TIMER_STOP(get_post_process_time);
2452
2453 return stat_list;
2454}
2455
2456template <class T>
2457bool DBImpl::MultiCFSnapshot(
2458 const ReadOptions& read_options, ReadCallback* callback,
2459 std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
2460 iter_deref_func,
2461 T* cf_list, SequenceNumber* snapshot) {
2462 PERF_TIMER_GUARD(get_snapshot_time);
2463
494da23a 2464 bool last_try = false;
f67539c2
TL
2465 if (cf_list->size() == 1) {
2466 // Fast path for a single column family. We can simply get the thread loca
2467 // super version
2468 auto cf_iter = cf_list->begin();
2469 auto node = iter_deref_func(cf_iter);
2470 node->super_version = GetAndRefSuperVersion(node->cfd);
2471 if (read_options.snapshot != nullptr) {
2472 // Note: In WritePrepared txns this is not necessary but not harmful
2473 // either. Because prep_seq > snapshot => commit_seq > snapshot so if
2474 // a snapshot is specified we should be fine with skipping seq numbers
2475 // that are greater than that.
2476 //
2477 // In WriteUnprepared, we cannot set snapshot in the lookup key because we
2478 // may skip uncommitted data that should be visible to the transaction for
2479 // reading own writes.
2480 *snapshot =
2481 static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
2482 if (callback) {
2483 *snapshot = std::max(*snapshot, callback->max_visible_seq());
2484 }
2485 } else {
2486 // Since we get and reference the super version before getting
2487 // the snapshot number, without a mutex protection, it is possible
2488 // that a memtable switch happened in the middle and not all the
2489 // data for this snapshot is available. But it will contain all
2490 // the data available in the super version we have, which is also
2491 // a valid snapshot to read from.
2492 // We shouldn't get snapshot before finding and referencing the super
2493 // version because a flush happening in between may compact away data for
2494 // the snapshot, but the snapshot is earlier than the data overwriting it,
2495 // so users may see wrong results.
1e59de90 2496 *snapshot = GetLastPublishedSequence();
f67539c2
TL
2497 }
2498 } else {
494da23a
TL
2499 // If we end up with the same issue of memtable geting sealed during 2
2500 // consecutive retries, it means the write rate is very high. In that case
2501 // its probably ok to take the mutex on the 3rd try so we can succeed for
2502 // sure
1e59de90 2503 constexpr int num_retries = 3;
f67539c2 2504 for (int i = 0; i < num_retries; ++i) {
494da23a
TL
2505 last_try = (i == num_retries - 1);
2506 bool retry = false;
2507
2508 if (i > 0) {
f67539c2
TL
2509 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2510 ++cf_iter) {
2511 auto node = iter_deref_func(cf_iter);
2512 SuperVersion* super_version = node->super_version;
2513 ColumnFamilyData* cfd = node->cfd;
494da23a
TL
2514 if (super_version != nullptr) {
2515 ReturnAndCleanupSuperVersion(cfd, super_version);
2516 }
f67539c2 2517 node->super_version = nullptr;
494da23a
TL
2518 }
2519 }
494da23a
TL
2520 if (read_options.snapshot == nullptr) {
2521 if (last_try) {
2522 TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
2523 // We're close to max number of retries. For the last retry,
2524 // acquire the lock so we're sure to succeed
2525 mutex_.Lock();
2526 }
1e59de90 2527 *snapshot = GetLastPublishedSequence();
494da23a 2528 } else {
1e59de90
TL
2529 *snapshot =
2530 static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
2531 ->number_;
494da23a 2532 }
f67539c2
TL
2533 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2534 ++cf_iter) {
2535 auto node = iter_deref_func(cf_iter);
494da23a 2536 if (!last_try) {
f67539c2 2537 node->super_version = GetAndRefSuperVersion(node->cfd);
494da23a 2538 } else {
f67539c2 2539 node->super_version = node->cfd->GetSuperVersion()->Ref();
494da23a
TL
2540 }
2541 TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
2542 if (read_options.snapshot != nullptr || last_try) {
2543 // If user passed a snapshot, then we don't care if a memtable is
2544 // sealed or compaction happens because the snapshot would ensure
2545 // that older key versions are kept around. If this is the last
2546 // retry, then we have the lock so nothing bad can happen
2547 continue;
2548 }
2549 // We could get the earliest sequence number for the whole list of
2550 // memtables, which will include immutable memtables as well, but that
2551 // might be tricky to maintain in case we decide, in future, to do
2552 // memtable compaction.
2553 if (!last_try) {
f67539c2
TL
2554 SequenceNumber seq =
2555 node->super_version->mem->GetEarliestSequenceNumber();
2556 if (seq > *snapshot) {
494da23a
TL
2557 retry = true;
2558 break;
2559 }
2560 }
2561 }
2562 if (!retry) {
2563 if (last_try) {
2564 mutex_.Unlock();
2565 }
2566 break;
2567 }
2568 }
7c673cae 2569 }
7c673cae 2570
7c673cae 2571 // Keep track of bytes that we read for statistics-recording later
7c673cae
FG
2572 PERF_TIMER_STOP(get_snapshot_time);
2573
f67539c2
TL
2574 return last_try;
2575}
2576
2577void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2578 ColumnFamilyHandle** column_families, const Slice* keys,
2579 PinnableSlice* values, Status* statuses,
2580 const bool sorted_input) {
20effc67
TL
2581 return MultiGet(read_options, num_keys, column_families, keys, values,
2582 /*timestamps=*/nullptr, statuses, sorted_input);
2583}
2584
2585void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2586 ColumnFamilyHandle** column_families, const Slice* keys,
2587 PinnableSlice* values, std::string* timestamps,
2588 Status* statuses, const bool sorted_input) {
f67539c2
TL
2589 if (num_keys == 0) {
2590 return;
2591 }
20effc67 2592
1e59de90 2593 bool should_fail = false;
20effc67
TL
2594 for (size_t i = 0; i < num_keys; ++i) {
2595 ColumnFamilyHandle* cfh = column_families[i];
2596 assert(cfh);
1e59de90
TL
2597 if (read_options.timestamp) {
2598 statuses[i] = FailIfTsMismatchCf(cfh, *(read_options.timestamp),
2599 /*ts_for_read=*/true);
2600 if (!statuses[i].ok()) {
2601 should_fail = true;
2602 }
20effc67 2603 } else {
1e59de90
TL
2604 statuses[i] = FailIfCfHasTs(cfh);
2605 if (!statuses[i].ok()) {
2606 should_fail = true;
2607 }
2608 }
2609 }
2610 if (should_fail) {
2611 for (size_t i = 0; i < num_keys; ++i) {
2612 if (statuses[i].ok()) {
2613 statuses[i] = Status::Incomplete(
2614 "DB not queried due to invalid argument(s) in the same MultiGet");
2615 }
2616 }
2617 return;
2618 }
2619
2620 if (tracer_) {
2621 // TODO: This mutex should be removed later, to improve performance when
2622 // tracing is enabled.
2623 InstrumentedMutexLock lock(&trace_mutex_);
2624 if (tracer_) {
2625 // TODO: maybe handle the tracing status?
2626 tracer_->MultiGet(num_keys, column_families, keys).PermitUncheckedError();
20effc67
TL
2627 }
2628 }
20effc67 2629
f67539c2
TL
2630 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2631 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2632 sorted_keys.resize(num_keys);
2633 for (size_t i = 0; i < num_keys; ++i) {
1e59de90 2634 values[i].Reset();
f67539c2 2635 key_context.emplace_back(column_families[i], keys[i], &values[i],
20effc67 2636 timestamps ? &timestamps[i] : nullptr,
f67539c2
TL
2637 &statuses[i]);
2638 }
2639 for (size_t i = 0; i < num_keys; ++i) {
2640 sorted_keys[i] = &key_context[i];
2641 }
2642 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2643
2644 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
2645 multiget_cf_data;
2646 size_t cf_start = 0;
2647 ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
1e59de90 2648
f67539c2
TL
2649 for (size_t i = 0; i < num_keys; ++i) {
2650 KeyContext* key_ctx = sorted_keys[i];
2651 if (key_ctx->column_family != cf) {
1e59de90 2652 multiget_cf_data.emplace_back(cf, cf_start, i - cf_start, nullptr);
f67539c2
TL
2653 cf_start = i;
2654 cf = key_ctx->column_family;
2655 }
2656 }
1e59de90
TL
2657
2658 multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
2659
f67539c2
TL
2660 std::function<MultiGetColumnFamilyData*(
2661 autovector<MultiGetColumnFamilyData,
2662 MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
2663 iter_deref_lambda =
2664 [](autovector<MultiGetColumnFamilyData,
2665 MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
2666 return &(*cf_iter);
2667 };
2668
2669 SequenceNumber consistent_seqnum;
2670 bool unref_only = MultiCFSnapshot<
2671 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
2672 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
2673 &consistent_seqnum);
2674
20effc67
TL
2675 GetWithTimestampReadCallback timestamp_read_callback(0);
2676 ReadCallback* read_callback = nullptr;
2677 if (read_options.timestamp && read_options.timestamp->size() > 0) {
2678 timestamp_read_callback.Refresh(consistent_seqnum);
2679 read_callback = &timestamp_read_callback;
2680 }
2681
2682 Status s;
2683 auto cf_iter = multiget_cf_data.begin();
2684 for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2685 s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
2686 &sorted_keys, cf_iter->super_version, consistent_seqnum,
1e59de90 2687 read_callback);
20effc67
TL
2688 if (!s.ok()) {
2689 break;
2690 }
2691 }
2692 if (!s.ok()) {
2693 assert(s.IsTimedOut() || s.IsAborted());
2694 for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2695 for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
2696 ++i) {
2697 *sorted_keys[i]->s = s;
2698 }
2699 }
2700 }
2701
2702 for (const auto& iter : multiget_cf_data) {
f67539c2 2703 if (!unref_only) {
20effc67 2704 ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version);
f67539c2 2705 } else {
20effc67 2706 iter.cfd->GetSuperVersion()->Unref();
f67539c2
TL
2707 }
2708 }
2709}
2710
2711namespace {
2712// Order keys by CF ID, followed by key contents
2713struct CompareKeyContext {
2714 inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
2715 ColumnFamilyHandleImpl* cfh =
2716 static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2717 uint32_t cfd_id1 = cfh->cfd()->GetID();
2718 const Comparator* comparator = cfh->cfd()->user_comparator();
1e59de90 2719 cfh = static_cast<ColumnFamilyHandleImpl*>(rhs->column_family);
f67539c2
TL
2720 uint32_t cfd_id2 = cfh->cfd()->GetID();
2721
2722 if (cfd_id1 < cfd_id2) {
2723 return true;
2724 } else if (cfd_id1 > cfd_id2) {
2725 return false;
2726 }
2727
2728 // Both keys are from the same column family
20effc67
TL
2729 int cmp = comparator->CompareWithoutTimestamp(
2730 *(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
f67539c2
TL
2731 if (cmp < 0) {
2732 return true;
2733 }
2734 return false;
2735 }
2736};
2737
2738} // anonymous namespace
2739
2740void DBImpl::PrepareMultiGetKeys(
2741 size_t num_keys, bool sorted_input,
2742 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
f67539c2 2743 if (sorted_input) {
1e59de90
TL
2744#ifndef NDEBUG
2745 assert(std::is_sorted(sorted_keys->begin(), sorted_keys->end(),
2746 CompareKeyContext()));
f67539c2 2747#endif
1e59de90 2748 return;
f67539c2 2749 }
1e59de90
TL
2750
2751 std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2752 CompareKeyContext());
f67539c2
TL
2753}
2754
2755void DBImpl::MultiGet(const ReadOptions& read_options,
2756 ColumnFamilyHandle* column_family, const size_t num_keys,
2757 const Slice* keys, PinnableSlice* values,
2758 Status* statuses, const bool sorted_input) {
20effc67
TL
2759 return MultiGet(read_options, column_family, num_keys, keys, values,
2760 /*timestamp=*/nullptr, statuses, sorted_input);
2761}
2762
2763void DBImpl::MultiGet(const ReadOptions& read_options,
2764 ColumnFamilyHandle* column_family, const size_t num_keys,
2765 const Slice* keys, PinnableSlice* values,
2766 std::string* timestamps, Status* statuses,
2767 const bool sorted_input) {
1e59de90
TL
2768 if (tracer_) {
2769 // TODO: This mutex should be removed later, to improve performance when
2770 // tracing is enabled.
2771 InstrumentedMutexLock lock(&trace_mutex_);
2772 if (tracer_) {
2773 // TODO: maybe handle the tracing status?
2774 tracer_->MultiGet(num_keys, column_family, keys).PermitUncheckedError();
2775 }
2776 }
f67539c2
TL
2777 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2778 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2779 sorted_keys.resize(num_keys);
2780 for (size_t i = 0; i < num_keys; ++i) {
1e59de90 2781 values[i].Reset();
20effc67
TL
2782 key_context.emplace_back(column_family, keys[i], &values[i],
2783 timestamps ? &timestamps[i] : nullptr,
2784 &statuses[i]);
f67539c2
TL
2785 }
2786 for (size_t i = 0; i < num_keys; ++i) {
2787 sorted_keys[i] = &key_context[i];
2788 }
2789 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2790 MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2791}
2792
2793void DBImpl::MultiGetWithCallback(
2794 const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2795 ReadCallback* callback,
2796 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2797 std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2798 multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2799 std::function<MultiGetColumnFamilyData*(
2800 std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2801 iter_deref_lambda =
2802 [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2803 return &(*cf_iter);
2804 };
2805
2806 size_t num_keys = sorted_keys->size();
2807 SequenceNumber consistent_seqnum;
2808 bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2809 read_options, callback, iter_deref_lambda, &multiget_cf_data,
2810 &consistent_seqnum);
2811#ifndef NDEBUG
2812 assert(!unref_only);
2813#else
2814 // Silence unused variable warning
2815 (void)unref_only;
2816#endif // NDEBUG
2817
2818 if (callback && read_options.snapshot == nullptr) {
2819 // The unprep_seqs are not published for write unprepared, so it could be
2820 // that max_visible_seq is larger. Seek to the std::max of the two.
2821 // However, we still want our callback to contain the actual snapshot so
2822 // that it can do the correct visibility filtering.
2823 callback->Refresh(consistent_seqnum);
2824
2825 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2826 // max_visible_seq = max(max_visible_seq, snapshot)
2827 //
2828 // Currently, the commented out assert is broken by
2829 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2830 // the regular transaction flow, then this special read callback would not
2831 // be needed.
2832 //
2833 // assert(callback->max_visible_seq() >= snapshot);
2834 consistent_seqnum = callback->max_visible_seq();
2835 }
2836
20effc67 2837 GetWithTimestampReadCallback timestamp_read_callback(0);
1e59de90 2838 ReadCallback* read_callback = callback;
20effc67 2839 if (read_options.timestamp && read_options.timestamp->size() > 0) {
1e59de90 2840 assert(!read_callback); // timestamp with callback is not supported
20effc67
TL
2841 timestamp_read_callback.Refresh(consistent_seqnum);
2842 read_callback = &timestamp_read_callback;
2843 }
2844
2845 Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2846 multiget_cf_data[0].super_version, consistent_seqnum,
1e59de90 2847 read_callback);
20effc67 2848 assert(s.ok() || s.IsTimedOut() || s.IsAborted());
f67539c2
TL
2849 ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2850 multiget_cf_data[0].super_version);
2851}
2852
20effc67
TL
2853// The actual implementation of batched MultiGet. Parameters -
2854// start_key - Index in the sorted_keys vector to start processing from
2855// num_keys - Number of keys to lookup, starting with sorted_keys[start_key]
2856// sorted_keys - The entire batch of sorted keys for this CF
2857//
2858// The per key status is returned in the KeyContext structures pointed to by
2859// sorted_keys. An overall Status is also returned, with the only possible
2860// values being Status::OK() and Status::TimedOut(). The latter indicates
2861// that the call exceeded read_options.deadline
2862Status DBImpl::MultiGetImpl(
f67539c2
TL
2863 const ReadOptions& read_options, size_t start_key, size_t num_keys,
2864 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2865 SuperVersion* super_version, SequenceNumber snapshot,
1e59de90
TL
2866 ReadCallback* callback) {
2867 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
2868 StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
2869
2870 assert(sorted_keys);
2871 // Clear the timestamps for returning results so that we can distinguish
2872 // between tombstone or key that has never been written
2873 for (auto* kctx : *sorted_keys) {
2874 assert(kctx);
2875 if (kctx->timestamp) {
2876 kctx->timestamp->clear();
2877 }
2878 }
f67539c2 2879
7c673cae
FG
2880 // For each of the given keys, apply the entire "get" process as follows:
2881 // First look in the memtable, then in the immutable memtable (if any).
2882 // s is both in/out. When in, s could either be OK or MergeInProgress.
2883 // merge_operands will contain the sequence of merges in the latter case.
f67539c2 2884 size_t keys_left = num_keys;
20effc67
TL
2885 Status s;
2886 uint64_t curr_value_size = 0;
f67539c2 2887 while (keys_left) {
20effc67 2888 if (read_options.deadline.count() &&
1e59de90 2889 immutable_db_options_.clock->NowMicros() >
20effc67
TL
2890 static_cast<uint64_t>(read_options.deadline.count())) {
2891 s = Status::TimedOut();
2892 break;
2893 }
2894
f67539c2
TL
2895 size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2896 ? MultiGetContext::MAX_BATCH_SIZE
2897 : keys_left;
2898 MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
1e59de90
TL
2899 batch_size, snapshot, read_options, GetFileSystem(),
2900 stats_);
f67539c2 2901 MultiGetRange range = ctx.GetMultiGetRange();
20effc67 2902 range.AddValueSize(curr_value_size);
f67539c2
TL
2903 bool lookup_current = false;
2904
2905 keys_left -= batch_size;
2906 for (auto mget_iter = range.begin(); mget_iter != range.end();
2907 ++mget_iter) {
2908 mget_iter->merge_context.Clear();
2909 *mget_iter->s = Status::OK();
2910 }
7c673cae 2911
7c673cae
FG
2912 bool skip_memtable =
2913 (read_options.read_tier == kPersistedTier &&
2914 has_unpersisted_data_.load(std::memory_order_relaxed));
7c673cae 2915 if (!skip_memtable) {
f67539c2 2916 super_version->mem->MultiGet(read_options, &range, callback,
1e59de90 2917 false /* immutable_memtable */);
f67539c2 2918 if (!range.empty()) {
1e59de90 2919 super_version->imm->MultiGet(read_options, &range, callback);
f67539c2
TL
2920 }
2921 if (!range.empty()) {
2922 lookup_current = true;
2923 uint64_t left = range.KeysLeft();
2924 RecordTick(stats_, MEMTABLE_MISS, left);
7c673cae
FG
2925 }
2926 }
f67539c2 2927 if (lookup_current) {
7c673cae 2928 PERF_TIMER_GUARD(get_from_output_files_time);
1e59de90 2929 super_version->current->MultiGet(read_options, &range, callback);
7c673cae 2930 }
20effc67
TL
2931 curr_value_size = range.GetValueSize();
2932 if (curr_value_size > read_options.value_size_soft_limit) {
2933 s = Status::Aborted();
2934 break;
2935 }
7c673cae
FG
2936 }
2937
2938 // Post processing (decrement reference counts and record statistics)
2939 PERF_TIMER_GUARD(get_post_process_time);
f67539c2
TL
2940 size_t num_found = 0;
2941 uint64_t bytes_read = 0;
20effc67 2942 for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
f67539c2
TL
2943 KeyContext* key = (*sorted_keys)[i];
2944 if (key->s->ok()) {
2945 bytes_read += key->value->size();
2946 num_found++;
7c673cae
FG
2947 }
2948 }
20effc67
TL
2949 if (keys_left) {
2950 assert(s.IsTimedOut() || s.IsAborted());
2951 for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
2952 ++i) {
2953 KeyContext* key = (*sorted_keys)[i];
2954 *key->s = s;
2955 }
2956 }
f67539c2 2957
7c673cae
FG
2958 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2959 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
11fdf7f2 2960 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
7c673cae 2961 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
494da23a 2962 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
11fdf7f2 2963 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
7c673cae 2964 PERF_TIMER_STOP(get_post_process_time);
20effc67
TL
2965
2966 return s;
7c673cae
FG
2967}
2968
2969Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
11fdf7f2 2970 const std::string& column_family,
7c673cae 2971 ColumnFamilyHandle** handle) {
11fdf7f2
TL
2972 assert(handle != nullptr);
2973 Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2974 if (s.ok()) {
2975 s = WriteOptionsFile(true /*need_mutex_lock*/,
2976 true /*need_enter_write_thread*/);
2977 }
2978 return s;
2979}
2980
2981Status DBImpl::CreateColumnFamilies(
2982 const ColumnFamilyOptions& cf_options,
2983 const std::vector<std::string>& column_family_names,
2984 std::vector<ColumnFamilyHandle*>* handles) {
2985 assert(handles != nullptr);
2986 handles->clear();
2987 size_t num_cf = column_family_names.size();
2988 Status s;
2989 bool success_once = false;
2990 for (size_t i = 0; i < num_cf; i++) {
2991 ColumnFamilyHandle* handle;
2992 s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2993 if (!s.ok()) {
2994 break;
2995 }
2996 handles->push_back(handle);
2997 success_once = true;
2998 }
2999 if (success_once) {
3000 Status persist_options_status = WriteOptionsFile(
3001 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
3002 if (s.ok() && !persist_options_status.ok()) {
3003 s = persist_options_status;
3004 }
3005 }
3006 return s;
3007}
3008
3009Status DBImpl::CreateColumnFamilies(
3010 const std::vector<ColumnFamilyDescriptor>& column_families,
3011 std::vector<ColumnFamilyHandle*>* handles) {
3012 assert(handles != nullptr);
3013 handles->clear();
3014 size_t num_cf = column_families.size();
3015 Status s;
3016 bool success_once = false;
3017 for (size_t i = 0; i < num_cf; i++) {
3018 ColumnFamilyHandle* handle;
3019 s = CreateColumnFamilyImpl(column_families[i].options,
3020 column_families[i].name, &handle);
3021 if (!s.ok()) {
3022 break;
3023 }
3024 handles->push_back(handle);
3025 success_once = true;
3026 }
3027 if (success_once) {
3028 Status persist_options_status = WriteOptionsFile(
3029 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
3030 if (s.ok() && !persist_options_status.ok()) {
3031 s = persist_options_status;
3032 }
3033 }
3034 return s;
3035}
3036
3037Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
3038 const std::string& column_family_name,
3039 ColumnFamilyHandle** handle) {
7c673cae 3040 Status s;
7c673cae
FG
3041 *handle = nullptr;
3042
f67539c2
TL
3043 DBOptions db_options =
3044 BuildDBOptions(immutable_db_options_, mutable_db_options_);
3045 s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
11fdf7f2
TL
3046 if (s.ok()) {
3047 for (auto& cf_path : cf_options.cf_paths) {
3048 s = env_->CreateDirIfMissing(cf_path.path);
3049 if (!s.ok()) {
3050 break;
3051 }
3052 }
3053 }
7c673cae
FG
3054 if (!s.ok()) {
3055 return s;
3056 }
3057
11fdf7f2 3058 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae
FG
3059 {
3060 InstrumentedMutexLock l(&mutex_);
3061
3062 if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
3063 nullptr) {
3064 return Status::InvalidArgument("Column family already exists");
3065 }
3066 VersionEdit edit;
3067 edit.AddColumnFamily(column_family_name);
3068 uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
3069 edit.SetColumnFamily(new_id);
3070 edit.SetLogNumber(logfile_number_);
3071 edit.SetComparatorName(cf_options.comparator->Name());
3072
3073 // LogAndApply will both write the creation in MANIFEST and create
3074 // ColumnFamilyData object
3075 { // write thread
3076 WriteThread::Writer w;
3077 write_thread_.EnterUnbatched(&w, &mutex_);
3078 // LogAndApply will both write the creation in MANIFEST and create
3079 // ColumnFamilyData object
3080 s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
3081 &mutex_, directories_.GetDbDir(), false,
3082 &cf_options);
7c673cae
FG
3083 write_thread_.ExitUnbatched(&w);
3084 }
11fdf7f2
TL
3085 if (s.ok()) {
3086 auto* cfd =
3087 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
3088 assert(cfd != nullptr);
20effc67 3089 std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
f67539c2 3090 s = cfd->AddDirectories(&dummy_created_dirs);
11fdf7f2 3091 }
7c673cae 3092 if (s.ok()) {
7c673cae
FG
3093 auto* cfd =
3094 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
3095 assert(cfd != nullptr);
11fdf7f2
TL
3096 InstallSuperVersionAndScheduleWork(cfd, &sv_context,
3097 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
3098
3099 if (!cfd->mem()->IsSnapshotSupported()) {
3100 is_snapshot_supported_ = false;
3101 }
3102
11fdf7f2
TL
3103 cfd->set_initialized();
3104
7c673cae
FG
3105 *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
3106 ROCKS_LOG_INFO(immutable_db_options_.info_log,
3107 "Created column family [%s] (ID %u)",
3108 column_family_name.c_str(), (unsigned)cfd->GetID());
3109 } else {
3110 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3111 "Creating column family [%s] FAILED -- %s",
3112 column_family_name.c_str(), s.ToString().c_str());
3113 }
3114 } // InstrumentedMutexLock l(&mutex_)
3115
1e59de90
TL
3116 if (cf_options.preserve_internal_time_seconds > 0 ||
3117 cf_options.preclude_last_level_data_seconds > 0) {
3118 s = RegisterRecordSeqnoTimeWorker();
3119 }
11fdf7f2 3120 sv_context.Clean();
7c673cae
FG
3121 // this is outside the mutex
3122 if (s.ok()) {
3123 NewThreadStatusCfInfo(
20effc67 3124 static_cast_with_check<ColumnFamilyHandleImpl>(*handle)->cfd());
7c673cae
FG
3125 }
3126 return s;
3127}
3128
3129Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
11fdf7f2
TL
3130 assert(column_family != nullptr);
3131 Status s = DropColumnFamilyImpl(column_family);
3132 if (s.ok()) {
3133 s = WriteOptionsFile(true /*need_mutex_lock*/,
3134 true /*need_enter_write_thread*/);
3135 }
3136 return s;
3137}
3138
3139Status DBImpl::DropColumnFamilies(
3140 const std::vector<ColumnFamilyHandle*>& column_families) {
3141 Status s;
3142 bool success_once = false;
3143 for (auto* handle : column_families) {
3144 s = DropColumnFamilyImpl(handle);
3145 if (!s.ok()) {
3146 break;
3147 }
3148 success_once = true;
3149 }
3150 if (success_once) {
3151 Status persist_options_status = WriteOptionsFile(
3152 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
3153 if (s.ok() && !persist_options_status.ok()) {
3154 s = persist_options_status;
3155 }
3156 }
3157 return s;
3158}
3159
3160Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
20effc67 3161 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3162 auto cfd = cfh->cfd();
3163 if (cfd->GetID() == 0) {
3164 return Status::InvalidArgument("Can't drop default column family");
3165 }
3166
3167 bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
3168
3169 VersionEdit edit;
3170 edit.DropColumnFamily();
3171 edit.SetColumnFamily(cfd->GetID());
3172
3173 Status s;
7c673cae
FG
3174 {
3175 InstrumentedMutexLock l(&mutex_);
3176 if (cfd->IsDropped()) {
3177 s = Status::InvalidArgument("Column family already dropped!\n");
3178 }
3179 if (s.ok()) {
3180 // we drop column family from a single write thread
3181 WriteThread::Writer w;
3182 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2 3183 s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
1e59de90 3184 &mutex_, directories_.GetDbDir());
7c673cae
FG
3185 write_thread_.ExitUnbatched(&w);
3186 }
11fdf7f2
TL
3187 if (s.ok()) {
3188 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
3189 max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
3190 mutable_cf_options->max_write_buffer_number;
3191 }
7c673cae
FG
3192
3193 if (!cf_support_snapshot) {
3194 // Dropped Column Family doesn't support snapshot. Need to recalculate
3195 // is_snapshot_supported_.
3196 bool new_is_snapshot_supported = true;
3197 for (auto c : *versions_->GetColumnFamilySet()) {
3198 if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
3199 new_is_snapshot_supported = false;
3200 break;
3201 }
3202 }
3203 is_snapshot_supported_ = new_is_snapshot_supported;
3204 }
11fdf7f2 3205 bg_cv_.SignalAll();
7c673cae
FG
3206 }
3207
1e59de90
TL
3208 if (cfd->ioptions()->preserve_internal_time_seconds > 0 ||
3209 cfd->ioptions()->preclude_last_level_data_seconds > 0) {
3210 s = RegisterRecordSeqnoTimeWorker();
3211 }
3212
7c673cae
FG
3213 if (s.ok()) {
3214 // Note that here we erase the associated cf_info of the to-be-dropped
3215 // cfd before its ref-count goes to zero to avoid having to erase cf_info
3216 // later inside db_mutex.
3217 EraseThreadStatusCfInfo(cfd);
3218 assert(cfd->IsDropped());
7c673cae
FG
3219 ROCKS_LOG_INFO(immutable_db_options_.info_log,
3220 "Dropped column family with id %u\n", cfd->GetID());
7c673cae
FG
3221 } else {
3222 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3223 "Dropping column family with id %u FAILED -- %s\n",
3224 cfd->GetID(), s.ToString().c_str());
3225 }
3226
3227 return s;
3228}
3229
3230bool DBImpl::KeyMayExist(const ReadOptions& read_options,
3231 ColumnFamilyHandle* column_family, const Slice& key,
20effc67
TL
3232 std::string* value, std::string* timestamp,
3233 bool* value_found) {
7c673cae
FG
3234 assert(value != nullptr);
3235 if (value_found != nullptr) {
3236 // falsify later if key-may-exist but can't fetch value
3237 *value_found = true;
3238 }
3239 ReadOptions roptions = read_options;
11fdf7f2 3240 roptions.read_tier = kBlockCacheTier; // read from block cache only
7c673cae 3241 PinnableSlice pinnable_val;
f67539c2
TL
3242 GetImplOptions get_impl_options;
3243 get_impl_options.column_family = column_family;
3244 get_impl_options.value = &pinnable_val;
3245 get_impl_options.value_found = value_found;
20effc67 3246 get_impl_options.timestamp = timestamp;
f67539c2 3247 auto s = GetImpl(roptions, key, get_impl_options);
7c673cae
FG
3248 value->assign(pinnable_val.data(), pinnable_val.size());
3249
3250 // If block_cache is enabled and the index block of the table didn't
3251 // not present in block_cache, the return value will be Status::Incomplete.
3252 // In this case, key may still exist in the table.
3253 return s.ok() || s.IsIncomplete();
3254}
3255
3256Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
3257 ColumnFamilyHandle* column_family) {
11fdf7f2
TL
3258 if (read_options.managed) {
3259 return NewErrorIterator(
3260 Status::NotSupported("Managed iterator is not supported anymore."));
3261 }
3262 Iterator* result = nullptr;
7c673cae
FG
3263 if (read_options.read_tier == kPersistedTier) {
3264 return NewErrorIterator(Status::NotSupported(
3265 "ReadTier::kPersistedData is not yet supported in iterators."));
3266 }
1e59de90
TL
3267
3268 assert(column_family);
3269
3270 if (read_options.timestamp) {
3271 const Status s = FailIfTsMismatchCf(
3272 column_family, *(read_options.timestamp), /*ts_for_read=*/true);
3273 if (!s.ok()) {
3274 return NewErrorIterator(s);
3275 }
3276 } else {
3277 const Status s = FailIfCfHasTs(column_family);
3278 if (!s.ok()) {
3279 return NewErrorIterator(s);
3280 }
11fdf7f2 3281 }
1e59de90 3282
20effc67
TL
3283 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3284 ColumnFamilyData* cfd = cfh->cfd();
3285 assert(cfd != nullptr);
11fdf7f2 3286 ReadCallback* read_callback = nullptr; // No read callback provided.
494da23a 3287 if (read_options.tailing) {
7c673cae
FG
3288#ifdef ROCKSDB_LITE
3289 // not supported in lite version
11fdf7f2
TL
3290 result = nullptr;
3291
7c673cae 3292#else
f67539c2 3293 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
20effc67
TL
3294 auto iter = new ForwardIterator(this, read_options, cfd, sv,
3295 /* allow_unprepared_value */ true);
11fdf7f2
TL
3296 result = NewDBIterator(
3297 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
1e59de90 3298 cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
11fdf7f2
TL
3299 sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
3300 this, cfd);
7c673cae
FG
3301#endif
3302 } else {
11fdf7f2
TL
3303 // Note: no need to consider the special case of
3304 // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
3305 // WritePreparedTxnDB
20effc67
TL
3306 result = NewIteratorImpl(read_options, cfd,
3307 (read_options.snapshot != nullptr)
3308 ? read_options.snapshot->GetSequenceNumber()
3309 : kMaxSequenceNumber,
3310 read_callback);
7c673cae 3311 }
11fdf7f2
TL
3312 return result;
3313}
3314
3315ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
3316 ColumnFamilyData* cfd,
3317 SequenceNumber snapshot,
3318 ReadCallback* read_callback,
1e59de90 3319 bool expose_blob_index,
11fdf7f2 3320 bool allow_refresh) {
f67539c2 3321 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
11fdf7f2 3322
20effc67
TL
3323 TEST_SYNC_POINT("DBImpl::NewIterator:1");
3324 TEST_SYNC_POINT("DBImpl::NewIterator:2");
3325
3326 if (snapshot == kMaxSequenceNumber) {
3327 // Note that the snapshot is assigned AFTER referencing the super
3328 // version because otherwise a flush happening in between may compact away
3329 // data for the snapshot, so the reader would see neither data that was be
3330 // visible to the snapshot before compaction nor the newer data inserted
3331 // afterwards.
3332 // Note that the super version might not contain all the data available
3333 // to this snapshot, but in that case it can see all the data in the
3334 // super version, which is a valid consistent state after the user
3335 // calls NewIterator().
3336 snapshot = versions_->LastSequence();
3337 TEST_SYNC_POINT("DBImpl::NewIterator:3");
3338 TEST_SYNC_POINT("DBImpl::NewIterator:4");
3339 }
3340
11fdf7f2
TL
3341 // Try to generate a DB iterator tree in continuous memory area to be
3342 // cache friendly. Here is an example of result:
3343 // +-------------------------------+
3344 // | |
3345 // | ArenaWrappedDBIter |
3346 // | + |
3347 // | +---> Inner Iterator ------------+
3348 // | | | |
3349 // | | +-- -- -- -- -- -- -- --+ |
3350 // | +--- | Arena | |
3351 // | | | |
3352 // | Allocated Memory: | |
3353 // | | +-------------------+ |
3354 // | | | DBIter | <---+
3355 // | | + |
3356 // | | | +-> iter_ ------------+
3357 // | | | | |
3358 // | | +-------------------+ |
3359 // | | | MergingIterator | <---+
3360 // | | + |
3361 // | | | +->child iter1 ------------+
3362 // | | | | | |
3363 // | | +->child iter2 ----------+ |
3364 // | | | | | | |
3365 // | | | +->child iter3 --------+ | |
3366 // | | | | | |
3367 // | | +-------------------+ | | |
3368 // | | | Iterator1 | <--------+
3369 // | | +-------------------+ | |
3370 // | | | Iterator2 | <------+
3371 // | | +-------------------+ |
3372 // | | | Iterator3 | <----+
3373 // | | +-------------------+
3374 // | | |
3375 // +-------+-----------------------+
3376 //
3377 // ArenaWrappedDBIter inlines an arena area where all the iterators in
3378 // the iterator tree are allocated in the order of being accessed when
3379 // querying.
3380 // Laying out the iterators in the order of being accessed makes it more
3381 // likely that any iterator pointer is close to the iterator it points to so
3382 // that they are likely to be in the same cache line and/or page.
3383 ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
1e59de90
TL
3384 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
3385 snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
3386 sv->version_number, read_callback, this, cfd, expose_blob_index,
f67539c2 3387 read_options.snapshot != nullptr ? false : allow_refresh);
11fdf7f2 3388
20effc67 3389 InternalIterator* internal_iter = NewInternalIterator(
1e59de90
TL
3390 db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), snapshot,
3391 /* allow_unprepared_value */ true, db_iter);
11fdf7f2
TL
3392 db_iter->SetIterUnderDBIter(internal_iter);
3393
3394 return db_iter;
7c673cae
FG
3395}
3396
3397Status DBImpl::NewIterators(
3398 const ReadOptions& read_options,
3399 const std::vector<ColumnFamilyHandle*>& column_families,
3400 std::vector<Iterator*>* iterators) {
11fdf7f2
TL
3401 if (read_options.managed) {
3402 return Status::NotSupported("Managed iterator is not supported anymore.");
3403 }
7c673cae
FG
3404 if (read_options.read_tier == kPersistedTier) {
3405 return Status::NotSupported(
3406 "ReadTier::kPersistedData is not yet supported in iterators.");
3407 }
1e59de90
TL
3408
3409 if (read_options.timestamp) {
3410 for (auto* cf : column_families) {
3411 assert(cf);
3412 const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
3413 /*ts_for_read=*/true);
3414 if (!s.ok()) {
3415 return s;
3416 }
3417 }
3418 } else {
3419 for (auto* cf : column_families) {
3420 assert(cf);
3421 const Status s = FailIfCfHasTs(cf);
3422 if (!s.ok()) {
3423 return s;
3424 }
3425 }
3426 }
3427
11fdf7f2 3428 ReadCallback* read_callback = nullptr; // No read callback provided.
7c673cae
FG
3429 iterators->clear();
3430 iterators->reserve(column_families.size());
11fdf7f2 3431 if (read_options.tailing) {
7c673cae
FG
3432#ifdef ROCKSDB_LITE
3433 return Status::InvalidArgument(
494da23a 3434 "Tailing iterator not supported in RocksDB lite");
7c673cae
FG
3435#else
3436 for (auto cfh : column_families) {
20effc67 3437 auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
f67539c2 3438 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
20effc67
TL
3439 auto iter = new ForwardIterator(this, read_options, cfd, sv,
3440 /* allow_unprepared_value */ true);
7c673cae 3441 iterators->push_back(NewDBIterator(
11fdf7f2 3442 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
1e59de90 3443 cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
7c673cae 3444 sv->mutable_cf_options.max_sequential_skip_in_iterations,
11fdf7f2 3445 read_callback, this, cfd));
7c673cae
FG
3446 }
3447#endif
3448 } else {
11fdf7f2
TL
3449 // Note: no need to consider the special case of
3450 // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
3451 // WritePreparedTxnDB
3452 auto snapshot = read_options.snapshot != nullptr
3453 ? read_options.snapshot->GetSequenceNumber()
3454 : versions_->LastSequence();
7c673cae 3455 for (size_t i = 0; i < column_families.size(); ++i) {
11fdf7f2 3456 auto* cfd =
20effc67
TL
3457 static_cast_with_check<ColumnFamilyHandleImpl>(column_families[i])
3458 ->cfd();
11fdf7f2
TL
3459 iterators->push_back(
3460 NewIteratorImpl(read_options, cfd, snapshot, read_callback));
7c673cae
FG
3461 }
3462 }
3463
3464 return Status::OK();
3465}
3466
3467const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
3468
3469#ifndef ROCKSDB_LITE
3470const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
3471 return GetSnapshotImpl(true);
3472}
3473#endif // ROCKSDB_LITE
3474
1e59de90
TL
3475std::pair<Status, std::shared_ptr<const Snapshot>>
3476DBImpl::CreateTimestampedSnapshot(SequenceNumber snapshot_seq, uint64_t ts) {
3477 assert(ts != std::numeric_limits<uint64_t>::max());
3478
3479 auto ret = CreateTimestampedSnapshotImpl(snapshot_seq, ts, /*lock=*/true);
3480 return ret;
3481}
3482
3483std::shared_ptr<const SnapshotImpl> DBImpl::GetTimestampedSnapshot(
3484 uint64_t ts) const {
3485 InstrumentedMutexLock lock_guard(&mutex_);
3486 return timestamped_snapshots_.GetSnapshot(ts);
3487}
3488
3489void DBImpl::ReleaseTimestampedSnapshotsOlderThan(uint64_t ts,
3490 size_t* remaining_total_ss) {
3491 autovector<std::shared_ptr<const SnapshotImpl>> snapshots_to_release;
3492 {
3493 InstrumentedMutexLock lock_guard(&mutex_);
3494 timestamped_snapshots_.ReleaseSnapshotsOlderThan(ts, snapshots_to_release);
3495 }
3496 snapshots_to_release.clear();
3497
3498 if (remaining_total_ss) {
3499 InstrumentedMutexLock lock_guard(&mutex_);
3500 *remaining_total_ss = static_cast<size_t>(snapshots_.count());
3501 }
3502}
3503
3504Status DBImpl::GetTimestampedSnapshots(
3505 uint64_t ts_lb, uint64_t ts_ub,
3506 std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const {
3507 if (ts_lb >= ts_ub) {
3508 return Status::InvalidArgument(
3509 "timestamp lower bound must be smaller than upper bound");
3510 }
3511 timestamped_snapshots.clear();
3512 InstrumentedMutexLock lock_guard(&mutex_);
3513 timestamped_snapshots_.GetSnapshots(ts_lb, ts_ub, timestamped_snapshots);
3514 return Status::OK();
3515}
3516
494da23a
TL
3517SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
3518 bool lock) {
7c673cae 3519 int64_t unix_time = 0;
1e59de90
TL
3520 immutable_db_options_.clock->GetCurrentTime(&unix_time)
3521 .PermitUncheckedError(); // Ignore error
7c673cae
FG
3522 SnapshotImpl* s = new SnapshotImpl;
3523
494da23a
TL
3524 if (lock) {
3525 mutex_.Lock();
1e59de90
TL
3526 } else {
3527 mutex_.AssertHeld();
494da23a 3528 }
7c673cae
FG
3529 // returns null if the underlying memtable does not support snapshot.
3530 if (!is_snapshot_supported_) {
494da23a
TL
3531 if (lock) {
3532 mutex_.Unlock();
3533 }
7c673cae
FG
3534 delete s;
3535 return nullptr;
3536 }
1e59de90 3537 auto snapshot_seq = GetLastPublishedSequence();
494da23a
TL
3538 SnapshotImpl* snapshot =
3539 snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
3540 if (lock) {
3541 mutex_.Unlock();
3542 }
3543 return snapshot;
7c673cae
FG
3544}
3545
1e59de90
TL
3546std::pair<Status, std::shared_ptr<const SnapshotImpl>>
3547DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
3548 bool lock) {
3549 int64_t unix_time = 0;
3550 immutable_db_options_.clock->GetCurrentTime(&unix_time)
3551 .PermitUncheckedError(); // Ignore error
3552 SnapshotImpl* s = new SnapshotImpl;
3553
3554 const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber);
3555
3556 if (lock) {
3557 mutex_.Lock();
3558 } else {
3559 mutex_.AssertHeld();
3560 }
3561 // returns null if the underlying memtable does not support snapshot.
3562 if (!is_snapshot_supported_) {
3563 if (lock) {
3564 mutex_.Unlock();
3565 }
3566 delete s;
3567 return std::make_pair(
3568 Status::NotSupported("Memtable does not support snapshot"), nullptr);
3569 }
3570
3571 // Caller is not write thread, thus didn't provide a valid snapshot_seq.
3572 // Obtain seq from db.
3573 if (!need_update_seq) {
3574 snapshot_seq = GetLastPublishedSequence();
3575 }
3576
3577 std::shared_ptr<const SnapshotImpl> latest =
3578 timestamped_snapshots_.GetSnapshot(std::numeric_limits<uint64_t>::max());
3579
3580 // If there is already a latest timestamped snapshot, then we need to do some
3581 // checks.
3582 if (latest) {
3583 uint64_t latest_snap_ts = latest->GetTimestamp();
3584 SequenceNumber latest_snap_seq = latest->GetSequenceNumber();
3585 assert(latest_snap_seq <= snapshot_seq);
3586 bool needs_create_snap = true;
3587 Status status;
3588 std::shared_ptr<const SnapshotImpl> ret;
3589 if (latest_snap_ts > ts) {
3590 // A snapshot created later cannot have smaller timestamp than a previous
3591 // timestamped snapshot.
3592 needs_create_snap = false;
3593 std::ostringstream oss;
3594 oss << "snapshot exists with larger timestamp " << latest_snap_ts << " > "
3595 << ts;
3596 status = Status::InvalidArgument(oss.str());
3597 } else if (latest_snap_ts == ts) {
3598 if (latest_snap_seq == snapshot_seq) {
3599 // We are requesting the same sequence number and timestamp, thus can
3600 // safely reuse (share) the current latest timestamped snapshot.
3601 needs_create_snap = false;
3602 ret = latest;
3603 } else if (latest_snap_seq < snapshot_seq) {
3604 // There may have been writes to the database since the latest
3605 // timestamped snapshot, yet we are still requesting the same
3606 // timestamp. In this case, we cannot create the new timestamped
3607 // snapshot.
3608 needs_create_snap = false;
3609 std::ostringstream oss;
3610 oss << "Allocated seq is " << snapshot_seq
3611 << ", while snapshot exists with smaller seq " << latest_snap_seq
3612 << " but same timestamp " << ts;
3613 status = Status::InvalidArgument(oss.str());
3614 }
3615 }
3616 if (!needs_create_snap) {
3617 if (lock) {
3618 mutex_.Unlock();
3619 }
3620 delete s;
3621 return std::make_pair(status, ret);
3622 } else {
3623 status.PermitUncheckedError();
3624 }
3625 }
3626
3627 SnapshotImpl* snapshot =
3628 snapshots_.New(s, snapshot_seq, unix_time,
3629 /*is_write_conflict_boundary=*/true, ts);
3630
3631 std::shared_ptr<const SnapshotImpl> ret(
3632 snapshot,
3633 std::bind(&DBImpl::ReleaseSnapshot, this, std::placeholders::_1));
3634 timestamped_snapshots_.AddSnapshot(ret);
3635
3636 // Caller is from write thread, and we need to update database's sequence
3637 // number.
3638 if (need_update_seq) {
3639 assert(versions_);
3640 if (last_seq_same_as_publish_seq_) {
3641 versions_->SetLastSequence(snapshot_seq);
3642 } else {
3643 // TODO: support write-prepared/write-unprepared transactions with two
3644 // write queues.
3645 assert(false);
3646 }
3647 }
3648
3649 if (lock) {
3650 mutex_.Unlock();
3651 }
3652 return std::make_pair(Status::OK(), ret);
3653}
3654
494da23a 3655namespace {
1e59de90 3656using CfdList = autovector<ColumnFamilyData*, 2>;
494da23a
TL
3657bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
3658 for (const ColumnFamilyData* t : list) {
3659 if (t == cfd) {
3660 return true;
3661 }
3662 }
3663 return false;
3664}
3665} // namespace
3666
7c673cae 3667void DBImpl::ReleaseSnapshot(const Snapshot* s) {
1e59de90
TL
3668 if (s == nullptr) {
3669 // DBImpl::GetSnapshot() can return nullptr when snapshot
3670 // not supported by specifying the condition:
3671 // inplace_update_support enabled.
3672 return;
3673 }
7c673cae
FG
3674 const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
3675 {
3676 InstrumentedMutexLock l(&mutex_);
3677 snapshots_.Delete(casted_s);
11fdf7f2
TL
3678 uint64_t oldest_snapshot;
3679 if (snapshots_.empty()) {
1e59de90 3680 oldest_snapshot = GetLastPublishedSequence();
11fdf7f2
TL
3681 } else {
3682 oldest_snapshot = snapshots_.oldest()->number_;
3683 }
494da23a
TL
3684 // Avoid to go through every column family by checking a global threshold
3685 // first.
3686 if (oldest_snapshot > bottommost_files_mark_threshold_) {
3687 CfdList cf_scheduled;
3688 for (auto* cfd : *versions_->GetColumnFamilySet()) {
1e59de90
TL
3689 if (!cfd->ioptions()->allow_ingest_behind) {
3690 cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
3691 if (!cfd->current()
3692 ->storage_info()
3693 ->BottommostFilesMarkedForCompaction()
3694 .empty()) {
3695 SchedulePendingCompaction(cfd);
3696 MaybeScheduleFlushOrCompaction();
3697 cf_scheduled.push_back(cfd);
3698 }
494da23a 3699 }
11fdf7f2 3700 }
494da23a
TL
3701
3702 // Calculate a new threshold, skipping those CFs where compactions are
3703 // scheduled. We do not do the same pass as the previous loop because
3704 // mutex might be unlocked during the loop, making the result inaccurate.
3705 SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
3706 for (auto* cfd : *versions_->GetColumnFamilySet()) {
1e59de90
TL
3707 if (CfdListContains(cf_scheduled, cfd) ||
3708 cfd->ioptions()->allow_ingest_behind) {
494da23a
TL
3709 continue;
3710 }
3711 new_bottommost_files_mark_threshold = std::min(
3712 new_bottommost_files_mark_threshold,
3713 cfd->current()->storage_info()->bottommost_files_mark_threshold());
3714 }
3715 bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
11fdf7f2 3716 }
7c673cae
FG
3717 }
3718 delete casted_s;
3719}
3720
3721#ifndef ROCKSDB_LITE
3722Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
3723 TablePropertiesCollection* props) {
20effc67 3724 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3725 auto cfd = cfh->cfd();
3726
3727 // Increment the ref count
3728 mutex_.Lock();
3729 auto version = cfd->current();
3730 version->Ref();
3731 mutex_.Unlock();
3732
3733 auto s = version->GetPropertiesOfAllTables(props);
3734
3735 // Decrement the ref count
3736 mutex_.Lock();
3737 version->Unref();
3738 mutex_.Unlock();
3739
3740 return s;
3741}
3742
3743Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
3744 const Range* range, std::size_t n,
3745 TablePropertiesCollection* props) {
20effc67 3746 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3747 auto cfd = cfh->cfd();
3748
3749 // Increment the ref count
3750 mutex_.Lock();
3751 auto version = cfd->current();
3752 version->Ref();
3753 mutex_.Unlock();
3754
3755 auto s = version->GetPropertiesOfTablesInRange(range, n, props);
3756
3757 // Decrement the ref count
3758 mutex_.Lock();
3759 version->Unref();
3760 mutex_.Unlock();
3761
3762 return s;
3763}
3764
3765#endif // ROCKSDB_LITE
3766
11fdf7f2 3767const std::string& DBImpl::GetName() const { return dbname_; }
7c673cae 3768
11fdf7f2 3769Env* DBImpl::GetEnv() const { return env_; }
7c673cae 3770
f67539c2 3771FileSystem* DB::GetFileSystem() const {
1e59de90
TL
3772 const auto& fs = GetEnv()->GetFileSystem();
3773 return fs.get();
f67539c2
TL
3774}
3775
3776FileSystem* DBImpl::GetFileSystem() const {
3777 return immutable_db_options_.fs.get();
3778}
3779
1e59de90
TL
3780SystemClock* DBImpl::GetSystemClock() const {
3781 return immutable_db_options_.clock;
3782}
3783
20effc67
TL
3784#ifndef ROCKSDB_LITE
3785
1e59de90 3786Status DBImpl::StartIOTrace(const TraceOptions& trace_options,
20effc67
TL
3787 std::unique_ptr<TraceWriter>&& trace_writer) {
3788 assert(trace_writer != nullptr);
1e59de90
TL
3789 return io_tracer_->StartIOTrace(GetSystemClock(), trace_options,
3790 std::move(trace_writer));
20effc67
TL
3791}
3792
3793Status DBImpl::EndIOTrace() {
3794 io_tracer_->EndIOTrace();
3795 return Status::OK();
3796}
3797
3798#endif // ROCKSDB_LITE
3799
7c673cae
FG
3800Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
3801 InstrumentedMutexLock l(&mutex_);
20effc67 3802 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
3803 return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
3804 cfh->cfd()->GetLatestCFOptions());
3805}
3806
3807DBOptions DBImpl::GetDBOptions() const {
3808 InstrumentedMutexLock l(&mutex_);
3809 return BuildDBOptions(immutable_db_options_, mutable_db_options_);
3810}
3811
3812bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
3813 const Slice& property, std::string* value) {
3814 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3815 value->clear();
20effc67
TL
3816 auto cfd =
3817 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
3818 if (property_info == nullptr) {
3819 return false;
3820 } else if (property_info->handle_int) {
3821 uint64_t int_value;
3822 bool ret_value =
3823 GetIntPropertyInternal(cfd, *property_info, false, &int_value);
3824 if (ret_value) {
1e59de90 3825 *value = std::to_string(int_value);
7c673cae
FG
3826 }
3827 return ret_value;
3828 } else if (property_info->handle_string) {
1e59de90
TL
3829 if (property_info->need_out_of_mutex) {
3830 return cfd->internal_stats()->GetStringProperty(*property_info, property,
3831 value);
3832 } else {
3833 InstrumentedMutexLock l(&mutex_);
3834 return cfd->internal_stats()->GetStringProperty(*property_info, property,
3835 value);
3836 }
11fdf7f2 3837 } else if (property_info->handle_string_dbimpl) {
1e59de90
TL
3838 if (property_info->need_out_of_mutex) {
3839 return (this->*(property_info->handle_string_dbimpl))(value);
3840 } else {
3841 InstrumentedMutexLock l(&mutex_);
3842 return (this->*(property_info->handle_string_dbimpl))(value);
11fdf7f2 3843 }
7c673cae
FG
3844 }
3845 // Shouldn't reach here since exactly one of handle_string and handle_int
3846 // should be non-nullptr.
3847 assert(false);
3848 return false;
3849}
3850
3851bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
3852 const Slice& property,
11fdf7f2 3853 std::map<std::string, std::string>* value) {
7c673cae
FG
3854 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3855 value->clear();
20effc67
TL
3856 auto cfd =
3857 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
3858 if (property_info == nullptr) {
3859 return false;
3860 } else if (property_info->handle_map) {
1e59de90
TL
3861 if (property_info->need_out_of_mutex) {
3862 return cfd->internal_stats()->GetMapProperty(*property_info, property,
3863 value);
3864 } else {
3865 InstrumentedMutexLock l(&mutex_);
3866 return cfd->internal_stats()->GetMapProperty(*property_info, property,
3867 value);
3868 }
7c673cae
FG
3869 }
3870 // If we reach this point it means that handle_map is not provided for the
3871 // requested property
3872 return false;
3873}
3874
3875bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
3876 const Slice& property, uint64_t* value) {
3877 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3878 if (property_info == nullptr || property_info->handle_int == nullptr) {
3879 return false;
3880 }
20effc67
TL
3881 auto cfd =
3882 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae
FG
3883 return GetIntPropertyInternal(cfd, *property_info, false, value);
3884}
3885
3886bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
3887 const DBPropertyInfo& property_info,
3888 bool is_locked, uint64_t* value) {
3889 assert(property_info.handle_int != nullptr);
3890 if (!property_info.need_out_of_mutex) {
3891 if (is_locked) {
3892 mutex_.AssertHeld();
3893 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3894 } else {
3895 InstrumentedMutexLock l(&mutex_);
3896 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3897 }
3898 } else {
3899 SuperVersion* sv = nullptr;
20effc67
TL
3900 if (is_locked) {
3901 mutex_.Unlock();
7c673cae 3902 }
20effc67 3903 sv = GetAndRefSuperVersion(cfd);
7c673cae
FG
3904
3905 bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
3906 property_info, sv->current, value);
3907
20effc67
TL
3908 ReturnAndCleanupSuperVersion(cfd, sv);
3909 if (is_locked) {
3910 mutex_.Lock();
7c673cae
FG
3911 }
3912
3913 return ret;
3914 }
3915}
3916
11fdf7f2
TL
3917bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
3918 assert(value != nullptr);
1e59de90 3919 Statistics* statistics = immutable_db_options_.stats;
11fdf7f2
TL
3920 if (!statistics) {
3921 return false;
3922 }
3923 *value = statistics->ToString();
3924 return true;
3925}
3926
7c673cae
FG
3927#ifndef ROCKSDB_LITE
3928Status DBImpl::ResetStats() {
3929 InstrumentedMutexLock l(&mutex_);
3930 for (auto* cfd : *versions_->GetColumnFamilySet()) {
11fdf7f2
TL
3931 if (cfd->initialized()) {
3932 cfd->internal_stats()->Clear();
3933 }
7c673cae
FG
3934 }
3935 return Status::OK();
3936}
3937#endif // ROCKSDB_LITE
3938
3939bool DBImpl::GetAggregatedIntProperty(const Slice& property,
3940 uint64_t* aggregated_value) {
3941 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3942 if (property_info == nullptr || property_info->handle_int == nullptr) {
3943 return false;
3944 }
3945
3946 uint64_t sum = 0;
20effc67 3947 bool ret = true;
7c673cae
FG
3948 {
3949 // Needs mutex to protect the list of column families.
3950 InstrumentedMutexLock l(&mutex_);
3951 uint64_t value;
1e59de90 3952 for (auto* cfd : versions_->GetRefedColumnFamilySet()) {
11fdf7f2
TL
3953 if (!cfd->initialized()) {
3954 continue;
3955 }
20effc67
TL
3956 ret = GetIntPropertyInternal(cfd, *property_info, true, &value);
3957 // GetIntPropertyInternal may release db mutex and re-acquire it.
3958 mutex_.AssertHeld();
20effc67 3959 if (ret) {
7c673cae
FG
3960 sum += value;
3961 } else {
20effc67
TL
3962 ret = false;
3963 break;
7c673cae
FG
3964 }
3965 }
3966 }
3967 *aggregated_value = sum;
20effc67 3968 return ret;
7c673cae
FG
3969}
3970
3971SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
3972 // TODO(ljin): consider using GetReferencedSuperVersion() directly
f67539c2 3973 return cfd->GetThreadLocalSuperVersion(this);
7c673cae
FG
3974}
3975
3976// REQUIRED: this function should only be called on the write thread or if the
3977// mutex is held.
3978SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
3979 auto column_family_set = versions_->GetColumnFamilySet();
3980 auto cfd = column_family_set->GetColumnFamily(column_family_id);
3981 if (!cfd) {
3982 return nullptr;
3983 }
3984
3985 return GetAndRefSuperVersion(cfd);
3986}
3987
11fdf7f2
TL
3988void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
3989 // Release SuperVersion
3990 if (sv->Unref()) {
1e59de90 3991 bool defer_purge = immutable_db_options().avoid_unnecessary_blocking_io;
11fdf7f2
TL
3992 {
3993 InstrumentedMutexLock l(&mutex_);
3994 sv->Cleanup();
f67539c2
TL
3995 if (defer_purge) {
3996 AddSuperVersionsToFreeQueue(sv);
3997 SchedulePurge();
3998 }
3999 }
4000 if (!defer_purge) {
4001 delete sv;
11fdf7f2 4002 }
11fdf7f2
TL
4003 RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
4004 }
4005 RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
4006}
4007
7c673cae
FG
4008void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
4009 SuperVersion* sv) {
11fdf7f2
TL
4010 if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
4011 CleanupSuperVersion(sv);
7c673cae
FG
4012 }
4013}
4014
4015// REQUIRED: this function should only be called on the write thread.
4016void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
4017 SuperVersion* sv) {
4018 auto column_family_set = versions_->GetColumnFamilySet();
4019 auto cfd = column_family_set->GetColumnFamily(column_family_id);
4020
4021 // If SuperVersion is held, and we successfully fetched a cfd using
4022 // GetAndRefSuperVersion(), it must still exist.
4023 assert(cfd != nullptr);
4024 ReturnAndCleanupSuperVersion(cfd, sv);
4025}
4026
4027// REQUIRED: this function should only be called on the write thread or if the
4028// mutex is held.
4029ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
4030 ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
4031
4032 if (!cf_memtables->Seek(column_family_id)) {
4033 return nullptr;
4034 }
4035
4036 return cf_memtables->GetColumnFamilyHandle();
4037}
4038
11fdf7f2 4039// REQUIRED: mutex is NOT held.
494da23a 4040std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
11fdf7f2 4041 uint32_t column_family_id) {
11fdf7f2
TL
4042 InstrumentedMutexLock l(&mutex_);
4043
494da23a
TL
4044 auto* cfd =
4045 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
4046 if (cfd == nullptr) {
11fdf7f2
TL
4047 return nullptr;
4048 }
4049
494da23a
TL
4050 return std::unique_ptr<ColumnFamilyHandleImpl>(
4051 new ColumnFamilyHandleImpl(cfd, this, &mutex_));
11fdf7f2
TL
4052}
4053
7c673cae
FG
4054void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
4055 const Range& range,
4056 uint64_t* const count,
4057 uint64_t* const size) {
4058 ColumnFamilyHandleImpl* cfh =
20effc67 4059 static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
4060 ColumnFamilyData* cfd = cfh->cfd();
4061 SuperVersion* sv = GetAndRefSuperVersion(cfd);
4062
4063 // Convert user_key into a corresponding internal key.
4064 InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
4065 InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
4066 MemTable::MemTableStats memStats =
4067 sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
4068 MemTable::MemTableStats immStats =
4069 sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
4070 *count = memStats.count + immStats.count;
4071 *size = memStats.size + immStats.size;
4072
4073 ReturnAndCleanupSuperVersion(cfd, sv);
4074}
4075
f67539c2
TL
4076Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
4077 ColumnFamilyHandle* column_family,
4078 const Range* range, int n, uint64_t* sizes) {
1e59de90 4079 if (!options.include_memtables && !options.include_files) {
f67539c2
TL
4080 return Status::InvalidArgument("Invalid options");
4081 }
4082
1e59de90
TL
4083 const Comparator* const ucmp = column_family->GetComparator();
4084 assert(ucmp);
4085 size_t ts_sz = ucmp->timestamp_size();
4086
7c673cae 4087 Version* v;
20effc67 4088 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
4089 auto cfd = cfh->cfd();
4090 SuperVersion* sv = GetAndRefSuperVersion(cfd);
4091 v = sv->current;
4092
4093 for (int i = 0; i < n; i++) {
1e59de90
TL
4094 Slice start = range[i].start;
4095 Slice limit = range[i].limit;
4096
4097 // Add timestamp if needed
4098 std::string start_with_ts, limit_with_ts;
4099 if (ts_sz > 0) {
4100 // Maximum timestamp means including all key with any timestamp
4101 AppendKeyWithMaxTimestamp(&start_with_ts, start, ts_sz);
4102 // Append a maximum timestamp as the range limit is exclusive:
4103 // [start, limit)
4104 AppendKeyWithMaxTimestamp(&limit_with_ts, limit, ts_sz);
4105 start = start_with_ts;
4106 limit = limit_with_ts;
4107 }
7c673cae 4108 // Convert user_key into a corresponding internal key.
1e59de90
TL
4109 InternalKey k1(start, kMaxSequenceNumber, kValueTypeForSeek);
4110 InternalKey k2(limit, kMaxSequenceNumber, kValueTypeForSeek);
7c673cae 4111 sizes[i] = 0;
f67539c2
TL
4112 if (options.include_files) {
4113 sizes[i] += versions_->ApproximateSize(
4114 options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
4115 /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
7c673cae 4116 }
1e59de90 4117 if (options.include_memtables) {
7c673cae
FG
4118 sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
4119 sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
4120 }
4121 }
4122
4123 ReturnAndCleanupSuperVersion(cfd, sv);
f67539c2 4124 return Status::OK();
7c673cae
FG
4125}
4126
4127std::list<uint64_t>::iterator
4128DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
4129 // We need to remember the iterator of our insert, because after the
4130 // background job is done, we need to remove that element from
4131 // pending_outputs_.
4132 pending_outputs_.push_back(versions_->current_next_file_number());
4133 auto pending_outputs_inserted_elem = pending_outputs_.end();
4134 --pending_outputs_inserted_elem;
4135 return pending_outputs_inserted_elem;
4136}
4137
4138void DBImpl::ReleaseFileNumberFromPendingOutputs(
f67539c2
TL
4139 std::unique_ptr<std::list<uint64_t>::iterator>& v) {
4140 if (v.get() != nullptr) {
4141 pending_outputs_.erase(*v.get());
4142 v.reset();
4143 }
7c673cae
FG
4144}
4145
4146#ifndef ROCKSDB_LITE
4147Status DBImpl::GetUpdatesSince(
494da23a 4148 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
7c673cae 4149 const TransactionLogIterator::ReadOptions& read_options) {
7c673cae 4150 RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
1e59de90
TL
4151 if (seq_per_batch_) {
4152 return Status::NotSupported(
4153 "This API is not yet compatible with write-prepared/write-unprepared "
4154 "transactions");
4155 }
7c673cae
FG
4156 if (seq > versions_->LastSequence()) {
4157 return Status::NotFound("Requested sequence not yet written in the db");
4158 }
4159 return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
4160}
4161
4162Status DBImpl::DeleteFile(std::string name) {
4163 uint64_t number;
4164 FileType type;
4165 WalFileType log_type;
4166 if (!ParseFileName(name, &number, &type, &log_type) ||
20effc67 4167 (type != kTableFile && type != kWalFile)) {
7c673cae
FG
4168 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
4169 name.c_str());
4170 return Status::InvalidArgument("Invalid file name");
4171 }
4172
20effc67 4173 if (type == kWalFile) {
7c673cae
FG
4174 // Only allow deleting archived log files
4175 if (log_type != kArchivedLogFile) {
4176 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4177 "DeleteFile %s failed - not archived log.\n",
4178 name.c_str());
4179 return Status::NotSupported("Delete only supported for archived logs");
4180 }
1e59de90 4181 Status status = wal_manager_.DeleteFile(name, number);
7c673cae
FG
4182 if (!status.ok()) {
4183 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4184 "DeleteFile %s failed -- %s.\n", name.c_str(),
4185 status.ToString().c_str());
4186 }
4187 return status;
4188 }
4189
1e59de90 4190 Status status;
7c673cae
FG
4191 int level;
4192 FileMetaData* metadata;
4193 ColumnFamilyData* cfd;
4194 VersionEdit edit;
4195 JobContext job_context(next_job_id_.fetch_add(1), true);
4196 {
4197 InstrumentedMutexLock l(&mutex_);
4198 status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
4199 if (!status.ok()) {
4200 ROCKS_LOG_WARN(immutable_db_options_.info_log,
4201 "DeleteFile %s failed. File not found\n", name.c_str());
4202 job_context.Clean();
4203 return Status::InvalidArgument("File not found");
4204 }
4205 assert(level < cfd->NumberLevels());
4206
4207 // If the file is being compacted no need to delete.
4208 if (metadata->being_compacted) {
4209 ROCKS_LOG_INFO(immutable_db_options_.info_log,
4210 "DeleteFile %s Skipped. File about to be compacted\n",
4211 name.c_str());
4212 job_context.Clean();
4213 return Status::OK();
4214 }
4215
4216 // Only the files in the last level can be deleted externally.
4217 // This is to make sure that any deletion tombstones are not
4218 // lost. Check that the level passed is the last level.
4219 auto* vstoreage = cfd->current()->storage_info();
4220 for (int i = level + 1; i < cfd->NumberLevels(); i++) {
4221 if (vstoreage->NumLevelFiles(i) != 0) {
4222 ROCKS_LOG_WARN(immutable_db_options_.info_log,
4223 "DeleteFile %s FAILED. File not in last level\n",
4224 name.c_str());
4225 job_context.Clean();
4226 return Status::InvalidArgument("File not in last level");
4227 }
4228 }
4229 // if level == 0, it has to be the oldest file
4230 if (level == 0 &&
4231 vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
4232 ROCKS_LOG_WARN(immutable_db_options_.info_log,
4233 "DeleteFile %s failed ---"
4234 " target file in level 0 must be the oldest.",
4235 name.c_str());
4236 job_context.Clean();
4237 return Status::InvalidArgument("File in level 0, but not oldest");
4238 }
4239 edit.SetColumnFamily(cfd->GetID());
4240 edit.DeleteFile(level, number);
4241 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
4242 &edit, &mutex_, directories_.GetDbDir());
4243 if (status.ok()) {
494da23a
TL
4244 InstallSuperVersionAndScheduleWork(cfd,
4245 &job_context.superversion_contexts[0],
4246 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
4247 }
4248 FindObsoleteFiles(&job_context, false);
4249 } // lock released here
4250
4251 LogFlush(immutable_db_options_.info_log);
4252 // remove files outside the db-lock
4253 if (job_context.HaveSomethingToDelete()) {
4254 // Call PurgeObsoleteFiles() without holding mutex.
4255 PurgeObsoleteFiles(job_context);
4256 }
4257 job_context.Clean();
4258 return status;
4259}
4260
11fdf7f2
TL
4261Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
4262 const RangePtr* ranges, size_t n,
4263 bool include_end) {
1e59de90 4264 Status status = Status::OK();
20effc67 4265 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
7c673cae
FG
4266 ColumnFamilyData* cfd = cfh->cfd();
4267 VersionEdit edit;
11fdf7f2 4268 std::set<FileMetaData*> deleted_files;
7c673cae
FG
4269 JobContext job_context(next_job_id_.fetch_add(1), true);
4270 {
4271 InstrumentedMutexLock l(&mutex_);
4272 Version* input_version = cfd->current();
4273
4274 auto* vstorage = input_version->storage_info();
11fdf7f2
TL
4275 for (size_t r = 0; r < n; r++) {
4276 auto begin = ranges[r].start, end = ranges[r].limit;
4277 for (int i = 1; i < cfd->NumberLevels(); i++) {
4278 if (vstorage->LevelFiles(i).empty() ||
4279 !vstorage->OverlapInLevel(i, begin, end)) {
4280 continue;
4281 }
4282 std::vector<FileMetaData*> level_files;
4283 InternalKey begin_storage, end_storage, *begin_key, *end_key;
4284 if (begin == nullptr) {
4285 begin_key = nullptr;
4286 } else {
4287 begin_storage.SetMinPossibleForUserKey(*begin);
4288 begin_key = &begin_storage;
4289 }
4290 if (end == nullptr) {
4291 end_key = nullptr;
4292 } else {
4293 end_storage.SetMaxPossibleForUserKey(*end);
4294 end_key = &end_storage;
4295 }
7c673cae 4296
11fdf7f2
TL
4297 vstorage->GetCleanInputsWithinInterval(
4298 i, begin_key, end_key, &level_files, -1 /* hint_index */,
4299 nullptr /* file_index */);
4300 FileMetaData* level_file;
4301 for (uint32_t j = 0; j < level_files.size(); j++) {
4302 level_file = level_files[j];
7c673cae
FG
4303 if (level_file->being_compacted) {
4304 continue;
4305 }
11fdf7f2
TL
4306 if (deleted_files.find(level_file) != deleted_files.end()) {
4307 continue;
4308 }
4309 if (!include_end && end != nullptr &&
4310 cfd->user_comparator()->Compare(level_file->largest.user_key(),
4311 *end) == 0) {
4312 continue;
4313 }
7c673cae
FG
4314 edit.SetColumnFamily(cfd->GetID());
4315 edit.DeleteFile(i, level_file->fd.GetNumber());
11fdf7f2 4316 deleted_files.insert(level_file);
7c673cae
FG
4317 level_file->being_compacted = true;
4318 }
1e59de90
TL
4319 vstorage->ComputeCompactionScore(*cfd->ioptions(),
4320 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
4321 }
4322 }
4323 if (edit.GetDeletedFiles().empty()) {
4324 job_context.Clean();
1e59de90 4325 return status;
7c673cae
FG
4326 }
4327 input_version->Ref();
4328 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
4329 &edit, &mutex_, directories_.GetDbDir());
4330 if (status.ok()) {
494da23a
TL
4331 InstallSuperVersionAndScheduleWork(cfd,
4332 &job_context.superversion_contexts[0],
4333 *cfd->GetLatestMutableCFOptions());
7c673cae
FG
4334 }
4335 for (auto* deleted_file : deleted_files) {
4336 deleted_file->being_compacted = false;
4337 }
4338 input_version->Unref();
4339 FindObsoleteFiles(&job_context, false);
4340 } // lock released here
4341
4342 LogFlush(immutable_db_options_.info_log);
4343 // remove files outside the db-lock
4344 if (job_context.HaveSomethingToDelete()) {
4345 // Call PurgeObsoleteFiles() without holding mutex.
4346 PurgeObsoleteFiles(job_context);
4347 }
4348 job_context.Clean();
4349 return status;
4350}
4351
4352void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
4353 InstrumentedMutexLock l(&mutex_);
4354 versions_->GetLiveFilesMetaData(metadata);
4355}
4356
20effc67
TL
4357Status DBImpl::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
4358 InstrumentedMutexLock l(&mutex_);
4359 return versions_->GetLiveFilesChecksumInfo(checksum_list);
4360}
4361
11fdf7f2
TL
4362void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
4363 ColumnFamilyMetaData* cf_meta) {
7c673cae 4364 assert(column_family);
20effc67
TL
4365 auto* cfd =
4366 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
7c673cae 4367 auto* sv = GetAndRefSuperVersion(cfd);
f67539c2
TL
4368 {
4369 // Without mutex, Version::GetColumnFamilyMetaData will have data race with
4370 // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
4371 // this may cause regression. An alternative is to make
4372 // FileMetaData::being_compacted atomic, but it will make FileMetaData
4373 // non-copy-able. Another option is to separate these variables from
4374 // original FileMetaData struct, and this requires re-organization of data
4375 // structures. For now, we take the easy approach. If
4376 // DB::GetColumnFamilyMetaData is not called frequently, the regression
4377 // should not be big. We still need to keep an eye on it.
4378 InstrumentedMutexLock l(&mutex_);
4379 sv->current->GetColumnFamilyMetaData(cf_meta);
4380 }
7c673cae
FG
4381 ReturnAndCleanupSuperVersion(cfd, sv);
4382}
4383
1e59de90
TL
4384void DBImpl::GetAllColumnFamilyMetaData(
4385 std::vector<ColumnFamilyMetaData>* metadata) {
4386 InstrumentedMutexLock l(&mutex_);
4387 for (auto cfd : *(versions_->GetColumnFamilySet())) {
4388 {
4389 metadata->emplace_back();
4390 cfd->current()->GetColumnFamilyMetaData(&metadata->back());
4391 }
4392 }
4393}
4394
7c673cae
FG
4395#endif // ROCKSDB_LITE
4396
4397Status DBImpl::CheckConsistency() {
4398 mutex_.AssertHeld();
4399 std::vector<LiveFileMetaData> metadata;
4400 versions_->GetLiveFilesMetaData(&metadata);
f67539c2 4401 TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
7c673cae
FG
4402
4403 std::string corruption_messages;
7c673cae 4404
f67539c2
TL
4405 if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
4406 // Instead of calling GetFileSize() for each expected file, call
4407 // GetChildren() for the DB directory and check that all expected files
4408 // are listed, without checking their sizes.
4409 // Since sst files might be in different directories, do it for each
4410 // directory separately.
4411 std::map<std::string, std::vector<std::string>> files_by_directory;
4412 for (const auto& md : metadata) {
4413 // md.name has a leading "/". Remove it.
4414 std::string fname = md.name;
4415 if (!fname.empty() && fname[0] == '/') {
4416 fname = fname.substr(1);
4417 }
4418 files_by_directory[md.db_path].push_back(fname);
7c673cae 4419 }
1e59de90
TL
4420
4421 IOOptions io_opts;
4422 io_opts.do_not_recurse = true;
f67539c2
TL
4423 for (const auto& dir_files : files_by_directory) {
4424 std::string directory = dir_files.first;
4425 std::vector<std::string> existing_files;
1e59de90
TL
4426 Status s = fs_->GetChildren(directory, io_opts, &existing_files,
4427 /*IODebugContext*=*/nullptr);
f67539c2
TL
4428 if (!s.ok()) {
4429 corruption_messages +=
4430 "Can't list files in " + directory + ": " + s.ToString() + "\n";
4431 continue;
4432 }
4433 std::sort(existing_files.begin(), existing_files.end());
4434
4435 for (const std::string& fname : dir_files.second) {
4436 if (!std::binary_search(existing_files.begin(), existing_files.end(),
4437 fname) &&
4438 !std::binary_search(existing_files.begin(), existing_files.end(),
4439 Rocks2LevelTableFileName(fname))) {
4440 corruption_messages +=
4441 "Missing sst file " + fname + " in " + directory + "\n";
4442 }
4443 }
4444 }
4445 } else {
4446 for (const auto& md : metadata) {
4447 // md.name has a leading "/".
4448 std::string file_path = md.db_path + md.name;
4449
4450 uint64_t fsize = 0;
4451 TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
4452 Status s = env_->GetFileSize(file_path, &fsize);
4453 if (!s.ok() &&
4454 env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
4455 s = Status::OK();
4456 }
4457 if (!s.ok()) {
4458 corruption_messages +=
4459 "Can't access " + md.name + ": " + s.ToString() + "\n";
4460 } else if (fsize != md.size) {
4461 corruption_messages += "Sst file size mismatch: " + file_path +
4462 ". Size recorded in manifest " +
1e59de90
TL
4463 std::to_string(md.size) + ", actual size " +
4464 std::to_string(fsize) + "\n";
f67539c2 4465 }
7c673cae
FG
4466 }
4467 }
f67539c2 4468
7c673cae
FG
4469 if (corruption_messages.size() == 0) {
4470 return Status::OK();
4471 } else {
4472 return Status::Corruption(corruption_messages);
4473 }
4474}
4475
4476Status DBImpl::GetDbIdentity(std::string& identity) const {
f67539c2
TL
4477 identity.assign(db_id_);
4478 return Status::OK();
4479}
4480
4481Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
7c673cae 4482 std::string idfilename = IdentityFileName(dbname_);
f67539c2 4483 const FileOptions soptions;
7c673cae 4484
f67539c2 4485 Status s = ReadFileToString(fs_.get(), idfilename, identity);
7c673cae
FG
4486 if (!s.ok()) {
4487 return s;
4488 }
f67539c2 4489
1e59de90
TL
4490 // If last character is '\n' remove it from identity. (Old implementations
4491 // of Env::GenerateUniqueId() would include a trailing '\n'.)
f67539c2
TL
4492 if (identity->size() > 0 && identity->back() == '\n') {
4493 identity->pop_back();
7c673cae
FG
4494 }
4495 return s;
4496}
4497
20effc67
TL
4498Status DBImpl::GetDbSessionId(std::string& session_id) const {
4499 session_id.assign(db_session_id_);
4500 return Status::OK();
4501}
4502
1e59de90
TL
4503namespace {
4504SemiStructuredUniqueIdGen* DbSessionIdGen() {
4505 static SemiStructuredUniqueIdGen gen;
4506 return &gen;
4507}
4508} // namespace
4509
4510void DBImpl::TEST_ResetDbSessionIdGen() { DbSessionIdGen()->Reset(); }
4511
4512std::string DBImpl::GenerateDbSessionId(Env*) {
4513 // See SemiStructuredUniqueIdGen for its desirable properties.
4514 auto gen = DbSessionIdGen();
4515
4516 uint64_t lo, hi;
4517 gen->GenerateNext(&hi, &lo);
4518 if (lo == 0) {
4519 // Avoid emitting session ID with lo==0, so that SST unique
4520 // IDs can be more easily ensured non-zero
4521 gen->GenerateNext(&hi, &lo);
4522 assert(lo != 0);
20effc67 4523 }
1e59de90
TL
4524 return EncodeSessionId(hi, lo);
4525}
4526
4527void DBImpl::SetDbSessionId() {
4528 db_session_id_ = GenerateDbSessionId(env_);
20effc67
TL
4529 TEST_SYNC_POINT_CALLBACK("DBImpl::SetDbSessionId", &db_session_id_);
4530}
4531
7c673cae 4532// Default implementation -- returns not supported status
11fdf7f2
TL
4533Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
4534 const std::string& /*column_family_name*/,
4535 ColumnFamilyHandle** /*handle*/) {
4536 return Status::NotSupported("");
4537}
4538
4539Status DB::CreateColumnFamilies(
4540 const ColumnFamilyOptions& /*cf_options*/,
4541 const std::vector<std::string>& /*column_family_names*/,
4542 std::vector<ColumnFamilyHandle*>* /*handles*/) {
7c673cae
FG
4543 return Status::NotSupported("");
4544}
11fdf7f2
TL
4545
4546Status DB::CreateColumnFamilies(
4547 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
4548 std::vector<ColumnFamilyHandle*>* /*handles*/) {
4549 return Status::NotSupported("");
4550}
4551
4552Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
4553 return Status::NotSupported("");
4554}
4555
4556Status DB::DropColumnFamilies(
4557 const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
7c673cae
FG
4558 return Status::NotSupported("");
4559}
11fdf7f2 4560
7c673cae 4561Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
1e59de90
TL
4562 if (DefaultColumnFamily() == column_family) {
4563 return Status::InvalidArgument(
4564 "Cannot destroy the handle returned by DefaultColumnFamily()");
4565 }
7c673cae
FG
4566 delete column_family;
4567 return Status::OK();
4568}
4569
11fdf7f2
TL
4570DB::~DB() {}
4571
4572Status DBImpl::Close() {
1e59de90
TL
4573 InstrumentedMutexLock closing_lock_guard(&closing_mutex_);
4574 if (closed_) {
4575 return closing_status_;
4576 }
f67539c2 4577
1e59de90
TL
4578 {
4579 const Status s = MaybeReleaseTimestampedSnapshotsAndCheck();
4580 if (!s.ok()) {
4581 return s;
4582 }
11fdf7f2 4583 }
1e59de90
TL
4584
4585 closing_status_ = CloseImpl();
4586 closed_ = true;
4587 return closing_status_;
11fdf7f2 4588}
7c673cae
FG
4589
4590Status DB::ListColumnFamilies(const DBOptions& db_options,
4591 const std::string& name,
4592 std::vector<std::string>* column_families) {
20effc67
TL
4593 const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
4594 return VersionSet::ListColumnFamilies(column_families, name, fs.get());
7c673cae
FG
4595}
4596
11fdf7f2 4597Snapshot::~Snapshot() {}
7c673cae 4598
11fdf7f2
TL
4599Status DestroyDB(const std::string& dbname, const Options& options,
4600 const std::vector<ColumnFamilyDescriptor>& column_families) {
4601 ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
7c673cae
FG
4602 Env* env = soptions.env;
4603 std::vector<std::string> filenames;
1e59de90 4604 bool wal_in_db_path = soptions.IsWalDirSameAsDBPath();
7c673cae 4605
11fdf7f2
TL
4606 // Reset the logger because it holds a handle to the
4607 // log file and prevents cleanup and directory removal
4608 soptions.info_log.reset();
1e59de90 4609 IOOptions io_opts;
7c673cae 4610 // Ignore error in case directory does not exist
1e59de90
TL
4611 soptions.fs
4612 ->GetChildren(dbname, io_opts, &filenames,
4613 /*IODebugContext*=*/nullptr)
4614 .PermitUncheckedError();
7c673cae
FG
4615
4616 FileLock* lock;
4617 const std::string lockname = LockFileName(dbname);
4618 Status result = env->LockFile(lockname, &lock);
4619 if (result.ok()) {
4620 uint64_t number;
4621 FileType type;
4622 InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
11fdf7f2
TL
4623 for (const auto& fname : filenames) {
4624 if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
494da23a 4625 type != kDBLockFile) { // Lock file will be deleted at end
7c673cae 4626 Status del;
11fdf7f2 4627 std::string path_to_delete = dbname + "/" + fname;
7c673cae
FG
4628 if (type == kMetaDatabase) {
4629 del = DestroyDB(path_to_delete, options);
1e59de90
TL
4630 } else if (type == kTableFile || type == kWalFile ||
4631 type == kBlobFile) {
4632 del = DeleteDBFile(
4633 &soptions, path_to_delete, dbname,
4634 /*force_bg=*/false,
4635 /*force_fg=*/(type == kWalFile) ? !wal_in_db_path : false);
7c673cae
FG
4636 } else {
4637 del = env->DeleteFile(path_to_delete);
4638 }
20effc67 4639 if (!del.ok() && result.ok()) {
7c673cae
FG
4640 result = del;
4641 }
4642 }
4643 }
4644
20effc67
TL
4645 std::set<std::string> paths;
4646 for (const DbPath& db_path : options.db_paths) {
4647 paths.insert(db_path.path);
11fdf7f2 4648 }
20effc67
TL
4649 for (const ColumnFamilyDescriptor& cf : column_families) {
4650 for (const DbPath& cf_path : cf.options.cf_paths) {
4651 paths.insert(cf_path.path);
11fdf7f2
TL
4652 }
4653 }
1e59de90 4654
11fdf7f2 4655 for (const auto& path : paths) {
1e59de90
TL
4656 if (soptions.fs
4657 ->GetChildren(path, io_opts, &filenames,
4658 /*IODebugContext*=*/nullptr)
4659 .ok()) {
11fdf7f2
TL
4660 for (const auto& fname : filenames) {
4661 if (ParseFileName(fname, &number, &type) &&
1e59de90
TL
4662 (type == kTableFile ||
4663 type == kBlobFile)) { // Lock file will be deleted at end
4664 std::string file_path = path + "/" + fname;
4665 Status del = DeleteDBFile(&soptions, file_path, dbname,
f67539c2 4666 /*force_bg=*/false, /*force_fg=*/false);
20effc67 4667 if (!del.ok() && result.ok()) {
11fdf7f2
TL
4668 result = del;
4669 }
7c673cae
FG
4670 }
4671 }
20effc67
TL
4672 // TODO: Should we return an error if we cannot delete the directory?
4673 env->DeleteDir(path).PermitUncheckedError();
7c673cae
FG
4674 }
4675 }
4676
4677 std::vector<std::string> walDirFiles;
4678 std::string archivedir = ArchivalDirectory(dbname);
11fdf7f2 4679 bool wal_dir_exists = false;
1e59de90
TL
4680 if (!soptions.IsWalDirSameAsDBPath(dbname)) {
4681 wal_dir_exists =
4682 soptions.fs
4683 ->GetChildren(soptions.wal_dir, io_opts, &walDirFiles,
4684 /*IODebugContext*=*/nullptr)
4685 .ok();
7c673cae
FG
4686 archivedir = ArchivalDirectory(soptions.wal_dir);
4687 }
4688
11fdf7f2
TL
4689 // Archive dir may be inside wal dir or dbname and should be
4690 // processed and removed before those otherwise we have issues
4691 // removing them
4692 std::vector<std::string> archiveFiles;
1e59de90
TL
4693 if (soptions.fs
4694 ->GetChildren(archivedir, io_opts, &archiveFiles,
4695 /*IODebugContext*=*/nullptr)
4696 .ok()) {
11fdf7f2
TL
4697 // Delete archival files.
4698 for (const auto& file : archiveFiles) {
20effc67 4699 if (ParseFileName(file, &number, &type) && type == kWalFile) {
494da23a 4700 Status del =
f67539c2
TL
4701 DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
4702 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
20effc67 4703 if (!del.ok() && result.ok()) {
11fdf7f2
TL
4704 result = del;
4705 }
7c673cae
FG
4706 }
4707 }
20effc67
TL
4708 // Ignore error in case dir contains other files
4709 env->DeleteDir(archivedir).PermitUncheckedError();
7c673cae
FG
4710 }
4711
11fdf7f2
TL
4712 // Delete log files in the WAL dir
4713 if (wal_dir_exists) {
4714 for (const auto& file : walDirFiles) {
20effc67 4715 if (ParseFileName(file, &number, &type) && type == kWalFile) {
494da23a
TL
4716 Status del =
4717 DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
f67539c2
TL
4718 soptions.wal_dir, /*force_bg=*/false,
4719 /*force_fg=*/!wal_in_db_path);
20effc67 4720 if (!del.ok() && result.ok()) {
11fdf7f2
TL
4721 result = del;
4722 }
7c673cae
FG
4723 }
4724 }
20effc67
TL
4725 // Ignore error in case dir contains other files
4726 env->DeleteDir(soptions.wal_dir).PermitUncheckedError();
7c673cae
FG
4727 }
4728
20effc67
TL
4729 // Ignore error since state is already gone
4730 env->UnlockFile(lock).PermitUncheckedError();
4731 env->DeleteFile(lockname).PermitUncheckedError();
f67539c2
TL
4732
4733 // sst_file_manager holds a ref to the logger. Make sure the logger is
4734 // gone before trying to remove the directory.
4735 soptions.sst_file_manager.reset();
4736
20effc67
TL
4737 // Ignore error in case dir contains other files
4738 env->DeleteDir(dbname).PermitUncheckedError();
4739 ;
7c673cae
FG
4740 }
4741 return result;
4742}
4743
11fdf7f2
TL
4744Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
4745 bool need_enter_write_thread) {
7c673cae 4746#ifndef ROCKSDB_LITE
11fdf7f2
TL
4747 WriteThread::Writer w;
4748 if (need_mutex_lock) {
4749 mutex_.Lock();
4750 } else {
4751 mutex_.AssertHeld();
4752 }
4753 if (need_enter_write_thread) {
4754 write_thread_.EnterUnbatched(&w, &mutex_);
4755 }
7c673cae
FG
4756
4757 std::vector<std::string> cf_names;
4758 std::vector<ColumnFamilyOptions> cf_opts;
4759
4760 // This part requires mutex to protect the column family options
4761 for (auto cfd : *versions_->GetColumnFamilySet()) {
4762 if (cfd->IsDropped()) {
4763 continue;
4764 }
4765 cf_names.push_back(cfd->GetName());
4766 cf_opts.push_back(cfd->GetLatestCFOptions());
4767 }
4768
4769 // Unlock during expensive operations. New writes cannot get here
4770 // because the single write thread ensures all new writes get queued.
4771 DBOptions db_options =
4772 BuildDBOptions(immutable_db_options_, mutable_db_options_);
4773 mutex_.Unlock();
4774
11fdf7f2
TL
4775 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
4776 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
1e59de90
TL
4777 TEST_SYNC_POINT_CALLBACK("DBImpl::WriteOptionsFile:PersistOptions",
4778 &db_options);
11fdf7f2 4779
7c673cae
FG
4780 std::string file_name =
4781 TempOptionsFileName(GetName(), versions_->NewFileNumber());
f67539c2 4782 Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
20effc67 4783 fs_.get());
7c673cae
FG
4784
4785 if (s.ok()) {
4786 s = RenameTempFileToOptionsFile(file_name);
4787 }
11fdf7f2
TL
4788 // restore lock
4789 if (!need_mutex_lock) {
4790 mutex_.Lock();
4791 }
4792 if (need_enter_write_thread) {
4793 write_thread_.ExitUnbatched(&w);
4794 }
4795 if (!s.ok()) {
4796 ROCKS_LOG_WARN(immutable_db_options_.info_log,
4797 "Unnable to persist options -- %s", s.ToString().c_str());
4798 if (immutable_db_options_.fail_if_options_file_error) {
4799 return Status::IOError("Unable to persist options.",
4800 s.ToString().c_str());
4801 }
4802 }
7c673cae 4803#else
11fdf7f2
TL
4804 (void)need_mutex_lock;
4805 (void)need_enter_write_thread;
7c673cae 4806#endif // !ROCKSDB_LITE
11fdf7f2 4807 return Status::OK();
7c673cae
FG
4808}
4809
4810#ifndef ROCKSDB_LITE
4811namespace {
4812void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
4813 const size_t num_files_to_keep,
4814 const std::shared_ptr<Logger>& info_log,
4815 Env* env) {
4816 if (filenames.size() <= num_files_to_keep) {
4817 return;
4818 }
4819 for (auto iter = std::next(filenames.begin(), num_files_to_keep);
4820 iter != filenames.end(); ++iter) {
4821 if (!env->DeleteFile(iter->second).ok()) {
4822 ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
4823 iter->second.c_str());
4824 }
4825 }
4826}
4827} // namespace
4828#endif // !ROCKSDB_LITE
4829
4830Status DBImpl::DeleteObsoleteOptionsFiles() {
4831#ifndef ROCKSDB_LITE
4832 std::vector<std::string> filenames;
4833 // use ordered map to store keep the filenames sorted from the newest
4834 // to the oldest.
4835 std::map<uint64_t, std::string> options_filenames;
4836 Status s;
1e59de90
TL
4837 IOOptions io_opts;
4838 io_opts.do_not_recurse = true;
4839 s = fs_->GetChildren(GetName(), io_opts, &filenames,
4840 /*IODebugContext*=*/nullptr);
7c673cae
FG
4841 if (!s.ok()) {
4842 return s;
4843 }
4844 for (auto& filename : filenames) {
4845 uint64_t file_number;
4846 FileType type;
4847 if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
4848 options_filenames.insert(
4849 {std::numeric_limits<uint64_t>::max() - file_number,
4850 GetName() + "/" + filename});
4851 }
4852 }
4853
4854 // Keeps the latest 2 Options file
4855 const size_t kNumOptionsFilesKept = 2;
4856 DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
4857 immutable_db_options_.info_log, GetEnv());
4858 return Status::OK();
4859#else
4860 return Status::OK();
4861#endif // !ROCKSDB_LITE
4862}
4863
4864Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
4865#ifndef ROCKSDB_LITE
4866 Status s;
4867
494da23a 4868 uint64_t options_file_number = versions_->NewFileNumber();
7c673cae 4869 std::string options_file_name =
494da23a 4870 OptionsFileName(GetName(), options_file_number);
1e59de90
TL
4871 uint64_t options_file_size = 0;
4872 s = GetEnv()->GetFileSize(file_name, &options_file_size);
4873 if (s.ok()) {
4874 // Retry if the file name happen to conflict with an existing one.
4875 s = GetEnv()->RenameFile(file_name, options_file_name);
4876 std::unique_ptr<FSDirectory> dir_obj;
4877 if (s.ok()) {
4878 s = fs_->NewDirectory(GetName(), IOOptions(), &dir_obj, nullptr);
4879 }
4880 if (s.ok()) {
4881 s = dir_obj->FsyncWithDirOptions(IOOptions(), nullptr,
4882 DirFsyncOptions(options_file_name));
4883 }
4884 if (s.ok()) {
4885 Status temp_s = dir_obj->Close(IOOptions(), nullptr);
4886 // The default Close() could return "NotSupproted" and we bypass it
4887 // if it is not impelmented. Detailed explanations can be found in
4888 // db/db_impl/db_impl.h
4889 if (!temp_s.ok()) {
4890 if (temp_s.IsNotSupported()) {
4891 temp_s.PermitUncheckedError();
4892 } else {
4893 s = temp_s;
4894 }
4895 }
4896 }
4897 }
494da23a
TL
4898 if (s.ok()) {
4899 InstrumentedMutexLock l(&mutex_);
4900 versions_->options_file_number_ = options_file_number;
1e59de90 4901 versions_->options_file_size_ = options_file_size;
494da23a 4902 }
7c673cae 4903
11fdf7f2 4904 if (0 == disable_delete_obsolete_files_) {
20effc67
TL
4905 // TODO: Should we check for errors here?
4906 DeleteObsoleteOptionsFiles().PermitUncheckedError();
11fdf7f2 4907 }
7c673cae
FG
4908 return s;
4909#else
11fdf7f2 4910 (void)file_name;
7c673cae
FG
4911 return Status::OK();
4912#endif // !ROCKSDB_LITE
4913}
4914
4915#ifdef ROCKSDB_USING_THREAD_STATUS
4916
11fdf7f2 4917void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
7c673cae
FG
4918 if (immutable_db_options_.enable_thread_tracking) {
4919 ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
4920 cfd->ioptions()->env);
4921 }
4922}
4923
11fdf7f2 4924void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
7c673cae
FG
4925 if (immutable_db_options_.enable_thread_tracking) {
4926 ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
4927 }
4928}
4929
4930void DBImpl::EraseThreadStatusDbInfo() const {
4931 if (immutable_db_options_.enable_thread_tracking) {
4932 ThreadStatusUtil::EraseDatabaseInfo(this);
4933 }
4934}
4935
4936#else
11fdf7f2 4937void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
7c673cae 4938
11fdf7f2 4939void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
7c673cae 4940
11fdf7f2 4941void DBImpl::EraseThreadStatusDbInfo() const {}
7c673cae
FG
4942#endif // ROCKSDB_USING_THREAD_STATUS
4943
4944//
4945// A global method that can dump out the build version
11fdf7f2 4946void DumpRocksDBBuildVersion(Logger* log) {
1e59de90
TL
4947 ROCKS_LOG_HEADER(log, "RocksDB version: %s\n",
4948 GetRocksVersionAsString().c_str());
4949 const auto& props = GetRocksBuildProperties();
4950 const auto& sha = props.find("rocksdb_build_git_sha");
4951 if (sha != props.end()) {
4952 ROCKS_LOG_HEADER(log, "Git sha %s", sha->second.c_str());
4953 }
4954 const auto date = props.find("rocksdb_build_date");
4955 if (date != props.end()) {
4956 ROCKS_LOG_HEADER(log, "Compile date %s", date->second.c_str());
4957 }
7c673cae
FG
4958}
4959
4960#ifndef ROCKSDB_LITE
4961SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
4962 bool include_history) {
4963 // Find the earliest sequence number that we know we can rely on reading
4964 // from the memtable without needing to check sst files.
4965 SequenceNumber earliest_seq =
4966 sv->imm->GetEarliestSequenceNumber(include_history);
4967 if (earliest_seq == kMaxSequenceNumber) {
4968 earliest_seq = sv->mem->GetEarliestSequenceNumber();
4969 }
4970 assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
4971
4972 return earliest_seq;
4973}
7c673cae 4974
1e59de90
TL
4975Status DBImpl::GetLatestSequenceForKey(
4976 SuperVersion* sv, const Slice& key, bool cache_only,
4977 SequenceNumber lower_bound_seq, SequenceNumber* seq, std::string* timestamp,
4978 bool* found_record_for_key, bool* is_blob_index) {
7c673cae
FG
4979 Status s;
4980 MergeContext merge_context;
494da23a 4981 SequenceNumber max_covering_tombstone_seq = 0;
7c673cae
FG
4982
4983 ReadOptions read_options;
4984 SequenceNumber current_seq = versions_->LastSequence();
1e59de90
TL
4985
4986 ColumnFamilyData* cfd = sv->cfd;
4987 assert(cfd);
4988 const Comparator* const ucmp = cfd->user_comparator();
4989 assert(ucmp);
4990 size_t ts_sz = ucmp->timestamp_size();
4991 std::string ts_buf;
4992 if (ts_sz > 0) {
4993 assert(timestamp);
4994 ts_buf.assign(ts_sz, '\xff');
4995 } else {
4996 assert(!timestamp);
4997 }
4998 Slice ts(ts_buf);
4999
5000 LookupKey lkey(key, current_seq, ts_sz == 0 ? nullptr : &ts);
7c673cae
FG
5001
5002 *seq = kMaxSequenceNumber;
5003 *found_record_for_key = false;
5004
5005 // Check if there is a record for this key in the latest memtable
1e59de90
TL
5006 sv->mem->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr, timestamp, &s,
5007 &merge_context, &max_covering_tombstone_seq, seq, read_options,
5008 false /* immutable_memtable */, nullptr /*read_callback*/,
5009 is_blob_index);
7c673cae
FG
5010
5011 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
5012 // unexpected error reading memtable.
5013 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
5014 "Unexpected status returned from MemTable::Get: %s\n",
5015 s.ToString().c_str());
5016
5017 return s;
5018 }
1e59de90
TL
5019 assert(!ts_sz ||
5020 (*seq != kMaxSequenceNumber &&
5021 *timestamp != std::string(ts_sz, '\xff')) ||
5022 (*seq == kMaxSequenceNumber && timestamp->empty()));
5023
5024 TEST_SYNC_POINT_CALLBACK("DBImpl::GetLatestSequenceForKey:mem", timestamp);
7c673cae
FG
5025
5026 if (*seq != kMaxSequenceNumber) {
5027 // Found a sequence number, no need to check immutable memtables
5028 *found_record_for_key = true;
5029 return Status::OK();
5030 }
5031
f67539c2
TL
5032 SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
5033 if (lower_bound_in_mem != kMaxSequenceNumber &&
5034 lower_bound_in_mem < lower_bound_seq) {
5035 *found_record_for_key = false;
5036 return Status::OK();
5037 }
5038
7c673cae 5039 // Check if there is a record for this key in the immutable memtables
1e59de90
TL
5040 sv->imm->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr, timestamp, &s,
5041 &merge_context, &max_covering_tombstone_seq, seq, read_options,
20effc67 5042 nullptr /*read_callback*/, is_blob_index);
7c673cae
FG
5043
5044 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
5045 // unexpected error reading memtable.
5046 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
5047 "Unexpected status returned from MemTableList::Get: %s\n",
5048 s.ToString().c_str());
5049
5050 return s;
5051 }
5052
1e59de90
TL
5053 assert(!ts_sz ||
5054 (*seq != kMaxSequenceNumber &&
5055 *timestamp != std::string(ts_sz, '\xff')) ||
5056 (*seq == kMaxSequenceNumber && timestamp->empty()));
5057
7c673cae
FG
5058 if (*seq != kMaxSequenceNumber) {
5059 // Found a sequence number, no need to check memtable history
5060 *found_record_for_key = true;
5061 return Status::OK();
5062 }
5063
f67539c2
TL
5064 SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
5065 if (lower_bound_in_imm != kMaxSequenceNumber &&
5066 lower_bound_in_imm < lower_bound_seq) {
5067 *found_record_for_key = false;
5068 return Status::OK();
5069 }
5070
7c673cae 5071 // Check if there is a record for this key in the immutable memtables
1e59de90
TL
5072 sv->imm->GetFromHistory(lkey, /*value=*/nullptr, /*columns=*/nullptr,
5073 timestamp, &s, &merge_context,
494da23a
TL
5074 &max_covering_tombstone_seq, seq, read_options,
5075 is_blob_index);
7c673cae
FG
5076
5077 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
5078 // unexpected error reading memtable.
5079 ROCKS_LOG_ERROR(
5080 immutable_db_options_.info_log,
5081 "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
5082 s.ToString().c_str());
5083
5084 return s;
5085 }
5086
1e59de90
TL
5087 assert(!ts_sz ||
5088 (*seq != kMaxSequenceNumber &&
5089 *timestamp != std::string(ts_sz, '\xff')) ||
5090 (*seq == kMaxSequenceNumber && timestamp->empty()));
5091
7c673cae
FG
5092 if (*seq != kMaxSequenceNumber) {
5093 // Found a sequence number, no need to check SST files
1e59de90 5094 assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff'));
7c673cae
FG
5095 *found_record_for_key = true;
5096 return Status::OK();
5097 }
5098
f67539c2
TL
5099 // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
5100 // check here to skip the history if possible. But currently the caller
5101 // already does that. Maybe we should move the logic here later.
5102
7c673cae
FG
5103 // TODO(agiardullo): possible optimization: consider checking cached
5104 // SST files if cache_only=true?
5105 if (!cache_only) {
5106 // Check tables
1e59de90
TL
5107 PinnedIteratorsManager pinned_iters_mgr;
5108 sv->current->Get(read_options, lkey, /*value=*/nullptr, /*columns=*/nullptr,
5109 timestamp, &s, &merge_context, &max_covering_tombstone_seq,
5110 &pinned_iters_mgr, nullptr /* value_found */,
11fdf7f2
TL
5111 found_record_for_key, seq, nullptr /*read_callback*/,
5112 is_blob_index);
7c673cae
FG
5113
5114 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
5115 // unexpected error reading SST files
5116 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
5117 "Unexpected status returned from Version::Get: %s\n",
5118 s.ToString().c_str());
7c673cae
FG
5119 }
5120 }
5121
494da23a 5122 return s;
7c673cae
FG
5123}
5124
5125Status DBImpl::IngestExternalFile(
5126 ColumnFamilyHandle* column_family,
5127 const std::vector<std::string>& external_files,
5128 const IngestExternalFileOptions& ingestion_options) {
494da23a
TL
5129 IngestExternalFileArg arg;
5130 arg.column_family = column_family;
5131 arg.external_files = external_files;
5132 arg.options = ingestion_options;
5133 return IngestExternalFiles({arg});
5134}
7c673cae 5135
494da23a
TL
5136Status DBImpl::IngestExternalFiles(
5137 const std::vector<IngestExternalFileArg>& args) {
5138 if (args.empty()) {
5139 return Status::InvalidArgument("ingestion arg list is empty");
5140 }
5141 {
5142 std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
5143 for (const auto& arg : args) {
5144 if (arg.column_family == nullptr) {
5145 return Status::InvalidArgument("column family handle is null");
5146 } else if (unique_cfhs.count(arg.column_family) > 0) {
5147 return Status::InvalidArgument(
5148 "ingestion args have duplicate column families");
5149 }
5150 unique_cfhs.insert(arg.column_family);
5151 }
5152 }
5153 // Ingest multiple external SST files atomically.
1e59de90 5154 const size_t num_cfs = args.size();
494da23a
TL
5155 for (size_t i = 0; i != num_cfs; ++i) {
5156 if (args[i].external_files.empty()) {
5157 char err_msg[128] = {0};
5158 snprintf(err_msg, 128, "external_files[%zu] is empty", i);
5159 return Status::InvalidArgument(err_msg);
5160 }
5161 }
5162 for (const auto& arg : args) {
5163 const IngestExternalFileOptions& ingest_opts = arg.options;
5164 if (ingest_opts.ingest_behind &&
5165 !immutable_db_options_.allow_ingest_behind) {
11fdf7f2 5166 return Status::InvalidArgument(
494da23a 5167 "can't ingest_behind file in DB with allow_ingest_behind=false");
11fdf7f2
TL
5168 }
5169 }
5170
494da23a
TL
5171 // TODO (yanqin) maybe handle the case in which column_families have
5172 // duplicates
f67539c2 5173 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
494da23a
TL
5174 size_t total = 0;
5175 for (const auto& arg : args) {
5176 total += arg.external_files.size();
11fdf7f2 5177 }
494da23a
TL
5178 uint64_t next_file_number = 0;
5179 Status status = ReserveFileNumbersBeforeIngestion(
5180 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
f67539c2 5181 pending_output_elem, &next_file_number);
11fdf7f2
TL
5182 if (!status.ok()) {
5183 InstrumentedMutexLock l(&mutex_);
5184 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
5185 return status;
7c673cae
FG
5186 }
5187
494da23a
TL
5188 std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
5189 for (const auto& arg : args) {
5190 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
1e59de90
TL
5191 ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
5192 file_options_, &snapshots_, arg.options,
5193 &directories_, &event_logger_, io_tracer_);
494da23a 5194 }
1e59de90 5195
494da23a 5196 // TODO(yanqin) maybe make jobs run in parallel
f67539c2 5197 uint64_t start_file_number = next_file_number;
494da23a 5198 for (size_t i = 1; i != num_cfs; ++i) {
f67539c2 5199 start_file_number += args[i - 1].external_files.size();
494da23a
TL
5200 auto* cfd =
5201 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
f67539c2 5202 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
1e59de90 5203 Status es = ingestion_jobs[i].Prepare(
20effc67 5204 args[i].external_files, args[i].files_checksums,
1e59de90
TL
5205 args[i].files_checksum_func_names, args[i].file_temperature,
5206 start_file_number, super_version);
5207 // capture first error only
5208 if (!es.ok() && status.ok()) {
5209 status = es;
5210 }
494da23a
TL
5211 CleanupSuperVersion(super_version);
5212 }
5213 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
5214 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
5215 {
5216 auto* cfd =
5217 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
f67539c2 5218 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
1e59de90 5219 Status es = ingestion_jobs[0].Prepare(
20effc67 5220 args[0].external_files, args[0].files_checksums,
1e59de90
TL
5221 args[0].files_checksum_func_names, args[0].file_temperature,
5222 next_file_number, super_version);
5223 if (!es.ok()) {
5224 status = es;
494da23a 5225 }
1e59de90 5226 CleanupSuperVersion(super_version);
494da23a 5227 }
7c673cae 5228 if (!status.ok()) {
494da23a 5229 for (size_t i = 0; i != num_cfs; ++i) {
1e59de90 5230 ingestion_jobs[i].Cleanup(status);
494da23a 5231 }
11fdf7f2
TL
5232 InstrumentedMutexLock l(&mutex_);
5233 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
7c673cae
FG
5234 return status;
5235 }
5236
494da23a
TL
5237 std::vector<SuperVersionContext> sv_ctxs;
5238 for (size_t i = 0; i != num_cfs; ++i) {
5239 sv_ctxs.emplace_back(true /* create_superversion */);
5240 }
5241 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
5242 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
7c673cae
FG
5243 TEST_SYNC_POINT("DBImpl::AddFile:Start");
5244 {
7c673cae
FG
5245 InstrumentedMutexLock l(&mutex_);
5246 TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
5247
11fdf7f2 5248 // Stop writes to the DB by entering both write threads
7c673cae
FG
5249 WriteThread::Writer w;
5250 write_thread_.EnterUnbatched(&w, &mutex_);
11fdf7f2
TL
5251 WriteThread::Writer nonmem_w;
5252 if (two_write_queues_) {
5253 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
5254 }
7c673cae 5255
f67539c2
TL
5256 // When unordered_write is enabled, the keys are writing to memtable in an
5257 // unordered way. If the ingestion job checks memtable key range before the
5258 // key landing in memtable, the ingestion job may skip the necessary
5259 // memtable flush.
5260 // So wait here to ensure there is no pending write to memtable.
5261 WaitForPendingWrites();
5262
494da23a
TL
5263 num_running_ingest_file_ += static_cast<int>(num_cfs);
5264 TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
7c673cae 5265
494da23a
TL
5266 bool at_least_one_cf_need_flush = false;
5267 std::vector<bool> need_flush(num_cfs, false);
5268 for (size_t i = 0; i != num_cfs; ++i) {
5269 auto* cfd =
5270 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
5271 if (cfd->IsDropped()) {
5272 // TODO (yanqin) investigate whether we should abort ingestion or
5273 // proceed with other non-dropped column families.
5274 status = Status::InvalidArgument(
5275 "cannot ingest an external file into a dropped CF");
5276 break;
5277 }
5278 bool tmp = false;
5279 status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
5280 need_flush[i] = tmp;
5281 at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
5282 if (!status.ok()) {
5283 break;
5284 }
7c673cae 5285 }
494da23a
TL
5286 TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
5287 &at_least_one_cf_need_flush);
7c673cae 5288
494da23a
TL
5289 if (status.ok() && at_least_one_cf_need_flush) {
5290 FlushOptions flush_opts;
5291 flush_opts.allow_write_stall = true;
5292 if (immutable_db_options_.atomic_flush) {
5293 autovector<ColumnFamilyData*> cfds_to_flush;
5294 SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
7c673cae 5295 mutex_.Unlock();
494da23a
TL
5296 status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
5297 FlushReason::kExternalFileIngestion,
1e59de90 5298 true /* entered_write_thread */);
7c673cae 5299 mutex_.Lock();
494da23a
TL
5300 } else {
5301 for (size_t i = 0; i != num_cfs; ++i) {
5302 if (need_flush[i]) {
5303 mutex_.Unlock();
5304 auto* cfd =
5305 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
5306 ->cfd();
5307 status = FlushMemTable(cfd, flush_opts,
5308 FlushReason::kExternalFileIngestion,
1e59de90 5309 true /* entered_write_thread */);
494da23a
TL
5310 mutex_.Lock();
5311 if (!status.ok()) {
5312 break;
5313 }
5314 }
5315 }
7c673cae
FG
5316 }
5317 }
494da23a 5318 // Run ingestion jobs.
7c673cae 5319 if (status.ok()) {
494da23a
TL
5320 for (size_t i = 0; i != num_cfs; ++i) {
5321 status = ingestion_jobs[i].Run();
5322 if (!status.ok()) {
5323 break;
5324 }
5325 }
7c673cae 5326 }
7c673cae 5327 if (status.ok()) {
494da23a
TL
5328 autovector<ColumnFamilyData*> cfds_to_commit;
5329 autovector<const MutableCFOptions*> mutable_cf_options_list;
5330 autovector<autovector<VersionEdit*>> edit_lists;
5331 uint32_t num_entries = 0;
5332 for (size_t i = 0; i != num_cfs; ++i) {
5333 auto* cfd =
5334 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
5335 if (cfd->IsDropped()) {
5336 continue;
5337 }
5338 cfds_to_commit.push_back(cfd);
5339 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
5340 autovector<VersionEdit*> edit_list;
5341 edit_list.push_back(ingestion_jobs[i].edit());
5342 edit_lists.push_back(edit_list);
5343 ++num_entries;
5344 }
5345 // Mark the version edits as an atomic group if the number of version
5346 // edits exceeds 1.
5347 if (cfds_to_commit.size() > 1) {
5348 for (auto& edits : edit_lists) {
5349 assert(edits.size() == 1);
5350 edits[0]->MarkAtomicGroup(--num_entries);
5351 }
5352 assert(0 == num_entries);
5353 }
7c673cae 5354 status =
494da23a
TL
5355 versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
5356 edit_lists, &mutex_, directories_.GetDbDir());
1e59de90
TL
5357 // It is safe to update VersionSet last seqno here after LogAndApply since
5358 // LogAndApply persists last sequence number from VersionEdits,
5359 // which are from file's largest seqno and not from VersionSet.
5360 //
5361 // It is necessary to update last seqno here since LogAndApply releases
5362 // mutex when persisting MANIFEST file, and the snapshots taken during
5363 // that period will not be stable if VersionSet last seqno is updated
5364 // before LogAndApply.
5365 int consumed_seqno_count =
5366 ingestion_jobs[0].ConsumedSequenceNumbersCount();
5367 for (size_t i = 1; i != num_cfs; ++i) {
5368 consumed_seqno_count =
5369 std::max(consumed_seqno_count,
5370 ingestion_jobs[i].ConsumedSequenceNumbersCount());
5371 }
5372 if (consumed_seqno_count > 0) {
5373 const SequenceNumber last_seqno = versions_->LastSequence();
5374 versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
5375 versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
5376 versions_->SetLastSequence(last_seqno + consumed_seqno_count);
5377 }
7c673cae 5378 }
494da23a 5379
7c673cae 5380 if (status.ok()) {
494da23a
TL
5381 for (size_t i = 0; i != num_cfs; ++i) {
5382 auto* cfd =
5383 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
5384 if (!cfd->IsDropped()) {
5385 InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
5386 *cfd->GetLatestMutableCFOptions());
5387#ifndef NDEBUG
5388 if (0 == i && num_cfs > 1) {
5389 TEST_SYNC_POINT(
5390 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
5391 TEST_SYNC_POINT(
5392 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
5393 }
5394#endif // !NDEBUG
5395 }
5396 }
20effc67
TL
5397 } else if (versions_->io_status().IsIOError()) {
5398 // Error while writing to MANIFEST.
5399 // In fact, versions_->io_status() can also be the result of renaming
5400 // CURRENT file. With current code, it's just difficult to tell. So just
5401 // be pessimistic and try write to a new MANIFEST.
5402 // TODO: distinguish between MANIFEST write and CURRENT renaming
5403 const IOStatus& io_s = versions_->io_status();
5404 // Should handle return error?
1e59de90 5405 error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
7c673cae
FG
5406 }
5407
5408 // Resume writes to the DB
11fdf7f2
TL
5409 if (two_write_queues_) {
5410 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
5411 }
7c673cae
FG
5412 write_thread_.ExitUnbatched(&w);
5413
7c673cae 5414 if (status.ok()) {
494da23a
TL
5415 for (auto& job : ingestion_jobs) {
5416 job.UpdateStats();
5417 }
7c673cae 5418 }
7c673cae 5419 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
494da23a
TL
5420 num_running_ingest_file_ -= static_cast<int>(num_cfs);
5421 if (0 == num_running_ingest_file_) {
7c673cae
FG
5422 bg_cv_.SignalAll();
5423 }
7c673cae
FG
5424 TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
5425 }
5426 // mutex_ is unlocked here
5427
5428 // Cleanup
494da23a
TL
5429 for (size_t i = 0; i != num_cfs; ++i) {
5430 sv_ctxs[i].Clean();
5431 // This may rollback jobs that have completed successfully. This is
5432 // intended for atomicity.
5433 ingestion_jobs[i].Cleanup(status);
5434 }
7c673cae 5435 if (status.ok()) {
494da23a
TL
5436 for (size_t i = 0; i != num_cfs; ++i) {
5437 auto* cfd =
5438 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
5439 if (!cfd->IsDropped()) {
5440 NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
5441 }
5442 }
7c673cae 5443 }
7c673cae
FG
5444 return status;
5445}
5446
f67539c2
TL
5447Status DBImpl::CreateColumnFamilyWithImport(
5448 const ColumnFamilyOptions& options, const std::string& column_family_name,
5449 const ImportColumnFamilyOptions& import_options,
5450 const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
5451 assert(handle != nullptr);
5452 assert(*handle == nullptr);
5453 std::string cf_comparator_name = options.comparator->Name();
5454 if (cf_comparator_name != metadata.db_comparator_name) {
5455 return Status::InvalidArgument("Comparator name mismatch");
5456 }
5457
5458 // Create column family.
5459 auto status = CreateColumnFamily(options, column_family_name, handle);
5460 if (!status.ok()) {
5461 return status;
5462 }
5463
5464 // Import sst files from metadata.
20effc67 5465 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(*handle);
f67539c2 5466 auto cfd = cfh->cfd();
1e59de90
TL
5467 ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
5468 file_options_, import_options,
5469 metadata.files, io_tracer_);
f67539c2
TL
5470
5471 SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
5472 VersionEdit dummy_edit;
5473 uint64_t next_file_number = 0;
5474 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
5475 {
5476 // Lock db mutex
5477 InstrumentedMutexLock l(&mutex_);
5478 if (error_handler_.IsDBStopped()) {
5479 // Don't import files when there is a bg_error
5480 status = error_handler_.GetBGError();
5481 }
5482
5483 // Make sure that bg cleanup wont delete the files that we are importing
5484 pending_output_elem.reset(new std::list<uint64_t>::iterator(
5485 CaptureCurrentFileNumberInPendingOutputs()));
5486
5487 if (status.ok()) {
5488 // If crash happen after a hard link established, Recover function may
5489 // reuse the file number that has already assigned to the internal file,
5490 // and this will overwrite the external file. To protect the external
5491 // file, we have to make sure the file number will never being reused.
5492 next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
5493 auto cf_options = cfd->GetLatestMutableCFOptions();
5494 status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
5495 directories_.GetDbDir());
5496 if (status.ok()) {
5497 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
5498 }
5499 }
5500 }
5501 dummy_sv_ctx.Clean();
5502
5503 if (status.ok()) {
5504 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
5505 status = import_job.Prepare(next_file_number, sv);
5506 CleanupSuperVersion(sv);
5507 }
5508
5509 if (status.ok()) {
5510 SuperVersionContext sv_context(true /*create_superversion*/);
5511 {
5512 // Lock db mutex
5513 InstrumentedMutexLock l(&mutex_);
5514
5515 // Stop writes to the DB by entering both write threads
5516 WriteThread::Writer w;
5517 write_thread_.EnterUnbatched(&w, &mutex_);
5518 WriteThread::Writer nonmem_w;
5519 if (two_write_queues_) {
5520 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
5521 }
5522
5523 num_running_ingest_file_++;
5524 assert(!cfd->IsDropped());
5525 status = import_job.Run();
5526
5527 // Install job edit [Mutex will be unlocked here]
5528 if (status.ok()) {
5529 auto cf_options = cfd->GetLatestMutableCFOptions();
5530 status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
5531 &mutex_, directories_.GetDbDir());
5532 if (status.ok()) {
5533 InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
5534 }
5535 }
5536
5537 // Resume writes to the DB
5538 if (two_write_queues_) {
5539 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
5540 }
5541 write_thread_.ExitUnbatched(&w);
5542
5543 num_running_ingest_file_--;
5544 if (num_running_ingest_file_ == 0) {
5545 bg_cv_.SignalAll();
5546 }
5547 }
5548 // mutex_ is unlocked here
5549
5550 sv_context.Clean();
5551 }
5552
5553 {
5554 InstrumentedMutexLock l(&mutex_);
5555 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
5556 }
5557
5558 import_job.Cleanup(status);
5559 if (!status.ok()) {
20effc67
TL
5560 Status temp_s = DropColumnFamily(*handle);
5561 if (!temp_s.ok()) {
5562 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
5563 "DropColumnFamily failed with error %s",
5564 temp_s.ToString().c_str());
5565 }
5566 // Always returns Status::OK()
5567 temp_s = DestroyColumnFamilyHandle(*handle);
5568 assert(temp_s.ok());
f67539c2
TL
5569 *handle = nullptr;
5570 }
5571 return status;
5572}
5573
20effc67
TL
5574Status DBImpl::VerifyFileChecksums(const ReadOptions& read_options) {
5575 return VerifyChecksumInternal(read_options, /*use_file_checksum=*/true);
5576}
5577
f67539c2 5578Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
20effc67
TL
5579 return VerifyChecksumInternal(read_options, /*use_file_checksum=*/false);
5580}
5581
5582Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
5583 bool use_file_checksum) {
1e59de90
TL
5584 // `bytes_read` stat is enabled based on compile-time support and cannot
5585 // be dynamically toggled. So we do not need to worry about `PerfLevel`
5586 // here, unlike many other `IOStatsContext` / `PerfContext` stats.
5587 uint64_t prev_bytes_read = IOSTATS(bytes_read);
5588
11fdf7f2 5589 Status s;
20effc67
TL
5590
5591 if (use_file_checksum) {
5592 FileChecksumGenFactory* const file_checksum_gen_factory =
5593 immutable_db_options_.file_checksum_gen_factory.get();
5594 if (!file_checksum_gen_factory) {
5595 s = Status::InvalidArgument(
5596 "Cannot verify file checksum if options.file_checksum_gen_factory is "
5597 "null");
5598 return s;
5599 }
5600 }
5601
1e59de90 5602 // TODO: simplify using GetRefedColumnFamilySet?
11fdf7f2
TL
5603 std::vector<ColumnFamilyData*> cfd_list;
5604 {
5605 InstrumentedMutexLock l(&mutex_);
5606 for (auto cfd : *versions_->GetColumnFamilySet()) {
5607 if (!cfd->IsDropped() && cfd->initialized()) {
5608 cfd->Ref();
5609 cfd_list.push_back(cfd);
5610 }
5611 }
5612 }
5613 std::vector<SuperVersion*> sv_list;
5614 for (auto cfd : cfd_list) {
f67539c2 5615 sv_list.push_back(cfd->GetReferencedSuperVersion(this));
11fdf7f2 5616 }
20effc67 5617
11fdf7f2
TL
5618 for (auto& sv : sv_list) {
5619 VersionStorageInfo* vstorage = sv->current->storage_info();
5620 ColumnFamilyData* cfd = sv->current->cfd();
5621 Options opts;
20effc67 5622 if (!use_file_checksum) {
11fdf7f2 5623 InstrumentedMutexLock l(&mutex_);
494da23a
TL
5624 opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
5625 cfd->GetLatestCFOptions());
11fdf7f2
TL
5626 }
5627 for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
5628 for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
5629 j++) {
20effc67
TL
5630 const auto& fd_with_krange = vstorage->LevelFilesBrief(i).files[j];
5631 const auto& fd = fd_with_krange.fd;
5632 const FileMetaData* fmeta = fd_with_krange.file_metadata;
5633 assert(fmeta);
11fdf7f2
TL
5634 std::string fname = TableFileName(cfd->ioptions()->cf_paths,
5635 fd.GetNumber(), fd.GetPathId());
20effc67 5636 if (use_file_checksum) {
1e59de90
TL
5637 s = VerifyFullFileChecksum(fmeta->file_checksum,
5638 fmeta->file_checksum_func_name, fname,
5639 read_options);
20effc67 5640 } else {
1e59de90
TL
5641 s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(
5642 opts, file_options_, read_options, fname, fd.largest_seqno);
5643 }
5644 RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
5645 IOSTATS(bytes_read) - prev_bytes_read);
5646 prev_bytes_read = IOSTATS(bytes_read);
5647 }
5648 }
5649
5650 if (s.ok() && use_file_checksum) {
5651 const auto& blob_files = vstorage->GetBlobFiles();
5652 for (const auto& meta : blob_files) {
5653 assert(meta);
5654
5655 const uint64_t blob_file_number = meta->GetBlobFileNumber();
5656
5657 const std::string blob_file_name = BlobFileName(
5658 cfd->ioptions()->cf_paths.front().path, blob_file_number);
5659 s = VerifyFullFileChecksum(meta->GetChecksumValue(),
5660 meta->GetChecksumMethod(), blob_file_name,
5661 read_options);
5662 RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
5663 IOSTATS(bytes_read) - prev_bytes_read);
5664 prev_bytes_read = IOSTATS(bytes_read);
5665 if (!s.ok()) {
5666 break;
20effc67 5667 }
11fdf7f2
TL
5668 }
5669 }
5670 if (!s.ok()) {
5671 break;
5672 }
5673 }
1e59de90
TL
5674
5675 bool defer_purge = immutable_db_options().avoid_unnecessary_blocking_io;
11fdf7f2
TL
5676 {
5677 InstrumentedMutexLock l(&mutex_);
5678 for (auto sv : sv_list) {
5679 if (sv && sv->Unref()) {
5680 sv->Cleanup();
f67539c2
TL
5681 if (defer_purge) {
5682 AddSuperVersionsToFreeQueue(sv);
5683 } else {
5684 delete sv;
5685 }
11fdf7f2
TL
5686 }
5687 }
f67539c2
TL
5688 if (defer_purge) {
5689 SchedulePurge();
5690 }
11fdf7f2 5691 for (auto cfd : cfd_list) {
f67539c2 5692 cfd->UnrefAndTryDelete();
11fdf7f2
TL
5693 }
5694 }
1e59de90
TL
5695 RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
5696 IOSTATS(bytes_read) - prev_bytes_read);
11fdf7f2
TL
5697 return s;
5698}
5699
1e59de90
TL
5700Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
5701 const std::string& func_name_expected,
5702 const std::string& fname,
5703 const ReadOptions& read_options) {
20effc67 5704 Status s;
1e59de90 5705 if (file_checksum_expected == kUnknownFileChecksum) {
20effc67
TL
5706 return s;
5707 }
5708 std::string file_checksum;
5709 std::string func_name;
5710 s = ROCKSDB_NAMESPACE::GenerateOneFileChecksum(
5711 fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
1e59de90 5712 func_name_expected, &file_checksum, &func_name,
20effc67 5713 read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
1e59de90
TL
5714 io_tracer_, immutable_db_options_.rate_limiter.get(),
5715 read_options.rate_limiter_priority);
20effc67 5716 if (s.ok()) {
1e59de90
TL
5717 assert(func_name_expected == func_name);
5718 if (file_checksum != file_checksum_expected) {
20effc67
TL
5719 std::ostringstream oss;
5720 oss << fname << " file checksum mismatch, ";
1e59de90
TL
5721 oss << "expecting "
5722 << Slice(file_checksum_expected).ToString(/*hex=*/true);
20effc67
TL
5723 oss << ", but actual " << Slice(file_checksum).ToString(/*hex=*/true);
5724 s = Status::Corruption(oss.str());
1e59de90 5725 TEST_SYNC_POINT_CALLBACK("DBImpl::VerifyFullFileChecksum:mismatch", &s);
20effc67
TL
5726 }
5727 }
5728 return s;
5729}
5730
7c673cae
FG
5731void DBImpl::NotifyOnExternalFileIngested(
5732 ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
7c673cae
FG
5733 if (immutable_db_options_.listeners.empty()) {
5734 return;
5735 }
5736
5737 for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
5738 ExternalFileIngestionInfo info;
5739 info.cf_name = cfd->GetName();
5740 info.external_file_path = f.external_file_path;
5741 info.internal_file_path = f.internal_file_path;
5742 info.global_seqno = f.assigned_seqno;
5743 info.table_properties = f.table_properties;
5744 for (auto listener : immutable_db_options_.listeners) {
5745 listener->OnExternalFileIngested(this, info);
5746 }
5747 }
7c673cae
FG
5748}
5749
5750void DBImpl::WaitForIngestFile() {
5751 mutex_.AssertHeld();
5752 while (num_running_ingest_file_ > 0) {
5753 bg_cv_.Wait();
5754 }
5755}
5756
494da23a 5757Status DBImpl::StartTrace(const TraceOptions& trace_options,
11fdf7f2
TL
5758 std::unique_ptr<TraceWriter>&& trace_writer) {
5759 InstrumentedMutexLock lock(&trace_mutex_);
1e59de90
TL
5760 tracer_.reset(new Tracer(immutable_db_options_.clock, trace_options,
5761 std::move(trace_writer)));
11fdf7f2
TL
5762 return Status::OK();
5763}
5764
5765Status DBImpl::EndTrace() {
5766 InstrumentedMutexLock lock(&trace_mutex_);
494da23a
TL
5767 Status s;
5768 if (tracer_ != nullptr) {
5769 s = tracer_->Close();
5770 tracer_.reset();
5771 } else {
1e59de90 5772 s = Status::IOError("No trace file to close");
494da23a 5773 }
11fdf7f2
TL
5774 return s;
5775}
5776
1e59de90
TL
5777Status DBImpl::NewDefaultReplayer(
5778 const std::vector<ColumnFamilyHandle*>& handles,
5779 std::unique_ptr<TraceReader>&& reader,
5780 std::unique_ptr<Replayer>* replayer) {
5781 replayer->reset(new ReplayerImpl(this, handles, std::move(reader)));
5782 return Status::OK();
5783}
5784
f67539c2
TL
5785Status DBImpl::StartBlockCacheTrace(
5786 const TraceOptions& trace_options,
5787 std::unique_ptr<TraceWriter>&& trace_writer) {
1e59de90
TL
5788 BlockCacheTraceOptions block_trace_opts;
5789 block_trace_opts.sampling_frequency = trace_options.sampling_frequency;
5790
5791 BlockCacheTraceWriterOptions trace_writer_opt;
5792 trace_writer_opt.max_trace_file_size = trace_options.max_trace_file_size;
5793
5794 std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
5795 NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
5796 std::move(trace_writer));
5797
5798 return block_cache_tracer_.StartTrace(block_trace_opts,
5799 std::move(block_cache_trace_writer));
5800}
5801
5802Status DBImpl::StartBlockCacheTrace(
5803 const BlockCacheTraceOptions& trace_options,
5804 std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) {
5805 return block_cache_tracer_.StartTrace(trace_options, std::move(trace_writer));
f67539c2
TL
5806}
5807
5808Status DBImpl::EndBlockCacheTrace() {
5809 block_cache_tracer_.EndTrace();
5810 return Status::OK();
5811}
5812
1e59de90
TL
5813Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
5814 const Slice& lower_bound,
5815 const Slice upper_bound) {
11fdf7f2
TL
5816 Status s;
5817 if (tracer_) {
5818 InstrumentedMutexLock lock(&trace_mutex_);
5819 if (tracer_) {
1e59de90 5820 s = tracer_->IteratorSeek(cf_id, key, lower_bound, upper_bound);
11fdf7f2
TL
5821 }
5822 }
5823 return s;
5824}
5825
1e59de90
TL
5826Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
5827 const Slice& lower_bound,
5828 const Slice upper_bound) {
11fdf7f2
TL
5829 Status s;
5830 if (tracer_) {
5831 InstrumentedMutexLock lock(&trace_mutex_);
5832 if (tracer_) {
1e59de90 5833 s = tracer_->IteratorSeekForPrev(cf_id, key, lower_bound, upper_bound);
11fdf7f2
TL
5834 }
5835 }
5836 return s;
5837}
5838
494da23a
TL
5839Status DBImpl::ReserveFileNumbersBeforeIngestion(
5840 ColumnFamilyData* cfd, uint64_t num,
f67539c2 5841 std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
494da23a
TL
5842 uint64_t* next_file_number) {
5843 Status s;
5844 SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
494da23a
TL
5845 assert(nullptr != next_file_number);
5846 InstrumentedMutexLock l(&mutex_);
5847 if (error_handler_.IsDBStopped()) {
5848 // Do not ingest files when there is a bg_error
5849 return error_handler_.GetBGError();
5850 }
f67539c2
TL
5851 pending_output_elem.reset(new std::list<uint64_t>::iterator(
5852 CaptureCurrentFileNumberInPendingOutputs()));
494da23a
TL
5853 *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
5854 auto cf_options = cfd->GetLatestMutableCFOptions();
5855 VersionEdit dummy_edit;
5856 // If crash happen after a hard link established, Recover function may
5857 // reuse the file number that has already assigned to the internal file,
5858 // and this will overwrite the external file. To protect the external
5859 // file, we have to make sure the file number will never being reused.
5860 s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
5861 directories_.GetDbDir());
5862 if (s.ok()) {
5863 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
5864 }
5865 dummy_sv_ctx.Clean();
5866 return s;
5867}
f67539c2
TL
5868
5869Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
5870 if (mutable_db_options_.max_open_files == -1) {
1e59de90 5871 uint64_t oldest_time = std::numeric_limits<uint64_t>::max();
f67539c2
TL
5872 for (auto cfd : *versions_->GetColumnFamilySet()) {
5873 if (!cfd->IsDropped()) {
5874 uint64_t ctime;
5875 {
5876 SuperVersion* sv = GetAndRefSuperVersion(cfd);
5877 Version* version = sv->current;
5878 version->GetCreationTimeOfOldestFile(&ctime);
5879 ReturnAndCleanupSuperVersion(cfd, sv);
5880 }
5881
5882 if (ctime < oldest_time) {
5883 oldest_time = ctime;
5884 }
5885 if (oldest_time == 0) {
5886 break;
5887 }
5888 }
5889 }
5890 *creation_time = oldest_time;
5891 return Status::OK();
5892 } else {
5893 return Status::NotSupported("This API only works if max_open_files = -1");
5894 }
5895}
1e59de90
TL
5896
5897void DBImpl::RecordSeqnoToTimeMapping() {
5898 // Get time first then sequence number, so the actual time of seqno is <=
5899 // unix_time recorded
5900 int64_t unix_time = 0;
5901 immutable_db_options_.clock->GetCurrentTime(&unix_time)
5902 .PermitUncheckedError(); // Ignore error
5903 SequenceNumber seqno = GetLatestSequenceNumber();
5904 bool appended = false;
5905 {
5906 InstrumentedMutexLock l(&mutex_);
5907 appended = seqno_time_mapping_.Append(seqno, unix_time);
5908 }
5909 if (!appended) {
5910 ROCKS_LOG_WARN(immutable_db_options_.info_log,
5911 "Failed to insert sequence number to time entry: %" PRIu64
5912 " -> %" PRIu64,
5913 seqno, unix_time);
5914 }
5915}
7c673cae
FG
5916#endif // ROCKSDB_LITE
5917
f67539c2 5918} // namespace ROCKSDB_NAMESPACE