]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/async_index_reader.rs
log rotate: do NOT overwrite file with possible writers
[proxmox-backup.git] / src / backup / async_index_reader.rs
1 use std::future::Future;
2 use std::task::{Poll, Context};
3 use std::pin::Pin;
4 use std::io::SeekFrom;
5
6 use anyhow::Error;
7 use futures::future::FutureExt;
8 use futures::ready;
9 use tokio::io::{AsyncRead, AsyncSeek};
10
11 use proxmox::sys::error::io_err_other;
12 use proxmox::io_format_err;
13
14 use super::IndexFile;
15 use super::read_chunk::AsyncReadChunk;
16 use super::index::ChunkReadInfo;
17
18 // FIXME: This enum may not be required?
19 // - Put the `WaitForData` case directly into a `read_future: Option<>`
20 // - make the read loop as follows:
21 // * if read_buffer is not empty:
22 // use it
23 // * else if read_future is there:
24 // poll it
25 // if read: move data to read_buffer
26 // * else
27 // create read future
28 #[allow(clippy::enum_variant_names)]
29 enum AsyncIndexReaderState<S> {
30 NoData,
31 WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
32 HaveData,
33 }
34
35 pub struct AsyncIndexReader<S, I: IndexFile> {
36 store: Option<S>,
37 index: I,
38 read_buffer: Vec<u8>,
39 current_chunk_offset: u64,
40 current_chunk_idx: usize,
41 current_chunk_info: Option<ChunkReadInfo>,
42 position: u64,
43 seek_to_pos: i64,
44 state: AsyncIndexReaderState<S>,
45 }
46
47 // ok because the only public interfaces operates on &mut Self
48 unsafe impl<S: Sync, I: IndexFile + Sync> Sync for AsyncIndexReader<S, I> {}
49
50 impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
51 pub fn new(index: I, store: S) -> Self {
52 Self {
53 store: Some(store),
54 index,
55 read_buffer: Vec::with_capacity(1024 * 1024),
56 current_chunk_offset: 0,
57 current_chunk_idx: 0,
58 current_chunk_info: None,
59 position: 0,
60 seek_to_pos: 0,
61 state: AsyncIndexReaderState::NoData,
62 }
63 }
64 }
65
66 impl<S, I> AsyncRead for AsyncIndexReader<S, I>
67 where
68 S: AsyncReadChunk + Unpin + Sync + 'static,
69 I: IndexFile + Unpin,
70 {
71 fn poll_read(
72 self: Pin<&mut Self>,
73 cx: &mut Context,
74 buf: &mut [u8],
75 ) -> Poll<tokio::io::Result<usize>> {
76 let this = Pin::get_mut(self);
77 loop {
78 match &mut this.state {
79 AsyncIndexReaderState::NoData => {
80 let (idx, offset) = if this.current_chunk_info.is_some() &&
81 this.position == this.current_chunk_info.as_ref().unwrap().range.end
82 {
83 // optimization for sequential chunk read
84 let next_idx = this.current_chunk_idx + 1;
85 (next_idx, 0)
86 } else {
87 match this.index.chunk_from_offset(this.position) {
88 Some(res) => res,
89 None => return Poll::Ready(Ok(0))
90 }
91 };
92
93 if idx >= this.index.index_count() {
94 return Poll::Ready(Ok(0));
95 }
96
97 let info = this
98 .index
99 .chunk_info(idx)
100 .ok_or(io_format_err!("could not get digest"))?;
101
102 this.current_chunk_offset = offset;
103 this.current_chunk_idx = idx;
104 let old_info = this.current_chunk_info.replace(info.clone());
105
106 if let Some(old_info) = old_info {
107 if old_info.digest == info.digest {
108 // hit, chunk is currently in cache
109 this.state = AsyncIndexReaderState::HaveData;
110 continue;
111 }
112 }
113
114 // miss, need to download new chunk
115 let store = match this.store.take() {
116 Some(store) => store,
117 None => {
118 return Poll::Ready(Err(io_format_err!("could not find store")));
119 }
120 };
121
122 let future = async move {
123 store.read_chunk(&info.digest)
124 .await
125 .map(move |x| (store, x))
126 };
127
128 this.state = AsyncIndexReaderState::WaitForData(future.boxed());
129 }
130 AsyncIndexReaderState::WaitForData(ref mut future) => {
131 match ready!(future.as_mut().poll(cx)) {
132 Ok((store, chunk_data)) => {
133 this.read_buffer = chunk_data;
134 this.state = AsyncIndexReaderState::HaveData;
135 this.store = Some(store);
136 }
137 Err(err) => {
138 return Poll::Ready(Err(io_err_other(err)));
139 }
140 };
141 }
142 AsyncIndexReaderState::HaveData => {
143 let offset = this.current_chunk_offset as usize;
144 let len = this.read_buffer.len();
145 let n = if len - offset < buf.len() {
146 len - offset
147 } else {
148 buf.len()
149 };
150
151 buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
152 this.position += n as u64;
153
154 if offset + n == len {
155 this.state = AsyncIndexReaderState::NoData;
156 } else {
157 this.current_chunk_offset += n as u64;
158 this.state = AsyncIndexReaderState::HaveData;
159 }
160
161 return Poll::Ready(Ok(n));
162 }
163 }
164 }
165 }
166 }
167
168 impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
169 where
170 S: AsyncReadChunk + Unpin + Sync + 'static,
171 I: IndexFile + Unpin,
172 {
173 fn start_seek(
174 self: Pin<&mut Self>,
175 _cx: &mut Context<'_>,
176 pos: SeekFrom,
177 ) -> Poll<tokio::io::Result<()>> {
178 let this = Pin::get_mut(self);
179 this.seek_to_pos = match pos {
180 SeekFrom::Start(offset) => {
181 offset as i64
182 },
183 SeekFrom::End(offset) => {
184 this.index.index_bytes() as i64 + offset
185 },
186 SeekFrom::Current(offset) => {
187 this.position as i64 + offset
188 }
189 };
190 Poll::Ready(Ok(()))
191 }
192
193 fn poll_complete(
194 self: Pin<&mut Self>,
195 _cx: &mut Context<'_>,
196 ) -> Poll<tokio::io::Result<u64>> {
197 let this = Pin::get_mut(self);
198
199 let index_bytes = this.index.index_bytes();
200 if this.seek_to_pos < 0 {
201 return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
202 } else if this.seek_to_pos > index_bytes as i64 {
203 this.position = index_bytes;
204 } else {
205 this.position = this.seek_to_pos as u64;
206 }
207
208 // even if seeking within one chunk, we need to go to NoData to
209 // recalculate the current_chunk_offset (data is cached anyway)
210 this.state = AsyncIndexReaderState::NoData;
211
212 Poll::Ready(Ok(this.position))
213 }
214 }