]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-protocol/src/chunk_stream.rs
4502b90bc7ce680ee56170b2b574e1e18f6c2c36
[proxmox-backup.git] / proxmox-protocol / src / chunk_stream.rs
1 use std::io::Read;
2
3 use failure::Error;
4
5 use crate::Chunker;
6
7 pub struct ChunkStream<T: Read> {
8 input: T,
9 buffer: Vec<u8>,
10 fill: usize,
11 pos: usize,
12 keep: bool,
13 eof: bool,
14 chunker: Chunker,
15 }
16
17 impl<T: Read> ChunkStream<T> {
18 pub fn new(input: T) -> Self {
19 Self {
20 input,
21 buffer: Vec::new(),
22 fill: 0,
23 pos: 0,
24 keep: false,
25 eof: false,
26 chunker: Chunker::new(4 * 1024 * 1024),
27 }
28 }
29
30 pub fn stream(&mut self) -> &mut Self {
31 self
32 }
33
34 fn fill_buf(&mut self) -> Result<bool, Error> {
35 if self.fill == self.buffer.len() {
36 let mut more = self.buffer.len(); // just double it
37 if more == 0 {
38 more = 1024 * 1024; // at the start, make a 1M buffer
39 }
40 // we need more data:
41 self.buffer.reserve(more);
42 unsafe {
43 self.buffer.set_len(self.buffer.capacity());
44 }
45 }
46
47 match self.input.read(&mut self.buffer[self.fill..]) {
48 Ok(more) => {
49 if more == 0 {
50 self.eof = true;
51 }
52 self.fill += more;
53 Ok(true)
54 }
55 Err(err) => {
56 if err.kind() == std::io::ErrorKind::WouldBlock {
57 Ok(false)
58 } else {
59 Err(err.into())
60 }
61 }
62 }
63 }
64
65 fn consume(&mut self) {
66 assert!(self.fill >= self.pos);
67
68 let remaining = self.fill - self.pos;
69 unsafe {
70 std::ptr::copy_nonoverlapping(
71 &self.buffer[self.pos] as *const u8,
72 self.buffer.as_mut_ptr(),
73 remaining,
74 );
75 }
76 self.fill = remaining;
77 self.pos = 0;
78 }
79
80 pub fn next(&mut self) {
81 self.keep = false;
82 }
83
84 // This crate should not depend on the futures create, so we use another Option instead of
85 // Async<T>.
86 pub fn get(&mut self) -> Result<Option<Option<&[u8]>>, Error> {
87 if self.keep {
88 return Ok(Some(Some(&self.buffer[0..self.pos])));
89 }
90
91 if self.eof {
92 return Ok(Some(None));
93 }
94
95 if self.pos != 0 {
96 self.consume();
97 }
98
99 loop {
100 match self.fill_buf() {
101 Ok(true) => (),
102 Ok(false) => return Ok(None),
103 Err(err) => return Err(err),
104 }
105
106 // Note that if we hit EOF we hit a hard boundary...
107 let boundary = self.chunker.scan(&self.buffer[self.pos..self.fill]);
108 if boundary == 0 && !self.eof {
109 self.pos = self.fill;
110 continue;
111 }
112
113 self.pos += boundary;
114 self.keep = true;
115 return Ok(Some(Some(&self.buffer[0..self.pos])));
116 }
117 }
118 }