]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10
11 #include <atomic>
12 #include <deque>
13 #include <functional>
14 #include <limits>
15 #include <list>
16 #include <map>
17 #include <set>
18 #include <string>
19 #include <utility>
20 #include <vector>
21
22 #include "db/column_family.h"
23 #include "db/compaction/compaction_job.h"
24 #include "db/dbformat.h"
25 #include "db/error_handler.h"
26 #include "db/event_helpers.h"
27 #include "db/external_sst_file_ingestion_job.h"
28 #include "db/flush_job.h"
29 #include "db/flush_scheduler.h"
30 #include "db/import_column_family_job.h"
31 #include "db/internal_stats.h"
32 #include "db/log_writer.h"
33 #include "db/logs_with_prep_tracker.h"
34 #include "db/memtable_list.h"
35 #include "db/pre_release_callback.h"
36 #include "db/range_del_aggregator.h"
37 #include "db/read_callback.h"
38 #include "db/snapshot_checker.h"
39 #include "db/snapshot_impl.h"
40 #include "db/trim_history_scheduler.h"
41 #include "db/version_edit.h"
42 #include "db/wal_manager.h"
43 #include "db/write_controller.h"
44 #include "db/write_thread.h"
45 #include "logging/event_logger.h"
46 #include "monitoring/instrumented_mutex.h"
47 #include "options/db_options.h"
48 #include "port/port.h"
49 #include "rocksdb/db.h"
50 #include "rocksdb/env.h"
51 #include "rocksdb/memtablerep.h"
52 #include "rocksdb/status.h"
53 #include "rocksdb/trace_reader_writer.h"
54 #include "rocksdb/transaction_log.h"
55 #include "rocksdb/write_buffer_manager.h"
56 #include "table/scoped_arena_iterator.h"
57 #include "trace_replay/block_cache_tracer.h"
58 #include "trace_replay/trace_replay.h"
59 #include "util/autovector.h"
60 #include "util/hash.h"
61 #include "util/repeatable_thread.h"
62 #include "util/stop_watch.h"
63 #include "util/thread_local.h"
64
65 namespace ROCKSDB_NAMESPACE {
66
67 class Arena;
68 class ArenaWrappedDBIter;
69 class InMemoryStatsHistoryIterator;
70 class MemTable;
71 class PersistentStatsHistoryIterator;
72 class TableCache;
73 class TaskLimiterToken;
74 class Version;
75 class VersionEdit;
76 class VersionSet;
77 class WriteCallback;
78 struct JobContext;
79 struct ExternalSstFileInfo;
80 struct MemTableInfo;
81
82 // Class to maintain directories for all database paths other than main one.
83 class Directories {
84 public:
85 Status SetDirectories(Env* env, const std::string& dbname,
86 const std::string& wal_dir,
87 const std::vector<DbPath>& data_paths);
88
89 Directory* GetDataDir(size_t path_id) const {
90 assert(path_id < data_dirs_.size());
91 Directory* ret_dir = data_dirs_[path_id].get();
92 if (ret_dir == nullptr) {
93 // Should use db_dir_
94 return db_dir_.get();
95 }
96 return ret_dir;
97 }
98
99 Directory* GetWalDir() {
100 if (wal_dir_) {
101 return wal_dir_.get();
102 }
103 return db_dir_.get();
104 }
105
106 Directory* GetDbDir() { return db_dir_.get(); }
107
108 private:
109 std::unique_ptr<Directory> db_dir_;
110 std::vector<std::unique_ptr<Directory>> data_dirs_;
111 std::unique_ptr<Directory> wal_dir_;
112 };
113
114 // While DB is the public interface of RocksDB, and DBImpl is the actual
115 // class implementing it. It's the entrance of the core RocksdB engine.
116 // All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
117 // DBImpl internally.
118 // Other than functions implementing the DB interface, some public
119 // functions are there for other internal components to call. For
120 // example, TransactionDB directly calls DBImpl::WriteImpl() and
121 // BlobDB directly calls DBImpl::GetImpl(). Some other functions
122 // are for sub-components to call. For example, ColumnFamilyHandleImpl
123 // calls DBImpl::FindObsoleteFiles().
124 //
125 // Since it's a very large class, the definition of the functions is
126 // divided in several db_impl_*.cc files, besides db_impl.cc.
127 class DBImpl : public DB {
128 public:
129 DBImpl(const DBOptions& options, const std::string& dbname,
130 const bool seq_per_batch = false, const bool batch_per_txn = true);
131 // No copying allowed
132 DBImpl(const DBImpl&) = delete;
133 void operator=(const DBImpl&) = delete;
134
135 virtual ~DBImpl();
136
137 // ---- Implementations of the DB interface ----
138
139 using DB::Resume;
140 virtual Status Resume() override;
141
142 using DB::Put;
143 virtual Status Put(const WriteOptions& options,
144 ColumnFamilyHandle* column_family, const Slice& key,
145 const Slice& value) override;
146 using DB::Merge;
147 virtual Status Merge(const WriteOptions& options,
148 ColumnFamilyHandle* column_family, const Slice& key,
149 const Slice& value) override;
150 using DB::Delete;
151 virtual Status Delete(const WriteOptions& options,
152 ColumnFamilyHandle* column_family,
153 const Slice& key) override;
154 using DB::SingleDelete;
155 virtual Status SingleDelete(const WriteOptions& options,
156 ColumnFamilyHandle* column_family,
157 const Slice& key) override;
158 using DB::Write;
159 virtual Status Write(const WriteOptions& options,
160 WriteBatch* updates) override;
161
162 using DB::Get;
163 virtual Status Get(const ReadOptions& options,
164 ColumnFamilyHandle* column_family, const Slice& key,
165 PinnableSlice* value) override;
166
167 using DB::GetMergeOperands;
168 Status GetMergeOperands(const ReadOptions& options,
169 ColumnFamilyHandle* column_family, const Slice& key,
170 PinnableSlice* merge_operands,
171 GetMergeOperandsOptions* get_merge_operands_options,
172 int* number_of_operands) override {
173 GetImplOptions get_impl_options;
174 get_impl_options.column_family = column_family;
175 get_impl_options.merge_operands = merge_operands;
176 get_impl_options.get_merge_operands_options = get_merge_operands_options;
177 get_impl_options.number_of_operands = number_of_operands;
178 get_impl_options.get_value = false;
179 return GetImpl(options, key, get_impl_options);
180 }
181
182 using DB::MultiGet;
183 virtual std::vector<Status> MultiGet(
184 const ReadOptions& options,
185 const std::vector<ColumnFamilyHandle*>& column_family,
186 const std::vector<Slice>& keys,
187 std::vector<std::string>* values) override;
188
189 // This MultiGet is a batched version, which may be faster than calling Get
190 // multiple times, especially if the keys have some spatial locality that
191 // enables them to be queried in the same SST files/set of files. The larger
192 // the batch size, the more scope for batching and performance improvement
193 // The values and statuses parameters are arrays with number of elements
194 // equal to keys.size(). This allows the storage for those to be alloacted
195 // by the caller on the stack for small batches
196 virtual void MultiGet(const ReadOptions& options,
197 ColumnFamilyHandle* column_family,
198 const size_t num_keys, const Slice* keys,
199 PinnableSlice* values, Status* statuses,
200 const bool sorted_input = false) override;
201
202 virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
203 ColumnFamilyHandle** column_families, const Slice* keys,
204 PinnableSlice* values, Status* statuses,
205 const bool sorted_input = false) override;
206
207 virtual void MultiGetWithCallback(
208 const ReadOptions& options, ColumnFamilyHandle* column_family,
209 ReadCallback* callback,
210 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
211
212 virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
213 const std::string& column_family,
214 ColumnFamilyHandle** handle) override;
215 virtual Status CreateColumnFamilies(
216 const ColumnFamilyOptions& cf_options,
217 const std::vector<std::string>& column_family_names,
218 std::vector<ColumnFamilyHandle*>* handles) override;
219 virtual Status CreateColumnFamilies(
220 const std::vector<ColumnFamilyDescriptor>& column_families,
221 std::vector<ColumnFamilyHandle*>* handles) override;
222 virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
223 virtual Status DropColumnFamilies(
224 const std::vector<ColumnFamilyHandle*>& column_families) override;
225
226 // Returns false if key doesn't exist in the database and true if it may.
227 // If value_found is not passed in as null, then return the value if found in
228 // memory. On return, if value was found, then value_found will be set to true
229 // , otherwise false.
230 using DB::KeyMayExist;
231 virtual bool KeyMayExist(const ReadOptions& options,
232 ColumnFamilyHandle* column_family, const Slice& key,
233 std::string* value,
234 bool* value_found = nullptr) override;
235
236 using DB::NewIterator;
237 virtual Iterator* NewIterator(const ReadOptions& options,
238 ColumnFamilyHandle* column_family) override;
239 virtual Status NewIterators(
240 const ReadOptions& options,
241 const std::vector<ColumnFamilyHandle*>& column_families,
242 std::vector<Iterator*>* iterators) override;
243
244 virtual const Snapshot* GetSnapshot() override;
245 virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
246 using DB::GetProperty;
247 virtual bool GetProperty(ColumnFamilyHandle* column_family,
248 const Slice& property, std::string* value) override;
249 using DB::GetMapProperty;
250 virtual bool GetMapProperty(
251 ColumnFamilyHandle* column_family, const Slice& property,
252 std::map<std::string, std::string>* value) override;
253 using DB::GetIntProperty;
254 virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
255 const Slice& property, uint64_t* value) override;
256 using DB::GetAggregatedIntProperty;
257 virtual bool GetAggregatedIntProperty(const Slice& property,
258 uint64_t* aggregated_value) override;
259 using DB::GetApproximateSizes;
260 virtual Status GetApproximateSizes(const SizeApproximationOptions& options,
261 ColumnFamilyHandle* column_family,
262 const Range* range, int n,
263 uint64_t* sizes) override;
264 using DB::GetApproximateMemTableStats;
265 virtual void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
266 const Range& range,
267 uint64_t* const count,
268 uint64_t* const size) override;
269 using DB::CompactRange;
270 virtual Status CompactRange(const CompactRangeOptions& options,
271 ColumnFamilyHandle* column_family,
272 const Slice* begin, const Slice* end) override;
273
274 using DB::CompactFiles;
275 virtual Status CompactFiles(
276 const CompactionOptions& compact_options,
277 ColumnFamilyHandle* column_family,
278 const std::vector<std::string>& input_file_names, const int output_level,
279 const int output_path_id = -1,
280 std::vector<std::string>* const output_file_names = nullptr,
281 CompactionJobInfo* compaction_job_info = nullptr) override;
282
283 virtual Status PauseBackgroundWork() override;
284 virtual Status ContinueBackgroundWork() override;
285
286 virtual Status EnableAutoCompaction(
287 const std::vector<ColumnFamilyHandle*>& column_family_handles) override;
288
289 virtual void EnableManualCompaction() override;
290 virtual void DisableManualCompaction() override;
291
292 using DB::SetOptions;
293 Status SetOptions(
294 ColumnFamilyHandle* column_family,
295 const std::unordered_map<std::string, std::string>& options_map) override;
296
297 virtual Status SetDBOptions(
298 const std::unordered_map<std::string, std::string>& options_map) override;
299
300 using DB::NumberLevels;
301 virtual int NumberLevels(ColumnFamilyHandle* column_family) override;
302 using DB::MaxMemCompactionLevel;
303 virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) override;
304 using DB::Level0StopWriteTrigger;
305 virtual int Level0StopWriteTrigger(
306 ColumnFamilyHandle* column_family) override;
307 virtual const std::string& GetName() const override;
308 virtual Env* GetEnv() const override;
309 virtual FileSystem* GetFileSystem() const override;
310 using DB::GetOptions;
311 virtual Options GetOptions(ColumnFamilyHandle* column_family) const override;
312 using DB::GetDBOptions;
313 virtual DBOptions GetDBOptions() const override;
314 using DB::Flush;
315 virtual Status Flush(const FlushOptions& options,
316 ColumnFamilyHandle* column_family) override;
317 virtual Status Flush(
318 const FlushOptions& options,
319 const std::vector<ColumnFamilyHandle*>& column_families) override;
320 virtual Status FlushWAL(bool sync) override;
321 bool TEST_WALBufferIsEmpty(bool lock = true);
322 virtual Status SyncWAL() override;
323 virtual Status LockWAL() override;
324 virtual Status UnlockWAL() override;
325
326 virtual SequenceNumber GetLatestSequenceNumber() const override;
327
328 virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
329
330 virtual Status GetDbIdentity(std::string& identity) const override;
331
332 virtual Status GetDbIdentityFromIdentityFile(std::string* identity) const;
333
334 ColumnFamilyHandle* DefaultColumnFamily() const override;
335
336 ColumnFamilyHandle* PersistentStatsColumnFamily() const;
337
338 virtual Status Close() override;
339
340 Status GetStatsHistory(
341 uint64_t start_time, uint64_t end_time,
342 std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
343
344 #ifndef ROCKSDB_LITE
345 using DB::ResetStats;
346 virtual Status ResetStats() override;
347 virtual Status DisableFileDeletions() override;
348 virtual Status EnableFileDeletions(bool force) override;
349 virtual int IsFileDeletionsEnabled() const;
350 // All the returned filenames start with "/"
351 virtual Status GetLiveFiles(std::vector<std::string>&,
352 uint64_t* manifest_file_size,
353 bool flush_memtable = true) override;
354 virtual Status GetSortedWalFiles(VectorLogPtr& files) override;
355 virtual Status GetCurrentWalFile(
356 std::unique_ptr<LogFile>* current_log_file) override;
357 virtual Status GetCreationTimeOfOldestFile(
358 uint64_t* creation_time) override;
359
360 virtual Status GetUpdatesSince(
361 SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter,
362 const TransactionLogIterator::ReadOptions& read_options =
363 TransactionLogIterator::ReadOptions()) override;
364 virtual Status DeleteFile(std::string name) override;
365 Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
366 const RangePtr* ranges, size_t n,
367 bool include_end = true);
368
369 virtual void GetLiveFilesMetaData(
370 std::vector<LiveFileMetaData>* metadata) override;
371
372 // Obtains the meta data of the specified column family of the DB.
373 // Status::NotFound() will be returned if the current DB does not have
374 // any column family match the specified name.
375 // TODO(yhchiang): output parameter is placed in the end in this codebase.
376 virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
377 ColumnFamilyMetaData* metadata) override;
378
379 Status SuggestCompactRange(ColumnFamilyHandle* column_family,
380 const Slice* begin, const Slice* end) override;
381
382 Status PromoteL0(ColumnFamilyHandle* column_family,
383 int target_level) override;
384
385 using DB::IngestExternalFile;
386 virtual Status IngestExternalFile(
387 ColumnFamilyHandle* column_family,
388 const std::vector<std::string>& external_files,
389 const IngestExternalFileOptions& ingestion_options) override;
390
391 using DB::IngestExternalFiles;
392 virtual Status IngestExternalFiles(
393 const std::vector<IngestExternalFileArg>& args) override;
394
395 using DB::CreateColumnFamilyWithImport;
396 virtual Status CreateColumnFamilyWithImport(
397 const ColumnFamilyOptions& options, const std::string& column_family_name,
398 const ImportColumnFamilyOptions& import_options,
399 const ExportImportFilesMetaData& metadata,
400 ColumnFamilyHandle** handle) override;
401
402 using DB::VerifyChecksum;
403 virtual Status VerifyChecksum(const ReadOptions& /*read_options*/) override;
404
405 using DB::StartTrace;
406 virtual Status StartTrace(
407 const TraceOptions& options,
408 std::unique_ptr<TraceWriter>&& trace_writer) override;
409
410 using DB::EndTrace;
411 virtual Status EndTrace() override;
412
413 using DB::StartBlockCacheTrace;
414 Status StartBlockCacheTrace(
415 const TraceOptions& options,
416 std::unique_ptr<TraceWriter>&& trace_writer) override;
417
418 using DB::EndBlockCacheTrace;
419 Status EndBlockCacheTrace() override;
420
421 using DB::GetPropertiesOfAllTables;
422 virtual Status GetPropertiesOfAllTables(
423 ColumnFamilyHandle* column_family,
424 TablePropertiesCollection* props) override;
425 virtual Status GetPropertiesOfTablesInRange(
426 ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
427 TablePropertiesCollection* props) override;
428
429 #endif // ROCKSDB_LITE
430
431 // ---- End of implementations of the DB interface ----
432
433 struct GetImplOptions {
434 ColumnFamilyHandle* column_family = nullptr;
435 PinnableSlice* value = nullptr;
436 bool* value_found = nullptr;
437 ReadCallback* callback = nullptr;
438 bool* is_blob_index = nullptr;
439 // If true return value associated with key via value pointer else return
440 // all merge operands for key via merge_operands pointer
441 bool get_value = true;
442 // Pointer to an array of size
443 // get_merge_operands_options.expected_max_number_of_operands allocated by
444 // user
445 PinnableSlice* merge_operands = nullptr;
446 GetMergeOperandsOptions* get_merge_operands_options = nullptr;
447 int* number_of_operands = nullptr;
448 };
449
450 // Function that Get and KeyMayExist call with no_io true or false
451 // Note: 'value_found' from KeyMayExist propagates here
452 // This function is also called by GetMergeOperands
453 // If get_impl_options.get_value = true get value associated with
454 // get_impl_options.key via get_impl_options.value
455 // If get_impl_options.get_value = false get merge operands associated with
456 // get_impl_options.key via get_impl_options.merge_operands
457 Status GetImpl(const ReadOptions& options, const Slice& key,
458 GetImplOptions get_impl_options);
459
460 ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
461 ColumnFamilyData* cfd,
462 SequenceNumber snapshot,
463 ReadCallback* read_callback,
464 bool allow_blob = false,
465 bool allow_refresh = true);
466
467 virtual SequenceNumber GetLastPublishedSequence() const {
468 if (last_seq_same_as_publish_seq_) {
469 return versions_->LastSequence();
470 } else {
471 return versions_->LastPublishedSequence();
472 }
473 }
474
475 // REQUIRES: joined the main write queue if two_write_queues is disabled, and
476 // the second write queue otherwise.
477 virtual void SetLastPublishedSequence(SequenceNumber seq);
478 // Returns LastSequence in last_seq_same_as_publish_seq_
479 // mode and LastAllocatedSequence otherwise. This is useful when visiblility
480 // depends also on data written to the WAL but not to the memtable.
481 SequenceNumber TEST_GetLastVisibleSequence() const;
482
483 #ifndef ROCKSDB_LITE
484 // Similar to Write() but will call the callback once on the single write
485 // thread to determine whether it is safe to perform the write.
486 virtual Status WriteWithCallback(const WriteOptions& write_options,
487 WriteBatch* my_batch,
488 WriteCallback* callback);
489
490 // Returns the sequence number that is guaranteed to be smaller than or equal
491 // to the sequence number of any key that could be inserted into the current
492 // memtables. It can then be assumed that any write with a larger(or equal)
493 // sequence number will be present in this memtable or a later memtable.
494 //
495 // If the earliest sequence number could not be determined,
496 // kMaxSequenceNumber will be returned.
497 //
498 // If include_history=true, will also search Memtables in MemTableList
499 // History.
500 SequenceNumber GetEarliestMemTableSequenceNumber(SuperVersion* sv,
501 bool include_history);
502
503 // For a given key, check to see if there are any records for this key
504 // in the memtables, including memtable history. If cache_only is false,
505 // SST files will also be checked.
506 //
507 // If a key is found, *found_record_for_key will be set to true and
508 // *seq will be set to the stored sequence number for the latest
509 // operation on this key or kMaxSequenceNumber if unknown.
510 // If no key is found, *found_record_for_key will be set to false.
511 //
512 // Note: If cache_only=false, it is possible for *seq to be set to 0 if
513 // the sequence number has been cleared from the record. If the caller is
514 // holding an active db snapshot, we know the missing sequence must be less
515 // than the snapshot's sequence number (sequence numbers are only cleared
516 // when there are no earlier active snapshots).
517 //
518 // If NotFound is returned and found_record_for_key is set to false, then no
519 // record for this key was found. If the caller is holding an active db
520 // snapshot, we know that no key could have existing after this snapshot
521 // (since we do not compact keys that have an earlier snapshot).
522 //
523 // Only records newer than or at `lower_bound_seq` are guaranteed to be
524 // returned. Memtables and files may not be checked if it only contains data
525 // older than `lower_bound_seq`.
526 //
527 // Returns OK or NotFound on success,
528 // other status on unexpected error.
529 // TODO(andrewkr): this API need to be aware of range deletion operations
530 Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
531 bool cache_only,
532 SequenceNumber lower_bound_seq,
533 SequenceNumber* seq,
534 bool* found_record_for_key,
535 bool* is_blob_index = nullptr);
536
537 Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key);
538 Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
539 #endif // ROCKSDB_LITE
540
541 // Similar to GetSnapshot(), but also lets the db know that this snapshot
542 // will be used for transaction write-conflict checking. The DB can then
543 // make sure not to compact any keys that would prevent a write-conflict from
544 // being detected.
545 const Snapshot* GetSnapshotForWriteConflictBoundary();
546
547 // checks if all live files exist on file system and that their file sizes
548 // match to our in-memory records
549 virtual Status CheckConsistency();
550
551 // max_file_num_to_ignore allows bottom level compaction to filter out newly
552 // compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will
553 // disable the filtering
554 Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
555 int output_level,
556 const CompactRangeOptions& compact_range_options,
557 const Slice* begin, const Slice* end,
558 bool exclusive, bool disallow_trivial_move,
559 uint64_t max_file_num_to_ignore);
560
561 // Return an internal iterator over the current state of the database.
562 // The keys of this iterator are internal keys (see format.h).
563 // The returned iterator should be deleted when no longer needed.
564 InternalIterator* NewInternalIterator(
565 Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
566 ColumnFamilyHandle* column_family = nullptr);
567
568 LogsWithPrepTracker* logs_with_prep_tracker() {
569 return &logs_with_prep_tracker_;
570 }
571
572 struct BGJobLimits {
573 int max_flushes;
574 int max_compactions;
575 };
576 // Returns maximum background flushes and compactions allowed to be scheduled
577 BGJobLimits GetBGJobLimits() const;
578 // Need a static version that can be called during SanitizeOptions().
579 static BGJobLimits GetBGJobLimits(int max_background_flushes,
580 int max_background_compactions,
581 int max_background_jobs,
582 bool parallelize_compactions);
583
584 // move logs pending closing from job_context to the DB queue and
585 // schedule a purge
586 void ScheduleBgLogWriterClose(JobContext* job_context);
587
588 uint64_t MinLogNumberToKeep();
589
590 // Returns the lower bound file number for SSTs that won't be deleted, even if
591 // they're obsolete. This lower bound is used internally to prevent newly
592 // created flush/compaction output files from being deleted before they're
593 // installed. This technique avoids the need for tracking the exact numbers of
594 // files pending creation, although it prevents more files than necessary from
595 // being deleted.
596 uint64_t MinObsoleteSstNumberToKeep();
597
598 // Returns the list of live files in 'live' and the list
599 // of all files in the filesystem in 'candidate_files'.
600 // If force == false and the last call was less than
601 // db_options_.delete_obsolete_files_period_micros microseconds ago,
602 // it will not fill up the job_context
603 void FindObsoleteFiles(JobContext* job_context, bool force,
604 bool no_full_scan = false);
605
606 // Diffs the files listed in filenames and those that do not
607 // belong to live files are possibly removed. Also, removes all the
608 // files in sst_delete_files and log_delete_files.
609 // It is not necessary to hold the mutex when invoking this method.
610 // If FindObsoleteFiles() was run, we need to also run
611 // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
612 void PurgeObsoleteFiles(JobContext& background_contet,
613 bool schedule_only = false);
614
615 // Schedule a background job to actually delete obsolete files.
616 void SchedulePurge();
617
618 const SnapshotList& snapshots() const { return snapshots_; }
619
620 // load list of snapshots to `snap_vector` that is no newer than `max_seq`
621 // in ascending order.
622 // `oldest_write_conflict_snapshot` is filled with the oldest snapshot
623 // which satisfies SnapshotImpl.is_write_conflict_boundary_ = true.
624 void LoadSnapshots(std::vector<SequenceNumber>* snap_vector,
625 SequenceNumber* oldest_write_conflict_snapshot,
626 const SequenceNumber& max_seq) const {
627 InstrumentedMutexLock l(mutex());
628 snapshots().GetAll(snap_vector, oldest_write_conflict_snapshot, max_seq);
629 }
630
631 const ImmutableDBOptions& immutable_db_options() const {
632 return immutable_db_options_;
633 }
634
635 // Cancel all background jobs, including flush, compaction, background
636 // purging, stats dumping threads, etc. If `wait` = true, wait for the
637 // running jobs to abort or finish before returning. Otherwise, only
638 // sends the signals.
639 void CancelAllBackgroundWork(bool wait);
640
641 // Find Super version and reference it. Based on options, it might return
642 // the thread local cached one.
643 // Call ReturnAndCleanupSuperVersion() when it is no longer needed.
644 SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);
645
646 // Similar to the previous function but looks up based on a column family id.
647 // nullptr will be returned if this column family no longer exists.
648 // REQUIRED: this function should only be called on the write thread or if the
649 // mutex is held.
650 SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
651
652 // Un-reference the super version and clean it up if it is the last reference.
653 void CleanupSuperVersion(SuperVersion* sv);
654
655 // Un-reference the super version and return it to thread local cache if
656 // needed. If it is the last reference of the super version. Clean it up
657 // after un-referencing it.
658 void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, SuperVersion* sv);
659
660 // Similar to the previous function but looks up based on a column family id.
661 // nullptr will be returned if this column family no longer exists.
662 // REQUIRED: this function should only be called on the write thread.
663 void ReturnAndCleanupSuperVersion(uint32_t colun_family_id, SuperVersion* sv);
664
665 // REQUIRED: this function should only be called on the write thread or if the
666 // mutex is held. Return value only valid until next call to this function or
667 // mutex is released.
668 ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
669
670 // Same as above, should called without mutex held and not on write thread.
671 std::unique_ptr<ColumnFamilyHandle> GetColumnFamilyHandleUnlocked(
672 uint32_t column_family_id);
673
674 // Returns the number of currently running flushes.
675 // REQUIREMENT: mutex_ must be held when calling this function.
676 int num_running_flushes() {
677 mutex_.AssertHeld();
678 return num_running_flushes_;
679 }
680
681 // Returns the number of currently running compactions.
682 // REQUIREMENT: mutex_ must be held when calling this function.
683 int num_running_compactions() {
684 mutex_.AssertHeld();
685 return num_running_compactions_;
686 }
687
688 const WriteController& write_controller() { return write_controller_; }
689
690 InternalIterator* NewInternalIterator(
691 const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version,
692 Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence);
693
694 // hollow transactions shell used for recovery.
695 // these will then be passed to TransactionDB so that
696 // locks can be reacquired before writing can resume.
697 struct RecoveredTransaction {
698 std::string name_;
699 bool unprepared_;
700
701 struct BatchInfo {
702 uint64_t log_number_;
703 // TODO(lth): For unprepared, the memory usage here can be big for
704 // unprepared transactions. This is only useful for rollbacks, and we
705 // can in theory just keep keyset for that.
706 WriteBatch* batch_;
707 // Number of sub-batches. A new sub-batch is created if txn attempts to
708 // insert a duplicate key,seq to memtable. This is currently used in
709 // WritePreparedTxn/WriteUnpreparedTxn.
710 size_t batch_cnt_;
711 };
712
713 // This maps the seq of the first key in the batch to BatchInfo, which
714 // contains WriteBatch and other information relevant to the batch.
715 //
716 // For WriteUnprepared, batches_ can have size greater than 1, but for
717 // other write policies, it must be of size 1.
718 std::map<SequenceNumber, BatchInfo> batches_;
719
720 explicit RecoveredTransaction(const uint64_t log, const std::string& name,
721 WriteBatch* batch, SequenceNumber seq,
722 size_t batch_cnt, bool unprepared)
723 : name_(name), unprepared_(unprepared) {
724 batches_[seq] = {log, batch, batch_cnt};
725 }
726
727 ~RecoveredTransaction() {
728 for (auto& it : batches_) {
729 delete it.second.batch_;
730 }
731 }
732
733 void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch,
734 size_t batch_cnt, bool unprepared) {
735 assert(batches_.count(seq) == 0);
736 batches_[seq] = {log_number, batch, batch_cnt};
737 // Prior state must be unprepared, since the prepare batch must be the
738 // last batch.
739 assert(unprepared_);
740 unprepared_ = unprepared;
741 }
742 };
743
744 bool allow_2pc() const { return immutable_db_options_.allow_2pc; }
745
746 std::unordered_map<std::string, RecoveredTransaction*>
747 recovered_transactions() {
748 return recovered_transactions_;
749 }
750
751 RecoveredTransaction* GetRecoveredTransaction(const std::string& name) {
752 auto it = recovered_transactions_.find(name);
753 if (it == recovered_transactions_.end()) {
754 return nullptr;
755 } else {
756 return it->second;
757 }
758 }
759
760 void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
761 WriteBatch* batch, SequenceNumber seq,
762 size_t batch_cnt, bool unprepared_batch) {
763 // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple
764 // times for every unprepared batch encountered during recovery.
765 //
766 // If the transaction is prepared, then the last call to
767 // InsertRecoveredTransaction will have unprepared_batch = false.
768 auto rtxn = recovered_transactions_.find(name);
769 if (rtxn == recovered_transactions_.end()) {
770 recovered_transactions_[name] = new RecoveredTransaction(
771 log, name, batch, seq, batch_cnt, unprepared_batch);
772 } else {
773 rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch);
774 }
775 logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log);
776 }
777
778 void DeleteRecoveredTransaction(const std::string& name) {
779 auto it = recovered_transactions_.find(name);
780 assert(it != recovered_transactions_.end());
781 auto* trx = it->second;
782 recovered_transactions_.erase(it);
783 for (const auto& info : trx->batches_) {
784 logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(
785 info.second.log_number_);
786 }
787 delete trx;
788 }
789
790 void DeleteAllRecoveredTransactions() {
791 for (auto it = recovered_transactions_.begin();
792 it != recovered_transactions_.end(); ++it) {
793 delete it->second;
794 }
795 recovered_transactions_.clear();
796 }
797
798 void AddToLogsToFreeQueue(log::Writer* log_writer) {
799 logs_to_free_queue_.push_back(log_writer);
800 }
801
802 void AddSuperVersionsToFreeQueue(SuperVersion* sv) {
803 superversions_to_free_queue_.push_back(sv);
804 }
805
806 void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
807
808 // Fill JobContext with snapshot information needed by flush and compaction.
809 void GetSnapshotContext(JobContext* job_context,
810 std::vector<SequenceNumber>* snapshot_seqs,
811 SequenceNumber* earliest_write_conflict_snapshot,
812 SnapshotChecker** snapshot_checker);
813
814 // Not thread-safe.
815 void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback);
816
817 InstrumentedMutex* mutex() const { return &mutex_; }
818
819 // Initialize a brand new DB. The DB directory is expected to be empty before
820 // calling it.
821 Status NewDB();
822
823 // This is to be used only by internal rocksdb classes.
824 static Status Open(const DBOptions& db_options, const std::string& name,
825 const std::vector<ColumnFamilyDescriptor>& column_families,
826 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
827 const bool seq_per_batch, const bool batch_per_txn);
828
829 static Status CreateAndNewDirectory(Env* env, const std::string& dirname,
830 std::unique_ptr<Directory>* directory);
831
832 // find stats map from stats_history_ with smallest timestamp in
833 // the range of [start_time, end_time)
834 bool FindStatsByTime(uint64_t start_time, uint64_t end_time,
835 uint64_t* new_time,
836 std::map<std::string, uint64_t>* stats_map);
837
838 // Print information of all tombstones of all iterators to the std::string
839 // This is only used by ldb. The output might be capped. Tombstones
840 // printed out are not guaranteed to be in any order.
841 Status TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
842 int max_entries_to_print,
843 std::string* out_str);
844
845 #ifndef NDEBUG
846 // Compact any files in the named level that overlap [*begin, *end]
847 Status TEST_CompactRange(int level, const Slice* begin, const Slice* end,
848 ColumnFamilyHandle* column_family = nullptr,
849 bool disallow_trivial_move = false);
850
851 void TEST_SwitchWAL();
852
853 bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; }
854
855 bool TEST_IsLogGettingFlushed() {
856 return alive_log_files_.begin()->getting_flushed;
857 }
858
859 Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
860
861 // Force current memtable contents to be flushed.
862 Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
863 ColumnFamilyHandle* cfh = nullptr);
864
865 Status TEST_FlushMemTable(ColumnFamilyData* cfd,
866 const FlushOptions& flush_opts);
867
868 // Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This
869 // is because in certain cases, we can flush column families, wait for the
870 // flush to complete, but delete the column family handle before the wait
871 // finishes. For example in CompactRange.
872 Status TEST_AtomicFlushMemTables(const autovector<ColumnFamilyData*>& cfds,
873 const FlushOptions& flush_opts);
874
875 // Wait for memtable compaction
876 Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
877
878 // Wait for any compaction
879 // We add a bool parameter to wait for unscheduledCompactions_ == 0, but this
880 // is only for the special test of CancelledCompactions
881 Status TEST_WaitForCompact(bool waitUnscheduled = false);
882
883 // Return the maximum overlapping data (in bytes) at next level for any
884 // file at a level >= 1.
885 int64_t TEST_MaxNextLevelOverlappingBytes(
886 ColumnFamilyHandle* column_family = nullptr);
887
888 // Return the current manifest file no.
889 uint64_t TEST_Current_Manifest_FileNo();
890
891 // Returns the number that'll be assigned to the next file that's created.
892 uint64_t TEST_Current_Next_FileNo();
893
894 // get total level0 file size. Only for testing.
895 uint64_t TEST_GetLevel0TotalSize();
896
897 void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family,
898 std::vector<std::vector<FileMetaData>>* metadata);
899
900 void TEST_LockMutex();
901
902 void TEST_UnlockMutex();
903
904 // REQUIRES: mutex locked
905 void* TEST_BeginWrite();
906
907 // REQUIRES: mutex locked
908 // pass the pointer that you got from TEST_BeginWrite()
909 void TEST_EndWrite(void* w);
910
911 uint64_t TEST_MaxTotalInMemoryState() const {
912 return max_total_in_memory_state_;
913 }
914
915 size_t TEST_LogsToFreeSize();
916
917 uint64_t TEST_LogfileNumber();
918
919 uint64_t TEST_total_log_size() const { return total_log_size_; }
920
921 // Returns column family name to ImmutableCFOptions map.
922 Status TEST_GetAllImmutableCFOptions(
923 std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
924
925 // Return the lastest MutableCFOptions of a column family
926 Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,
927 MutableCFOptions* mutable_cf_options);
928
929 Cache* TEST_table_cache() { return table_cache_.get(); }
930
931 WriteController& TEST_write_controler() { return write_controller_; }
932
933 uint64_t TEST_FindMinLogContainingOutstandingPrep();
934 uint64_t TEST_FindMinPrepLogReferencedByMemTable();
935 size_t TEST_PreparedSectionCompletedSize();
936 size_t TEST_LogsWithPrepSize();
937
938 int TEST_BGCompactionsAllowed() const;
939 int TEST_BGFlushesAllowed() const;
940 size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
941 void TEST_WaitForDumpStatsRun(std::function<void()> callback) const;
942 void TEST_WaitForPersistStatsRun(std::function<void()> callback) const;
943 bool TEST_IsPersistentStatsEnabled() const;
944 size_t TEST_EstimateInMemoryStatsHistorySize() const;
945 #endif // NDEBUG
946
947 protected:
948 const std::string dbname_;
949 std::string db_id_;
950 std::unique_ptr<VersionSet> versions_;
951 // Flag to check whether we allocated and own the info log file
952 bool own_info_log_;
953 const DBOptions initial_db_options_;
954 Env* const env_;
955 std::shared_ptr<FileSystem> fs_;
956 const ImmutableDBOptions immutable_db_options_;
957 MutableDBOptions mutable_db_options_;
958 Statistics* stats_;
959 std::unordered_map<std::string, RecoveredTransaction*>
960 recovered_transactions_;
961 std::unique_ptr<Tracer> tracer_;
962 InstrumentedMutex trace_mutex_;
963 BlockCacheTracer block_cache_tracer_;
964
965 // State below is protected by mutex_
966 // With two_write_queues enabled, some of the variables that accessed during
967 // WriteToWAL need different synchronization: log_empty_, alive_log_files_,
968 // logs_, logfile_number_. Refer to the definition of each variable below for
969 // more description.
970 mutable InstrumentedMutex mutex_;
971
972 ColumnFamilyHandleImpl* default_cf_handle_;
973 InternalStats* default_cf_internal_stats_;
974
975 // only used for dynamically adjusting max_total_wal_size. it is a sum of
976 // [write_buffer_size * max_write_buffer_number] over all column families
977 uint64_t max_total_in_memory_state_;
978 // If true, we have only one (default) column family. We use this to optimize
979 // some code-paths
980 bool single_column_family_mode_;
981
982 // The options to access storage files
983 const FileOptions file_options_;
984
985 // Additonal options for compaction and flush
986 FileOptions file_options_for_compaction_;
987
988 std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
989
990 // Increase the sequence number after writing each batch, whether memtable is
991 // disabled for that or not. Otherwise the sequence number is increased after
992 // writing each key into memtable. This implies that when disable_memtable is
993 // set, the seq is not increased at all.
994 //
995 // Default: false
996 const bool seq_per_batch_;
997 // This determines during recovery whether we expect one writebatch per
998 // recovered transaction, or potentially multiple writebatches per
999 // transaction. For WriteUnprepared, this is set to false, since multiple
1000 // batches can exist per transaction.
1001 //
1002 // Default: true
1003 const bool batch_per_txn_;
1004
1005 // Except in DB::Open(), WriteOptionsFile can only be called when:
1006 // Persist options to options file.
1007 // If need_mutex_lock = false, the method will lock DB mutex.
1008 // If need_enter_write_thread = false, the method will enter write thread.
1009 Status WriteOptionsFile(bool need_mutex_lock, bool need_enter_write_thread);
1010
1011 // The following two functions can only be called when:
1012 // 1. WriteThread::Writer::EnterUnbatched() is used.
1013 // 2. db_mutex is NOT held
1014 Status RenameTempFileToOptionsFile(const std::string& file_name);
1015 Status DeleteObsoleteOptionsFiles();
1016
1017 void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
1018 const MutableCFOptions& mutable_cf_options,
1019 int job_id);
1020
1021 void NotifyOnFlushCompleted(
1022 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
1023 std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info);
1024
1025 void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1026 const Status& st,
1027 const CompactionJobStats& job_stats, int job_id);
1028
1029 void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c,
1030 const Status& st,
1031 const CompactionJobStats& job_stats,
1032 int job_id);
1033 void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
1034 const MemTableInfo& mem_table_info);
1035
1036 #ifndef ROCKSDB_LITE
1037 void NotifyOnExternalFileIngested(
1038 ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
1039 #endif // !ROCKSDB_LITE
1040
1041 void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
1042
1043 void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
1044
1045 void EraseThreadStatusDbInfo() const;
1046
1047 // If disable_memtable is set the application logic must guarantee that the
1048 // batch will still be skipped from memtable during the recovery. An excption
1049 // to this is seq_per_batch_ mode, in which since each batch already takes one
1050 // seq, it is ok for the batch to write to memtable during recovery as long as
1051 // it only takes one sequence number: i.e., no duplicate keys.
1052 // In WriteCommitted it is guarnateed since disable_memtable is used for
1053 // prepare batch which will be written to memtable later during the commit,
1054 // and in WritePrepared it is guaranteed since it will be used only for WAL
1055 // markers which will never be written to memtable. If the commit marker is
1056 // accompanied with CommitTimeWriteBatch that is not written to memtable as
1057 // long as it has no duplicate keys, it does not violate the one-seq-per-batch
1058 // policy.
1059 // batch_cnt is expected to be non-zero in seq_per_batch mode and
1060 // indicates the number of sub-patches. A sub-patch is a subset of the write
1061 // batch that does not have duplicate keys.
1062 Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
1063 WriteCallback* callback = nullptr,
1064 uint64_t* log_used = nullptr, uint64_t log_ref = 0,
1065 bool disable_memtable = false, uint64_t* seq_used = nullptr,
1066 size_t batch_cnt = 0,
1067 PreReleaseCallback* pre_release_callback = nullptr);
1068
1069 Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
1070 WriteCallback* callback = nullptr,
1071 uint64_t* log_used = nullptr, uint64_t log_ref = 0,
1072 bool disable_memtable = false,
1073 uint64_t* seq_used = nullptr);
1074
1075 // Write only to memtables without joining any write queue
1076 Status UnorderedWriteMemtable(const WriteOptions& write_options,
1077 WriteBatch* my_batch, WriteCallback* callback,
1078 uint64_t log_ref, SequenceNumber seq,
1079 const size_t sub_batch_cnt);
1080
1081 // Whether the batch requires to be assigned with an order
1082 enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder };
1083 // Whether it requires publishing last sequence or not
1084 enum PublishLastSeq : bool { kDontPublishLastSeq, kDoPublishLastSeq };
1085
1086 // Join the write_thread to write the batch only to the WAL. It is the
1087 // responsibility of the caller to also write the write batch to the memtable
1088 // if it required.
1089 //
1090 // sub_batch_cnt is expected to be non-zero when assign_order = kDoAssignOrder
1091 // indicating the number of sub-batches in my_batch. A sub-patch is a subset
1092 // of the write batch that does not have duplicate keys. When seq_per_batch is
1093 // not set, each key is a separate sub_batch. Otherwise each duplicate key
1094 // marks start of a new sub-batch.
1095 Status WriteImplWALOnly(
1096 WriteThread* write_thread, const WriteOptions& options,
1097 WriteBatch* updates, WriteCallback* callback, uint64_t* log_used,
1098 const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
1099 PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
1100 const PublishLastSeq publish_last_seq, const bool disable_memtable);
1101
1102 // write cached_recoverable_state_ to memtable if it is not empty
1103 // The writer must be the leader in write_thread_ and holding mutex_
1104 Status WriteRecoverableState();
1105
1106 // Actual implementation of Close()
1107 Status CloseImpl();
1108
1109 // Recover the descriptor from persistent storage. May do a significant
1110 // amount of work to recover recently logged updates. Any changes to
1111 // be made to the descriptor are added to *edit.
1112 // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
1113 // skipped.
1114 virtual Status Recover(
1115 const std::vector<ColumnFamilyDescriptor>& column_families,
1116 bool read_only = false, bool error_if_log_file_exist = false,
1117 bool error_if_data_exists_in_logs = false,
1118 uint64_t* recovered_seq = nullptr);
1119
1120 virtual bool OwnTablesAndLogs() const { return true; }
1121
1122 private:
1123 friend class DB;
1124 friend class ErrorHandler;
1125 friend class InternalStats;
1126 friend class PessimisticTransaction;
1127 friend class TransactionBaseImpl;
1128 friend class WriteCommittedTxn;
1129 friend class WritePreparedTxn;
1130 friend class WritePreparedTxnDB;
1131 friend class WriteBatchWithIndex;
1132 friend class WriteUnpreparedTxnDB;
1133 friend class WriteUnpreparedTxn;
1134
1135 #ifndef ROCKSDB_LITE
1136 friend class ForwardIterator;
1137 #endif
1138 friend struct SuperVersion;
1139 friend class CompactedDBImpl;
1140 friend class DBTest_ConcurrentFlushWAL_Test;
1141 friend class DBTest_MixedSlowdownOptionsStop_Test;
1142 friend class DBCompactionTest_CompactBottomLevelFilesWithDeletions_Test;
1143 friend class DBCompactionTest_CompactionDuringShutdown_Test;
1144 friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test;
1145 #ifndef NDEBUG
1146 friend class DBTest2_ReadCallbackTest_Test;
1147 friend class WriteCallbackTest_WriteWithCallbackTest_Test;
1148 friend class XFTransactionWriteHandler;
1149 friend class DBBlobIndexTest;
1150 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
1151 #endif
1152
1153 struct CompactionState;
1154 struct PrepickedCompaction;
1155 struct PurgeFileInfo;
1156
1157 struct WriteContext {
1158 SuperVersionContext superversion_context;
1159 autovector<MemTable*> memtables_to_free_;
1160
1161 explicit WriteContext(bool create_superversion = false)
1162 : superversion_context(create_superversion) {}
1163
1164 ~WriteContext() {
1165 superversion_context.Clean();
1166 for (auto& m : memtables_to_free_) {
1167 delete m;
1168 }
1169 }
1170 };
1171
1172 struct LogFileNumberSize {
1173 explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
1174 void AddSize(uint64_t new_size) { size += new_size; }
1175 uint64_t number;
1176 uint64_t size = 0;
1177 bool getting_flushed = false;
1178 };
1179
1180 struct LogWriterNumber {
1181 // pass ownership of _writer
1182 LogWriterNumber(uint64_t _number, log::Writer* _writer)
1183 : number(_number), writer(_writer) {}
1184
1185 log::Writer* ReleaseWriter() {
1186 auto* w = writer;
1187 writer = nullptr;
1188 return w;
1189 }
1190 Status ClearWriter() {
1191 Status s = writer->WriteBuffer();
1192 delete writer;
1193 writer = nullptr;
1194 return s;
1195 }
1196
1197 uint64_t number;
1198 // Visual Studio doesn't support deque's member to be noncopyable because
1199 // of a std::unique_ptr as a member.
1200 log::Writer* writer; // own
1201 // true for some prefix of logs_
1202 bool getting_synced = false;
1203 };
1204
1205 // PurgeFileInfo is a structure to hold information of files to be deleted in
1206 // purge_files_
1207 struct PurgeFileInfo {
1208 std::string fname;
1209 std::string dir_to_sync;
1210 FileType type;
1211 uint64_t number;
1212 int job_id;
1213 PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num,
1214 int jid)
1215 : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {}
1216 };
1217
1218 // Argument required by background flush thread.
1219 struct BGFlushArg {
1220 BGFlushArg()
1221 : cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {}
1222 BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id,
1223 SuperVersionContext* superversion_context)
1224 : cfd_(cfd),
1225 max_memtable_id_(max_memtable_id),
1226 superversion_context_(superversion_context) {}
1227
1228 // Column family to flush.
1229 ColumnFamilyData* cfd_;
1230 // Maximum ID of memtable to flush. In this column family, memtables with
1231 // IDs smaller than this value must be flushed before this flush completes.
1232 uint64_t max_memtable_id_;
1233 // Pointer to a SuperVersionContext object. After flush completes, RocksDB
1234 // installs a new superversion for the column family. This operation
1235 // requires a SuperVersionContext object (currently embedded in JobContext).
1236 SuperVersionContext* superversion_context_;
1237 };
1238
1239 // Argument passed to flush thread.
1240 struct FlushThreadArg {
1241 DBImpl* db_;
1242
1243 Env::Priority thread_pri_;
1244 };
1245
1246 // Information for a manual compaction
1247 struct ManualCompactionState {
1248 ColumnFamilyData* cfd;
1249 int input_level;
1250 int output_level;
1251 uint32_t output_path_id;
1252 Status status;
1253 bool done;
1254 bool in_progress; // compaction request being processed?
1255 bool incomplete; // only part of requested range compacted
1256 bool exclusive; // current behavior of only one manual
1257 bool disallow_trivial_move; // Force actual compaction to run
1258 const InternalKey* begin; // nullptr means beginning of key range
1259 const InternalKey* end; // nullptr means end of key range
1260 InternalKey* manual_end; // how far we are compacting
1261 InternalKey tmp_storage; // Used to keep track of compaction progress
1262 InternalKey tmp_storage1; // Used to keep track of compaction progress
1263 };
1264 struct PrepickedCompaction {
1265 // background compaction takes ownership of `compaction`.
1266 Compaction* compaction;
1267 // caller retains ownership of `manual_compaction_state` as it is reused
1268 // across background compactions.
1269 ManualCompactionState* manual_compaction_state; // nullptr if non-manual
1270 // task limiter token is requested during compaction picking.
1271 std::unique_ptr<TaskLimiterToken> task_token;
1272 };
1273
1274 struct CompactionArg {
1275 // caller retains ownership of `db`.
1276 DBImpl* db;
1277 // background compaction takes ownership of `prepicked_compaction`.
1278 PrepickedCompaction* prepicked_compaction;
1279 };
1280
1281 // Initialize the built-in column family for persistent stats. Depending on
1282 // whether on-disk persistent stats have been enabled before, it may either
1283 // create a new column family and column family handle or just a column family
1284 // handle.
1285 // Required: DB mutex held
1286 Status InitPersistStatsColumnFamily();
1287
1288 // Persistent Stats column family has two format version key which are used
1289 // for compatibility check. Write format version if it's created for the
1290 // first time, read format version and check compatibility if recovering
1291 // from disk. This function requires DB mutex held at entrance but may
1292 // release and re-acquire DB mutex in the process.
1293 // Required: DB mutex held
1294 Status PersistentStatsProcessFormatVersion();
1295
1296 Status ResumeImpl();
1297
1298 void MaybeIgnoreError(Status* s) const;
1299
1300 const Status CreateArchivalDirectory();
1301
1302 Status CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
1303 const std::string& cf_name,
1304 ColumnFamilyHandle** handle);
1305
1306 Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
1307
1308 // Delete any unneeded files and stale in-memory entries.
1309 void DeleteObsoleteFiles();
1310 // Delete obsolete files and log status and information of file deletion
1311 void DeleteObsoleteFileImpl(int job_id, const std::string& fname,
1312 const std::string& path_to_sync, FileType type,
1313 uint64_t number);
1314
1315 // Background process needs to call
1316 // auto x = CaptureCurrentFileNumberInPendingOutputs()
1317 // auto file_num = versions_->NewFileNumber();
1318 // <do something>
1319 // ReleaseFileNumberFromPendingOutputs(x)
1320 // This will protect any file with number `file_num` or greater from being
1321 // deleted while <do something> is running.
1322 // -----------
1323 // This function will capture current file number and append it to
1324 // pending_outputs_. This will prevent any background process to delete any
1325 // file created after this point.
1326 std::list<uint64_t>::iterator CaptureCurrentFileNumberInPendingOutputs();
1327 // This function should be called with the result of
1328 // CaptureCurrentFileNumberInPendingOutputs(). It then marks that any file
1329 // created between the calls CaptureCurrentFileNumberInPendingOutputs() and
1330 // ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live
1331 // and blocked by any other pending_outputs_ calls)
1332 void ReleaseFileNumberFromPendingOutputs(
1333 std::unique_ptr<std::list<uint64_t>::iterator>& v);
1334
1335 Status SyncClosedLogs(JobContext* job_context);
1336
1337 // Flush the in-memory write buffer to storage. Switches to a new
1338 // log-file/memtable and writes a new descriptor iff successful. Then
1339 // installs a new super version for the column family.
1340 Status FlushMemTableToOutputFile(
1341 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
1342 bool* madeProgress, JobContext* job_context,
1343 SuperVersionContext* superversion_context,
1344 std::vector<SequenceNumber>& snapshot_seqs,
1345 SequenceNumber earliest_write_conflict_snapshot,
1346 SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
1347 Env::Priority thread_pri);
1348
1349 // Flush the memtables of (multiple) column families to multiple files on
1350 // persistent storage.
1351 Status FlushMemTablesToOutputFiles(
1352 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
1353 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
1354
1355 Status AtomicFlushMemTablesToOutputFiles(
1356 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
1357 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
1358
1359 // REQUIRES: log_numbers are sorted in ascending order
1360 // corrupted_log_found is set to true if we recover from a corrupted log file.
1361 Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
1362 SequenceNumber* next_sequence, bool read_only,
1363 bool* corrupted_log_found);
1364
1365 // The following two methods are used to flush a memtable to
1366 // storage. The first one is used at database RecoveryTime (when the
1367 // database is opened) and is heavyweight because it holds the mutex
1368 // for the entire period. The second method WriteLevel0Table supports
1369 // concurrent flush memtables to storage.
1370 Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1371 MemTable* mem, VersionEdit* edit);
1372
1373 // Restore alive_log_files_ and total_log_size_ after recovery.
1374 // It needs to run only when there's no flush during recovery
1375 // (e.g. avoid_flush_during_recovery=true). May also trigger flush
1376 // in case total_log_size > max_total_wal_size.
1377 Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers);
1378
1379 // num_bytes: for slowdown case, delay time is calculated based on
1380 // `num_bytes` going through.
1381 Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);
1382
1383 Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
1384 WriteBatch* my_batch);
1385
1386 // REQUIRES: mutex locked and in write thread.
1387 Status ScheduleFlushes(WriteContext* context);
1388
1389 void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds);
1390
1391 Status TrimMemtableHistory(WriteContext* context);
1392
1393 Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
1394
1395 void SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData*>* cfds);
1396
1397 // Force current memtable contents to be flushed.
1398 Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
1399 FlushReason flush_reason, bool writes_stopped = false);
1400
1401 Status AtomicFlushMemTables(
1402 const autovector<ColumnFamilyData*>& column_family_datas,
1403 const FlushOptions& options, FlushReason flush_reason,
1404 bool writes_stopped = false);
1405
1406 // Wait until flushing this column family won't stall writes
1407 Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
1408 bool* flush_needed);
1409
1410 // Wait for memtable flushed.
1411 // If flush_memtable_id is non-null, wait until the memtable with the ID
1412 // gets flush. Otherwise, wait until the column family don't have any
1413 // memtable pending flush.
1414 // resuming_from_bg_err indicates whether the caller is attempting to resume
1415 // from background error.
1416 Status WaitForFlushMemTable(ColumnFamilyData* cfd,
1417 const uint64_t* flush_memtable_id = nullptr,
1418 bool resuming_from_bg_err = false) {
1419 return WaitForFlushMemTables({cfd}, {flush_memtable_id},
1420 resuming_from_bg_err);
1421 }
1422 // Wait for memtables to be flushed for multiple column families.
1423 Status WaitForFlushMemTables(
1424 const autovector<ColumnFamilyData*>& cfds,
1425 const autovector<const uint64_t*>& flush_memtable_ids,
1426 bool resuming_from_bg_err);
1427
1428 inline void WaitForPendingWrites() {
1429 mutex_.AssertHeld();
1430 TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock");
1431 // In case of pipelined write is enabled, wait for all pending memtable
1432 // writers.
1433 if (immutable_db_options_.enable_pipelined_write) {
1434 // Memtable writers may call DB::Get in case max_successive_merges > 0,
1435 // which may lock mutex. Unlocking mutex here to avoid deadlock.
1436 mutex_.Unlock();
1437 write_thread_.WaitForMemTableWriters();
1438 mutex_.Lock();
1439 }
1440
1441 if (!immutable_db_options_.unordered_write) {
1442 // Then the writes are finished before the next write group starts
1443 return;
1444 }
1445
1446 // Wait for the ones who already wrote to the WAL to finish their
1447 // memtable write.
1448 if (pending_memtable_writes_.load() != 0) {
1449 std::unique_lock<std::mutex> guard(switch_mutex_);
1450 switch_cv_.wait(guard,
1451 [&] { return pending_memtable_writes_.load() == 0; });
1452 }
1453 }
1454
1455 // REQUIRES: mutex locked and in write thread.
1456 void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
1457
1458 // REQUIRES: mutex locked and in write thread.
1459 Status SwitchWAL(WriteContext* write_context);
1460
1461 // REQUIRES: mutex locked and in write thread.
1462 Status HandleWriteBufferFull(WriteContext* write_context);
1463
1464 // REQUIRES: mutex locked
1465 Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
1466 WriteContext* write_context);
1467
1468 WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
1469 WriteBatch* tmp_batch, size_t* write_with_wal,
1470 WriteBatch** to_be_cached_state);
1471
1472 Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
1473 uint64_t* log_used, uint64_t* log_size);
1474
1475 Status WriteToWAL(const WriteThread::WriteGroup& write_group,
1476 log::Writer* log_writer, uint64_t* log_used,
1477 bool need_log_sync, bool need_log_dir_sync,
1478 SequenceNumber sequence);
1479
1480 Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
1481 uint64_t* log_used, SequenceNumber* last_sequence,
1482 size_t seq_inc);
1483
1484 // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
1485 void WriteStatusCheck(const Status& status);
1486
1487 // Used by WriteImpl to update bg_error_ in case of memtable insert error.
1488 void MemTableInsertStatusCheck(const Status& memtable_insert_status);
1489
1490 #ifndef ROCKSDB_LITE
1491
1492 Status CompactFilesImpl(const CompactionOptions& compact_options,
1493 ColumnFamilyData* cfd, Version* version,
1494 const std::vector<std::string>& input_file_names,
1495 std::vector<std::string>* const output_file_names,
1496 const int output_level, int output_path_id,
1497 JobContext* job_context, LogBuffer* log_buffer,
1498 CompactionJobInfo* compaction_job_info);
1499
1500 // Wait for current IngestExternalFile() calls to finish.
1501 // REQUIRES: mutex_ held
1502 void WaitForIngestFile();
1503
1504 #else
1505 // IngestExternalFile is not supported in ROCKSDB_LITE so this function
1506 // will be no-op
1507 void WaitForIngestFile() {}
1508 #endif // ROCKSDB_LITE
1509
1510 ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
1511
1512 void MaybeScheduleFlushOrCompaction();
1513
1514 // A flush request specifies the column families to flush as well as the
1515 // largest memtable id to persist for each column family. Once all the
1516 // memtables whose IDs are smaller than or equal to this per-column-family
1517 // specified value, this flush request is considered to have completed its
1518 // work of flushing this column family. After completing the work for all
1519 // column families in this request, this flush is considered complete.
1520 typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest;
1521
1522 void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1523 FlushRequest* req);
1524
1525 void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
1526
1527 void SchedulePendingCompaction(ColumnFamilyData* cfd);
1528 void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
1529 FileType type, uint64_t number, int job_id);
1530 static void BGWorkCompaction(void* arg);
1531 // Runs a pre-chosen universal compaction involving bottom level in a
1532 // separate, bottom-pri thread pool.
1533 static void BGWorkBottomCompaction(void* arg);
1534 static void BGWorkFlush(void* arg);
1535 static void BGWorkPurge(void* arg);
1536 static void UnscheduleCompactionCallback(void* arg);
1537 static void UnscheduleFlushCallback(void* arg);
1538 void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
1539 Env::Priority thread_pri);
1540 void BackgroundCallFlush(Env::Priority thread_pri);
1541 void BackgroundCallPurge();
1542 Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
1543 LogBuffer* log_buffer,
1544 PrepickedCompaction* prepicked_compaction,
1545 Env::Priority thread_pri);
1546 Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
1547 LogBuffer* log_buffer, FlushReason* reason,
1548 Env::Priority thread_pri);
1549
1550 bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
1551 const std::vector<CompactionInputFiles>& inputs,
1552 bool* sfm_bookkeeping, LogBuffer* log_buffer);
1553
1554 // Request compaction tasks token from compaction thread limiter.
1555 // It always succeeds if force = true or limiter is disable.
1556 bool RequestCompactionToken(ColumnFamilyData* cfd, bool force,
1557 std::unique_ptr<TaskLimiterToken>* token,
1558 LogBuffer* log_buffer);
1559
1560 // Schedule background tasks
1561 void StartTimedTasks();
1562
1563 void PrintStatistics();
1564
1565 size_t EstimateInMemoryStatsHistorySize() const;
1566
1567 // persist stats to column family "_persistent_stats"
1568 void PersistStats();
1569
1570 // dump rocksdb.stats to LOG
1571 void DumpStats();
1572
1573 // Return the minimum empty level that could hold the total data in the
1574 // input level. Return the input level, if such level could not be found.
1575 int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
1576 const MutableCFOptions& mutable_cf_options,
1577 int level);
1578
1579 // Move the files in the input level to the target level.
1580 // If target_level < 0, automatically calculate the minimum level that could
1581 // hold the data set.
1582 Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1);
1583
1584 // helper functions for adding and removing from flush & compaction queues
1585 void AddToCompactionQueue(ColumnFamilyData* cfd);
1586 ColumnFamilyData* PopFirstFromCompactionQueue();
1587 FlushRequest PopFirstFromFlushQueue();
1588
1589 // Pick the first unthrottled compaction with task token from queue.
1590 ColumnFamilyData* PickCompactionFromQueue(
1591 std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);
1592
1593 // helper function to call after some of the logs_ were synced
1594 void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
1595
1596 SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
1597 bool lock = true);
1598
1599 uint64_t GetMaxTotalWalSize() const;
1600
1601 Directory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
1602
1603 Status CloseHelper();
1604
1605 void WaitForBackgroundWork();
1606
1607 // Background threads call this function, which is just a wrapper around
1608 // the InstallSuperVersion() function. Background threads carry
1609 // sv_context which can have new_superversion already
1610 // allocated.
1611 // All ColumnFamily state changes go through this function. Here we analyze
1612 // the new state and we schedule background work if we detect that the new
1613 // state needs flush or compaction.
1614 void InstallSuperVersionAndScheduleWork(
1615 ColumnFamilyData* cfd, SuperVersionContext* sv_context,
1616 const MutableCFOptions& mutable_cf_options);
1617
1618 bool GetIntPropertyInternal(ColumnFamilyData* cfd,
1619 const DBPropertyInfo& property_info,
1620 bool is_locked, uint64_t* value);
1621 bool GetPropertyHandleOptionsStatistics(std::string* value);
1622
1623 bool HasPendingManualCompaction();
1624 bool HasExclusiveManualCompaction();
1625 void AddManualCompaction(ManualCompactionState* m);
1626 void RemoveManualCompaction(ManualCompactionState* m);
1627 bool ShouldntRunManualCompaction(ManualCompactionState* m);
1628 bool HaveManualCompaction(ColumnFamilyData* cfd);
1629 bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
1630 #ifndef ROCKSDB_LITE
1631 void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c,
1632 const Status& st,
1633 const CompactionJobStats& compaction_job_stats,
1634 const int job_id, const Version* current,
1635 CompactionJobInfo* compaction_job_info) const;
1636 // Reserve the next 'num' file numbers for to-be-ingested external SST files,
1637 // and return the current file_number in 'next_file_number'.
1638 // Write a version edit to the MANIFEST.
1639 Status ReserveFileNumbersBeforeIngestion(
1640 ColumnFamilyData* cfd, uint64_t num,
1641 std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
1642 uint64_t* next_file_number);
1643 #endif //! ROCKSDB_LITE
1644
1645 bool ShouldPurge(uint64_t file_number) const;
1646 void MarkAsGrabbedForPurge(uint64_t file_number);
1647
1648 size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
1649 Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
1650
1651 Status CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1652 size_t preallocate_block_size, log::Writer** new_log);
1653
1654 // Validate self-consistency of DB options
1655 static Status ValidateOptions(const DBOptions& db_options);
1656 // Validate self-consistency of DB options and its consistency with cf options
1657 static Status ValidateOptions(
1658 const DBOptions& db_options,
1659 const std::vector<ColumnFamilyDescriptor>& column_families);
1660
1661 // Utility function to do some debug validation and sort the given vector
1662 // of MultiGet keys
1663 void PrepareMultiGetKeys(
1664 const size_t num_keys, bool sorted,
1665 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs);
1666
1667 // A structure to hold the information required to process MultiGet of keys
1668 // belonging to one column family. For a multi column family MultiGet, there
1669 // will be a container of these objects.
1670 struct MultiGetColumnFamilyData {
1671 ColumnFamilyHandle* cf;
1672 ColumnFamilyData* cfd;
1673
1674 // For the batched MultiGet which relies on sorted keys, start specifies
1675 // the index of first key belonging to this column family in the sorted
1676 // list.
1677 size_t start;
1678
1679 // For the batched MultiGet case, num_keys specifies the number of keys
1680 // belonging to this column family in the sorted list
1681 size_t num_keys;
1682
1683 // SuperVersion for the column family obtained in a manner that ensures a
1684 // consistent view across all column families in the DB
1685 SuperVersion* super_version;
1686 MultiGetColumnFamilyData(ColumnFamilyHandle* column_family,
1687 SuperVersion* sv)
1688 : cf(column_family),
1689 cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
1690 start(0),
1691 num_keys(0),
1692 super_version(sv) {}
1693
1694 MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, size_t first,
1695 size_t count, SuperVersion* sv)
1696 : cf(column_family),
1697 cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
1698 start(first),
1699 num_keys(count),
1700 super_version(sv) {}
1701
1702 MultiGetColumnFamilyData() = default;
1703 };
1704
1705 // A common function to obtain a consistent snapshot, which can be implicit
1706 // if the user doesn't specify a snapshot in read_options, across
1707 // multiple column families for MultiGet. It will attempt to get an implicit
1708 // snapshot without acquiring the db_mutes, but will give up after a few
1709 // tries and acquire the mutex if a memtable flush happens. The template
1710 // allows both the batched and non-batched MultiGet to call this with
1711 // either an std::unordered_map or autovector of column families.
1712 //
1713 // If callback is non-null, the callback is refreshed with the snapshot
1714 // sequence number
1715 //
1716 // A return value of true indicates that the SuperVersions were obtained
1717 // from the ColumnFamilyData, whereas false indicates they are thread
1718 // local
1719 template <class T>
1720 bool MultiCFSnapshot(
1721 const ReadOptions& read_options, ReadCallback* callback,
1722 std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
1723 iter_deref_func,
1724 T* cf_list, SequenceNumber* snapshot);
1725
1726 // The actual implementation of the batching MultiGet. The caller is expected
1727 // to have acquired the SuperVersion and pass in a snapshot sequence number
1728 // in order to construct the LookupKeys. The start_key and num_keys specify
1729 // the range of keys in the sorted_keys vector for a single column family.
1730 void MultiGetImpl(
1731 const ReadOptions& read_options, size_t start_key, size_t num_keys,
1732 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
1733 SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback,
1734 bool* is_blob_index);
1735
1736 // table_cache_ provides its own synchronization
1737 std::shared_ptr<Cache> table_cache_;
1738
1739 // Lock over the persistent DB state. Non-nullptr iff successfully acquired.
1740 FileLock* db_lock_;
1741
1742 // In addition to mutex_, log_write_mutex_ protected writes to stats_history_
1743 InstrumentedMutex stats_history_mutex_;
1744 // In addition to mutex_, log_write_mutex_ protected writes to logs_ and
1745 // logfile_number_. With two_write_queues it also protects alive_log_files_,
1746 // and log_empty_. Refer to the definition of each variable below for more
1747 // details.
1748 // Note: to avoid dealock, if needed to acquire both log_write_mutex_ and
1749 // mutex_, the order should be first mutex_ and then log_write_mutex_.
1750 InstrumentedMutex log_write_mutex_;
1751
1752 std::atomic<bool> shutting_down_;
1753 std::atomic<bool> manual_compaction_paused_;
1754 // This condition variable is signaled on these conditions:
1755 // * whenever bg_compaction_scheduled_ goes down to 0
1756 // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't
1757 // made any progress
1758 // * whenever a compaction made any progress
1759 // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
1760 // (i.e. whenever a flush is done, even if it didn't make any progress)
1761 // * whenever there is an error in background purge, flush or compaction
1762 // * whenever num_running_ingest_file_ goes to 0.
1763 // * whenever pending_purge_obsolete_files_ goes to 0.
1764 // * whenever disable_delete_obsolete_files_ goes to 0.
1765 // * whenever SetOptions successfully updates options.
1766 // * whenever a column family is dropped.
1767 InstrumentedCondVar bg_cv_;
1768 // Writes are protected by locking both mutex_ and log_write_mutex_, and reads
1769 // must be under either mutex_ or log_write_mutex_. Since after ::Open,
1770 // logfile_number_ is currently updated only in write_thread_, it can be read
1771 // from the same write_thread_ without any locks.
1772 uint64_t logfile_number_;
1773 std::deque<uint64_t>
1774 log_recycle_files_; // a list of log files that we can recycle
1775 bool log_dir_synced_;
1776 // Without two_write_queues, read and writes to log_empty_ are protected by
1777 // mutex_. Since it is currently updated/read only in write_thread_, it can be
1778 // accessed from the same write_thread_ without any locks. With
1779 // two_write_queues writes, where it can be updated in different threads,
1780 // read and writes are protected by log_write_mutex_ instead. This is to avoid
1781 // expesnive mutex_ lock during WAL write, which update log_empty_.
1782 bool log_empty_;
1783
1784 ColumnFamilyHandleImpl* persist_stats_cf_handle_;
1785
1786 bool persistent_stats_cfd_exists_ = true;
1787
1788 // Without two_write_queues, read and writes to alive_log_files_ are
1789 // protected by mutex_. However since back() is never popped, and push_back()
1790 // is done only from write_thread_, the same thread can access the item
1791 // reffered by back() without mutex_. With two_write_queues_, writes
1792 // are protected by locking both mutex_ and log_write_mutex_, and reads must
1793 // be under either mutex_ or log_write_mutex_.
1794 std::deque<LogFileNumberSize> alive_log_files_;
1795 // Log files that aren't fully synced, and the current log file.
1796 // Synchronization:
1797 // - push_back() is done from write_thread_ with locked mutex_ and
1798 // log_write_mutex_
1799 // - pop_front() is done from any thread with locked mutex_ and
1800 // log_write_mutex_
1801 // - reads are done with either locked mutex_ or log_write_mutex_
1802 // - back() and items with getting_synced=true are not popped,
1803 // - The same thread that sets getting_synced=true will reset it.
1804 // - it follows that the object referred by back() can be safely read from
1805 // the write_thread_ without using mutex
1806 // - it follows that the items with getting_synced=true can be safely read
1807 // from the same thread that has set getting_synced=true
1808 std::deque<LogWriterNumber> logs_;
1809 // Signaled when getting_synced becomes false for some of the logs_.
1810 InstrumentedCondVar log_sync_cv_;
1811 // This is the app-level state that is written to the WAL but will be used
1812 // only during recovery. Using this feature enables not writing the state to
1813 // memtable on normal writes and hence improving the throughput. Each new
1814 // write of the state will replace the previous state entirely even if the
1815 // keys in the two consecuitive states do not overlap.
1816 // It is protected by log_write_mutex_ when two_write_queues_ is enabled.
1817 // Otherwise only the heaad of write_thread_ can access it.
1818 WriteBatch cached_recoverable_state_;
1819 std::atomic<bool> cached_recoverable_state_empty_ = {true};
1820 std::atomic<uint64_t> total_log_size_;
1821
1822 // If this is non-empty, we need to delete these log files in background
1823 // threads. Protected by db mutex.
1824 autovector<log::Writer*> logs_to_free_;
1825
1826 bool is_snapshot_supported_;
1827
1828 std::map<uint64_t, std::map<std::string, uint64_t>> stats_history_;
1829
1830 std::map<std::string, uint64_t> stats_slice_;
1831
1832 bool stats_slice_initialized_ = false;
1833
1834 Directories directories_;
1835
1836 WriteBufferManager* write_buffer_manager_;
1837
1838 WriteThread write_thread_;
1839 WriteBatch tmp_batch_;
1840 // The write thread when the writers have no memtable write. This will be used
1841 // in 2PC to batch the prepares separately from the serial commit.
1842 WriteThread nonmem_write_thread_;
1843
1844 WriteController write_controller_;
1845
1846 // Size of the last batch group. In slowdown mode, next write needs to
1847 // sleep if it uses up the quota.
1848 // Note: This is to protect memtable and compaction. If the batch only writes
1849 // to the WAL its size need not to be included in this.
1850 uint64_t last_batch_group_size_;
1851
1852 FlushScheduler flush_scheduler_;
1853
1854 TrimHistoryScheduler trim_history_scheduler_;
1855
1856 SnapshotList snapshots_;
1857
1858 // For each background job, pending_outputs_ keeps the current file number at
1859 // the time that background job started.
1860 // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has
1861 // number bigger than any of the file number in pending_outputs_. Since file
1862 // numbers grow monotonically, this also means that pending_outputs_ is always
1863 // sorted. After a background job is done executing, its file number is
1864 // deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean
1865 // it up.
1866 // State is protected with db mutex.
1867 std::list<uint64_t> pending_outputs_;
1868
1869 // flush_queue_ and compaction_queue_ hold column families that we need to
1870 // flush and compact, respectively.
1871 // A column family is inserted into flush_queue_ when it satisfies condition
1872 // cfd->imm()->IsFlushPending()
1873 // A column family is inserted into compaction_queue_ when it satisfied
1874 // condition cfd->NeedsCompaction()
1875 // Column families in this list are all Ref()-erenced
1876 // TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will
1877 // do RAII on ColumnFamilyData
1878 // Column families are in this queue when they need to be flushed or
1879 // compacted. Consumers of these queues are flush and compaction threads. When
1880 // column family is put on this queue, we increase unscheduled_flushes_ and
1881 // unscheduled_compactions_. When these variables are bigger than zero, that
1882 // means we need to schedule background threads for flush and compaction.
1883 // Once the background threads are scheduled, we decrease unscheduled_flushes_
1884 // and unscheduled_compactions_. That way we keep track of number of
1885 // compaction and flush threads we need to schedule. This scheduling is done
1886 // in MaybeScheduleFlushOrCompaction()
1887 // invariant(column family present in flush_queue_ <==>
1888 // ColumnFamilyData::pending_flush_ == true)
1889 std::deque<FlushRequest> flush_queue_;
1890 // invariant(column family present in compaction_queue_ <==>
1891 // ColumnFamilyData::pending_compaction_ == true)
1892 std::deque<ColumnFamilyData*> compaction_queue_;
1893
1894 // A map to store file numbers and filenames of the files to be purged
1895 std::unordered_map<uint64_t, PurgeFileInfo> purge_files_;
1896
1897 // A vector to store the file numbers that have been assigned to certain
1898 // JobContext. Current implementation tracks ssts only.
1899 std::unordered_set<uint64_t> files_grabbed_for_purge_;
1900
1901 // A queue to store log writers to close
1902 std::deque<log::Writer*> logs_to_free_queue_;
1903 std::deque<SuperVersion*> superversions_to_free_queue_;
1904 int unscheduled_flushes_;
1905 int unscheduled_compactions_;
1906
1907 // count how many background compactions are running or have been scheduled in
1908 // the BOTTOM pool
1909 int bg_bottom_compaction_scheduled_;
1910
1911 // count how many background compactions are running or have been scheduled
1912 int bg_compaction_scheduled_;
1913
1914 // stores the number of compactions are currently running
1915 int num_running_compactions_;
1916
1917 // number of background memtable flush jobs, submitted to the HIGH pool
1918 int bg_flush_scheduled_;
1919
1920 // stores the number of flushes are currently running
1921 int num_running_flushes_;
1922
1923 // number of background obsolete file purge jobs, submitted to the HIGH pool
1924 int bg_purge_scheduled_;
1925
1926 std::deque<ManualCompactionState*> manual_compaction_dequeue_;
1927
1928 // shall we disable deletion of obsolete files
1929 // if 0 the deletion is enabled.
1930 // if non-zero, files will not be getting deleted
1931 // This enables two different threads to call
1932 // EnableFileDeletions() and DisableFileDeletions()
1933 // without any synchronization
1934 int disable_delete_obsolete_files_;
1935
1936 // Number of times FindObsoleteFiles has found deletable files and the
1937 // corresponding call to PurgeObsoleteFiles has not yet finished.
1938 int pending_purge_obsolete_files_;
1939
1940 // last time when DeleteObsoleteFiles with full scan was executed. Originally
1941 // initialized with startup time.
1942 uint64_t delete_obsolete_files_last_run_;
1943
1944 // last time stats were dumped to LOG
1945 std::atomic<uint64_t> last_stats_dump_time_microsec_;
1946
1947 // The thread that wants to switch memtable, can wait on this cv until the
1948 // pending writes to memtable finishes.
1949 std::condition_variable switch_cv_;
1950 // The mutex used by switch_cv_. mutex_ should be acquired beforehand.
1951 std::mutex switch_mutex_;
1952 // Number of threads intending to write to memtable
1953 std::atomic<size_t> pending_memtable_writes_ = {};
1954
1955 // Each flush or compaction gets its own job id. this counter makes sure
1956 // they're unique
1957 std::atomic<int> next_job_id_;
1958
1959 // A flag indicating whether the current rocksdb database has any
1960 // data that is not yet persisted into either WAL or SST file.
1961 // Used when disableWAL is true.
1962 std::atomic<bool> has_unpersisted_data_;
1963
1964 // if an attempt was made to flush all column families that
1965 // the oldest log depends on but uncommitted data in the oldest
1966 // log prevents the log from being released.
1967 // We must attempt to free the dependent memtables again
1968 // at a later time after the transaction in the oldest
1969 // log is fully commited.
1970 bool unable_to_release_oldest_log_;
1971
1972 static const int KEEP_LOG_FILE_NUM = 1000;
1973 // MSVC version 1800 still does not have constexpr for ::max()
1974 static const uint64_t kNoTimeOut = port::kMaxUint64;
1975
1976 std::string db_absolute_path_;
1977
1978 // Number of running IngestExternalFile() or CreateColumnFamilyWithImport()
1979 // calls.
1980 // REQUIRES: mutex held
1981 int num_running_ingest_file_;
1982
1983 #ifndef ROCKSDB_LITE
1984 WalManager wal_manager_;
1985 #endif // ROCKSDB_LITE
1986
1987 // Unified interface for logging events
1988 EventLogger event_logger_;
1989
1990 // A value of > 0 temporarily disables scheduling of background work
1991 int bg_work_paused_;
1992
1993 // A value of > 0 temporarily disables scheduling of background compaction
1994 int bg_compaction_paused_;
1995
1996 // Guard against multiple concurrent refitting
1997 bool refitting_level_;
1998
1999 // Indicate DB was opened successfully
2000 bool opened_successfully_;
2001
2002 // The min threshold to triggere bottommost compaction for removing
2003 // garbages, among all column families.
2004 SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2005
2006 LogsWithPrepTracker logs_with_prep_tracker_;
2007
2008 // Callback for compaction to check if a key is visible to a snapshot.
2009 // REQUIRES: mutex held
2010 std::unique_ptr<SnapshotChecker> snapshot_checker_;
2011
2012 // Callback for when the cached_recoverable_state_ is written to memtable
2013 // Only to be set during initialization
2014 std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
2015
2016 // handle for scheduling stats dumping at fixed intervals
2017 // REQUIRES: mutex locked
2018 std::unique_ptr<ROCKSDB_NAMESPACE::RepeatableThread> thread_dump_stats_;
2019
2020 // handle for scheduling stats snapshoting at fixed intervals
2021 // REQUIRES: mutex locked
2022 std::unique_ptr<ROCKSDB_NAMESPACE::RepeatableThread> thread_persist_stats_;
2023
2024 // When set, we use a separate queue for writes that dont write to memtable.
2025 // In 2PC these are the writes at Prepare phase.
2026 const bool two_write_queues_;
2027 const bool manual_wal_flush_;
2028
2029 // LastSequence also indicates last published sequence visibile to the
2030 // readers. Otherwise LastPublishedSequence should be used.
2031 const bool last_seq_same_as_publish_seq_;
2032 // It indicates that a customized gc algorithm must be used for
2033 // flush/compaction and if it is not provided vis SnapshotChecker, we should
2034 // disable gc to be safe.
2035 const bool use_custom_gc_;
2036 // Flag to indicate that the DB instance shutdown has been initiated. This
2037 // different from shutting_down_ atomic in that it is set at the beginning
2038 // of shutdown sequence, specifically in order to prevent any background
2039 // error recovery from going on in parallel. The latter, shutting_down_,
2040 // is set a little later during the shutdown after scheduling memtable
2041 // flushes
2042 std::atomic<bool> shutdown_initiated_;
2043 // Flag to indicate whether sst_file_manager object was allocated in
2044 // DB::Open() or passed to us
2045 bool own_sfm_;
2046
2047 // Clients must periodically call SetPreserveDeletesSequenceNumber()
2048 // to advance this seqnum. Default value is 0 which means ALL deletes are
2049 // preserved. Note that this has no effect if DBOptions.preserve_deletes
2050 // is set to false.
2051 std::atomic<SequenceNumber> preserve_deletes_seqnum_;
2052 const bool preserve_deletes_;
2053
2054 // Flag to check whether Close() has been called on this DB
2055 bool closed_;
2056
2057 ErrorHandler error_handler_;
2058
2059 // Conditional variable to coordinate installation of atomic flush results.
2060 // With atomic flush, each bg thread installs the result of flushing multiple
2061 // column families, and different threads can flush different column
2062 // families. It's difficult to rely on one thread to perform batch
2063 // installation for all threads. This is different from the non-atomic flush
2064 // case.
2065 // atomic_flush_install_cv_ makes sure that threads install atomic flush
2066 // results sequentially. Flush results of memtables with lower IDs get
2067 // installed to MANIFEST first.
2068 InstrumentedCondVar atomic_flush_install_cv_;
2069
2070 bool wal_in_db_path_;
2071 };
2072
2073 extern Options SanitizeOptions(const std::string& db, const Options& src);
2074
2075 extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
2076
2077 extern CompressionType GetCompressionFlush(
2078 const ImmutableCFOptions& ioptions,
2079 const MutableCFOptions& mutable_cf_options);
2080
2081 // Return the earliest log file to keep after the memtable flush is
2082 // finalized.
2083 // `cfd_to_flush` is the column family whose memtable (specified in
2084 // `memtables_to_flush`) will be flushed and thus will not depend on any WAL
2085 // file.
2086 // The function is only applicable to 2pc mode.
2087 extern uint64_t PrecomputeMinLogNumberToKeep(
2088 VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
2089 autovector<VersionEdit*> edit_list,
2090 const autovector<MemTable*>& memtables_to_flush,
2091 LogsWithPrepTracker* prep_tracker);
2092
2093 // `cfd_to_flush` is the column family whose memtable will be flushed and thus
2094 // will not depend on any WAL file. nullptr means no memtable is being flushed.
2095 // The function is only applicable to 2pc mode.
2096 extern uint64_t FindMinPrepLogReferencedByMemTable(
2097 VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
2098 const autovector<MemTable*>& memtables_to_flush);
2099
2100 // Fix user-supplied options to be reasonable
2101 template <class T, class V>
2102 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
2103 if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
2104 if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
2105 }
2106
2107 } // namespace ROCKSDB_NAMESPACE