]>
git.proxmox.com Git - proxmox-backup.git/blob - src/backup/async_index_reader.rs
1 use std
::future
::Future
;
2 use std
::task
::{Poll, Context}
;
7 use futures
::future
::FutureExt
;
9 use tokio
::io
::{AsyncRead, AsyncSeek}
;
11 use proxmox
::sys
::error
::io_err_other
;
12 use proxmox
::io_format_err
;
15 use super::read_chunk
::AsyncReadChunk
;
16 use super::index
::ChunkReadInfo
;
18 // FIXME: This enum may not be required?
19 // - Put the `WaitForData` case directly into a `read_future: Option<>`
20 // - make the read loop as follows:
21 // * if read_buffer is not empty:
23 // * else if read_future is there:
25 // if read: move data to read_buffer
28 #[allow(clippy::enum_variant_names)]
29 enum AsyncIndexReaderState
<S
> {
31 WaitForData(Pin
<Box
<dyn Future
<Output
= Result
<(S
, Vec
<u8>), Error
>> + Send
+ '
static>>),
35 pub struct AsyncIndexReader
<S
, I
: IndexFile
> {
39 current_chunk_offset
: u64,
40 current_chunk_idx
: usize,
41 current_chunk_info
: Option
<ChunkReadInfo
>,
44 state
: AsyncIndexReaderState
<S
>,
47 // ok because the only public interfaces operates on &mut Self
48 unsafe impl<S
: Sync
, I
: IndexFile
+ Sync
> Sync
for AsyncIndexReader
<S
, I
> {}
50 impl<S
: AsyncReadChunk
, I
: IndexFile
> AsyncIndexReader
<S
, I
> {
51 pub fn new(index
: I
, store
: S
) -> Self {
55 read_buffer
: Vec
::with_capacity(1024 * 1024),
56 current_chunk_offset
: 0,
58 current_chunk_info
: None
,
61 state
: AsyncIndexReaderState
::NoData
,
66 impl<S
, I
> AsyncRead
for AsyncIndexReader
<S
, I
>
68 S
: AsyncReadChunk
+ Unpin
+ Sync
+ '
static,
75 ) -> Poll
<tokio
::io
::Result
<usize>> {
76 let this
= Pin
::get_mut(self);
78 match &mut this
.state
{
79 AsyncIndexReaderState
::NoData
=> {
80 let (idx
, offset
) = if this
.current_chunk_info
.is_some() &&
81 this
.position
== this
.current_chunk_info
.as_ref().unwrap().range
.end
83 // optimization for sequential chunk read
84 let next_idx
= this
.current_chunk_idx
+ 1;
87 match this
.index
.chunk_from_offset(this
.position
) {
89 None
=> return Poll
::Ready(Ok(0))
93 if idx
>= this
.index
.index_count() {
94 return Poll
::Ready(Ok(0));
100 .ok_or(io_format_err
!("could not get digest"))?
;
102 this
.current_chunk_offset
= offset
;
103 this
.current_chunk_idx
= idx
;
104 let old_info
= this
.current_chunk_info
.replace(info
.clone());
106 if let Some(old_info
) = old_info
{
107 if old_info
.digest
== info
.digest
{
108 // hit, chunk is currently in cache
109 this
.state
= AsyncIndexReaderState
::HaveData
;
114 // miss, need to download new chunk
115 let store
= match this
.store
.take() {
116 Some(store
) => store
,
118 return Poll
::Ready(Err(io_format_err
!("could not find store")));
122 let future
= async
move {
123 store
.read_chunk(&info
.digest
)
125 .map(move |x
| (store
, x
))
128 this
.state
= AsyncIndexReaderState
::WaitForData(future
.boxed());
130 AsyncIndexReaderState
::WaitForData(ref mut future
) => {
131 match ready
!(future
.as_mut().poll(cx
)) {
132 Ok((store
, chunk_data
)) => {
133 this
.read_buffer
= chunk_data
;
134 this
.state
= AsyncIndexReaderState
::HaveData
;
135 this
.store
= Some(store
);
138 return Poll
::Ready(Err(io_err_other(err
)));
142 AsyncIndexReaderState
::HaveData
=> {
143 let offset
= this
.current_chunk_offset
as usize;
144 let len
= this
.read_buffer
.len();
145 let n
= if len
- offset
< buf
.len() {
151 buf
[0..n
].copy_from_slice(&this
.read_buffer
[offset
..(offset
+ n
)]);
152 this
.position
+= n
as u64;
154 if offset
+ n
== len
{
155 this
.state
= AsyncIndexReaderState
::NoData
;
157 this
.current_chunk_offset
+= n
as u64;
158 this
.state
= AsyncIndexReaderState
::HaveData
;
161 return Poll
::Ready(Ok(n
));
168 impl<S
, I
> AsyncSeek
for AsyncIndexReader
<S
, I
>
170 S
: AsyncReadChunk
+ Unpin
+ Sync
+ '
static,
171 I
: IndexFile
+ Unpin
,
174 self: Pin
<&mut Self>,
175 _cx
: &mut Context
<'_
>,
177 ) -> Poll
<tokio
::io
::Result
<()>> {
178 let this
= Pin
::get_mut(self);
179 this
.seek_to_pos
= match pos
{
180 SeekFrom
::Start(offset
) => {
183 SeekFrom
::End(offset
) => {
184 this
.index
.index_bytes() as i64 + offset
186 SeekFrom
::Current(offset
) => {
187 this
.position
as i64 + offset
194 self: Pin
<&mut Self>,
195 _cx
: &mut Context
<'_
>,
196 ) -> Poll
<tokio
::io
::Result
<u64>> {
197 let this
= Pin
::get_mut(self);
199 let index_bytes
= this
.index
.index_bytes();
200 if this
.seek_to_pos
< 0 {
201 return Poll
::Ready(Err(io_format_err
!("cannot seek to negative values")));
202 } else if this
.seek_to_pos
> index_bytes
as i64 {
203 this
.position
= index_bytes
;
205 this
.position
= this
.seek_to_pos
as u64;
208 // even if seeking within one chunk, we need to go to NoData to
209 // recalculate the current_chunk_offset (data is cached anyway)
210 this
.state
= AsyncIndexReaderState
::NoData
;
212 Poll
::Ready(Ok(this
.position
))