]>
Commit | Line | Data |
---|---|---|
4a3adc3d DC |
1 | use std::future::Future; |
2 | use std::task::{Poll, Context}; | |
3 | use std::pin::Pin; | |
ec5f9d35 | 4 | use std::io::SeekFrom; |
4a3adc3d DC |
5 | |
6 | use anyhow::Error; | |
7 | use futures::future::FutureExt; | |
8 | use futures::ready; | |
ec5f9d35 | 9 | use tokio::io::{AsyncRead, AsyncSeek}; |
4a3adc3d DC |
10 | |
11 | use proxmox::sys::error::io_err_other; | |
12 | use proxmox::io_format_err; | |
13 | ||
14 | use super::IndexFile; | |
15 | use super::read_chunk::AsyncReadChunk; | |
ec5f9d35 | 16 | use super::index::ChunkReadInfo; |
4a3adc3d | 17 | |
61c6eafc WB |
18 | // FIXME: This enum may not be required? |
19 | // - Put the `WaitForData` case directly into a `read_future: Option<>` | |
20 | // - make the read loop as follows: | |
21 | // * if read_buffer is not empty: | |
22 | // use it | |
23 | // * else if read_future is there: | |
24 | // poll it | |
25 | // if read: move data to read_buffer | |
26 | // * else | |
27 | // create read future | |
28 | #[allow(clippy::enum_variant_names)] | |
4a3adc3d DC |
29 | enum AsyncIndexReaderState<S> { |
30 | NoData, | |
31 | WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>), | |
ec5f9d35 | 32 | HaveData, |
4a3adc3d DC |
33 | } |
34 | ||
35 | pub struct AsyncIndexReader<S, I: IndexFile> { | |
36 | store: Option<S>, | |
37 | index: I, | |
38 | read_buffer: Vec<u8>, | |
ec5f9d35 | 39 | current_chunk_offset: u64, |
4a3adc3d | 40 | current_chunk_idx: usize, |
ec5f9d35 SR |
41 | current_chunk_info: Option<ChunkReadInfo>, |
42 | position: u64, | |
43 | seek_to_pos: i64, | |
4a3adc3d DC |
44 | state: AsyncIndexReaderState<S>, |
45 | } | |
46 | ||
47 | // ok because the only public interfaces operates on &mut Self | |
48 | unsafe impl<S: Sync, I: IndexFile + Sync> Sync for AsyncIndexReader<S, I> {} | |
49 | ||
50 | impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> { | |
51 | pub fn new(index: I, store: S) -> Self { | |
52 | Self { | |
53 | store: Some(store), | |
54 | index, | |
e13c4f66 | 55 | read_buffer: Vec::with_capacity(1024 * 1024), |
ec5f9d35 | 56 | current_chunk_offset: 0, |
4a3adc3d | 57 | current_chunk_idx: 0, |
ec5f9d35 SR |
58 | current_chunk_info: None, |
59 | position: 0, | |
60 | seek_to_pos: 0, | |
4a3adc3d DC |
61 | state: AsyncIndexReaderState::NoData, |
62 | } | |
63 | } | |
64 | } | |
65 | ||
e13c4f66 WB |
66 | impl<S, I> AsyncRead for AsyncIndexReader<S, I> |
67 | where | |
68 | S: AsyncReadChunk + Unpin + Sync + 'static, | |
69 | I: IndexFile + Unpin, | |
4a3adc3d DC |
70 | { |
71 | fn poll_read( | |
72 | self: Pin<&mut Self>, | |
73 | cx: &mut Context, | |
74 | buf: &mut [u8], | |
75 | ) -> Poll<tokio::io::Result<usize>> { | |
76 | let this = Pin::get_mut(self); | |
77 | loop { | |
78 | match &mut this.state { | |
79 | AsyncIndexReaderState::NoData => { | |
ec5f9d35 SR |
80 | let (idx, offset) = if this.current_chunk_info.is_some() && |
81 | this.position == this.current_chunk_info.as_ref().unwrap().range.end | |
82 | { | |
83 | // optimization for sequential chunk read | |
84 | let next_idx = this.current_chunk_idx + 1; | |
85 | (next_idx, 0) | |
86 | } else { | |
87 | match this.index.chunk_from_offset(this.position) { | |
88 | Some(res) => res, | |
89 | None => return Poll::Ready(Ok(0)) | |
90 | } | |
91 | }; | |
92 | ||
93 | if idx >= this.index.index_count() { | |
4a3adc3d DC |
94 | return Poll::Ready(Ok(0)); |
95 | } | |
96 | ||
ec5f9d35 | 97 | let info = this |
4a3adc3d | 98 | .index |
ec5f9d35 SR |
99 | .chunk_info(idx) |
100 | .ok_or(io_format_err!("could not get digest"))?; | |
101 | ||
102 | this.current_chunk_offset = offset; | |
103 | this.current_chunk_idx = idx; | |
104 | let old_info = this.current_chunk_info.replace(info.clone()); | |
105 | ||
106 | if let Some(old_info) = old_info { | |
107 | if old_info.digest == info.digest { | |
108 | // hit, chunk is currently in cache | |
109 | this.state = AsyncIndexReaderState::HaveData; | |
110 | continue; | |
111 | } | |
4a3adc3d DC |
112 | } |
113 | ||
ec5f9d35 | 114 | // miss, need to download new chunk |
e9764238 | 115 | let store = match this.store.take() { |
4a3adc3d DC |
116 | Some(store) => store, |
117 | None => { | |
118 | return Poll::Ready(Err(io_format_err!("could not find store"))); | |
e13c4f66 | 119 | } |
4a3adc3d DC |
120 | }; |
121 | ||
122 | let future = async move { | |
ec5f9d35 | 123 | store.read_chunk(&info.digest) |
4a3adc3d DC |
124 | .await |
125 | .map(move |x| (store, x)) | |
126 | }; | |
127 | ||
128 | this.state = AsyncIndexReaderState::WaitForData(future.boxed()); | |
e13c4f66 | 129 | } |
4a3adc3d DC |
130 | AsyncIndexReaderState::WaitForData(ref mut future) => { |
131 | match ready!(future.as_mut().poll(cx)) { | |
61c6eafc WB |
132 | Ok((store, chunk_data)) => { |
133 | this.read_buffer = chunk_data; | |
ec5f9d35 | 134 | this.state = AsyncIndexReaderState::HaveData; |
4a3adc3d | 135 | this.store = Some(store); |
e13c4f66 | 136 | } |
4a3adc3d DC |
137 | Err(err) => { |
138 | return Poll::Ready(Err(io_err_other(err))); | |
e13c4f66 | 139 | } |
4a3adc3d | 140 | }; |
e13c4f66 | 141 | } |
ec5f9d35 SR |
142 | AsyncIndexReaderState::HaveData => { |
143 | let offset = this.current_chunk_offset as usize; | |
4a3adc3d DC |
144 | let len = this.read_buffer.len(); |
145 | let n = if len - offset < buf.len() { | |
146 | len - offset | |
147 | } else { | |
148 | buf.len() | |
149 | }; | |
150 | ||
e13c4f66 | 151 | buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]); |
ec5f9d35 SR |
152 | this.position += n as u64; |
153 | ||
4a3adc3d DC |
154 | if offset + n == len { |
155 | this.state = AsyncIndexReaderState::NoData; | |
4a3adc3d | 156 | } else { |
ec5f9d35 SR |
157 | this.current_chunk_offset += n as u64; |
158 | this.state = AsyncIndexReaderState::HaveData; | |
4a3adc3d DC |
159 | } |
160 | ||
161 | return Poll::Ready(Ok(n)); | |
e13c4f66 | 162 | } |
4a3adc3d DC |
163 | } |
164 | } | |
165 | } | |
166 | } | |
ec5f9d35 SR |
167 | |
168 | impl<S, I> AsyncSeek for AsyncIndexReader<S, I> | |
169 | where | |
170 | S: AsyncReadChunk + Unpin + Sync + 'static, | |
171 | I: IndexFile + Unpin, | |
172 | { | |
173 | fn start_seek( | |
174 | self: Pin<&mut Self>, | |
175 | _cx: &mut Context<'_>, | |
176 | pos: SeekFrom, | |
177 | ) -> Poll<tokio::io::Result<()>> { | |
178 | let this = Pin::get_mut(self); | |
179 | this.seek_to_pos = match pos { | |
180 | SeekFrom::Start(offset) => { | |
181 | offset as i64 | |
182 | }, | |
183 | SeekFrom::End(offset) => { | |
184 | this.index.index_bytes() as i64 + offset | |
185 | }, | |
186 | SeekFrom::Current(offset) => { | |
187 | this.position as i64 + offset | |
188 | } | |
189 | }; | |
190 | Poll::Ready(Ok(())) | |
191 | } | |
192 | ||
193 | fn poll_complete( | |
194 | self: Pin<&mut Self>, | |
195 | _cx: &mut Context<'_>, | |
196 | ) -> Poll<tokio::io::Result<u64>> { | |
197 | let this = Pin::get_mut(self); | |
198 | ||
199 | let index_bytes = this.index.index_bytes(); | |
200 | if this.seek_to_pos < 0 { | |
201 | return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); | |
202 | } else if this.seek_to_pos > index_bytes as i64 { | |
203 | this.position = index_bytes; | |
204 | } else { | |
205 | this.position = this.seek_to_pos as u64; | |
206 | } | |
207 | ||
208 | // even if seeking within one chunk, we need to go to NoData to | |
209 | // recalculate the current_chunk_offset (data is cached anyway) | |
210 | this.state = AsyncIndexReaderState::NoData; | |
211 | ||
212 | Poll::Ready(Ok(this.position)) | |
213 | } | |
214 | } |