]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/async_index_reader.rs
AsyncIndexReader: avoid memcpy, add clippy lint fixup comment
[proxmox-backup.git] / src / backup / async_index_reader.rs
CommitLineData
4a3adc3d
DC
1use std::future::Future;
2use std::task::{Poll, Context};
3use std::pin::Pin;
ec5f9d35 4use std::io::SeekFrom;
4a3adc3d
DC
5
6use anyhow::Error;
7use futures::future::FutureExt;
8use futures::ready;
ec5f9d35 9use tokio::io::{AsyncRead, AsyncSeek};
4a3adc3d
DC
10
11use proxmox::sys::error::io_err_other;
12use proxmox::io_format_err;
13
14use super::IndexFile;
15use super::read_chunk::AsyncReadChunk;
ec5f9d35 16use super::index::ChunkReadInfo;
4a3adc3d 17
61c6eafc
WB
18// FIXME: This enum may not be required?
19// - Put the `WaitForData` case directly into a `read_future: Option<>`
20// - make the read loop as follows:
21// * if read_buffer is not empty:
22// use it
23// * else if read_future is there:
24// poll it
25// if read: move data to read_buffer
26// * else
27// create read future
28#[allow(clippy::enum_variant_names)]
4a3adc3d
DC
29enum AsyncIndexReaderState<S> {
30 NoData,
31 WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
ec5f9d35 32 HaveData,
4a3adc3d
DC
33}
34
35pub struct AsyncIndexReader<S, I: IndexFile> {
36 store: Option<S>,
37 index: I,
38 read_buffer: Vec<u8>,
ec5f9d35 39 current_chunk_offset: u64,
4a3adc3d 40 current_chunk_idx: usize,
ec5f9d35
SR
41 current_chunk_info: Option<ChunkReadInfo>,
42 position: u64,
43 seek_to_pos: i64,
4a3adc3d
DC
44 state: AsyncIndexReaderState<S>,
45}
46
47// ok because the only public interfaces operates on &mut Self
48unsafe impl<S: Sync, I: IndexFile + Sync> Sync for AsyncIndexReader<S, I> {}
49
50impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
51 pub fn new(index: I, store: S) -> Self {
52 Self {
53 store: Some(store),
54 index,
e13c4f66 55 read_buffer: Vec::with_capacity(1024 * 1024),
ec5f9d35 56 current_chunk_offset: 0,
4a3adc3d 57 current_chunk_idx: 0,
ec5f9d35
SR
58 current_chunk_info: None,
59 position: 0,
60 seek_to_pos: 0,
4a3adc3d
DC
61 state: AsyncIndexReaderState::NoData,
62 }
63 }
64}
65
e13c4f66
WB
66impl<S, I> AsyncRead for AsyncIndexReader<S, I>
67where
68 S: AsyncReadChunk + Unpin + Sync + 'static,
69 I: IndexFile + Unpin,
4a3adc3d
DC
70{
71 fn poll_read(
72 self: Pin<&mut Self>,
73 cx: &mut Context,
74 buf: &mut [u8],
75 ) -> Poll<tokio::io::Result<usize>> {
76 let this = Pin::get_mut(self);
77 loop {
78 match &mut this.state {
79 AsyncIndexReaderState::NoData => {
ec5f9d35
SR
80 let (idx, offset) = if this.current_chunk_info.is_some() &&
81 this.position == this.current_chunk_info.as_ref().unwrap().range.end
82 {
83 // optimization for sequential chunk read
84 let next_idx = this.current_chunk_idx + 1;
85 (next_idx, 0)
86 } else {
87 match this.index.chunk_from_offset(this.position) {
88 Some(res) => res,
89 None => return Poll::Ready(Ok(0))
90 }
91 };
92
93 if idx >= this.index.index_count() {
4a3adc3d
DC
94 return Poll::Ready(Ok(0));
95 }
96
ec5f9d35 97 let info = this
4a3adc3d 98 .index
ec5f9d35
SR
99 .chunk_info(idx)
100 .ok_or(io_format_err!("could not get digest"))?;
101
102 this.current_chunk_offset = offset;
103 this.current_chunk_idx = idx;
104 let old_info = this.current_chunk_info.replace(info.clone());
105
106 if let Some(old_info) = old_info {
107 if old_info.digest == info.digest {
108 // hit, chunk is currently in cache
109 this.state = AsyncIndexReaderState::HaveData;
110 continue;
111 }
4a3adc3d
DC
112 }
113
ec5f9d35 114 // miss, need to download new chunk
e9764238 115 let store = match this.store.take() {
4a3adc3d
DC
116 Some(store) => store,
117 None => {
118 return Poll::Ready(Err(io_format_err!("could not find store")));
e13c4f66 119 }
4a3adc3d
DC
120 };
121
122 let future = async move {
ec5f9d35 123 store.read_chunk(&info.digest)
4a3adc3d
DC
124 .await
125 .map(move |x| (store, x))
126 };
127
128 this.state = AsyncIndexReaderState::WaitForData(future.boxed());
e13c4f66 129 }
4a3adc3d
DC
130 AsyncIndexReaderState::WaitForData(ref mut future) => {
131 match ready!(future.as_mut().poll(cx)) {
61c6eafc
WB
132 Ok((store, chunk_data)) => {
133 this.read_buffer = chunk_data;
ec5f9d35 134 this.state = AsyncIndexReaderState::HaveData;
4a3adc3d 135 this.store = Some(store);
e13c4f66 136 }
4a3adc3d
DC
137 Err(err) => {
138 return Poll::Ready(Err(io_err_other(err)));
e13c4f66 139 }
4a3adc3d 140 };
e13c4f66 141 }
ec5f9d35
SR
142 AsyncIndexReaderState::HaveData => {
143 let offset = this.current_chunk_offset as usize;
4a3adc3d
DC
144 let len = this.read_buffer.len();
145 let n = if len - offset < buf.len() {
146 len - offset
147 } else {
148 buf.len()
149 };
150
e13c4f66 151 buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
ec5f9d35
SR
152 this.position += n as u64;
153
4a3adc3d
DC
154 if offset + n == len {
155 this.state = AsyncIndexReaderState::NoData;
4a3adc3d 156 } else {
ec5f9d35
SR
157 this.current_chunk_offset += n as u64;
158 this.state = AsyncIndexReaderState::HaveData;
4a3adc3d
DC
159 }
160
161 return Poll::Ready(Ok(n));
e13c4f66 162 }
4a3adc3d
DC
163 }
164 }
165 }
166}
ec5f9d35
SR
167
168impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
169where
170 S: AsyncReadChunk + Unpin + Sync + 'static,
171 I: IndexFile + Unpin,
172{
173 fn start_seek(
174 self: Pin<&mut Self>,
175 _cx: &mut Context<'_>,
176 pos: SeekFrom,
177 ) -> Poll<tokio::io::Result<()>> {
178 let this = Pin::get_mut(self);
179 this.seek_to_pos = match pos {
180 SeekFrom::Start(offset) => {
181 offset as i64
182 },
183 SeekFrom::End(offset) => {
184 this.index.index_bytes() as i64 + offset
185 },
186 SeekFrom::Current(offset) => {
187 this.position as i64 + offset
188 }
189 };
190 Poll::Ready(Ok(()))
191 }
192
193 fn poll_complete(
194 self: Pin<&mut Self>,
195 _cx: &mut Context<'_>,
196 ) -> Poll<tokio::io::Result<u64>> {
197 let this = Pin::get_mut(self);
198
199 let index_bytes = this.index.index_bytes();
200 if this.seek_to_pos < 0 {
201 return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
202 } else if this.seek_to_pos > index_bytes as i64 {
203 this.position = index_bytes;
204 } else {
205 this.position = this.seek_to_pos as u64;
206 }
207
208 // even if seeking within one chunk, we need to go to NoData to
209 // recalculate the current_chunk_offset (data is cached anyway)
210 this.state = AsyncIndexReaderState::NoData;
211
212 Poll::Ready(Ok(this.position))
213 }
214}