1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/env.h"
17 #include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
18 #include "utilities/persistent_cache/lrulist.h"
19 #include "utilities/persistent_cache/persistent_cache_tier.h"
20 #include "utilities/persistent_cache/persistent_cache_util.h"
22 #include "port/port.h"
23 #include "util/crc32c.h"
24 #include "util/file_reader_writer.h"
25 #include "util/mutexlock.h"
27 // The io code path of persistent cache uses pipelined architecture
29 // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
31 // This would enable the system to scale for GB/s of throughput which is
32 // expected with modern devies like NVM.
34 // The file level operations are encapsulated in the following abstractions
40 // RandomAccessCacheFile (For reading)
44 // WriteableCacheFile (For writing)
46 // Write IO code path :
50 class WriteableCacheFile
;
53 // Represents a logical record on device
55 // (L)ogical (B)lock (Address = { cache-file-id, offset, size }
56 struct LogicalBlockAddress
{
57 LogicalBlockAddress() {}
58 explicit LogicalBlockAddress(const uint32_t cache_id
, const uint32_t off
,
60 : cache_id_(cache_id
), off_(off
), size_(size
) {}
62 uint32_t cache_id_
= 0;
67 typedef LogicalBlockAddress LBA
;
71 // Writer is the abstraction used for writing data to file. The component can be
72 // multithreaded. It is the last step of write pipeline
75 explicit Writer(PersistentCacheTier
* const cache
) : cache_(cache
) {}
78 // write buffer to file at the given offset
79 virtual void Write(WritableFile
* const file
, CacheWriteBuffer
* buf
,
80 const uint64_t file_off
,
81 const std::function
<void()> callback
) = 0;
83 virtual void Stop() = 0;
85 PersistentCacheTier
* const cache_
;
88 // class BlockCacheFile
90 // Generic interface to support building file specialized for read/writing
91 class BlockCacheFile
: public LRUElement
<BlockCacheFile
> {
93 explicit BlockCacheFile(const uint32_t cache_id
)
94 : LRUElement
<BlockCacheFile
>(), cache_id_(cache_id
) {}
96 explicit BlockCacheFile(Env
* const env
, const std::string
& dir
,
97 const uint32_t cache_id
)
98 : LRUElement
<BlockCacheFile
>(),
101 cache_id_(cache_id
) {}
103 virtual ~BlockCacheFile() {}
105 // append key/value to file and return LBA locator to user
106 virtual bool Append(const Slice
& /*key*/, const Slice
& /*val*/,
107 LBA
* const /*lba*/) {
108 assert(!"not implemented");
112 // read from the record locator (LBA) and return key, value and status
113 virtual bool Read(const LBA
& /*lba*/, Slice
* /*key*/, Slice
* /*block*/,
115 assert(!"not implemented");
120 std::string
Path() const {
121 return dir_
+ "/" + std::to_string(cache_id_
) + ".rc";
124 uint32_t cacheid() const { return cache_id_
; }
125 // Add block information to file data
126 // Block information is the list of index reference for this file
127 virtual void Add(BlockInfo
* binfo
) {
128 WriteLock
_(&rwlock_
);
129 block_infos_
.push_back(binfo
);
131 // get block information
132 std::list
<BlockInfo
*>& block_infos() { return block_infos_
; }
133 // delete file and return the size of the file
134 virtual Status
Delete(uint64_t* size
);
137 port::RWMutex rwlock_
; // synchronization mutex
138 Env
* const env_
= nullptr; // Env for IO
139 const std::string dir_
; // Directory name
140 const uint32_t cache_id_
; // Cache id for the file
141 std::list
<BlockInfo
*> block_infos_
; // List of index entries mapping to the
145 // class RandomAccessFile
147 // Thread safe implementation for reading random data from file
148 class RandomAccessCacheFile
: public BlockCacheFile
{
150 explicit RandomAccessCacheFile(Env
* const env
, const std::string
& dir
,
151 const uint32_t cache_id
,
152 const shared_ptr
<Logger
>& log
)
153 : BlockCacheFile(env
, dir
, cache_id
), log_(log
) {}
155 virtual ~RandomAccessCacheFile() {}
157 // open file for reading
158 bool Open(const bool enable_direct_reads
);
159 // read data from the disk
160 bool Read(const LBA
& lba
, Slice
* key
, Slice
* block
, char* scratch
) override
;
163 std::unique_ptr
<RandomAccessFileReader
> freader_
;
166 bool OpenImpl(const bool enable_direct_reads
);
167 bool ParseRec(const LBA
& lba
, Slice
* key
, Slice
* val
, char* scratch
);
169 std::shared_ptr
<Logger
> log_
; // log file
172 // class WriteableCacheFile
174 // All writes to the files are cached in buffers. The buffers are flushed to
175 // disk as they get filled up. When file size reaches a certain size, a new file
176 // will be created provided there is free space
177 class WriteableCacheFile
: public RandomAccessCacheFile
{
179 explicit WriteableCacheFile(Env
* const env
, CacheWriteBufferAllocator
* alloc
,
180 Writer
* writer
, const std::string
& dir
,
181 const uint32_t cache_id
, const uint32_t max_size
,
182 const std::shared_ptr
<Logger
>& log
)
183 : RandomAccessCacheFile(env
, dir
, cache_id
, log
),
186 max_size_(max_size
) {}
188 virtual ~WriteableCacheFile();
190 // create file on disk
191 bool Create(const bool enable_direct_writes
, const bool enable_direct_reads
);
193 // read data from logical file
194 bool Read(const LBA
& lba
, Slice
* key
, Slice
* block
, char* scratch
) override
{
195 ReadLock
_(&rwlock_
);
196 const bool closed
= eof_
&& bufs_
.empty();
198 // the file is closed, read from disk
199 return RandomAccessCacheFile::Read(lba
, key
, block
, scratch
);
201 // file is still being written, read from buffers
202 return ReadBuffer(lba
, key
, block
, scratch
);
205 // append data to end of file
206 bool Append(const Slice
&, const Slice
&, LBA
* const) override
;
208 bool Eof() const { return eof_
; }
211 friend class ThreadedWriter
;
213 static const size_t kFileAlignmentSize
= 4 * 1024; // align file size
215 bool ReadBuffer(const LBA
& lba
, Slice
* key
, Slice
* block
, char* scratch
);
216 bool ReadBuffer(const LBA
& lba
, char* data
);
217 bool ExpandBuffer(const size_t size
);
218 void DispatchBuffer();
219 void BufferWriteDone();
220 void CloseAndOpenForReading();
224 // File layout in memory
226 // +------+------+------+------+------+------+
227 // | b0 | b1 | b2 | b3 | b4 | b5 |
228 // +------+------+------+------+------+------+
231 // buf_doff_ buf_woff_
232 // (next buffer to (next buffer to fill)
235 // The buffers are flushed to disk serially for a given file
237 CacheWriteBufferAllocator
* const alloc_
= nullptr; // Buffer provider
238 Writer
* const writer_
= nullptr; // File writer thread
239 std::unique_ptr
<WritableFile
> file_
; // RocksDB Env file abstraction
240 std::vector
<CacheWriteBuffer
*> bufs_
; // Written buffers
241 uint32_t size_
= 0; // Size of the file
242 const uint32_t max_size_
; // Max size of the file
243 bool eof_
= false; // End of file
244 uint32_t disk_woff_
= 0; // Offset to write on disk
245 size_t buf_woff_
= 0; // off into bufs_ to write
246 size_t buf_doff_
= 0; // off into bufs_ to dispatch
247 size_t pending_ios_
= 0; // Number of ios to disk in-progress
248 bool enable_direct_reads_
= false; // Should we enable direct reads
249 // when reading from disk
253 // Abstraction to do writing to device. It is part of pipelined architecture.
255 class ThreadedWriter
: public Writer
{
257 // Representation of IO to device
259 explicit IO(const bool signal
) : signal_(signal
) {}
260 explicit IO(WritableFile
* const file
, CacheWriteBuffer
* const buf
,
261 const uint64_t file_off
, const std::function
<void()> callback
)
262 : file_(file
), buf_(buf
), file_off_(file_off
), callback_(callback
) {}
264 IO(const IO
&) = default;
265 IO
& operator=(const IO
&) = default;
266 size_t Size() const { return sizeof(IO
); }
268 WritableFile
* file_
= nullptr; // File to write to
269 CacheWriteBuffer
* const buf_
= nullptr; // buffer to write
270 uint64_t file_off_
= 0; // file offset
271 bool signal_
= false; // signal to exit thread loop
272 std::function
<void()> callback_
; // Callback on completion
275 explicit ThreadedWriter(PersistentCacheTier
* const cache
, const size_t qdepth
,
276 const size_t io_size
);
277 virtual ~ThreadedWriter() { assert(threads_
.empty()); }
279 void Stop() override
;
280 void Write(WritableFile
* const file
, CacheWriteBuffer
* buf
,
281 const uint64_t file_off
,
282 const std::function
<void()> callback
) override
;
286 void DispatchIO(const IO
& io
);
288 const size_t io_size_
= 0;
290 std::vector
<port::Thread
> threads_
;
293 } // namespace rocksdb