]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | #pragma once |
6 | ||
7 | #ifndef ROCKSDB_LITE | |
8 | ||
9 | #include <list> | |
10 | #include <memory> | |
11 | #include <string> | |
12 | #include <vector> | |
13 | ||
14 | #include "rocksdb/comparator.h" | |
15 | #include "rocksdb/env.h" | |
16 | ||
17 | #include "utilities/persistent_cache/block_cache_tier_file_buffer.h" | |
18 | #include "utilities/persistent_cache/lrulist.h" | |
19 | #include "utilities/persistent_cache/persistent_cache_tier.h" | |
20 | #include "utilities/persistent_cache/persistent_cache_util.h" | |
21 | ||
22 | #include "port/port.h" | |
23 | #include "util/crc32c.h" | |
24 | #include "util/file_reader_writer.h" | |
25 | #include "util/mutexlock.h" | |
26 | ||
27 | // The io code path of persistent cache uses pipelined architecture | |
28 | // | |
29 | // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel | |
30 | // | |
31 | // This would enable the system to scale for GB/s of throughput which is | |
32 | // expected with modern devies like NVM. | |
33 | // | |
34 | // The file level operations are encapsulated in the following abstractions | |
35 | // | |
36 | // BlockCacheFile | |
37 | // ^ | |
38 | // | | |
39 | // | | |
40 | // RandomAccessCacheFile (For reading) | |
41 | // ^ | |
42 | // | | |
43 | // | | |
44 | // WriteableCacheFile (For writing) | |
45 | // | |
46 | // Write IO code path : | |
47 | // | |
48 | namespace rocksdb { | |
49 | ||
50 | class WriteableCacheFile; | |
51 | struct BlockInfo; | |
52 | ||
53 | // Represents a logical record on device | |
54 | // | |
55 | // (L)ogical (B)lock (Address = { cache-file-id, offset, size } | |
56 | struct LogicalBlockAddress { | |
57 | LogicalBlockAddress() {} | |
58 | explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off, | |
59 | const uint16_t size) | |
60 | : cache_id_(cache_id), off_(off), size_(size) {} | |
61 | ||
62 | uint32_t cache_id_ = 0; | |
63 | uint32_t off_ = 0; | |
64 | uint32_t size_ = 0; | |
65 | }; | |
66 | ||
67 | typedef LogicalBlockAddress LBA; | |
68 | ||
69 | // class Writer | |
70 | // | |
71 | // Writer is the abstraction used for writing data to file. The component can be | |
72 | // multithreaded. It is the last step of write pipeline | |
73 | class Writer { | |
74 | public: | |
75 | explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {} | |
76 | virtual ~Writer() {} | |
77 | ||
78 | // write buffer to file at the given offset | |
79 | virtual void Write(WritableFile* const file, CacheWriteBuffer* buf, | |
80 | const uint64_t file_off, | |
81 | const std::function<void()> callback) = 0; | |
82 | // stop the writer | |
83 | virtual void Stop() = 0; | |
84 | ||
85 | PersistentCacheTier* const cache_; | |
86 | }; | |
87 | ||
88 | // class BlockCacheFile | |
89 | // | |
90 | // Generic interface to support building file specialized for read/writing | |
91 | class BlockCacheFile : public LRUElement<BlockCacheFile> { | |
92 | public: | |
93 | explicit BlockCacheFile(const uint32_t cache_id) | |
94 | : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {} | |
95 | ||
96 | explicit BlockCacheFile(Env* const env, const std::string& dir, | |
97 | const uint32_t cache_id) | |
98 | : LRUElement<BlockCacheFile>(), | |
99 | env_(env), | |
100 | dir_(dir), | |
101 | cache_id_(cache_id) {} | |
102 | ||
103 | virtual ~BlockCacheFile() {} | |
104 | ||
105 | // append key/value to file and return LBA locator to user | |
11fdf7f2 TL |
106 | virtual bool Append(const Slice& /*key*/, const Slice& /*val*/, |
107 | LBA* const /*lba*/) { | |
7c673cae FG |
108 | assert(!"not implemented"); |
109 | return false; | |
110 | } | |
111 | ||
112 | // read from the record locator (LBA) and return key, value and status | |
11fdf7f2 TL |
113 | virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/, |
114 | char* /*scratch*/) { | |
7c673cae FG |
115 | assert(!"not implemented"); |
116 | return false; | |
117 | } | |
118 | ||
119 | // get file path | |
120 | std::string Path() const { | |
121 | return dir_ + "/" + std::to_string(cache_id_) + ".rc"; | |
122 | } | |
123 | // get cache ID | |
124 | uint32_t cacheid() const { return cache_id_; } | |
125 | // Add block information to file data | |
126 | // Block information is the list of index reference for this file | |
127 | virtual void Add(BlockInfo* binfo) { | |
128 | WriteLock _(&rwlock_); | |
129 | block_infos_.push_back(binfo); | |
130 | } | |
131 | // get block information | |
132 | std::list<BlockInfo*>& block_infos() { return block_infos_; } | |
133 | // delete file and return the size of the file | |
134 | virtual Status Delete(uint64_t* size); | |
135 | ||
136 | protected: | |
137 | port::RWMutex rwlock_; // synchronization mutex | |
138 | Env* const env_ = nullptr; // Env for IO | |
139 | const std::string dir_; // Directory name | |
140 | const uint32_t cache_id_; // Cache id for the file | |
141 | std::list<BlockInfo*> block_infos_; // List of index entries mapping to the | |
142 | // file content | |
143 | }; | |
144 | ||
145 | // class RandomAccessFile | |
146 | // | |
147 | // Thread safe implementation for reading random data from file | |
148 | class RandomAccessCacheFile : public BlockCacheFile { | |
149 | public: | |
150 | explicit RandomAccessCacheFile(Env* const env, const std::string& dir, | |
151 | const uint32_t cache_id, | |
494da23a | 152 | const std::shared_ptr<Logger>& log) |
7c673cae FG |
153 | : BlockCacheFile(env, dir, cache_id), log_(log) {} |
154 | ||
155 | virtual ~RandomAccessCacheFile() {} | |
156 | ||
157 | // open file for reading | |
158 | bool Open(const bool enable_direct_reads); | |
159 | // read data from the disk | |
160 | bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override; | |
161 | ||
162 | private: | |
163 | std::unique_ptr<RandomAccessFileReader> freader_; | |
164 | ||
165 | protected: | |
166 | bool OpenImpl(const bool enable_direct_reads); | |
167 | bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch); | |
168 | ||
169 | std::shared_ptr<Logger> log_; // log file | |
170 | }; | |
171 | ||
172 | // class WriteableCacheFile | |
173 | // | |
174 | // All writes to the files are cached in buffers. The buffers are flushed to | |
175 | // disk as they get filled up. When file size reaches a certain size, a new file | |
176 | // will be created provided there is free space | |
177 | class WriteableCacheFile : public RandomAccessCacheFile { | |
178 | public: | |
179 | explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc, | |
180 | Writer* writer, const std::string& dir, | |
181 | const uint32_t cache_id, const uint32_t max_size, | |
182 | const std::shared_ptr<Logger>& log) | |
183 | : RandomAccessCacheFile(env, dir, cache_id, log), | |
184 | alloc_(alloc), | |
185 | writer_(writer), | |
186 | max_size_(max_size) {} | |
187 | ||
188 | virtual ~WriteableCacheFile(); | |
189 | ||
190 | // create file on disk | |
191 | bool Create(const bool enable_direct_writes, const bool enable_direct_reads); | |
192 | ||
193 | // read data from logical file | |
194 | bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override { | |
195 | ReadLock _(&rwlock_); | |
196 | const bool closed = eof_ && bufs_.empty(); | |
197 | if (closed) { | |
198 | // the file is closed, read from disk | |
199 | return RandomAccessCacheFile::Read(lba, key, block, scratch); | |
200 | } | |
201 | // file is still being written, read from buffers | |
202 | return ReadBuffer(lba, key, block, scratch); | |
203 | } | |
204 | ||
205 | // append data to end of file | |
206 | bool Append(const Slice&, const Slice&, LBA* const) override; | |
207 | // End-of-file | |
208 | bool Eof() const { return eof_; } | |
209 | ||
210 | private: | |
211 | friend class ThreadedWriter; | |
212 | ||
213 | static const size_t kFileAlignmentSize = 4 * 1024; // align file size | |
214 | ||
215 | bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch); | |
216 | bool ReadBuffer(const LBA& lba, char* data); | |
217 | bool ExpandBuffer(const size_t size); | |
218 | void DispatchBuffer(); | |
219 | void BufferWriteDone(); | |
220 | void CloseAndOpenForReading(); | |
221 | void ClearBuffers(); | |
222 | void Close(); | |
223 | ||
224 | // File layout in memory | |
225 | // | |
226 | // +------+------+------+------+------+------+ | |
227 | // | b0 | b1 | b2 | b3 | b4 | b5 | | |
228 | // +------+------+------+------+------+------+ | |
229 | // ^ ^ | |
230 | // | | | |
231 | // buf_doff_ buf_woff_ | |
232 | // (next buffer to (next buffer to fill) | |
233 | // flush to disk) | |
234 | // | |
235 | // The buffers are flushed to disk serially for a given file | |
236 | ||
237 | CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider | |
238 | Writer* const writer_ = nullptr; // File writer thread | |
239 | std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction | |
240 | std::vector<CacheWriteBuffer*> bufs_; // Written buffers | |
241 | uint32_t size_ = 0; // Size of the file | |
242 | const uint32_t max_size_; // Max size of the file | |
243 | bool eof_ = false; // End of file | |
244 | uint32_t disk_woff_ = 0; // Offset to write on disk | |
245 | size_t buf_woff_ = 0; // off into bufs_ to write | |
246 | size_t buf_doff_ = 0; // off into bufs_ to dispatch | |
247 | size_t pending_ios_ = 0; // Number of ios to disk in-progress | |
248 | bool enable_direct_reads_ = false; // Should we enable direct reads | |
249 | // when reading from disk | |
250 | }; | |
251 | ||
252 | // | |
253 | // Abstraction to do writing to device. It is part of pipelined architecture. | |
254 | // | |
255 | class ThreadedWriter : public Writer { | |
256 | public: | |
257 | // Representation of IO to device | |
258 | struct IO { | |
259 | explicit IO(const bool signal) : signal_(signal) {} | |
260 | explicit IO(WritableFile* const file, CacheWriteBuffer* const buf, | |
261 | const uint64_t file_off, const std::function<void()> callback) | |
262 | : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {} | |
263 | ||
264 | IO(const IO&) = default; | |
265 | IO& operator=(const IO&) = default; | |
266 | size_t Size() const { return sizeof(IO); } | |
267 | ||
494da23a TL |
268 | WritableFile* file_ = nullptr; // File to write to |
269 | CacheWriteBuffer* buf_ = nullptr; // buffer to write | |
270 | uint64_t file_off_ = 0; // file offset | |
271 | bool signal_ = false; // signal to exit thread loop | |
272 | std::function<void()> callback_; // Callback on completion | |
7c673cae FG |
273 | }; |
274 | ||
275 | explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, | |
276 | const size_t io_size); | |
277 | virtual ~ThreadedWriter() { assert(threads_.empty()); } | |
278 | ||
279 | void Stop() override; | |
280 | void Write(WritableFile* const file, CacheWriteBuffer* buf, | |
281 | const uint64_t file_off, | |
282 | const std::function<void()> callback) override; | |
283 | ||
284 | private: | |
285 | void ThreadMain(); | |
286 | void DispatchIO(const IO& io); | |
287 | ||
288 | const size_t io_size_ = 0; | |
289 | BoundedQueue<IO> q_; | |
290 | std::vector<port::Thread> threads_; | |
291 | }; | |
292 | ||
293 | } // namespace rocksdb | |
294 | ||
295 | #endif |