]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/chunk_stream.rs
5bb061583df2439ede2d661836d71f447fadc7fc
[proxmox-backup.git] / src / backup / chunk_stream.rs
1 use std::pin::Pin;
2 use std::task::{Context, Poll};
3
4 use bytes::BytesMut;
5 use anyhow::{Error};
6 use futures::ready;
7 use futures::stream::{Stream, TryStream};
8
9 use super::Chunker;
10
11 /// Split input stream into dynamic sized chunks
12 pub struct ChunkStream<S: Unpin> {
13 input: S,
14 chunker: Chunker,
15 buffer: BytesMut,
16 scan_pos: usize,
17 }
18
19 impl<S: Unpin> ChunkStream<S> {
20 pub fn new(input: S, chunk_size: Option<usize>) -> Self {
21 Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
22 }
23 }
24
25 impl<S: Unpin> Unpin for ChunkStream<S> {}
26
27 impl<S: Unpin> Stream for ChunkStream<S>
28 where
29 S: TryStream,
30 S::Ok: AsRef<[u8]>,
31 S::Error: Into<Error>,
32 {
33
34 type Item = Result<BytesMut, Error>;
35
36 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
37 let this = self.get_mut();
38 loop {
39 if this.scan_pos < this.buffer.len() {
40 let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
41
42 let chunk_size = this.scan_pos + boundary;
43
44 if boundary == 0 {
45 this.scan_pos = this.buffer.len();
46 // continue poll
47 } else if chunk_size <= this.buffer.len() {
48 let result = this.buffer.split_to(chunk_size);
49 this.scan_pos = 0;
50 return Poll::Ready(Some(Ok(result)));
51 } else {
52 panic!("got unexpected chunk boundary from chunker");
53 }
54 }
55
56 match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
57 Some(Err(err)) => {
58 return Poll::Ready(Some(Err(err.into())));
59 }
60 None => {
61 this.scan_pos = 0;
62 if this.buffer.len() > 0 {
63 return Poll::Ready(Some(Ok(this.buffer.split())));
64 } else {
65 return Poll::Ready(None);
66 }
67 }
68 Some(Ok(data)) => {
69 this.buffer.extend_from_slice(data.as_ref());
70 }
71 }
72 }
73 }
74 }
75
76 /// Split input stream into fixed sized chunks
77 pub struct FixedChunkStream<S: Unpin> {
78 input: S,
79 chunk_size: usize,
80 buffer: BytesMut,
81 }
82
83 impl<S: Unpin> FixedChunkStream<S> {
84 pub fn new(input: S, chunk_size: usize) -> Self {
85 Self { input, chunk_size, buffer: BytesMut::new() }
86 }
87 }
88
89 impl<S: Unpin> Unpin for FixedChunkStream<S> {}
90
91 impl<S: Unpin> Stream for FixedChunkStream<S>
92 where
93 S: TryStream,
94 S::Ok: AsRef<[u8]>,
95 {
96 type Item = Result<BytesMut, S::Error>;
97
98 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<BytesMut, S::Error>>> {
99 let this = self.get_mut();
100 loop {
101 if this.buffer.len() == this.chunk_size {
102 return Poll::Ready(Some(Ok(this.buffer.split())));
103 } else if this.buffer.len() > this.chunk_size {
104 let result = this.buffer.split_to(this.chunk_size);
105 return Poll::Ready(Some(Ok(result)));
106 }
107
108 match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
109 Some(Err(err)) => {
110 return Poll::Ready(Some(Err(err)));
111 }
112 None => {
113 // last chunk can have any size
114 if this.buffer.len() > 0 {
115 return Poll::Ready(Some(Ok(this.buffer.split())));
116 } else {
117 return Poll::Ready(None);
118 }
119 }
120 Some(Ok(data)) => {
121 this.buffer.extend_from_slice(data.as_ref());
122 }
123 }
124 }
125 }
126 }