]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/persistent_cache/block_cache_tier.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / block_cache_tier.h
1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 #pragma once
6
7 #ifndef ROCKSDB_LITE
8
9 #ifndef OS_WIN
10 #include <unistd.h>
11 #endif // ! OS_WIN
12
13 #include <list>
14 #include <memory>
15 #include <set>
16 #include <sstream>
17 #include <stdexcept>
18 #include <string>
19 #include <thread>
20
21 #include "rocksdb/cache.h"
22 #include "rocksdb/comparator.h"
23 #include "rocksdb/persistent_cache.h"
24
25 #include "utilities/persistent_cache/block_cache_tier_file.h"
26 #include "utilities/persistent_cache/block_cache_tier_metadata.h"
27 #include "utilities/persistent_cache/persistent_cache_util.h"
28
29 #include "memtable/skiplist.h"
30 #include "monitoring/histogram.h"
31 #include "port/port.h"
32 #include "util/arena.h"
33 #include "util/coding.h"
34 #include "util/crc32c.h"
35 #include "util/mutexlock.h"
36
37 namespace rocksdb {
38
39 //
40 // Block cache tier implementation
41 //
42 class BlockCacheTier : public PersistentCacheTier {
43 public:
44 explicit BlockCacheTier(const PersistentCacheConfig& opt)
45 : opt_(opt),
46 insert_ops_(opt_.max_write_pipeline_backlog_size),
47 buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()),
48 writer_(this, opt_.writer_qdepth, opt_.writer_dispatch_size) {
49 Info(opt_.log, "Initializing allocator. size=%d B count=%d",
50 opt_.write_buffer_size, opt_.write_buffer_count());
51 }
52
53 virtual ~BlockCacheTier() {
54 // Close is re-entrant so we can call close even if it is already closed
55 Close();
56 assert(!insert_th_.joinable());
57 }
58
59 Status Insert(const Slice& key, const char* data, const size_t size) override;
60 Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
61 size_t* size) override;
62 Status Open() override;
63 Status Close() override;
64 bool Erase(const Slice& key) override;
65 bool Reserve(const size_t size) override;
66
67 bool IsCompressed() override { return opt_.is_compressed; }
68
69 std::string GetPrintableOptions() const override { return opt_.ToString(); }
70
71 PersistentCache::StatsType Stats() override;
72
73 void TEST_Flush() override {
74 while (insert_ops_.Size()) {
75 /* sleep override */
76 Env::Default()->SleepForMicroseconds(1000000);
77 }
78 }
79
80 private:
81 // Percentage of cache to be evicted when the cache is full
82 static const size_t kEvictPct = 10;
83 // Max attempts to insert key, value to cache in pipelined mode
84 static const size_t kMaxRetry = 3;
85
86 // Pipelined operation
87 struct InsertOp {
88 explicit InsertOp(const bool signal) : signal_(signal) {}
89 explicit InsertOp(std::string&& key, const std::string& data)
90 : key_(std::move(key)), data_(data) {}
91 ~InsertOp() {}
92
93 InsertOp() = delete;
94 InsertOp(InsertOp&& rhs) = default;
95 InsertOp& operator=(InsertOp&& rhs) = default;
96
97 // used for estimating size by bounded queue
98 size_t Size() { return data_.size() + key_.size(); }
99
100 std::string key_;
101 std::string data_;
102 const bool signal_ = false; // signal to request processing thread to exit
103 };
104
105 // entry point for insert thread
106 void InsertMain();
107 // insert implementation
108 Status InsertImpl(const Slice& key, const Slice& data);
109 // Create a new cache file
110 Status NewCacheFile();
111 // Get cache directory path
112 std::string GetCachePath() const { return opt_.path + "/cache"; }
113 // Cleanup folder
114 Status CleanupCacheFolder(const std::string& folder);
115
116 // Statistics
117 struct Statistics {
118 HistogramImpl bytes_pipelined_;
119 HistogramImpl bytes_written_;
120 HistogramImpl bytes_read_;
121 HistogramImpl read_hit_latency_;
122 HistogramImpl read_miss_latency_;
123 HistogramImpl write_latency_;
124 uint64_t cache_hits_ = 0;
125 uint64_t cache_misses_ = 0;
126 uint64_t cache_errors_ = 0;
127 uint64_t insert_dropped_ = 0;
128
129 double CacheHitPct() const {
130 const auto lookups = cache_hits_ + cache_misses_;
131 return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
132 }
133
134 double CacheMissPct() const {
135 const auto lookups = cache_hits_ + cache_misses_;
136 return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
137 }
138 };
139
140 port::RWMutex lock_; // Synchronization
141 const PersistentCacheConfig opt_; // BlockCache options
142 BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
143 rocksdb::port::Thread insert_th_; // Insert thread
144 uint32_t writer_cache_id_ = 0; // Current cache file identifier
145 WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
146 CacheWriteBufferAllocator buffer_allocator_; // Buffer provider
147 ThreadedWriter writer_; // Writer threads
148 BlockCacheTierMetadata metadata_; // Cache meta data manager
149 std::atomic<uint64_t> size_{0}; // Size of the cache
150 Statistics stats_; // Statistics
151 };
152
153 } // namespace rocksdb
154
155 #endif