1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
11 #include <condition_variable>
18 #include <unordered_map>
22 #include "db/db_iter.h"
23 #include "rocksdb/compaction_filter.h"
24 #include "rocksdb/db.h"
25 #include "rocksdb/listener.h"
26 #include "rocksdb/options.h"
27 #include "rocksdb/statistics.h"
28 #include "rocksdb/wal_filter.h"
29 #include "util/mutexlock.h"
30 #include "util/timer_queue.h"
31 #include "utilities/blob_db/blob_db.h"
32 #include "utilities/blob_db/blob_file.h"
33 #include "utilities/blob_db/blob_log_format.h"
34 #include "utilities/blob_db/blob_log_reader.h"
35 #include "utilities/blob_db/blob_log_writer.h"
40 class ColumnFamilyHandle
;
41 class ColumnFamilyData
;
46 struct BlobCompactionContext
;
50 // this implements the callback from the WAL which ensures that the
51 // blob record is present in the blob log. If fsync/fdatasync in not
52 // happening on every write, there is the probability that keys in the
53 // blob log can lag the keys in blobs
54 // TODO(yiwu): implement the WAL filter.
55 class BlobReconcileWalFilter
: public WalFilter
{
57 virtual WalFilter::WalProcessingOption
LogRecordFound(
58 unsigned long long log_number
, const std::string
& log_file_name
,
59 const WriteBatch
& batch
, WriteBatch
* new_batch
,
60 bool* batch_changed
) override
;
62 virtual const char* Name() const override
{ return "BlobDBWalReconciler"; }
65 // Comparator to sort "TTL" aware Blob files based on the lower value of
67 struct BlobFileComparatorTTL
{
68 bool operator()(const std::shared_ptr
<BlobFile
>& lhs
,
69 const std::shared_ptr
<BlobFile
>& rhs
) const;
72 struct BlobFileComparator
{
73 bool operator()(const std::shared_ptr
<BlobFile
>& lhs
,
74 const std::shared_ptr
<BlobFile
>& rhs
) const;
78 uint64_t blob_count
= 0;
79 uint64_t num_keys_overwritten
= 0;
80 uint64_t num_keys_expired
= 0;
81 uint64_t num_keys_relocated
= 0;
82 uint64_t bytes_overwritten
= 0;
83 uint64_t bytes_expired
= 0;
84 uint64_t bytes_relocated
= 0;
88 * The implementation class for BlobDB. This manages the value
89 * part in TTL aware sequentially written files. These files are
92 class BlobDBImpl
: public BlobDB
{
93 friend class BlobFile
;
94 friend class BlobDBIterator
;
97 // deletions check period
98 static constexpr uint32_t kDeleteCheckPeriodMillisecs
= 2 * 1000;
100 // gc percentage each check period
101 static constexpr uint32_t kGCFilePercentage
= 100;
104 static constexpr uint32_t kGCCheckPeriodMillisecs
= 60 * 1000;
107 static constexpr uint32_t kSanityCheckPeriodMillisecs
= 20 * 60 * 1000;
109 // how many random access open files can we tolerate
110 static constexpr uint32_t kOpenFilesTrigger
= 100;
112 // how often to schedule reclaim open files.
113 static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs
= 1 * 1000;
115 // how often to schedule delete obs files periods
116 static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs
= 10 * 1000;
118 // how often to schedule expired files eviction.
119 static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs
= 10 * 1000;
121 // when should oldest file be evicted:
122 // on reaching 90% of blob_dir_size
123 static constexpr double kEvictOldestFileAtSize
= 0.9;
126 Status
Put(const WriteOptions
& options
, const Slice
& key
,
127 const Slice
& value
) override
;
130 Status
Get(const ReadOptions
& read_options
, ColumnFamilyHandle
* column_family
,
131 const Slice
& key
, PinnableSlice
* value
) override
;
133 Status
Get(const ReadOptions
& read_options
, ColumnFamilyHandle
* column_family
,
134 const Slice
& key
, PinnableSlice
* value
,
135 uint64_t* expiration
) override
;
137 using BlobDB::NewIterator
;
138 virtual Iterator
* NewIterator(const ReadOptions
& read_options
) override
;
140 using BlobDB::NewIterators
;
141 virtual Status
NewIterators(
142 const ReadOptions
& /*read_options*/,
143 const std::vector
<ColumnFamilyHandle
*>& /*column_families*/,
144 std::vector
<Iterator
*>* /*iterators*/) override
{
145 return Status::NotSupported("Not implemented");
148 using BlobDB::MultiGet
;
149 virtual std::vector
<Status
> MultiGet(
150 const ReadOptions
& read_options
,
151 const std::vector
<Slice
>& keys
,
152 std::vector
<std::string
>* values
) override
;
154 virtual Status
Write(const WriteOptions
& opts
, WriteBatch
* updates
) override
;
156 virtual Status
Close() override
;
158 using BlobDB::PutWithTTL
;
159 Status
PutWithTTL(const WriteOptions
& options
, const Slice
& key
,
160 const Slice
& value
, uint64_t ttl
) override
;
162 using BlobDB::PutUntil
;
163 Status
PutUntil(const WriteOptions
& options
, const Slice
& key
,
164 const Slice
& value
, uint64_t expiration
) override
;
166 BlobDBOptions
GetBlobDBOptions() const override
;
168 BlobDBImpl(const std::string
& dbname
, const BlobDBOptions
& bdb_options
,
169 const DBOptions
& db_options
,
170 const ColumnFamilyOptions
& cf_options
);
172 virtual Status
DisableFileDeletions() override
;
174 virtual Status
EnableFileDeletions(bool force
) override
;
176 virtual Status
GetLiveFiles(std::vector
<std::string
>&,
177 uint64_t* manifest_file_size
,
178 bool flush_memtable
= true) override
;
179 virtual void GetLiveFilesMetaData(std::vector
<LiveFileMetaData
>*) override
;
183 Status
Open(std::vector
<ColumnFamilyHandle
*>* handles
);
185 Status
SyncBlobFiles() override
;
187 void UpdateLiveSSTSize();
189 void GetCompactionContext(BlobCompactionContext
* context
);
192 Status
TEST_GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
193 PinnableSlice
* value
);
195 std::vector
<std::shared_ptr
<BlobFile
>> TEST_GetBlobFiles() const;
197 std::vector
<std::shared_ptr
<BlobFile
>> TEST_GetObsoleteFiles() const;
199 Status
TEST_CloseBlobFile(std::shared_ptr
<BlobFile
>& bfile
);
201 void TEST_ObsoleteBlobFile(std::shared_ptr
<BlobFile
>& blob_file
,
202 SequenceNumber obsolete_seq
= 0,
203 bool update_size
= true);
205 Status
TEST_GCFileAndUpdateLSM(std::shared_ptr
<BlobFile
>& bfile
,
210 void TEST_EvictExpiredFiles();
212 void TEST_DeleteObsoleteFiles();
214 uint64_t TEST_live_sst_size();
218 class GarbageCollectionWriteCallback
;
221 // Create a snapshot if there isn't one in read options.
222 // Return true if a snapshot is created.
223 bool SetSnapshotIfNeeded(ReadOptions
* read_options
);
225 Status
GetImpl(const ReadOptions
& read_options
,
226 ColumnFamilyHandle
* column_family
, const Slice
& key
,
227 PinnableSlice
* value
, uint64_t* expiration
= nullptr);
229 Status
GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
230 PinnableSlice
* value
, uint64_t* expiration
= nullptr);
232 Slice
GetCompressedSlice(const Slice
& raw
,
233 std::string
* compression_output
) const;
235 // Close a file by appending a footer, and removes file from open files list.
236 Status
CloseBlobFile(std::shared_ptr
<BlobFile
> bfile
, bool need_lock
= true);
238 // Close a file if its size exceeds blob_file_size
239 Status
CloseBlobFileIfNeeded(std::shared_ptr
<BlobFile
>& bfile
);
241 // Mark file as obsolete and move the file to obsolete file list.
243 // REQUIRED: hold write lock of mutex_ or during DB open.
244 void ObsoleteBlobFile(std::shared_ptr
<BlobFile
> blob_file
,
245 SequenceNumber obsolete_seq
, bool update_size
);
247 Status
PutBlobValue(const WriteOptions
& options
, const Slice
& key
,
248 const Slice
& value
, uint64_t expiration
,
251 Status
AppendBlob(const std::shared_ptr
<BlobFile
>& bfile
,
252 const std::string
& headerbuf
, const Slice
& key
,
253 const Slice
& value
, uint64_t expiration
,
254 std::string
* index_entry
);
256 // find an existing blob log file based on the expiration unix epoch
257 // if such a file does not exist, return nullptr
258 std::shared_ptr
<BlobFile
> SelectBlobFileTTL(uint64_t expiration
);
260 // find an existing blob log file to append the value to
261 std::shared_ptr
<BlobFile
> SelectBlobFile();
263 std::shared_ptr
<BlobFile
> FindBlobFileLocked(uint64_t expiration
) const;
265 // periodic sanity check. Bunch of checks
266 std::pair
<bool, int64_t> SanityCheck(bool aborted
);
268 // delete files which have been garbage collected and marked
269 // obsolete. Check whether any snapshots exist which refer to
271 std::pair
<bool, int64_t> DeleteObsoleteFiles(bool aborted
);
273 // Major task to garbage collect expired and deleted blobs
274 std::pair
<bool, int64_t> RunGC(bool aborted
);
276 // periodically check if open blob files and their TTL's has expired
277 // if expired, close the sequential writer and make the file immutable
278 std::pair
<bool, int64_t> EvictExpiredFiles(bool aborted
);
280 // if the number of open files, approaches ULIMIT's this
281 // task will close random readers, which are kept around for
283 std::pair
<bool, int64_t> ReclaimOpenFiles(bool aborted
);
285 std::pair
<bool, int64_t> RemoveTimerQ(TimerQueue
* tq
, bool aborted
);
287 // Adds the background tasks to the timer queue
288 void StartBackgroundTasks();
290 // add a new Blob File
291 std::shared_ptr
<BlobFile
> NewBlobFile(const std::string
& reason
);
293 // collect all the blob log files from the blob directory
294 Status
GetAllBlobFiles(std::set
<uint64_t>* file_numbers
);
296 // Open all blob files found in blob_dir.
297 Status
OpenAllBlobFiles();
299 Status
GetBlobFileReader(const std::shared_ptr
<BlobFile
>& blob_file
,
300 std::shared_ptr
<RandomAccessFileReader
>* reader
);
302 // hold write mutex on file and call.
303 // Close the above Random Access reader
304 void CloseRandomAccessLocked(const std::shared_ptr
<BlobFile
>& bfile
);
306 // hold write mutex on file and call
307 // creates a sequential (append) writer for this blobfile
308 Status
CreateWriterLocked(const std::shared_ptr
<BlobFile
>& bfile
);
310 // returns a Writer object for the file. If writer is not
311 // already present, creates one. Needs Write Mutex to be held
312 std::shared_ptr
<Writer
> CheckOrCreateWriterLocked(
313 const std::shared_ptr
<BlobFile
>& bfile
);
315 // Iterate through keys and values on Blob and write into
316 // separate file the remaining blobs and delete/update pointers
318 Status
GCFileAndUpdateLSM(const std::shared_ptr
<BlobFile
>& bfptr
,
321 // checks if there is no snapshot which is referencing the
323 bool VisibleToActiveSnapshot(const std::shared_ptr
<BlobFile
>& file
);
324 bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr
<BlobFile
>& bfile
);
326 void CopyBlobFiles(std::vector
<std::shared_ptr
<BlobFile
>>* bfiles_copy
);
328 uint64_t EpochNow() { return env_
->NowMicros() / 1000000; }
330 // Check if inserting a new blob will make DB grow out of space.
331 // If is_fifo = true, FIFO eviction will be triggered to make room for the
332 // new blob. If force_evict = true, FIFO eviction will evict blob files
333 // even eviction will not make enough room for the new blob.
334 Status
CheckSizeAndEvictBlobFiles(uint64_t blob_size
,
335 bool force_evict
= false);
337 // name of the database directory
344 // the options that govern the behavior of Blob Storage
345 BlobDBOptions bdb_options_
;
346 DBOptions db_options_
;
347 ColumnFamilyOptions cf_options_
;
348 EnvOptions env_options_
;
350 // Raw pointer of statistic. db_options_ has a shared_ptr to hold ownership.
351 Statistics
* statistics_
;
353 // by default this is "blob_dir" under dbname_
354 // but can be configured
355 std::string blob_dir_
;
357 // pointer to directory
358 std::unique_ptr
<Directory
> dir_ent_
;
360 // Read Write Mutex, which protects all the data structures
361 // HEAVILY TRAFFICKED
362 mutable port::RWMutex mutex_
;
364 // Writers has to hold write_mutex_ before writing.
365 mutable port::Mutex write_mutex_
;
367 // counter for blob file number
368 std::atomic
<uint64_t> next_file_number_
;
370 // entire metadata of all the BLOB files memory
371 std::map
<uint64_t, std::shared_ptr
<BlobFile
>> blob_files_
;
373 // epoch or version of the open files.
374 std::atomic
<uint64_t> epoch_of_
;
376 // opened non-TTL blob file.
377 std::shared_ptr
<BlobFile
> open_non_ttl_file_
;
379 // all the blob files which are currently being appended to based
380 // on variety of incoming TTL's
381 std::set
<std::shared_ptr
<BlobFile
>, BlobFileComparatorTTL
> open_ttl_files_
;
383 // Flag to check whether Close() has been called on this DB
386 // timer based queue to execute tasks
389 // number of files opened for random access/GET
390 // counter is used to monitor and close excess RA files.
391 std::atomic
<uint32_t> open_file_count_
;
393 // Total size of all live blob files (i.e. exclude obsolete files).
394 std::atomic
<uint64_t> total_blob_size_
;
396 // total size of SST files.
397 std::atomic
<uint64_t> live_sst_size_
;
399 // Latest FIFO eviction timestamp
401 // REQUIRES: access with metex_ lock held.
402 uint64_t fifo_eviction_seq_
;
404 // The expiration up to which latest FIFO eviction evicts.
406 // REQUIRES: access with metex_ lock held.
407 uint64_t evict_expiration_up_to_
;
409 std::list
<std::shared_ptr
<BlobFile
>> obsolete_files_
;
411 // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
412 // on the mutex to avoid contention.
414 // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
415 // the difference. mutex_ only needs to be held when access the
416 // data-structure, and delete_file_mutex_ needs to be held the whole time
417 // during DeleteObsoleteFiles to avoid being run simultaneously with
418 // DisableFileDeletions.
420 // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
421 // to hold delete_file_mutex_ first to avoid deadlock.
422 mutable port::Mutex delete_file_mutex_
;
424 // Each call of DisableFileDeletions will increase disable_file_deletion_
425 // by 1. EnableFileDeletions will either decrease the count by 1 or reset
426 // it to zeor, depending on the force flag.
428 // REQUIRES: access with delete_file_mutex_ held.
429 int disable_file_deletions_
= 0;
431 uint32_t debug_level_
;
434 } // namespace blob_db
435 } // namespace rocksdb
436 #endif // ROCKSDB_LITE