]>
git.proxmox.com Git - proxmox-backup.git/blob - proxmox-protocol/src/chunk_stream.rs
4502b90bc7ce680ee56170b2b574e1e18f6c2c36
7 pub struct ChunkStream
<T
: Read
> {
17 impl<T
: Read
> ChunkStream
<T
> {
18 pub fn new(input
: T
) -> Self {
26 chunker
: Chunker
::new(4 * 1024 * 1024),
30 pub fn stream(&mut self) -> &mut Self {
34 fn fill_buf(&mut self) -> Result
<bool
, Error
> {
35 if self.fill
== self.buffer
.len() {
36 let mut more
= self.buffer
.len(); // just double it
38 more
= 1024 * 1024; // at the start, make a 1M buffer
41 self.buffer
.reserve(more
);
43 self.buffer
.set_len(self.buffer
.capacity());
47 match self.input
.read(&mut self.buffer
[self.fill
..]) {
56 if err
.kind() == std
::io
::ErrorKind
::WouldBlock
{
65 fn consume(&mut self) {
66 assert
!(self.fill
>= self.pos
);
68 let remaining
= self.fill
- self.pos
;
70 std
::ptr
::copy_nonoverlapping(
71 &self.buffer
[self.pos
] as *const u8,
72 self.buffer
.as_mut_ptr(),
76 self.fill
= remaining
;
80 pub fn next(&mut self) {
84 // This crate should not depend on the futures create, so we use another Option instead of
86 pub fn get(&mut self) -> Result
<Option
<Option
<&[u8]>>, Error
> {
88 return Ok(Some(Some(&self.buffer
[0..self.pos
])));
92 return Ok(Some(None
));
100 match self.fill_buf() {
102 Ok(false) => return Ok(None
),
103 Err(err
) => return Err(err
),
106 // Note that if we hit EOF we hit a hard boundary...
107 let boundary
= self.chunker
.scan(&self.buffer
[self.pos
..self.fill
]);
108 if boundary
== 0 && !self.eof
{
109 self.pos
= self.fill
;
113 self.pos
+= boundary
;
115 return Ok(Some(Some(&self.buffer
[0..self.pos
])));