]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h
ef5dbab0408596eeb9697b5a3c9bb1d8fcb04fb9
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / block_cache_tier_file.h
1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6
7 #ifndef ROCKSDB_LITE
8
9 #include <list>
10 #include <memory>
11 #include <string>
12 #include <vector>
13
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/env.h"
16
17 #include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
18 #include "utilities/persistent_cache/lrulist.h"
19 #include "utilities/persistent_cache/persistent_cache_tier.h"
20 #include "utilities/persistent_cache/persistent_cache_util.h"
21
22 #include "port/port.h"
23 #include "util/crc32c.h"
24 #include "util/file_reader_writer.h"
25 #include "util/mutexlock.h"
26
27 // The io code path of persistent cache uses pipelined architecture
28 //
29 // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
30 //
31 // This would enable the system to scale for GB/s of throughput which is
32 // expected with modern devies like NVM.
33 //
34 // The file level operations are encapsulated in the following abstractions
35 //
36 // BlockCacheFile
37 // ^
38 // |
39 // |
40 // RandomAccessCacheFile (For reading)
41 // ^
42 // |
43 // |
44 // WriteableCacheFile (For writing)
45 //
46 // Write IO code path :
47 //
48 namespace rocksdb {
49
50 class WriteableCacheFile;
51 struct BlockInfo;
52
53 // Represents a logical record on device
54 //
55 // (L)ogical (B)lock (Address = { cache-file-id, offset, size }
56 struct LogicalBlockAddress {
57 LogicalBlockAddress() {}
58 explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
59 const uint16_t size)
60 : cache_id_(cache_id), off_(off), size_(size) {}
61
62 uint32_t cache_id_ = 0;
63 uint32_t off_ = 0;
64 uint32_t size_ = 0;
65 };
66
67 typedef LogicalBlockAddress LBA;
68
69 // class Writer
70 //
71 // Writer is the abstraction used for writing data to file. The component can be
72 // multithreaded. It is the last step of write pipeline
73 class Writer {
74 public:
75 explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
76 virtual ~Writer() {}
77
78 // write buffer to file at the given offset
79 virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
80 const uint64_t file_off,
81 const std::function<void()> callback) = 0;
82 // stop the writer
83 virtual void Stop() = 0;
84
85 PersistentCacheTier* const cache_;
86 };
87
88 // class BlockCacheFile
89 //
90 // Generic interface to support building file specialized for read/writing
91 class BlockCacheFile : public LRUElement<BlockCacheFile> {
92 public:
93 explicit BlockCacheFile(const uint32_t cache_id)
94 : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}
95
96 explicit BlockCacheFile(Env* const env, const std::string& dir,
97 const uint32_t cache_id)
98 : LRUElement<BlockCacheFile>(),
99 env_(env),
100 dir_(dir),
101 cache_id_(cache_id) {}
102
103 virtual ~BlockCacheFile() {}
104
105 // append key/value to file and return LBA locator to user
106 virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
107 LBA* const /*lba*/) {
108 assert(!"not implemented");
109 return false;
110 }
111
112 // read from the record locator (LBA) and return key, value and status
113 virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
114 char* /*scratch*/) {
115 assert(!"not implemented");
116 return false;
117 }
118
119 // get file path
120 std::string Path() const {
121 return dir_ + "/" + std::to_string(cache_id_) + ".rc";
122 }
123 // get cache ID
124 uint32_t cacheid() const { return cache_id_; }
125 // Add block information to file data
126 // Block information is the list of index reference for this file
127 virtual void Add(BlockInfo* binfo) {
128 WriteLock _(&rwlock_);
129 block_infos_.push_back(binfo);
130 }
131 // get block information
132 std::list<BlockInfo*>& block_infos() { return block_infos_; }
133 // delete file and return the size of the file
134 virtual Status Delete(uint64_t* size);
135
136 protected:
137 port::RWMutex rwlock_; // synchronization mutex
138 Env* const env_ = nullptr; // Env for IO
139 const std::string dir_; // Directory name
140 const uint32_t cache_id_; // Cache id for the file
141 std::list<BlockInfo*> block_infos_; // List of index entries mapping to the
142 // file content
143 };
144
145 // class RandomAccessFile
146 //
147 // Thread safe implementation for reading random data from file
148 class RandomAccessCacheFile : public BlockCacheFile {
149 public:
150 explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
151 const uint32_t cache_id,
152 const shared_ptr<Logger>& log)
153 : BlockCacheFile(env, dir, cache_id), log_(log) {}
154
155 virtual ~RandomAccessCacheFile() {}
156
157 // open file for reading
158 bool Open(const bool enable_direct_reads);
159 // read data from the disk
160 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
161
162 private:
163 std::unique_ptr<RandomAccessFileReader> freader_;
164
165 protected:
166 bool OpenImpl(const bool enable_direct_reads);
167 bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
168
169 std::shared_ptr<Logger> log_; // log file
170 };
171
172 // class WriteableCacheFile
173 //
174 // All writes to the files are cached in buffers. The buffers are flushed to
175 // disk as they get filled up. When file size reaches a certain size, a new file
176 // will be created provided there is free space
177 class WriteableCacheFile : public RandomAccessCacheFile {
178 public:
179 explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
180 Writer* writer, const std::string& dir,
181 const uint32_t cache_id, const uint32_t max_size,
182 const std::shared_ptr<Logger>& log)
183 : RandomAccessCacheFile(env, dir, cache_id, log),
184 alloc_(alloc),
185 writer_(writer),
186 max_size_(max_size) {}
187
188 virtual ~WriteableCacheFile();
189
190 // create file on disk
191 bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
192
193 // read data from logical file
194 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
195 ReadLock _(&rwlock_);
196 const bool closed = eof_ && bufs_.empty();
197 if (closed) {
198 // the file is closed, read from disk
199 return RandomAccessCacheFile::Read(lba, key, block, scratch);
200 }
201 // file is still being written, read from buffers
202 return ReadBuffer(lba, key, block, scratch);
203 }
204
205 // append data to end of file
206 bool Append(const Slice&, const Slice&, LBA* const) override;
207 // End-of-file
208 bool Eof() const { return eof_; }
209
210 private:
211 friend class ThreadedWriter;
212
213 static const size_t kFileAlignmentSize = 4 * 1024; // align file size
214
215 bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
216 bool ReadBuffer(const LBA& lba, char* data);
217 bool ExpandBuffer(const size_t size);
218 void DispatchBuffer();
219 void BufferWriteDone();
220 void CloseAndOpenForReading();
221 void ClearBuffers();
222 void Close();
223
224 // File layout in memory
225 //
226 // +------+------+------+------+------+------+
227 // | b0 | b1 | b2 | b3 | b4 | b5 |
228 // +------+------+------+------+------+------+
229 // ^ ^
230 // | |
231 // buf_doff_ buf_woff_
232 // (next buffer to (next buffer to fill)
233 // flush to disk)
234 //
235 // The buffers are flushed to disk serially for a given file
236
237 CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider
238 Writer* const writer_ = nullptr; // File writer thread
239 std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction
240 std::vector<CacheWriteBuffer*> bufs_; // Written buffers
241 uint32_t size_ = 0; // Size of the file
242 const uint32_t max_size_; // Max size of the file
243 bool eof_ = false; // End of file
244 uint32_t disk_woff_ = 0; // Offset to write on disk
245 size_t buf_woff_ = 0; // off into bufs_ to write
246 size_t buf_doff_ = 0; // off into bufs_ to dispatch
247 size_t pending_ios_ = 0; // Number of ios to disk in-progress
248 bool enable_direct_reads_ = false; // Should we enable direct reads
249 // when reading from disk
250 };
251
252 //
253 // Abstraction to do writing to device. It is part of pipelined architecture.
254 //
255 class ThreadedWriter : public Writer {
256 public:
257 // Representation of IO to device
258 struct IO {
259 explicit IO(const bool signal) : signal_(signal) {}
260 explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
261 const uint64_t file_off, const std::function<void()> callback)
262 : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}
263
264 IO(const IO&) = default;
265 IO& operator=(const IO&) = default;
266 size_t Size() const { return sizeof(IO); }
267
268 WritableFile* file_ = nullptr; // File to write to
269 CacheWriteBuffer* const buf_ = nullptr; // buffer to write
270 uint64_t file_off_ = 0; // file offset
271 bool signal_ = false; // signal to exit thread loop
272 std::function<void()> callback_; // Callback on completion
273 };
274
275 explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
276 const size_t io_size);
277 virtual ~ThreadedWriter() { assert(threads_.empty()); }
278
279 void Stop() override;
280 void Write(WritableFile* const file, CacheWriteBuffer* buf,
281 const uint64_t file_off,
282 const std::function<void()> callback) override;
283
284 private:
285 void ThreadMain();
286 void DispatchIO(const IO& io);
287
288 const size_t io_size_ = 0;
289 BoundedQueue<IO> q_;
290 std::vector<port::Thread> threads_;
291 };
292
293 } // namespace rocksdb
294
295 #endif