]>
Commit | Line | Data |
---|---|---|
ee53955f DM |
1 | use failure::*; |
2 | use futures::*; | |
3 | use bytes::{Bytes, BytesMut}; | |
4 | ||
457531e7 | 5 | pub trait IndexFile: Send { |
7bc1d727 WB |
6 | fn index_count(&self) -> usize; |
7 | fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>; | |
8 | } | |
457531e7 DM |
9 | |
10 | /// This struct can read the list of chunks from an `IndexFile` | |
11 | /// | |
12 | /// The reader simply returns a birary stream of 32 byte digest values. | |
13 | pub struct ChunkListReader { | |
14 | index: Box<dyn IndexFile>, | |
15 | pos: usize, | |
16 | count: usize, | |
17 | } | |
18 | ||
19 | impl ChunkListReader { | |
20 | ||
21 | pub fn new(index: Box<dyn IndexFile>) -> Self { | |
22 | let count = index.index_count(); | |
23 | Self { index, pos: 0, count } | |
24 | } | |
25 | } | |
26 | ||
27 | impl std::io::Read for ChunkListReader { | |
28 | ||
29 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { | |
30 | if buf.len() < 32 { panic!("read buffer too small"); } | |
31 | if self.pos < self.count { | |
32 | let mut written = 0; | |
33 | loop { | |
34 | let digest = self.index.index_digest(self.pos).unwrap(); | |
35 | unsafe { std::ptr::copy_nonoverlapping(digest.as_ptr(), buf.as_mut_ptr().add(written), 32); } | |
36 | self.pos += 1; | |
37 | written += 32; | |
38 | if self.pos >= self.count { break; } | |
39 | if (written + 32) >= buf.len() { break; } | |
40 | } | |
41 | return Ok(written); | |
42 | } else { | |
43 | return Ok(0); | |
44 | } | |
45 | } | |
46 | } | |
ee53955f DM |
47 | |
48 | /// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]> | |
49 | /// | |
50 | /// The reader simply returns a birary stream of 32 byte digest values. | |
51 | ||
52 | pub struct DigestListDecoder<S> { | |
53 | input: S, | |
54 | buffer: BytesMut, | |
55 | } | |
56 | ||
57 | impl <S> DigestListDecoder<S> { | |
58 | ||
59 | pub fn new(input: S) -> Self { | |
60 | Self { input, buffer: BytesMut::new() } | |
61 | } | |
62 | } | |
63 | ||
64 | impl <S> Stream for DigestListDecoder<S> | |
65 | where S: Stream<Item=Bytes>, | |
66 | S::Error: Into<Error>, | |
67 | { | |
68 | type Item = [u8; 32]; | |
69 | type Error = Error; | |
70 | ||
71 | fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { | |
72 | loop { | |
73 | ||
74 | if self.buffer.len() >= 32 { | |
75 | ||
76 | let left = self.buffer.split_to(32); | |
77 | ||
78 | let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() }; | |
79 | unsafe { std::ptr::copy_nonoverlapping(left.as_ptr(), digest.as_mut_ptr(), 32); } | |
80 | ||
81 | return Ok(Async::Ready(Some(digest))); | |
82 | } | |
83 | ||
84 | match self.input.poll() { | |
85 | Err(err) => { | |
86 | return Err(err.into()); | |
87 | } | |
88 | Ok(Async::NotReady) => { | |
89 | return Ok(Async::NotReady); | |
90 | } | |
91 | Ok(Async::Ready(None)) => { | |
92 | let rest = self.buffer.len(); | |
93 | if rest == 0 { return Ok(Async::Ready(None)); } | |
94 | return Err(format_err!("got small digest ({} != 32).", rest)); | |
95 | } | |
96 | Ok(Async::Ready(Some(data))) => { | |
97 | self.buffer.extend_from_slice(&data); | |
98 | // continue | |
99 | } | |
100 | } | |
101 | } | |
102 | } | |
103 | } |