1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
11 #include <condition_variable>
18 #include <unordered_map>
22 #include "db/db_iter.h"
23 #include "rocksdb/compaction_filter.h"
24 #include "rocksdb/db.h"
25 #include "rocksdb/listener.h"
26 #include "rocksdb/options.h"
27 #include "rocksdb/statistics.h"
28 #include "rocksdb/wal_filter.h"
29 #include "util/mutexlock.h"
30 #include "util/timer_queue.h"
31 #include "utilities/blob_db/blob_db.h"
32 #include "utilities/blob_db/blob_file.h"
33 #include "utilities/blob_db/blob_log_format.h"
34 #include "utilities/blob_db/blob_log_reader.h"
35 #include "utilities/blob_db/blob_log_writer.h"
40 class ColumnFamilyHandle
;
41 class ColumnFamilyData
;
46 struct BlobCompactionContext
;
50 // Comparator to sort "TTL" aware Blob files based on the lower value of
52 struct BlobFileComparatorTTL
{
53 bool operator()(const std::shared_ptr
<BlobFile
>& lhs
,
54 const std::shared_ptr
<BlobFile
>& rhs
) const;
57 struct BlobFileComparator
{
58 bool operator()(const std::shared_ptr
<BlobFile
>& lhs
,
59 const std::shared_ptr
<BlobFile
>& rhs
) const;
63 uint64_t blob_count
= 0;
64 uint64_t num_keys_overwritten
= 0;
65 uint64_t num_keys_expired
= 0;
66 uint64_t num_keys_relocated
= 0;
67 uint64_t bytes_overwritten
= 0;
68 uint64_t bytes_expired
= 0;
69 uint64_t bytes_relocated
= 0;
73 * The implementation class for BlobDB. This manages the value
74 * part in TTL aware sequentially written files. These files are
77 class BlobDBImpl
: public BlobDB
{
78 friend class BlobFile
;
79 friend class BlobDBIterator
;
82 // deletions check period
83 static constexpr uint32_t kDeleteCheckPeriodMillisecs
= 2 * 1000;
85 // gc percentage each check period
86 static constexpr uint32_t kGCFilePercentage
= 100;
89 static constexpr uint32_t kGCCheckPeriodMillisecs
= 60 * 1000;
92 static constexpr uint32_t kSanityCheckPeriodMillisecs
= 20 * 60 * 1000;
94 // how many random access open files can we tolerate
95 static constexpr uint32_t kOpenFilesTrigger
= 100;
97 // how often to schedule reclaim open files.
98 static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs
= 1 * 1000;
100 // how often to schedule delete obs files periods
101 static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs
= 10 * 1000;
103 // how often to schedule expired files eviction.
104 static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs
= 10 * 1000;
106 // when should oldest file be evicted:
107 // on reaching 90% of blob_dir_size
108 static constexpr double kEvictOldestFileAtSize
= 0.9;
111 Status
Put(const WriteOptions
& options
, const Slice
& key
,
112 const Slice
& value
) override
;
115 Status
Get(const ReadOptions
& read_options
, ColumnFamilyHandle
* column_family
,
116 const Slice
& key
, PinnableSlice
* value
) override
;
118 Status
Get(const ReadOptions
& read_options
, ColumnFamilyHandle
* column_family
,
119 const Slice
& key
, PinnableSlice
* value
,
120 uint64_t* expiration
) override
;
122 using BlobDB::NewIterator
;
123 virtual Iterator
* NewIterator(const ReadOptions
& read_options
) override
;
125 using BlobDB::NewIterators
;
126 virtual Status
NewIterators(
127 const ReadOptions
& /*read_options*/,
128 const std::vector
<ColumnFamilyHandle
*>& /*column_families*/,
129 std::vector
<Iterator
*>* /*iterators*/) override
{
130 return Status::NotSupported("Not implemented");
133 using BlobDB::MultiGet
;
134 virtual std::vector
<Status
> MultiGet(
135 const ReadOptions
& read_options
,
136 const std::vector
<Slice
>& keys
,
137 std::vector
<std::string
>* values
) override
;
139 virtual Status
Write(const WriteOptions
& opts
, WriteBatch
* updates
) override
;
141 virtual Status
Close() override
;
143 using BlobDB::PutWithTTL
;
144 Status
PutWithTTL(const WriteOptions
& options
, const Slice
& key
,
145 const Slice
& value
, uint64_t ttl
) override
;
147 using BlobDB::PutUntil
;
148 Status
PutUntil(const WriteOptions
& options
, const Slice
& key
,
149 const Slice
& value
, uint64_t expiration
) override
;
151 BlobDBOptions
GetBlobDBOptions() const override
;
153 BlobDBImpl(const std::string
& dbname
, const BlobDBOptions
& bdb_options
,
154 const DBOptions
& db_options
,
155 const ColumnFamilyOptions
& cf_options
);
157 virtual Status
DisableFileDeletions() override
;
159 virtual Status
EnableFileDeletions(bool force
) override
;
161 virtual Status
GetLiveFiles(std::vector
<std::string
>&,
162 uint64_t* manifest_file_size
,
163 bool flush_memtable
= true) override
;
164 virtual void GetLiveFilesMetaData(std::vector
<LiveFileMetaData
>*) override
;
168 Status
Open(std::vector
<ColumnFamilyHandle
*>* handles
);
170 Status
SyncBlobFiles() override
;
172 void UpdateLiveSSTSize();
174 void GetCompactionContext(BlobCompactionContext
* context
);
177 Status
TEST_GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
178 PinnableSlice
* value
);
180 std::vector
<std::shared_ptr
<BlobFile
>> TEST_GetBlobFiles() const;
182 std::vector
<std::shared_ptr
<BlobFile
>> TEST_GetObsoleteFiles() const;
184 Status
TEST_CloseBlobFile(std::shared_ptr
<BlobFile
>& bfile
);
186 void TEST_ObsoleteBlobFile(std::shared_ptr
<BlobFile
>& blob_file
,
187 SequenceNumber obsolete_seq
= 0,
188 bool update_size
= true);
190 Status
TEST_GCFileAndUpdateLSM(std::shared_ptr
<BlobFile
>& bfile
,
195 void TEST_EvictExpiredFiles();
197 void TEST_DeleteObsoleteFiles();
199 uint64_t TEST_live_sst_size();
201 const std::string
& TEST_blob_dir() const { return blob_dir_
; }
205 class GarbageCollectionWriteCallback
;
208 // Create a snapshot if there isn't one in read options.
209 // Return true if a snapshot is created.
210 bool SetSnapshotIfNeeded(ReadOptions
* read_options
);
212 Status
GetImpl(const ReadOptions
& read_options
,
213 ColumnFamilyHandle
* column_family
, const Slice
& key
,
214 PinnableSlice
* value
, uint64_t* expiration
= nullptr);
216 Status
GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
217 PinnableSlice
* value
, uint64_t* expiration
= nullptr);
219 Slice
GetCompressedSlice(const Slice
& raw
,
220 std::string
* compression_output
) const;
222 // Close a file by appending a footer, and removes file from open files list.
223 Status
CloseBlobFile(std::shared_ptr
<BlobFile
> bfile
, bool need_lock
= true);
225 // Close a file if its size exceeds blob_file_size
226 Status
CloseBlobFileIfNeeded(std::shared_ptr
<BlobFile
>& bfile
);
228 // Mark file as obsolete and move the file to obsolete file list.
230 // REQUIRED: hold write lock of mutex_ or during DB open.
231 void ObsoleteBlobFile(std::shared_ptr
<BlobFile
> blob_file
,
232 SequenceNumber obsolete_seq
, bool update_size
);
234 Status
PutBlobValue(const WriteOptions
& options
, const Slice
& key
,
235 const Slice
& value
, uint64_t expiration
,
238 Status
AppendBlob(const std::shared_ptr
<BlobFile
>& bfile
,
239 const std::string
& headerbuf
, const Slice
& key
,
240 const Slice
& value
, uint64_t expiration
,
241 std::string
* index_entry
);
243 // find an existing blob log file based on the expiration unix epoch
244 // if such a file does not exist, return nullptr
245 Status
SelectBlobFileTTL(uint64_t expiration
,
246 std::shared_ptr
<BlobFile
>* blob_file
);
248 // find an existing blob log file to append the value to
249 Status
SelectBlobFile(std::shared_ptr
<BlobFile
>* blob_file
);
251 std::shared_ptr
<BlobFile
> FindBlobFileLocked(uint64_t expiration
) const;
253 // periodic sanity check. Bunch of checks
254 std::pair
<bool, int64_t> SanityCheck(bool aborted
);
256 // delete files which have been garbage collected and marked
257 // obsolete. Check whether any snapshots exist which refer to
259 std::pair
<bool, int64_t> DeleteObsoleteFiles(bool aborted
);
261 // Major task to garbage collect expired and deleted blobs
262 std::pair
<bool, int64_t> RunGC(bool aborted
);
264 // periodically check if open blob files and their TTL's has expired
265 // if expired, close the sequential writer and make the file immutable
266 std::pair
<bool, int64_t> EvictExpiredFiles(bool aborted
);
268 // if the number of open files, approaches ULIMIT's this
269 // task will close random readers, which are kept around for
271 std::pair
<bool, int64_t> ReclaimOpenFiles(bool aborted
);
273 std::pair
<bool, int64_t> RemoveTimerQ(TimerQueue
* tq
, bool aborted
);
275 // Adds the background tasks to the timer queue
276 void StartBackgroundTasks();
278 // add a new Blob File
279 std::shared_ptr
<BlobFile
> NewBlobFile(const std::string
& reason
);
281 // collect all the blob log files from the blob directory
282 Status
GetAllBlobFiles(std::set
<uint64_t>* file_numbers
);
284 // Open all blob files found in blob_dir.
285 Status
OpenAllBlobFiles();
287 Status
GetBlobFileReader(const std::shared_ptr
<BlobFile
>& blob_file
,
288 std::shared_ptr
<RandomAccessFileReader
>* reader
);
290 // hold write mutex on file and call.
291 // Close the above Random Access reader
292 void CloseRandomAccessLocked(const std::shared_ptr
<BlobFile
>& bfile
);
294 // hold write mutex on file and call
295 // creates a sequential (append) writer for this blobfile
296 Status
CreateWriterLocked(const std::shared_ptr
<BlobFile
>& bfile
);
298 // returns a Writer object for the file. If writer is not
299 // already present, creates one. Needs Write Mutex to be held
300 Status
CheckOrCreateWriterLocked(const std::shared_ptr
<BlobFile
>& blob_file
,
301 std::shared_ptr
<Writer
>* writer
);
303 // Iterate through keys and values on Blob and write into
304 // separate file the remaining blobs and delete/update pointers
306 Status
GCFileAndUpdateLSM(const std::shared_ptr
<BlobFile
>& bfptr
,
309 // checks if there is no snapshot which is referencing the
311 bool VisibleToActiveSnapshot(const std::shared_ptr
<BlobFile
>& file
);
312 bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr
<BlobFile
>& bfile
);
314 void CopyBlobFiles(std::vector
<std::shared_ptr
<BlobFile
>>* bfiles_copy
);
316 uint64_t EpochNow() { return env_
->NowMicros() / 1000000; }
318 // Check if inserting a new blob will make DB grow out of space.
319 // If is_fifo = true, FIFO eviction will be triggered to make room for the
320 // new blob. If force_evict = true, FIFO eviction will evict blob files
321 // even eviction will not make enough room for the new blob.
322 Status
CheckSizeAndEvictBlobFiles(uint64_t blob_size
,
323 bool force_evict
= false);
325 // name of the database directory
332 // the options that govern the behavior of Blob Storage
333 BlobDBOptions bdb_options_
;
334 DBOptions db_options_
;
335 ColumnFamilyOptions cf_options_
;
336 EnvOptions env_options_
;
338 // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
340 Statistics
* statistics_
;
342 // by default this is "blob_dir" under dbname_
343 // but can be configured
344 std::string blob_dir_
;
346 // pointer to directory
347 std::unique_ptr
<Directory
> dir_ent_
;
349 // Read Write Mutex, which protects all the data structures
350 // HEAVILY TRAFFICKED
351 mutable port::RWMutex mutex_
;
353 // Writers has to hold write_mutex_ before writing.
354 mutable port::Mutex write_mutex_
;
356 // counter for blob file number
357 std::atomic
<uint64_t> next_file_number_
;
359 // entire metadata of all the BLOB files memory
360 std::map
<uint64_t, std::shared_ptr
<BlobFile
>> blob_files_
;
362 // epoch or version of the open files.
363 std::atomic
<uint64_t> epoch_of_
;
365 // opened non-TTL blob file.
366 std::shared_ptr
<BlobFile
> open_non_ttl_file_
;
368 // all the blob files which are currently being appended to based
369 // on variety of incoming TTL's
370 std::set
<std::shared_ptr
<BlobFile
>, BlobFileComparatorTTL
> open_ttl_files_
;
372 // Flag to check whether Close() has been called on this DB
375 // timer based queue to execute tasks
378 // number of files opened for random access/GET
379 // counter is used to monitor and close excess RA files.
380 std::atomic
<uint32_t> open_file_count_
;
382 // Total size of all live blob files (i.e. exclude obsolete files).
383 std::atomic
<uint64_t> total_blob_size_
;
385 // total size of SST files.
386 std::atomic
<uint64_t> live_sst_size_
;
388 // Latest FIFO eviction timestamp
390 // REQUIRES: access with metex_ lock held.
391 uint64_t fifo_eviction_seq_
;
393 // The expiration up to which latest FIFO eviction evicts.
395 // REQUIRES: access with metex_ lock held.
396 uint64_t evict_expiration_up_to_
;
398 std::list
<std::shared_ptr
<BlobFile
>> obsolete_files_
;
400 // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
401 // on the mutex to avoid contention.
403 // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
404 // the difference. mutex_ only needs to be held when access the
405 // data-structure, and delete_file_mutex_ needs to be held the whole time
406 // during DeleteObsoleteFiles to avoid being run simultaneously with
407 // DisableFileDeletions.
409 // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
410 // to hold delete_file_mutex_ first to avoid deadlock.
411 mutable port::Mutex delete_file_mutex_
;
413 // Each call of DisableFileDeletions will increase disable_file_deletion_
414 // by 1. EnableFileDeletions will either decrease the count by 1 or reset
415 // it to zeor, depending on the force flag.
417 // REQUIRES: access with delete_file_mutex_ held.
418 int disable_file_deletions_
= 0;
420 uint32_t debug_level_
;
423 } // namespace blob_db
424 } // namespace rocksdb
425 #endif // ROCKSDB_LITE