1 // Copyright (c) 2013, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
14 #include "file/random_access_file_reader.h"
15 #include "port/port.h"
16 #include "rocksdb/comparator.h"
17 #include "rocksdb/env.h"
18 #include "util/crc32c.h"
19 #include "util/mutexlock.h"
20 #include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
21 #include "utilities/persistent_cache/lrulist.h"
22 #include "utilities/persistent_cache/persistent_cache_tier.h"
23 #include "utilities/persistent_cache/persistent_cache_util.h"
25 // The io code path of persistent cache uses pipelined architecture
27 // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
29 // This would enable the system to scale for GB/s of throughput which is
30 // expected with modern devies like NVM.
32 // The file level operations are encapsulated in the following abstractions
38 // RandomAccessCacheFile (For reading)
42 // WriteableCacheFile (For writing)
44 // Write IO code path :
46 namespace ROCKSDB_NAMESPACE
{
48 class WriteableCacheFile
;
51 // Represents a logical record on device
53 // (L)ogical (B)lock (Address = { cache-file-id, offset, size }
54 struct LogicalBlockAddress
{
55 LogicalBlockAddress() {}
56 explicit LogicalBlockAddress(const uint32_t cache_id
, const uint32_t off
,
58 : cache_id_(cache_id
), off_(off
), size_(size
) {}
60 uint32_t cache_id_
= 0;
65 using LBA
= LogicalBlockAddress
;
69 // Writer is the abstraction used for writing data to file. The component can be
70 // multithreaded. It is the last step of write pipeline
73 explicit Writer(PersistentCacheTier
* const cache
) : cache_(cache
) {}
76 // write buffer to file at the given offset
77 virtual void Write(WritableFile
* const file
, CacheWriteBuffer
* buf
,
78 const uint64_t file_off
,
79 const std::function
<void()> callback
) = 0;
81 virtual void Stop() = 0;
83 PersistentCacheTier
* const cache_
;
86 // class BlockCacheFile
88 // Generic interface to support building file specialized for read/writing
89 class BlockCacheFile
: public LRUElement
<BlockCacheFile
> {
91 explicit BlockCacheFile(const uint32_t cache_id
)
92 : LRUElement
<BlockCacheFile
>(), cache_id_(cache_id
) {}
94 explicit BlockCacheFile(Env
* const env
, const std::string
& dir
,
95 const uint32_t cache_id
)
96 : LRUElement
<BlockCacheFile
>(),
99 cache_id_(cache_id
) {}
101 virtual ~BlockCacheFile() {}
103 // append key/value to file and return LBA locator to user
104 virtual bool Append(const Slice
& /*key*/, const Slice
& /*val*/,
105 LBA
* const /*lba*/) {
106 assert(!"not implemented");
110 // read from the record locator (LBA) and return key, value and status
111 virtual bool Read(const LBA
& /*lba*/, Slice
* /*key*/, Slice
* /*block*/,
113 assert(!"not implemented");
118 std::string
Path() const {
119 return dir_
+ "/" + std::to_string(cache_id_
) + ".rc";
122 uint32_t cacheid() const { return cache_id_
; }
123 // Add block information to file data
124 // Block information is the list of index reference for this file
125 virtual void Add(BlockInfo
* binfo
) {
126 WriteLock
_(&rwlock_
);
127 block_infos_
.push_back(binfo
);
129 // get block information
130 std::list
<BlockInfo
*>& block_infos() { return block_infos_
; }
131 // delete file and return the size of the file
132 virtual Status
Delete(uint64_t* size
);
135 port::RWMutex rwlock_
; // synchronization mutex
136 Env
* const env_
= nullptr; // Env for OS
137 const std::string dir_
; // Directory name
138 const uint32_t cache_id_
; // Cache id for the file
139 std::list
<BlockInfo
*> block_infos_
; // List of index entries mapping to the
143 // class RandomAccessFile
145 // Thread safe implementation for reading random data from file
146 class RandomAccessCacheFile
: public BlockCacheFile
{
148 explicit RandomAccessCacheFile(Env
* const env
, const std::string
& dir
,
149 const uint32_t cache_id
,
150 const std::shared_ptr
<Logger
>& log
)
151 : BlockCacheFile(env
, dir
, cache_id
), log_(log
) {}
153 virtual ~RandomAccessCacheFile() {}
155 // open file for reading
156 bool Open(const bool enable_direct_reads
);
157 // read data from the disk
158 bool Read(const LBA
& lba
, Slice
* key
, Slice
* block
, char* scratch
) override
;
161 std::unique_ptr
<RandomAccessFileReader
> freader_
;
164 bool OpenImpl(const bool enable_direct_reads
);
165 bool ParseRec(const LBA
& lba
, Slice
* key
, Slice
* val
, char* scratch
);
167 std::shared_ptr
<Logger
> log_
; // log file
170 // class WriteableCacheFile
172 // All writes to the files are cached in buffers. The buffers are flushed to
173 // disk as they get filled up. When file size reaches a certain size, a new file
174 // will be created provided there is free space
175 class WriteableCacheFile
: public RandomAccessCacheFile
{
177 explicit WriteableCacheFile(Env
* const env
, CacheWriteBufferAllocator
* alloc
,
178 Writer
* writer
, const std::string
& dir
,
179 const uint32_t cache_id
, const uint32_t max_size
,
180 const std::shared_ptr
<Logger
>& log
)
181 : RandomAccessCacheFile(env
, dir
, cache_id
, log
),
184 max_size_(max_size
) {}
186 virtual ~WriteableCacheFile();
188 // create file on disk
189 bool Create(const bool enable_direct_writes
, const bool enable_direct_reads
);
191 // read data from logical file
192 bool Read(const LBA
& lba
, Slice
* key
, Slice
* block
, char* scratch
) override
{
193 ReadLock
_(&rwlock_
);
194 const bool closed
= eof_
&& bufs_
.empty();
196 // the file is closed, read from disk
197 return RandomAccessCacheFile::Read(lba
, key
, block
, scratch
);
199 // file is still being written, read from buffers
200 return ReadBuffer(lba
, key
, block
, scratch
);
203 // append data to end of file
204 bool Append(const Slice
&, const Slice
&, LBA
* const) override
;
206 bool Eof() const { return eof_
; }
209 friend class ThreadedWriter
;
211 static const size_t kFileAlignmentSize
= 4 * 1024; // align file size
213 bool ReadBuffer(const LBA
& lba
, Slice
* key
, Slice
* block
, char* scratch
);
214 bool ReadBuffer(const LBA
& lba
, char* data
);
215 bool ExpandBuffer(const size_t size
);
216 void DispatchBuffer();
217 void BufferWriteDone();
218 void CloseAndOpenForReading();
222 // File layout in memory
224 // +------+------+------+------+------+------+
225 // | b0 | b1 | b2 | b3 | b4 | b5 |
226 // +------+------+------+------+------+------+
229 // buf_doff_ buf_woff_
230 // (next buffer to (next buffer to fill)
233 // The buffers are flushed to disk serially for a given file
235 CacheWriteBufferAllocator
* const alloc_
= nullptr; // Buffer provider
236 Writer
* const writer_
= nullptr; // File writer thread
237 std::unique_ptr
<WritableFile
> file_
; // RocksDB Env file abstraction
238 std::vector
<CacheWriteBuffer
*> bufs_
; // Written buffers
239 uint32_t size_
= 0; // Size of the file
240 const uint32_t max_size_
; // Max size of the file
241 bool eof_
= false; // End of file
242 uint32_t disk_woff_
= 0; // Offset to write on disk
243 size_t buf_woff_
= 0; // off into bufs_ to write
244 size_t buf_doff_
= 0; // off into bufs_ to dispatch
245 size_t pending_ios_
= 0; // Number of ios to disk in-progress
246 bool enable_direct_reads_
= false; // Should we enable direct reads
247 // when reading from disk
251 // Abstraction to do writing to device. It is part of pipelined architecture.
253 class ThreadedWriter
: public Writer
{
255 // Representation of IO to device
257 explicit IO(const bool signal
) : signal_(signal
) {}
258 explicit IO(WritableFile
* const file
, CacheWriteBuffer
* const buf
,
259 const uint64_t file_off
, const std::function
<void()> callback
)
260 : file_(file
), buf_(buf
), file_off_(file_off
), callback_(callback
) {}
262 IO(const IO
&) = default;
263 IO
& operator=(const IO
&) = default;
264 size_t Size() const { return sizeof(IO
); }
266 WritableFile
* file_
= nullptr; // File to write to
267 CacheWriteBuffer
* buf_
= nullptr; // buffer to write
268 uint64_t file_off_
= 0; // file offset
269 bool signal_
= false; // signal to exit thread loop
270 std::function
<void()> callback_
; // Callback on completion
273 explicit ThreadedWriter(PersistentCacheTier
* const cache
, const size_t qdepth
,
274 const size_t io_size
);
275 virtual ~ThreadedWriter() { assert(threads_
.empty()); }
277 void Stop() override
;
278 void Write(WritableFile
* const file
, CacheWriteBuffer
* buf
,
279 const uint64_t file_off
,
280 const std::function
<void()> callback
) override
;
284 void DispatchIO(const IO
& io
);
286 const size_t io_size_
= 0;
288 std::vector
<port::Thread
> threads_
;
291 } // namespace ROCKSDB_NAMESPACE