]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-datastore/src/cached_chunk_reader.rs
update to proxmox-sys 0.2 crate
[proxmox-backup.git] / pbs-datastore / src / cached_chunk_reader.rs
CommitLineData
70a152de
SR
1//! An async and concurrency safe data reader backed by a local LRU cache.
2
e64f38cb 3use std::future::Future;
10351f70
SR
4use std::io::SeekFrom;
5use std::pin::Pin;
70a152de 6use std::sync::Arc;
10351f70 7use std::task::{Context, Poll};
70a152de 8
e64f38cb
WB
9use anyhow::Error;
10use futures::ready;
11use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
12
25877d05
DM
13use proxmox_sys::io_format_err;
14use proxmox_sys::error::io_err_other;
70a152de 15
6c221244 16use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
e64f38cb 17
ef4df211
WB
18use crate::index::IndexFile;
19use crate::read_chunk::AsyncReadChunk;
20
70a152de
SR
21struct AsyncChunkCacher<T> {
22 reader: Arc<T>,
23}
24
25impl<T: AsyncReadChunk + Send + Sync + 'static> AsyncCacher<[u8; 32], Arc<Vec<u8>>>
26 for AsyncChunkCacher<T>
27{
28 fn fetch(
29 &self,
30 key: [u8; 32],
31 ) -> Box<dyn Future<Output = Result<Option<Arc<Vec<u8>>>, Error>> + Send> {
32 let reader = Arc::clone(&self.reader);
33 Box::new(async move {
34 AsyncReadChunk::read_chunk(reader.as_ref(), &key)
35 .await
36 .map(|x| Some(Arc::new(x)))
37 })
38 }
39}
40
41/// Allows arbitrary data reads from an Index via an AsyncReadChunk implementation, using an LRU
42/// cache internally to cache chunks and provide support for multiple concurrent reads (potentially
43/// to the same chunk).
44pub struct CachedChunkReader<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> {
45 cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
46 cacher: AsyncChunkCacher<R>,
47 index: I,
48}
49
50impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<I, R> {
51 /// Create a new reader with a local LRU cache containing 'capacity' chunks.
52 pub fn new(reader: R, index: I, capacity: usize) -> Self {
53 let cache = Arc::new(AsyncLruCache::new(capacity));
54 Self::new_with_cache(reader, index, cache)
55 }
56
57 /// Create a new reader with a custom LRU cache. Use this to share a cache between multiple
58 /// readers.
59 pub fn new_with_cache(
60 reader: R,
61 index: I,
62 cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
63 ) -> Self {
64 Self {
65 cache,
66 cacher: AsyncChunkCacher {
67 reader: Arc::new(reader),
68 },
69 index,
70 }
71 }
72
73 /// Read data at a given byte offset into a variable size buffer. Returns the amount of bytes
74 /// read, which will always be the size of the buffer except when reaching EOF.
75 pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
76 let size = buf.len();
77 let mut read: usize = 0;
78 while read < size {
79 let cur_offset = offset + read as u64;
80 if let Some(chunk) = self.index.chunk_from_offset(cur_offset) {
81 // chunk indices retrieved from chunk_from_offset always resolve to Some(_)
82 let info = self.index.chunk_info(chunk.0).unwrap();
83
84 // will never be None, see AsyncChunkCacher
85 let data = self.cache.access(info.digest, &self.cacher).await?.unwrap();
86
87 let want_bytes = ((info.range.end - cur_offset) as usize).min(size - read);
88 let slice = &mut buf[read..(read + want_bytes)];
89 let intra_chunk = chunk.1 as usize;
90 slice.copy_from_slice(&data[intra_chunk..(intra_chunk + want_bytes)]);
91 read += want_bytes;
92 } else {
93 // EOF
94 break;
95 }
96 }
97 Ok(read)
98 }
99}
10351f70
SR
100
101impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
102 CachedChunkReader<I, R>
103{
104 /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
105 /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
106 /// otherwise.
107 pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
108 SeekableCachedChunkReader {
109 index_bytes: self.index.index_bytes(),
110 reader: Arc::new(self),
111 position: 0,
112 read_future: None,
113 }
114 }
115}
116
117pub struct SeekableCachedChunkReader<
118 I: IndexFile + Send + Sync + 'static,
119 R: AsyncReadChunk + Send + Sync + 'static,
120> {
121 reader: Arc<CachedChunkReader<I, R>>,
122 index_bytes: u64,
123 position: u64,
124 read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
125}
126
127impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
128where
129 I: IndexFile + Send + Sync + 'static,
130 R: AsyncReadChunk + Send + Sync + 'static,
131{
132 fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
133 let this = Pin::get_mut(self);
134 let seek_to_pos = match pos {
135 SeekFrom::Start(offset) => offset as i64,
136 SeekFrom::End(offset) => this.index_bytes as i64 + offset,
137 SeekFrom::Current(offset) => this.position as i64 + offset,
138 };
139 if seek_to_pos < 0 {
140 return Err(io_format_err!("cannot seek to negative values"));
141 } else if seek_to_pos > this.index_bytes as i64 {
142 this.position = this.index_bytes;
143 } else {
144 this.position = seek_to_pos as u64;
145 }
146 Ok(())
147 }
148
149 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
150 Poll::Ready(Ok(self.position))
151 }
152}
153
154impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
155where
156 I: IndexFile + Send + Sync + 'static,
157 R: AsyncReadChunk + Send + Sync + 'static,
158{
159 fn poll_read(
160 self: Pin<&mut Self>,
161 cx: &mut Context,
162 buf: &mut ReadBuf,
163 ) -> Poll<tokio::io::Result<()>> {
164 let this = Pin::get_mut(self);
165
166 let offset = this.position;
167 let wanted = buf.capacity();
168 let reader = Arc::clone(&this.reader);
169
170 let fut = this.read_future.get_or_insert_with(|| {
171 Box::pin(async move {
172 let mut read_buf = vec![0u8; wanted];
173 let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
174 Ok((read_buf, read))
175 })
176 });
177
178 let ret = match ready!(fut.as_mut().poll(cx)) {
179 Ok((read_buf, read)) => {
180 buf.put_slice(&read_buf[..read]);
181 this.position += read as u64;
182 Ok(())
183 }
184 Err(err) => Err(io_err_other(err)),
185 };
186
187 // future completed, drop
188 this.read_future = None;
189
190 Poll::Ready(ret)
191 }
192}