]>
Commit | Line | Data |
---|---|---|
2f02e431 | 1 | use std::io::{self, Seek, SeekFrom}; |
c443f58b | 2 | use std::ops::Range; |
4cf0ced9 | 3 | use std::sync::{Arc, Mutex}; |
a6f87283 | 4 | use std::task::Context; |
4cf0ced9 | 5 | use std::pin::Pin; |
367f002e | 6 | |
f7d4e4b5 | 7 | use anyhow::{bail, format_err, Error}; |
0433db19 | 8 | |
a6f87283 | 9 | use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; |
8ea3b1d1 | 10 | |
2f02e431 | 11 | use pbs_datastore::dynamic_index::DynamicIndexReader; |
e64f38cb | 12 | use pbs_datastore::read_chunk::ReadChunk; |
2f02e431 | 13 | use pbs_datastore::index::IndexFile; |
6c221244 | 14 | use pbs_tools::lru_cache::LruCache; |
7bc1d727 | 15 | |
c443f58b WB |
16 | struct CachedChunk { |
17 | range: Range<u64>, | |
18 | data: Vec<u8>, | |
19 | } | |
20 | ||
21 | impl CachedChunk { | |
22 | /// Perform sanity checks on the range and data size: | |
23 | pub fn new(range: Range<u64>, data: Vec<u8>) -> Result<Self, Error> { | |
24 | if data.len() as u64 != range.end - range.start { | |
25 | bail!( | |
26 | "read chunk with wrong size ({} != {})", | |
27 | data.len(), | |
28 | range.end - range.start, | |
29 | ); | |
30 | } | |
31 | Ok(Self { range, data }) | |
32 | } | |
33 | } | |
34 | ||
d48a9955 DM |
35 | pub struct BufferedDynamicReader<S> { |
36 | store: S, | |
93d5d779 | 37 | index: DynamicIndexReader, |
39c6bd86 DM |
38 | archive_size: u64, |
39 | read_buffer: Vec<u8>, | |
40 | buffered_chunk_idx: usize, | |
41 | buffered_chunk_start: u64, | |
42 | read_offset: u64, | |
6c221244 | 43 | lru_cache: LruCache<usize, CachedChunk>, |
536683e7 CE |
44 | } |
45 | ||
46 | struct ChunkCacher<'a, S> { | |
47 | store: &'a mut S, | |
48 | index: &'a DynamicIndexReader, | |
49 | } | |
50 | ||
6c221244 | 51 | impl<'a, S: ReadChunk> pbs_tools::lru_cache::Cacher<usize, CachedChunk> for ChunkCacher<'a, S> { |
c443f58b | 52 | fn fetch(&mut self, index: usize) -> Result<Option<CachedChunk>, Error> { |
fdaab0df DM |
53 | let info = match self.index.chunk_info(index) { |
54 | Some(info) => info, | |
55 | None => bail!("chunk index out of range"), | |
56 | }; | |
c443f58b WB |
57 | let range = info.range; |
58 | let data = self.store.read_chunk(&info.digest)?; | |
59 | CachedChunk::new(range, data).map(Some) | |
536683e7 | 60 | } |
77703d95 DM |
61 | } |
62 | ||
f569acc5 | 63 | impl<S: ReadChunk> BufferedDynamicReader<S> { |
d48a9955 | 64 | pub fn new(index: DynamicIndexReader, store: S) -> Self { |
57e50fb9 | 65 | let archive_size = index.index_bytes(); |
39c6bd86 | 66 | Self { |
d48a9955 | 67 | store, |
653b1ca1 WB |
68 | index, |
69 | archive_size, | |
f569acc5 | 70 | read_buffer: Vec::with_capacity(1024 * 1024), |
39c6bd86 DM |
71 | buffered_chunk_idx: 0, |
72 | buffered_chunk_start: 0, | |
73 | read_offset: 0, | |
6c221244 | 74 | lru_cache: LruCache::new(32), |
39c6bd86 DM |
75 | } |
76 | } | |
77 | ||
f569acc5 WB |
78 | pub fn archive_size(&self) -> u64 { |
79 | self.archive_size | |
80 | } | |
39c6bd86 | 81 | |
0a72e267 | 82 | fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { |
c443f58b WB |
83 | //let (start, end, data) = self.lru_cache.access( |
84 | let cached_chunk = self.lru_cache.access( | |
536683e7 CE |
85 | idx, |
86 | &mut ChunkCacher { | |
87 | store: &mut self.store, | |
88 | index: &self.index, | |
89 | }, | |
90 | )?.ok_or_else(|| format_err!("chunk not found by cacher"))?; | |
91 | ||
536683e7 | 92 | // fixme: avoid copy |
f98ac774 | 93 | self.read_buffer.clear(); |
c443f58b | 94 | self.read_buffer.extend_from_slice(&cached_chunk.data); |
0a72e267 DM |
95 | |
96 | self.buffered_chunk_idx = idx; | |
d48a9955 | 97 | |
c443f58b | 98 | self.buffered_chunk_start = cached_chunk.range.start; |
0a72e267 DM |
99 | //println!("BUFFER {} {}", self.buffered_chunk_start, end); |
100 | Ok(()) | |
101 | } | |
102 | } | |
103 | ||
eb5e0ae6 | 104 | impl<S: ReadChunk> pbs_tools::io::BufferedRead for BufferedDynamicReader<S> { |
0a72e267 | 105 | fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { |
f569acc5 WB |
106 | if offset == self.archive_size { |
107 | return Ok(&self.read_buffer[0..0]); | |
108 | } | |
318564ac | 109 | |
39c6bd86 | 110 | let buffer_len = self.read_buffer.len(); |
0b05fd58 | 111 | let index = &self.index; |
39c6bd86 DM |
112 | |
113 | // optimization for sequential read | |
f569acc5 | 114 | if buffer_len > 0 |
2f02e431 | 115 | && ((self.buffered_chunk_idx + 1) < index.index().len()) |
f569acc5 | 116 | && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) |
39c6bd86 DM |
117 | { |
118 | let next_idx = self.buffered_chunk_idx + 1; | |
119 | let next_end = index.chunk_end(next_idx); | |
120 | if offset < next_end { | |
373ef4a5 | 121 | self.buffer_chunk(next_idx)?; |
39c6bd86 DM |
122 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; |
123 | return Ok(&self.read_buffer[buffer_offset..]); | |
124 | } | |
125 | } | |
126 | ||
f569acc5 WB |
127 | if (buffer_len == 0) |
128 | || (offset < self.buffered_chunk_start) | |
129 | || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
39c6bd86 | 130 | { |
2f02e431 | 131 | let end_idx = index.index().len() - 1; |
39c6bd86 DM |
132 | let end = index.chunk_end(end_idx); |
133 | let idx = index.binary_search(0, 0, end_idx, end, offset)?; | |
373ef4a5 | 134 | self.buffer_chunk(idx)?; |
f569acc5 | 135 | } |
39c6bd86 DM |
136 | |
137 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; | |
138 | Ok(&self.read_buffer[buffer_offset..]) | |
139 | } | |
39c6bd86 | 140 | } |
77703d95 | 141 | |
f569acc5 | 142 | impl<S: ReadChunk> std::io::Read for BufferedDynamicReader<S> { |
4624fe29 | 143 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { |
eb5e0ae6 | 144 | use pbs_tools::io::BufferedRead; |
f569acc5 | 145 | use std::io::{Error, ErrorKind}; |
4624fe29 DM |
146 | |
147 | let data = match self.buffered_read(self.read_offset) { | |
148 | Ok(v) => v, | |
149 | Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), | |
150 | }; | |
151 | ||
f569acc5 WB |
152 | let n = if data.len() > buf.len() { |
153 | buf.len() | |
154 | } else { | |
155 | data.len() | |
156 | }; | |
4624fe29 | 157 | |
bde8e243 | 158 | buf[0..n].copy_from_slice(&data[0..n]); |
4624fe29 DM |
159 | |
160 | self.read_offset += n as u64; | |
161 | ||
62ee2eb4 | 162 | Ok(n) |
4624fe29 DM |
163 | } |
164 | } | |
165 | ||
f569acc5 | 166 | impl<S: ReadChunk> std::io::Seek for BufferedDynamicReader<S> { |
d48a9955 | 167 | fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { |
34337050 | 168 | let new_offset = match pos { |
f569acc5 WB |
169 | SeekFrom::Start(start_offset) => start_offset as i64, |
170 | SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset, | |
34337050 DM |
171 | SeekFrom::Current(offset) => (self.read_offset as i64) + offset, |
172 | }; | |
173 | ||
ddbdf80d | 174 | use std::io::{Error, ErrorKind}; |
34337050 DM |
175 | if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { |
176 | return Err(Error::new( | |
177 | ErrorKind::Other, | |
f569acc5 WB |
178 | format!( |
179 | "seek is out of range {} ([0..{}])", | |
180 | new_offset, self.archive_size | |
181 | ), | |
182 | )); | |
34337050 DM |
183 | } |
184 | self.read_offset = new_offset as u64; | |
185 | ||
186 | Ok(self.read_offset) | |
187 | } | |
188 | } | |
189 | ||
4cf0ced9 DC |
190 | /// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better |
191 | /// async use! | |
192 | /// | |
193 | /// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`, | |
194 | /// so that we can properly access it from multiple threads simultaneously while not issuing | |
195 | /// duplicate simultaneous reads over http. | |
196 | #[derive(Clone)] | |
197 | pub struct LocalDynamicReadAt<R: ReadChunk> { | |
198 | inner: Arc<Mutex<BufferedDynamicReader<R>>>, | |
199 | } | |
200 | ||
201 | impl<R: ReadChunk> LocalDynamicReadAt<R> { | |
202 | pub fn new(inner: BufferedDynamicReader<R>) -> Self { | |
203 | Self { | |
204 | inner: Arc::new(Mutex::new(inner)), | |
205 | } | |
206 | } | |
207 | } | |
208 | ||
a6f87283 WB |
209 | impl<R: ReadChunk> ReadAt for LocalDynamicReadAt<R> { |
210 | fn start_read_at<'a>( | |
211 | self: Pin<&'a Self>, | |
4cf0ced9 | 212 | _cx: &mut Context, |
a6f87283 | 213 | buf: &'a mut [u8], |
4cf0ced9 | 214 | offset: u64, |
a6f87283 | 215 | ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> { |
4cf0ced9 | 216 | use std::io::Read; |
a6f87283 | 217 | MaybeReady::Ready(tokio::task::block_in_place(move || { |
4cf0ced9 DC |
218 | let mut reader = self.inner.lock().unwrap(); |
219 | reader.seek(SeekFrom::Start(offset))?; | |
a6f87283 WB |
220 | Ok(reader.read(buf)?) |
221 | })) | |
222 | } | |
223 | ||
224 | fn poll_complete<'a>( | |
225 | self: Pin<&'a Self>, | |
226 | _op: ReadAtOperation<'a>, | |
227 | ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> { | |
228 | panic!("LocalDynamicReadAt::start_read_at returned Pending"); | |
4cf0ced9 DC |
229 | } |
230 | } |