]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/chunk_stream.rs
update a chunk of stuff to the hyper release
[proxmox-backup.git] / src / backup / chunk_stream.rs
CommitLineData
32bef1e2
WB
1use std::pin::Pin;
2use std::task::{Context, Poll};
3
7d83440c 4use bytes::BytesMut;
dafc27ae 5use failure::*;
32bef1e2
WB
6use futures::ready;
7use futures::stream::{Stream, TryStream};
dafc27ae 8
7d83440c 9use super::Chunker;
0cc0fffd 10
8a7cc756 11/// Split input stream into dynamic sized chunks
32bef1e2 12pub struct ChunkStream<S: Unpin> {
dafc27ae
DM
13 input: S,
14 chunker: Chunker,
0cc0fffd
DM
15 buffer: BytesMut,
16 scan_pos: usize,
dafc27ae
DM
17}
18
32bef1e2 19impl<S: Unpin> ChunkStream<S> {
36898ffc
DM
20 pub fn new(input: S, chunk_size: Option<usize>) -> Self {
21 Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
dafc27ae
DM
22 }
23}
24
32bef1e2
WB
25impl<S: Unpin> Unpin for ChunkStream<S> {}
26
27impl<S: Unpin> Stream for ChunkStream<S>
28where
29 S: TryStream,
30 S::Ok: AsRef<[u8]>,
31 S::Error: Into<Error>,
c052be5c 32{
dafc27ae 33
32bef1e2 34 type Item = Result<BytesMut, Error>;
dafc27ae 35
32bef1e2
WB
36 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
37 let this = self.get_mut();
dafc27ae 38 loop {
32bef1e2
WB
39 if this.scan_pos < this.buffer.len() {
40 let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
3be3f3dc 41
32bef1e2 42 let chunk_size = this.scan_pos + boundary;
3be3f3dc
DM
43
44 if boundary == 0 {
32bef1e2 45 this.scan_pos = this.buffer.len();
3be3f3dc 46 // continue poll
32bef1e2
WB
47 } else if chunk_size <= this.buffer.len() {
48 let result = this.buffer.split_to(chunk_size);
49 this.scan_pos = 0;
50 return Poll::Ready(Some(Ok(result)));
3be3f3dc
DM
51 } else {
52 panic!("got unexpected chunk boundary from chunker");
53 }
54 }
55
32bef1e2
WB
56 match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
57 Some(Err(err)) => {
58 return Poll::Ready(Some(Err(err.into())));
dafc27ae 59 }
32bef1e2
WB
60 None => {
61 this.scan_pos = 0;
62 if this.buffer.len() > 0 {
db0cb9ce 63 return Poll::Ready(Some(Ok(this.buffer.split())));
ff77dbbe 64 } else {
32bef1e2 65 return Poll::Ready(None);
ff77dbbe 66 }
dafc27ae 67 }
32bef1e2
WB
68 Some(Ok(data)) => {
69 this.buffer.extend_from_slice(data.as_ref());
70 }
dafc27ae
DM
71 }
72 }
73 }
74}
8a7cc756
DM
75
76/// Split input stream into fixed sized chunks
32bef1e2 77pub struct FixedChunkStream<S: Unpin> {
8a7cc756
DM
78 input: S,
79 chunk_size: usize,
169c0e06 80 buffer: BytesMut,
8a7cc756
DM
81}
82
32bef1e2 83impl<S: Unpin> FixedChunkStream<S> {
8a7cc756 84 pub fn new(input: S, chunk_size: usize) -> Self {
169c0e06 85 Self { input, chunk_size, buffer: BytesMut::new() }
8a7cc756
DM
86 }
87}
88
32bef1e2 89impl<S: Unpin> Unpin for FixedChunkStream<S> {}
8a7cc756 90
32bef1e2
WB
91impl<S: Unpin> Stream for FixedChunkStream<S>
92where
93 S: TryStream,
94 S::Ok: AsRef<[u8]>,
95{
96 type Item = Result<BytesMut, S::Error>;
8a7cc756 97
32bef1e2
WB
98 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<BytesMut, S::Error>>> {
99 let this = self.get_mut();
8a7cc756 100 loop {
32bef1e2 101 if this.buffer.len() == this.chunk_size {
db0cb9ce 102 return Poll::Ready(Some(Ok(this.buffer.split())));
32bef1e2
WB
103 } else if this.buffer.len() > this.chunk_size {
104 let result = this.buffer.split_to(this.chunk_size);
105 return Poll::Ready(Some(Ok(result)));
169c0e06
DM
106 }
107
32bef1e2
WB
108 match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
109 Some(Err(err)) => {
110 return Poll::Ready(Some(Err(err)));
8a7cc756 111 }
32bef1e2 112 None => {
8a7cc756 113 // last chunk can have any size
32bef1e2 114 if this.buffer.len() > 0 {
db0cb9ce 115 return Poll::Ready(Some(Ok(this.buffer.split())));
8a7cc756 116 } else {
32bef1e2 117 return Poll::Ready(None);
8a7cc756
DM
118 }
119 }
32bef1e2
WB
120 Some(Ok(data)) => {
121 this.buffer.extend_from_slice(data.as_ref());
169c0e06 122 }
8a7cc756
DM
123 }
124 }
125 }
126}