]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/persistent_cache/block_cache_tier_file.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / utilities / persistent_cache / block_cache_tier_file.h
CommitLineData
7c673cae 1// Copyright (c) 2013, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5#pragma once
6
7#ifndef ROCKSDB_LITE
8
9#include <list>
10#include <memory>
11#include <string>
12#include <vector>
13
14#include "rocksdb/comparator.h"
15#include "rocksdb/env.h"
16
17#include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
18#include "utilities/persistent_cache/lrulist.h"
19#include "utilities/persistent_cache/persistent_cache_tier.h"
20#include "utilities/persistent_cache/persistent_cache_util.h"
21
22#include "port/port.h"
23#include "util/crc32c.h"
24#include "util/file_reader_writer.h"
25#include "util/mutexlock.h"
26
27// The io code path of persistent cache uses pipelined architecture
28//
29// client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
30//
31// This would enable the system to scale for GB/s of throughput which is
32// expected with modern devies like NVM.
33//
34// The file level operations are encapsulated in the following abstractions
35//
36// BlockCacheFile
37// ^
38// |
39// |
40// RandomAccessCacheFile (For reading)
41// ^
42// |
43// |
44// WriteableCacheFile (For writing)
45//
46// Write IO code path :
47//
48namespace rocksdb {
49
50class WriteableCacheFile;
51struct BlockInfo;
52
53// Represents a logical record on device
54//
55// (L)ogical (B)lock (Address = { cache-file-id, offset, size }
56struct LogicalBlockAddress {
57 LogicalBlockAddress() {}
58 explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
59 const uint16_t size)
60 : cache_id_(cache_id), off_(off), size_(size) {}
61
62 uint32_t cache_id_ = 0;
63 uint32_t off_ = 0;
64 uint32_t size_ = 0;
65};
66
67typedef LogicalBlockAddress LBA;
68
69// class Writer
70//
71// Writer is the abstraction used for writing data to file. The component can be
72// multithreaded. It is the last step of write pipeline
73class Writer {
74 public:
75 explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
76 virtual ~Writer() {}
77
78 // write buffer to file at the given offset
79 virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
80 const uint64_t file_off,
81 const std::function<void()> callback) = 0;
82 // stop the writer
83 virtual void Stop() = 0;
84
85 PersistentCacheTier* const cache_;
86};
87
88// class BlockCacheFile
89//
90// Generic interface to support building file specialized for read/writing
91class BlockCacheFile : public LRUElement<BlockCacheFile> {
92 public:
93 explicit BlockCacheFile(const uint32_t cache_id)
94 : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}
95
96 explicit BlockCacheFile(Env* const env, const std::string& dir,
97 const uint32_t cache_id)
98 : LRUElement<BlockCacheFile>(),
99 env_(env),
100 dir_(dir),
101 cache_id_(cache_id) {}
102
103 virtual ~BlockCacheFile() {}
104
105 // append key/value to file and return LBA locator to user
11fdf7f2
TL
106 virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
107 LBA* const /*lba*/) {
7c673cae
FG
108 assert(!"not implemented");
109 return false;
110 }
111
112 // read from the record locator (LBA) and return key, value and status
11fdf7f2
TL
113 virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
114 char* /*scratch*/) {
7c673cae
FG
115 assert(!"not implemented");
116 return false;
117 }
118
119 // get file path
120 std::string Path() const {
121 return dir_ + "/" + std::to_string(cache_id_) + ".rc";
122 }
123 // get cache ID
124 uint32_t cacheid() const { return cache_id_; }
125 // Add block information to file data
126 // Block information is the list of index reference for this file
127 virtual void Add(BlockInfo* binfo) {
128 WriteLock _(&rwlock_);
129 block_infos_.push_back(binfo);
130 }
131 // get block information
132 std::list<BlockInfo*>& block_infos() { return block_infos_; }
133 // delete file and return the size of the file
134 virtual Status Delete(uint64_t* size);
135
136 protected:
137 port::RWMutex rwlock_; // synchronization mutex
138 Env* const env_ = nullptr; // Env for IO
139 const std::string dir_; // Directory name
140 const uint32_t cache_id_; // Cache id for the file
141 std::list<BlockInfo*> block_infos_; // List of index entries mapping to the
142 // file content
143};
144
145// class RandomAccessFile
146//
147// Thread safe implementation for reading random data from file
148class RandomAccessCacheFile : public BlockCacheFile {
149 public:
150 explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
151 const uint32_t cache_id,
494da23a 152 const std::shared_ptr<Logger>& log)
7c673cae
FG
153 : BlockCacheFile(env, dir, cache_id), log_(log) {}
154
155 virtual ~RandomAccessCacheFile() {}
156
157 // open file for reading
158 bool Open(const bool enable_direct_reads);
159 // read data from the disk
160 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
161
162 private:
163 std::unique_ptr<RandomAccessFileReader> freader_;
164
165 protected:
166 bool OpenImpl(const bool enable_direct_reads);
167 bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
168
169 std::shared_ptr<Logger> log_; // log file
170};
171
172// class WriteableCacheFile
173//
174// All writes to the files are cached in buffers. The buffers are flushed to
175// disk as they get filled up. When file size reaches a certain size, a new file
176// will be created provided there is free space
177class WriteableCacheFile : public RandomAccessCacheFile {
178 public:
179 explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
180 Writer* writer, const std::string& dir,
181 const uint32_t cache_id, const uint32_t max_size,
182 const std::shared_ptr<Logger>& log)
183 : RandomAccessCacheFile(env, dir, cache_id, log),
184 alloc_(alloc),
185 writer_(writer),
186 max_size_(max_size) {}
187
188 virtual ~WriteableCacheFile();
189
190 // create file on disk
191 bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
192
193 // read data from logical file
194 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
195 ReadLock _(&rwlock_);
196 const bool closed = eof_ && bufs_.empty();
197 if (closed) {
198 // the file is closed, read from disk
199 return RandomAccessCacheFile::Read(lba, key, block, scratch);
200 }
201 // file is still being written, read from buffers
202 return ReadBuffer(lba, key, block, scratch);
203 }
204
205 // append data to end of file
206 bool Append(const Slice&, const Slice&, LBA* const) override;
207 // End-of-file
208 bool Eof() const { return eof_; }
209
210 private:
211 friend class ThreadedWriter;
212
213 static const size_t kFileAlignmentSize = 4 * 1024; // align file size
214
215 bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
216 bool ReadBuffer(const LBA& lba, char* data);
217 bool ExpandBuffer(const size_t size);
218 void DispatchBuffer();
219 void BufferWriteDone();
220 void CloseAndOpenForReading();
221 void ClearBuffers();
222 void Close();
223
224 // File layout in memory
225 //
226 // +------+------+------+------+------+------+
227 // | b0 | b1 | b2 | b3 | b4 | b5 |
228 // +------+------+------+------+------+------+
229 // ^ ^
230 // | |
231 // buf_doff_ buf_woff_
232 // (next buffer to (next buffer to fill)
233 // flush to disk)
234 //
235 // The buffers are flushed to disk serially for a given file
236
237 CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider
238 Writer* const writer_ = nullptr; // File writer thread
239 std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction
240 std::vector<CacheWriteBuffer*> bufs_; // Written buffers
241 uint32_t size_ = 0; // Size of the file
242 const uint32_t max_size_; // Max size of the file
243 bool eof_ = false; // End of file
244 uint32_t disk_woff_ = 0; // Offset to write on disk
245 size_t buf_woff_ = 0; // off into bufs_ to write
246 size_t buf_doff_ = 0; // off into bufs_ to dispatch
247 size_t pending_ios_ = 0; // Number of ios to disk in-progress
248 bool enable_direct_reads_ = false; // Should we enable direct reads
249 // when reading from disk
250};
251
252//
253// Abstraction to do writing to device. It is part of pipelined architecture.
254//
255class ThreadedWriter : public Writer {
256 public:
257 // Representation of IO to device
258 struct IO {
259 explicit IO(const bool signal) : signal_(signal) {}
260 explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
261 const uint64_t file_off, const std::function<void()> callback)
262 : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}
263
264 IO(const IO&) = default;
265 IO& operator=(const IO&) = default;
266 size_t Size() const { return sizeof(IO); }
267
494da23a
TL
268 WritableFile* file_ = nullptr; // File to write to
269 CacheWriteBuffer* buf_ = nullptr; // buffer to write
270 uint64_t file_off_ = 0; // file offset
271 bool signal_ = false; // signal to exit thread loop
272 std::function<void()> callback_; // Callback on completion
7c673cae
FG
273 };
274
275 explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
276 const size_t io_size);
277 virtual ~ThreadedWriter() { assert(threads_.empty()); }
278
279 void Stop() override;
280 void Write(WritableFile* const file, CacheWriteBuffer* buf,
281 const uint64_t file_off,
282 const std::function<void()> callback) override;
283
284 private:
285 void ThreadMain();
286 void DispatchIO(const IO& io);
287
288 const size_t io_size_ = 0;
289 BoundedQueue<IO> q_;
290 std::vector<port::Thread> threads_;
291};
292
293} // namespace rocksdb
294
295#endif