]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / block_cache_tier_file.h
1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6
7 #ifndef ROCKSDB_LITE
8
9 #include <list>
10 #include <memory>
11 #include <string>
12 #include <vector>
13
14 #include "file/random_access_file_reader.h"
15 #include "port/port.h"
16 #include "rocksdb/comparator.h"
17 #include "rocksdb/env.h"
18 #include "util/crc32c.h"
19 #include "util/mutexlock.h"
20 #include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
21 #include "utilities/persistent_cache/lrulist.h"
22 #include "utilities/persistent_cache/persistent_cache_tier.h"
23 #include "utilities/persistent_cache/persistent_cache_util.h"
24
25 // The io code path of persistent cache uses pipelined architecture
26 //
27 // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
28 //
29 // This would enable the system to scale for GB/s of throughput which is
30 // expected with modern devies like NVM.
31 //
32 // The file level operations are encapsulated in the following abstractions
33 //
34 // BlockCacheFile
35 // ^
36 // |
37 // |
38 // RandomAccessCacheFile (For reading)
39 // ^
40 // |
41 // |
42 // WriteableCacheFile (For writing)
43 //
44 // Write IO code path :
45 //
46 namespace ROCKSDB_NAMESPACE {
47
48 class WriteableCacheFile;
49 struct BlockInfo;
50
51 // Represents a logical record on device
52 //
53 // (L)ogical (B)lock (Address = { cache-file-id, offset, size }
54 struct LogicalBlockAddress {
55 LogicalBlockAddress() {}
56 explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
57 const uint16_t size)
58 : cache_id_(cache_id), off_(off), size_(size) {}
59
60 uint32_t cache_id_ = 0;
61 uint32_t off_ = 0;
62 uint32_t size_ = 0;
63 };
64
65 using LBA = LogicalBlockAddress;
66
67 // class Writer
68 //
69 // Writer is the abstraction used for writing data to file. The component can be
70 // multithreaded. It is the last step of write pipeline
71 class Writer {
72 public:
73 explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
74 virtual ~Writer() {}
75
76 // write buffer to file at the given offset
77 virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
78 const uint64_t file_off,
79 const std::function<void()> callback) = 0;
80 // stop the writer
81 virtual void Stop() = 0;
82
83 PersistentCacheTier* const cache_;
84 };
85
86 // class BlockCacheFile
87 //
88 // Generic interface to support building file specialized for read/writing
89 class BlockCacheFile : public LRUElement<BlockCacheFile> {
90 public:
91 explicit BlockCacheFile(const uint32_t cache_id)
92 : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}
93
94 explicit BlockCacheFile(Env* const env, const std::string& dir,
95 const uint32_t cache_id)
96 : LRUElement<BlockCacheFile>(),
97 env_(env),
98 dir_(dir),
99 cache_id_(cache_id) {}
100
101 virtual ~BlockCacheFile() {}
102
103 // append key/value to file and return LBA locator to user
104 virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
105 LBA* const /*lba*/) {
106 assert(!"not implemented");
107 return false;
108 }
109
110 // read from the record locator (LBA) and return key, value and status
111 virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
112 char* /*scratch*/) {
113 assert(!"not implemented");
114 return false;
115 }
116
117 // get file path
118 std::string Path() const {
119 return dir_ + "/" + std::to_string(cache_id_) + ".rc";
120 }
121 // get cache ID
122 uint32_t cacheid() const { return cache_id_; }
123 // Add block information to file data
124 // Block information is the list of index reference for this file
125 virtual void Add(BlockInfo* binfo) {
126 WriteLock _(&rwlock_);
127 block_infos_.push_back(binfo);
128 }
129 // get block information
130 std::list<BlockInfo*>& block_infos() { return block_infos_; }
131 // delete file and return the size of the file
132 virtual Status Delete(uint64_t* size);
133
134 protected:
135 port::RWMutex rwlock_; // synchronization mutex
136 Env* const env_ = nullptr; // Env for OS
137 const std::string dir_; // Directory name
138 const uint32_t cache_id_; // Cache id for the file
139 std::list<BlockInfo*> block_infos_; // List of index entries mapping to the
140 // file content
141 };
142
143 // class RandomAccessFile
144 //
145 // Thread safe implementation for reading random data from file
146 class RandomAccessCacheFile : public BlockCacheFile {
147 public:
148 explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
149 const uint32_t cache_id,
150 const std::shared_ptr<Logger>& log)
151 : BlockCacheFile(env, dir, cache_id), log_(log) {}
152
153 virtual ~RandomAccessCacheFile() {}
154
155 // open file for reading
156 bool Open(const bool enable_direct_reads);
157 // read data from the disk
158 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
159
160 private:
161 std::unique_ptr<RandomAccessFileReader> freader_;
162
163 protected:
164 bool OpenImpl(const bool enable_direct_reads);
165 bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
166
167 std::shared_ptr<Logger> log_; // log file
168 };
169
170 // class WriteableCacheFile
171 //
172 // All writes to the files are cached in buffers. The buffers are flushed to
173 // disk as they get filled up. When file size reaches a certain size, a new file
174 // will be created provided there is free space
175 class WriteableCacheFile : public RandomAccessCacheFile {
176 public:
177 explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
178 Writer* writer, const std::string& dir,
179 const uint32_t cache_id, const uint32_t max_size,
180 const std::shared_ptr<Logger>& log)
181 : RandomAccessCacheFile(env, dir, cache_id, log),
182 alloc_(alloc),
183 writer_(writer),
184 max_size_(max_size) {}
185
186 virtual ~WriteableCacheFile();
187
188 // create file on disk
189 bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
190
191 // read data from logical file
192 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
193 ReadLock _(&rwlock_);
194 const bool closed = eof_ && bufs_.empty();
195 if (closed) {
196 // the file is closed, read from disk
197 return RandomAccessCacheFile::Read(lba, key, block, scratch);
198 }
199 // file is still being written, read from buffers
200 return ReadBuffer(lba, key, block, scratch);
201 }
202
203 // append data to end of file
204 bool Append(const Slice&, const Slice&, LBA* const) override;
205 // End-of-file
206 bool Eof() const { return eof_; }
207
208 private:
209 friend class ThreadedWriter;
210
211 static const size_t kFileAlignmentSize = 4 * 1024; // align file size
212
213 bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
214 bool ReadBuffer(const LBA& lba, char* data);
215 bool ExpandBuffer(const size_t size);
216 void DispatchBuffer();
217 void BufferWriteDone();
218 void CloseAndOpenForReading();
219 void ClearBuffers();
220 void Close();
221
222 // File layout in memory
223 //
224 // +------+------+------+------+------+------+
225 // | b0 | b1 | b2 | b3 | b4 | b5 |
226 // +------+------+------+------+------+------+
227 // ^ ^
228 // | |
229 // buf_doff_ buf_woff_
230 // (next buffer to (next buffer to fill)
231 // flush to disk)
232 //
233 // The buffers are flushed to disk serially for a given file
234
235 CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider
236 Writer* const writer_ = nullptr; // File writer thread
237 std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction
238 std::vector<CacheWriteBuffer*> bufs_; // Written buffers
239 uint32_t size_ = 0; // Size of the file
240 const uint32_t max_size_; // Max size of the file
241 bool eof_ = false; // End of file
242 uint32_t disk_woff_ = 0; // Offset to write on disk
243 size_t buf_woff_ = 0; // off into bufs_ to write
244 size_t buf_doff_ = 0; // off into bufs_ to dispatch
245 size_t pending_ios_ = 0; // Number of ios to disk in-progress
246 bool enable_direct_reads_ = false; // Should we enable direct reads
247 // when reading from disk
248 };
249
250 //
251 // Abstraction to do writing to device. It is part of pipelined architecture.
252 //
253 class ThreadedWriter : public Writer {
254 public:
255 // Representation of IO to device
256 struct IO {
257 explicit IO(const bool signal) : signal_(signal) {}
258 explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
259 const uint64_t file_off, const std::function<void()> callback)
260 : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}
261
262 IO(const IO&) = default;
263 IO& operator=(const IO&) = default;
264 size_t Size() const { return sizeof(IO); }
265
266 WritableFile* file_ = nullptr; // File to write to
267 CacheWriteBuffer* buf_ = nullptr; // buffer to write
268 uint64_t file_off_ = 0; // file offset
269 bool signal_ = false; // signal to exit thread loop
270 std::function<void()> callback_; // Callback on completion
271 };
272
273 explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
274 const size_t io_size);
275 virtual ~ThreadedWriter() { assert(threads_.empty()); }
276
277 void Stop() override;
278 void Write(WritableFile* const file, CacheWriteBuffer* buf,
279 const uint64_t file_off,
280 const std::function<void()> callback) override;
281
282 private:
283 void ThreadMain();
284 void DispatchIO(const IO& io);
285
286 const size_t io_size_ = 0;
287 BoundedQueue<IO> q_;
288 std::vector<port::Thread> threads_;
289 };
290
291 } // namespace ROCKSDB_NAMESPACE
292
293 #endif