]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-client/src/dynamic_index.rs
tools::format: avoid some string copies
[proxmox-backup.git] / pbs-client / src / dynamic_index.rs
CommitLineData
2f02e431 1use std::io::{self, Seek, SeekFrom};
c443f58b 2use std::ops::Range;
4cf0ced9 3use std::sync::{Arc, Mutex};
a6f87283 4use std::task::Context;
4cf0ced9 5use std::pin::Pin;
367f002e 6
f7d4e4b5 7use anyhow::{bail, format_err, Error};
0433db19 8
a6f87283 9use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
8ea3b1d1 10
2f02e431 11use pbs_datastore::dynamic_index::DynamicIndexReader;
e64f38cb 12use pbs_datastore::read_chunk::ReadChunk;
2f02e431 13use pbs_datastore::index::IndexFile;
6c221244 14use pbs_tools::lru_cache::LruCache;
7bc1d727 15
c443f58b
WB
16struct CachedChunk {
17 range: Range<u64>,
18 data: Vec<u8>,
19}
20
21impl CachedChunk {
22 /// Perform sanity checks on the range and data size:
23 pub fn new(range: Range<u64>, data: Vec<u8>) -> Result<Self, Error> {
24 if data.len() as u64 != range.end - range.start {
25 bail!(
26 "read chunk with wrong size ({} != {})",
27 data.len(),
28 range.end - range.start,
29 );
30 }
31 Ok(Self { range, data })
32 }
33}
34
d48a9955
DM
35pub struct BufferedDynamicReader<S> {
36 store: S,
93d5d779 37 index: DynamicIndexReader,
39c6bd86
DM
38 archive_size: u64,
39 read_buffer: Vec<u8>,
40 buffered_chunk_idx: usize,
41 buffered_chunk_start: u64,
42 read_offset: u64,
6c221244 43 lru_cache: LruCache<usize, CachedChunk>,
536683e7
CE
44}
45
46struct ChunkCacher<'a, S> {
47 store: &'a mut S,
48 index: &'a DynamicIndexReader,
49}
50
6c221244 51impl<'a, S: ReadChunk> pbs_tools::lru_cache::Cacher<usize, CachedChunk> for ChunkCacher<'a, S> {
c443f58b 52 fn fetch(&mut self, index: usize) -> Result<Option<CachedChunk>, Error> {
fdaab0df
DM
53 let info = match self.index.chunk_info(index) {
54 Some(info) => info,
55 None => bail!("chunk index out of range"),
56 };
c443f58b
WB
57 let range = info.range;
58 let data = self.store.read_chunk(&info.digest)?;
59 CachedChunk::new(range, data).map(Some)
536683e7 60 }
77703d95
DM
61}
62
f569acc5 63impl<S: ReadChunk> BufferedDynamicReader<S> {
d48a9955 64 pub fn new(index: DynamicIndexReader, store: S) -> Self {
57e50fb9 65 let archive_size = index.index_bytes();
39c6bd86 66 Self {
d48a9955 67 store,
653b1ca1
WB
68 index,
69 archive_size,
f569acc5 70 read_buffer: Vec::with_capacity(1024 * 1024),
39c6bd86
DM
71 buffered_chunk_idx: 0,
72 buffered_chunk_start: 0,
73 read_offset: 0,
6c221244 74 lru_cache: LruCache::new(32),
39c6bd86
DM
75 }
76 }
77
f569acc5
WB
78 pub fn archive_size(&self) -> u64 {
79 self.archive_size
80 }
39c6bd86 81
0a72e267 82 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
c443f58b
WB
83 //let (start, end, data) = self.lru_cache.access(
84 let cached_chunk = self.lru_cache.access(
536683e7
CE
85 idx,
86 &mut ChunkCacher {
87 store: &mut self.store,
88 index: &self.index,
89 },
90 )?.ok_or_else(|| format_err!("chunk not found by cacher"))?;
91
536683e7 92 // fixme: avoid copy
f98ac774 93 self.read_buffer.clear();
c443f58b 94 self.read_buffer.extend_from_slice(&cached_chunk.data);
0a72e267
DM
95
96 self.buffered_chunk_idx = idx;
d48a9955 97
c443f58b 98 self.buffered_chunk_start = cached_chunk.range.start;
0a72e267
DM
99 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
100 Ok(())
101 }
102}
103
eb5e0ae6 104impl<S: ReadChunk> pbs_tools::io::BufferedRead for BufferedDynamicReader<S> {
0a72e267 105 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
f569acc5
WB
106 if offset == self.archive_size {
107 return Ok(&self.read_buffer[0..0]);
108 }
318564ac 109
39c6bd86 110 let buffer_len = self.read_buffer.len();
0b05fd58 111 let index = &self.index;
39c6bd86
DM
112
113 // optimization for sequential read
f569acc5 114 if buffer_len > 0
2f02e431 115 && ((self.buffered_chunk_idx + 1) < index.index().len())
f569acc5 116 && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
39c6bd86
DM
117 {
118 let next_idx = self.buffered_chunk_idx + 1;
119 let next_end = index.chunk_end(next_idx);
120 if offset < next_end {
373ef4a5 121 self.buffer_chunk(next_idx)?;
39c6bd86
DM
122 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
123 return Ok(&self.read_buffer[buffer_offset..]);
124 }
125 }
126
f569acc5
WB
127 if (buffer_len == 0)
128 || (offset < self.buffered_chunk_start)
129 || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
39c6bd86 130 {
2f02e431 131 let end_idx = index.index().len() - 1;
39c6bd86
DM
132 let end = index.chunk_end(end_idx);
133 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
373ef4a5 134 self.buffer_chunk(idx)?;
f569acc5 135 }
39c6bd86
DM
136
137 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
138 Ok(&self.read_buffer[buffer_offset..])
139 }
39c6bd86 140}
77703d95 141
f569acc5 142impl<S: ReadChunk> std::io::Read for BufferedDynamicReader<S> {
4624fe29 143 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
eb5e0ae6 144 use pbs_tools::io::BufferedRead;
f569acc5 145 use std::io::{Error, ErrorKind};
4624fe29
DM
146
147 let data = match self.buffered_read(self.read_offset) {
148 Ok(v) => v,
149 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
150 };
151
f569acc5
WB
152 let n = if data.len() > buf.len() {
153 buf.len()
154 } else {
155 data.len()
156 };
4624fe29 157
bde8e243 158 buf[0..n].copy_from_slice(&data[0..n]);
4624fe29
DM
159
160 self.read_offset += n as u64;
161
62ee2eb4 162 Ok(n)
4624fe29
DM
163 }
164}
165
f569acc5 166impl<S: ReadChunk> std::io::Seek for BufferedDynamicReader<S> {
d48a9955 167 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
34337050 168 let new_offset = match pos {
f569acc5
WB
169 SeekFrom::Start(start_offset) => start_offset as i64,
170 SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
34337050
DM
171 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
172 };
173
ddbdf80d 174 use std::io::{Error, ErrorKind};
34337050
DM
175 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
176 return Err(Error::new(
177 ErrorKind::Other,
f569acc5
WB
178 format!(
179 "seek is out of range {} ([0..{}])",
180 new_offset, self.archive_size
181 ),
182 ));
34337050
DM
183 }
184 self.read_offset = new_offset as u64;
185
186 Ok(self.read_offset)
187 }
188}
189
4cf0ced9
DC
190/// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
191/// async use!
192///
193/// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
194/// so that we can properly access it from multiple threads simultaneously while not issuing
195/// duplicate simultaneous reads over http.
196#[derive(Clone)]
197pub struct LocalDynamicReadAt<R: ReadChunk> {
198 inner: Arc<Mutex<BufferedDynamicReader<R>>>,
199}
200
201impl<R: ReadChunk> LocalDynamicReadAt<R> {
202 pub fn new(inner: BufferedDynamicReader<R>) -> Self {
203 Self {
204 inner: Arc::new(Mutex::new(inner)),
205 }
206 }
207}
208
a6f87283
WB
209impl<R: ReadChunk> ReadAt for LocalDynamicReadAt<R> {
210 fn start_read_at<'a>(
211 self: Pin<&'a Self>,
4cf0ced9 212 _cx: &mut Context,
a6f87283 213 buf: &'a mut [u8],
4cf0ced9 214 offset: u64,
a6f87283 215 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
4cf0ced9 216 use std::io::Read;
a6f87283 217 MaybeReady::Ready(tokio::task::block_in_place(move || {
4cf0ced9
DC
218 let mut reader = self.inner.lock().unwrap();
219 reader.seek(SeekFrom::Start(offset))?;
a6f87283
WB
220 Ok(reader.read(buf)?)
221 }))
222 }
223
224 fn poll_complete<'a>(
225 self: Pin<&'a Self>,
226 _op: ReadAtOperation<'a>,
227 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
228 panic!("LocalDynamicReadAt::start_read_at returned Pending");
4cf0ced9
DC
229 }
230}