]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/blob_db/blob_db_impl.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_db_impl.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #pragma once
7
8 #ifndef ROCKSDB_LITE
9
10 #include <atomic>
11 #include <condition_variable>
12 #include <limits>
13 #include <list>
14 #include <memory>
15 #include <set>
16 #include <string>
17 #include <thread>
18 #include <unordered_map>
19 #include <utility>
20 #include <vector>
21
22 #include "db/db_iter.h"
23 #include "rocksdb/compaction_filter.h"
24 #include "rocksdb/db.h"
25 #include "rocksdb/listener.h"
26 #include "rocksdb/options.h"
27 #include "rocksdb/statistics.h"
28 #include "rocksdb/wal_filter.h"
29 #include "util/mutexlock.h"
30 #include "util/timer_queue.h"
31 #include "utilities/blob_db/blob_db.h"
32 #include "utilities/blob_db/blob_file.h"
33 #include "utilities/blob_db/blob_log_format.h"
34 #include "utilities/blob_db/blob_log_reader.h"
35 #include "utilities/blob_db/blob_log_writer.h"
36
37 namespace rocksdb {
38
39 class DBImpl;
40 class ColumnFamilyHandle;
41 class ColumnFamilyData;
42 struct FlushJobInfo;
43
44 namespace blob_db {
45
46 struct BlobCompactionContext;
47 class BlobDBImpl;
48 class BlobFile;
49
50 // this implements the callback from the WAL which ensures that the
51 // blob record is present in the blob log. If fsync/fdatasync in not
52 // happening on every write, there is the probability that keys in the
53 // blob log can lag the keys in blobs
54 // TODO(yiwu): implement the WAL filter.
55 class BlobReconcileWalFilter : public WalFilter {
56 public:
57 virtual WalFilter::WalProcessingOption LogRecordFound(
58 unsigned long long log_number, const std::string& log_file_name,
59 const WriteBatch& batch, WriteBatch* new_batch,
60 bool* batch_changed) override;
61
62 virtual const char* Name() const override { return "BlobDBWalReconciler"; }
63 };
64
65 // Comparator to sort "TTL" aware Blob files based on the lower value of
66 // TTL range.
67 struct BlobFileComparatorTTL {
68 bool operator()(const std::shared_ptr<BlobFile>& lhs,
69 const std::shared_ptr<BlobFile>& rhs) const;
70 };
71
72 struct BlobFileComparator {
73 bool operator()(const std::shared_ptr<BlobFile>& lhs,
74 const std::shared_ptr<BlobFile>& rhs) const;
75 };
76
77 struct GCStats {
78 uint64_t blob_count = 0;
79 uint64_t num_keys_overwritten = 0;
80 uint64_t num_keys_expired = 0;
81 uint64_t num_keys_relocated = 0;
82 uint64_t bytes_overwritten = 0;
83 uint64_t bytes_expired = 0;
84 uint64_t bytes_relocated = 0;
85 };
86
87 /**
88 * The implementation class for BlobDB. This manages the value
89 * part in TTL aware sequentially written files. These files are
90 * Garbage Collected.
91 */
92 class BlobDBImpl : public BlobDB {
93 friend class BlobFile;
94 friend class BlobDBIterator;
95
96 public:
97 // deletions check period
98 static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
99
100 // gc percentage each check period
101 static constexpr uint32_t kGCFilePercentage = 100;
102
103 // gc period
104 static constexpr uint32_t kGCCheckPeriodMillisecs = 60 * 1000;
105
106 // sanity check task
107 static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
108
109 // how many random access open files can we tolerate
110 static constexpr uint32_t kOpenFilesTrigger = 100;
111
112 // how often to schedule reclaim open files.
113 static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
114
115 // how often to schedule delete obs files periods
116 static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
117
118 // how often to schedule expired files eviction.
119 static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
120
121 // when should oldest file be evicted:
122 // on reaching 90% of blob_dir_size
123 static constexpr double kEvictOldestFileAtSize = 0.9;
124
125 using BlobDB::Put;
126 Status Put(const WriteOptions& options, const Slice& key,
127 const Slice& value) override;
128
129 using BlobDB::Get;
130 Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
131 const Slice& key, PinnableSlice* value) override;
132
133 Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
134 const Slice& key, PinnableSlice* value,
135 uint64_t* expiration) override;
136
137 using BlobDB::NewIterator;
138 virtual Iterator* NewIterator(const ReadOptions& read_options) override;
139
140 using BlobDB::NewIterators;
141 virtual Status NewIterators(
142 const ReadOptions& /*read_options*/,
143 const std::vector<ColumnFamilyHandle*>& /*column_families*/,
144 std::vector<Iterator*>* /*iterators*/) override {
145 return Status::NotSupported("Not implemented");
146 }
147
148 using BlobDB::MultiGet;
149 virtual std::vector<Status> MultiGet(
150 const ReadOptions& read_options,
151 const std::vector<Slice>& keys,
152 std::vector<std::string>* values) override;
153
154 virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
155
156 virtual Status Close() override;
157
158 using BlobDB::PutWithTTL;
159 Status PutWithTTL(const WriteOptions& options, const Slice& key,
160 const Slice& value, uint64_t ttl) override;
161
162 using BlobDB::PutUntil;
163 Status PutUntil(const WriteOptions& options, const Slice& key,
164 const Slice& value, uint64_t expiration) override;
165
166 BlobDBOptions GetBlobDBOptions() const override;
167
168 BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
169 const DBOptions& db_options,
170 const ColumnFamilyOptions& cf_options);
171
172 virtual Status DisableFileDeletions() override;
173
174 virtual Status EnableFileDeletions(bool force) override;
175
176 virtual Status GetLiveFiles(std::vector<std::string>&,
177 uint64_t* manifest_file_size,
178 bool flush_memtable = true) override;
179 virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
180
181 ~BlobDBImpl();
182
183 Status Open(std::vector<ColumnFamilyHandle*>* handles);
184
185 Status SyncBlobFiles() override;
186
187 void UpdateLiveSSTSize();
188
189 void GetCompactionContext(BlobCompactionContext* context);
190
191 #ifndef NDEBUG
192 Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
193 PinnableSlice* value);
194
195 std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
196
197 std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
198
199 Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
200
201 void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
202 SequenceNumber obsolete_seq = 0,
203 bool update_size = true);
204
205 Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
206 GCStats* gc_stats);
207
208 void TEST_RunGC();
209
210 void TEST_EvictExpiredFiles();
211
212 void TEST_DeleteObsoleteFiles();
213
214 uint64_t TEST_live_sst_size();
215 #endif // !NDEBUG
216
217 private:
218 class GarbageCollectionWriteCallback;
219 class BlobInserter;
220
221 // Create a snapshot if there isn't one in read options.
222 // Return true if a snapshot is created.
223 bool SetSnapshotIfNeeded(ReadOptions* read_options);
224
225 Status GetImpl(const ReadOptions& read_options,
226 ColumnFamilyHandle* column_family, const Slice& key,
227 PinnableSlice* value, uint64_t* expiration = nullptr);
228
229 Status GetBlobValue(const Slice& key, const Slice& index_entry,
230 PinnableSlice* value, uint64_t* expiration = nullptr);
231
232 Slice GetCompressedSlice(const Slice& raw,
233 std::string* compression_output) const;
234
235 // Close a file by appending a footer, and removes file from open files list.
236 Status CloseBlobFile(std::shared_ptr<BlobFile> bfile, bool need_lock = true);
237
238 // Close a file if its size exceeds blob_file_size
239 Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
240
241 // Mark file as obsolete and move the file to obsolete file list.
242 //
243 // REQUIRED: hold write lock of mutex_ or during DB open.
244 void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
245 SequenceNumber obsolete_seq, bool update_size);
246
247 Status PutBlobValue(const WriteOptions& options, const Slice& key,
248 const Slice& value, uint64_t expiration,
249 WriteBatch* batch);
250
251 Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
252 const std::string& headerbuf, const Slice& key,
253 const Slice& value, uint64_t expiration,
254 std::string* index_entry);
255
256 // find an existing blob log file based on the expiration unix epoch
257 // if such a file does not exist, return nullptr
258 std::shared_ptr<BlobFile> SelectBlobFileTTL(uint64_t expiration);
259
260 // find an existing blob log file to append the value to
261 std::shared_ptr<BlobFile> SelectBlobFile();
262
263 std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
264
265 // periodic sanity check. Bunch of checks
266 std::pair<bool, int64_t> SanityCheck(bool aborted);
267
268 // delete files which have been garbage collected and marked
269 // obsolete. Check whether any snapshots exist which refer to
270 // the same
271 std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
272
273 // Major task to garbage collect expired and deleted blobs
274 std::pair<bool, int64_t> RunGC(bool aborted);
275
276 // periodically check if open blob files and their TTL's has expired
277 // if expired, close the sequential writer and make the file immutable
278 std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
279
280 // if the number of open files, approaches ULIMIT's this
281 // task will close random readers, which are kept around for
282 // efficiency
283 std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
284
285 std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
286
287 // Adds the background tasks to the timer queue
288 void StartBackgroundTasks();
289
290 // add a new Blob File
291 std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason);
292
293 // collect all the blob log files from the blob directory
294 Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
295
296 // Open all blob files found in blob_dir.
297 Status OpenAllBlobFiles();
298
299 Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
300 std::shared_ptr<RandomAccessFileReader>* reader);
301
302 // hold write mutex on file and call.
303 // Close the above Random Access reader
304 void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
305
306 // hold write mutex on file and call
307 // creates a sequential (append) writer for this blobfile
308 Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
309
310 // returns a Writer object for the file. If writer is not
311 // already present, creates one. Needs Write Mutex to be held
312 std::shared_ptr<Writer> CheckOrCreateWriterLocked(
313 const std::shared_ptr<BlobFile>& bfile);
314
315 // Iterate through keys and values on Blob and write into
316 // separate file the remaining blobs and delete/update pointers
317 // in LSM atomically
318 Status GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
319 GCStats* gcstats);
320
321 // checks if there is no snapshot which is referencing the
322 // blobs
323 bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
324 bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
325
326 void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
327
328 uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
329
330 // Check if inserting a new blob will make DB grow out of space.
331 // If is_fifo = true, FIFO eviction will be triggered to make room for the
332 // new blob. If force_evict = true, FIFO eviction will evict blob files
333 // even eviction will not make enough room for the new blob.
334 Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
335 bool force_evict = false);
336
337 // name of the database directory
338 std::string dbname_;
339
340 // the base DB
341 DBImpl* db_impl_;
342 Env* env_;
343
344 // the options that govern the behavior of Blob Storage
345 BlobDBOptions bdb_options_;
346 DBOptions db_options_;
347 ColumnFamilyOptions cf_options_;
348 EnvOptions env_options_;
349
350 // Raw pointer of statistic. db_options_ has a shared_ptr to hold ownership.
351 Statistics* statistics_;
352
353 // by default this is "blob_dir" under dbname_
354 // but can be configured
355 std::string blob_dir_;
356
357 // pointer to directory
358 std::unique_ptr<Directory> dir_ent_;
359
360 // Read Write Mutex, which protects all the data structures
361 // HEAVILY TRAFFICKED
362 mutable port::RWMutex mutex_;
363
364 // Writers has to hold write_mutex_ before writing.
365 mutable port::Mutex write_mutex_;
366
367 // counter for blob file number
368 std::atomic<uint64_t> next_file_number_;
369
370 // entire metadata of all the BLOB files memory
371 std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
372
373 // epoch or version of the open files.
374 std::atomic<uint64_t> epoch_of_;
375
376 // opened non-TTL blob file.
377 std::shared_ptr<BlobFile> open_non_ttl_file_;
378
379 // all the blob files which are currently being appended to based
380 // on variety of incoming TTL's
381 std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
382
383 // Flag to check whether Close() has been called on this DB
384 bool closed_;
385
386 // timer based queue to execute tasks
387 TimerQueue tqueue_;
388
389 // number of files opened for random access/GET
390 // counter is used to monitor and close excess RA files.
391 std::atomic<uint32_t> open_file_count_;
392
393 // Total size of all live blob files (i.e. exclude obsolete files).
394 std::atomic<uint64_t> total_blob_size_;
395
396 // total size of SST files.
397 std::atomic<uint64_t> live_sst_size_;
398
399 // Latest FIFO eviction timestamp
400 //
401 // REQUIRES: access with metex_ lock held.
402 uint64_t fifo_eviction_seq_;
403
404 // The expiration up to which latest FIFO eviction evicts.
405 //
406 // REQUIRES: access with metex_ lock held.
407 uint64_t evict_expiration_up_to_;
408
409 std::list<std::shared_ptr<BlobFile>> obsolete_files_;
410
411 // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
412 // on the mutex to avoid contention.
413 //
414 // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
415 // the difference. mutex_ only needs to be held when access the
416 // data-structure, and delete_file_mutex_ needs to be held the whole time
417 // during DeleteObsoleteFiles to avoid being run simultaneously with
418 // DisableFileDeletions.
419 //
420 // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
421 // to hold delete_file_mutex_ first to avoid deadlock.
422 mutable port::Mutex delete_file_mutex_;
423
424 // Each call of DisableFileDeletions will increase disable_file_deletion_
425 // by 1. EnableFileDeletions will either decrease the count by 1 or reset
426 // it to zeor, depending on the force flag.
427 //
428 // REQUIRES: access with delete_file_mutex_ held.
429 int disable_file_deletions_ = 0;
430
431 uint32_t debug_level_;
432 };
433
434 } // namespace blob_db
435 } // namespace rocksdb
436 #endif // ROCKSDB_LITE