]>
Commit | Line | Data |
---|---|---|
70a152de SR |
1 | //! An async and concurrency safe data reader backed by a local LRU cache. |
2 | ||
e64f38cb | 3 | use std::future::Future; |
10351f70 SR |
4 | use std::io::SeekFrom; |
5 | use std::pin::Pin; | |
70a152de | 6 | use std::sync::Arc; |
10351f70 | 7 | use std::task::{Context, Poll}; |
70a152de | 8 | |
e64f38cb WB |
9 | use anyhow::Error; |
10 | use futures::ready; | |
11 | use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; | |
12 | ||
25877d05 DM |
13 | use proxmox_sys::io_format_err; |
14 | use proxmox_sys::error::io_err_other; | |
70a152de | 15 | |
6c221244 | 16 | use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; |
e64f38cb | 17 | |
ef4df211 WB |
18 | use crate::index::IndexFile; |
19 | use crate::read_chunk::AsyncReadChunk; | |
20 | ||
70a152de SR |
21 | struct AsyncChunkCacher<T> { |
22 | reader: Arc<T>, | |
23 | } | |
24 | ||
25 | impl<T: AsyncReadChunk + Send + Sync + 'static> AsyncCacher<[u8; 32], Arc<Vec<u8>>> | |
26 | for AsyncChunkCacher<T> | |
27 | { | |
28 | fn fetch( | |
29 | &self, | |
30 | key: [u8; 32], | |
31 | ) -> Box<dyn Future<Output = Result<Option<Arc<Vec<u8>>>, Error>> + Send> { | |
32 | let reader = Arc::clone(&self.reader); | |
33 | Box::new(async move { | |
34 | AsyncReadChunk::read_chunk(reader.as_ref(), &key) | |
35 | .await | |
36 | .map(|x| Some(Arc::new(x))) | |
37 | }) | |
38 | } | |
39 | } | |
40 | ||
41 | /// Allows arbitrary data reads from an Index via an AsyncReadChunk implementation, using an LRU | |
42 | /// cache internally to cache chunks and provide support for multiple concurrent reads (potentially | |
43 | /// to the same chunk). | |
44 | pub struct CachedChunkReader<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> { | |
45 | cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>, | |
46 | cacher: AsyncChunkCacher<R>, | |
47 | index: I, | |
48 | } | |
49 | ||
50 | impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<I, R> { | |
51 | /// Create a new reader with a local LRU cache containing 'capacity' chunks. | |
52 | pub fn new(reader: R, index: I, capacity: usize) -> Self { | |
53 | let cache = Arc::new(AsyncLruCache::new(capacity)); | |
54 | Self::new_with_cache(reader, index, cache) | |
55 | } | |
56 | ||
57 | /// Create a new reader with a custom LRU cache. Use this to share a cache between multiple | |
58 | /// readers. | |
59 | pub fn new_with_cache( | |
60 | reader: R, | |
61 | index: I, | |
62 | cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>, | |
63 | ) -> Self { | |
64 | Self { | |
65 | cache, | |
66 | cacher: AsyncChunkCacher { | |
67 | reader: Arc::new(reader), | |
68 | }, | |
69 | index, | |
70 | } | |
71 | } | |
72 | ||
73 | /// Read data at a given byte offset into a variable size buffer. Returns the amount of bytes | |
74 | /// read, which will always be the size of the buffer except when reaching EOF. | |
75 | pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> { | |
76 | let size = buf.len(); | |
77 | let mut read: usize = 0; | |
78 | while read < size { | |
79 | let cur_offset = offset + read as u64; | |
80 | if let Some(chunk) = self.index.chunk_from_offset(cur_offset) { | |
81 | // chunk indices retrieved from chunk_from_offset always resolve to Some(_) | |
82 | let info = self.index.chunk_info(chunk.0).unwrap(); | |
83 | ||
84 | // will never be None, see AsyncChunkCacher | |
85 | let data = self.cache.access(info.digest, &self.cacher).await?.unwrap(); | |
86 | ||
87 | let want_bytes = ((info.range.end - cur_offset) as usize).min(size - read); | |
88 | let slice = &mut buf[read..(read + want_bytes)]; | |
89 | let intra_chunk = chunk.1 as usize; | |
90 | slice.copy_from_slice(&data[intra_chunk..(intra_chunk + want_bytes)]); | |
91 | read += want_bytes; | |
92 | } else { | |
93 | // EOF | |
94 | break; | |
95 | } | |
96 | } | |
97 | Ok(read) | |
98 | } | |
99 | } | |
10351f70 SR |
100 | |
101 | impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static> | |
102 | CachedChunkReader<I, R> | |
103 | { | |
104 | /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and | |
105 | /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred | |
106 | /// otherwise. | |
107 | pub fn seekable(self) -> SeekableCachedChunkReader<I, R> { | |
108 | SeekableCachedChunkReader { | |
109 | index_bytes: self.index.index_bytes(), | |
110 | reader: Arc::new(self), | |
111 | position: 0, | |
112 | read_future: None, | |
113 | } | |
114 | } | |
115 | } | |
116 | ||
117 | pub struct SeekableCachedChunkReader< | |
118 | I: IndexFile + Send + Sync + 'static, | |
119 | R: AsyncReadChunk + Send + Sync + 'static, | |
120 | > { | |
121 | reader: Arc<CachedChunkReader<I, R>>, | |
122 | index_bytes: u64, | |
123 | position: u64, | |
124 | read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>, | |
125 | } | |
126 | ||
127 | impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R> | |
128 | where | |
129 | I: IndexFile + Send + Sync + 'static, | |
130 | R: AsyncReadChunk + Send + Sync + 'static, | |
131 | { | |
132 | fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> { | |
133 | let this = Pin::get_mut(self); | |
134 | let seek_to_pos = match pos { | |
135 | SeekFrom::Start(offset) => offset as i64, | |
136 | SeekFrom::End(offset) => this.index_bytes as i64 + offset, | |
137 | SeekFrom::Current(offset) => this.position as i64 + offset, | |
138 | }; | |
139 | if seek_to_pos < 0 { | |
140 | return Err(io_format_err!("cannot seek to negative values")); | |
141 | } else if seek_to_pos > this.index_bytes as i64 { | |
142 | this.position = this.index_bytes; | |
143 | } else { | |
144 | this.position = seek_to_pos as u64; | |
145 | } | |
146 | Ok(()) | |
147 | } | |
148 | ||
149 | fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> { | |
150 | Poll::Ready(Ok(self.position)) | |
151 | } | |
152 | } | |
153 | ||
154 | impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R> | |
155 | where | |
156 | I: IndexFile + Send + Sync + 'static, | |
157 | R: AsyncReadChunk + Send + Sync + 'static, | |
158 | { | |
159 | fn poll_read( | |
160 | self: Pin<&mut Self>, | |
161 | cx: &mut Context, | |
162 | buf: &mut ReadBuf, | |
163 | ) -> Poll<tokio::io::Result<()>> { | |
164 | let this = Pin::get_mut(self); | |
165 | ||
166 | let offset = this.position; | |
167 | let wanted = buf.capacity(); | |
168 | let reader = Arc::clone(&this.reader); | |
169 | ||
170 | let fut = this.read_future.get_or_insert_with(|| { | |
171 | Box::pin(async move { | |
172 | let mut read_buf = vec![0u8; wanted]; | |
173 | let read = reader.read_at(&mut read_buf[..wanted], offset).await?; | |
174 | Ok((read_buf, read)) | |
175 | }) | |
176 | }); | |
177 | ||
178 | let ret = match ready!(fut.as_mut().poll(cx)) { | |
179 | Ok((read_buf, read)) => { | |
180 | buf.put_slice(&read_buf[..read]); | |
181 | this.position += read as u64; | |
182 | Ok(()) | |
183 | } | |
184 | Err(err) => Err(io_err_other(err)), | |
185 | }; | |
186 | ||
187 | // future completed, drop | |
188 | this.read_future = None; | |
189 | ||
190 | Poll::Ready(ret) | |
191 | } | |
192 | } |