]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-datastore/src/data_blob_reader.rs
add pbs-datastore module
[proxmox-backup.git] / pbs-datastore / src / data_blob_reader.rs
1 use std::io::{BufReader, Read};
2 use std::sync::Arc;
3
4 use anyhow::{bail, format_err, Error};
5 use proxmox::tools::io::ReadExt;
6
7 use crate::checksum_reader::ChecksumReader;
8 use crate::crypt_config::CryptConfig;
9 use crate::crypt_reader::CryptReader;
10 use crate::file_formats::{self, DataBlobHeader};
11
12 enum BlobReaderState<'reader, R: Read> {
13 Uncompressed {
14 expected_crc: u32,
15 csum_reader: ChecksumReader<R>,
16 },
17 Compressed {
18 expected_crc: u32,
19 decompr: zstd::stream::read::Decoder<'reader, BufReader<ChecksumReader<R>>>,
20 },
21 Encrypted {
22 expected_crc: u32,
23 decrypt_reader: CryptReader<BufReader<ChecksumReader<R>>>,
24 },
25 EncryptedCompressed {
26 expected_crc: u32,
27 decompr: zstd::stream::read::Decoder<
28 'reader,
29 BufReader<CryptReader<BufReader<ChecksumReader<R>>>>,
30 >,
31 },
32 }
33
34 /// Read data blobs
35 pub struct DataBlobReader<'reader, R: Read> {
36 state: BlobReaderState<'reader, R>,
37 }
38
39 // zstd_safe::DCtx is not sync but we are, since
40 // the only public interface is on mutable reference
41 unsafe impl<R: Read> Sync for DataBlobReader<'_, R> {}
42
43 impl<R: Read> DataBlobReader<'_, R> {
44 pub fn new(mut reader: R, config: Option<Arc<CryptConfig>>) -> Result<Self, Error> {
45 let head: DataBlobHeader = unsafe { reader.read_le_value()? };
46 match head.magic {
47 file_formats::UNCOMPRESSED_BLOB_MAGIC_1_0 => {
48 let expected_crc = u32::from_le_bytes(head.crc);
49 let csum_reader = ChecksumReader::new(reader, None);
50 Ok(Self {
51 state: BlobReaderState::Uncompressed {
52 expected_crc,
53 csum_reader,
54 },
55 })
56 }
57 file_formats::COMPRESSED_BLOB_MAGIC_1_0 => {
58 let expected_crc = u32::from_le_bytes(head.crc);
59 let csum_reader = ChecksumReader::new(reader, None);
60
61 let decompr = zstd::stream::read::Decoder::new(csum_reader)?;
62 Ok(Self {
63 state: BlobReaderState::Compressed {
64 expected_crc,
65 decompr,
66 },
67 })
68 }
69 file_formats::ENCRYPTED_BLOB_MAGIC_1_0 => {
70 let config = config
71 .ok_or_else(|| format_err!("unable to read encrypted blob without key"))?;
72 let expected_crc = u32::from_le_bytes(head.crc);
73 let mut iv = [0u8; 16];
74 let mut expected_tag = [0u8; 16];
75 reader.read_exact(&mut iv)?;
76 reader.read_exact(&mut expected_tag)?;
77 let csum_reader = ChecksumReader::new(reader, None);
78 let decrypt_reader = CryptReader::new(
79 BufReader::with_capacity(64 * 1024, csum_reader),
80 iv,
81 expected_tag,
82 config,
83 )?;
84 Ok(Self {
85 state: BlobReaderState::Encrypted {
86 expected_crc,
87 decrypt_reader,
88 },
89 })
90 }
91 file_formats::ENCR_COMPR_BLOB_MAGIC_1_0 => {
92 let config = config
93 .ok_or_else(|| format_err!("unable to read encrypted blob without key"))?;
94 let expected_crc = u32::from_le_bytes(head.crc);
95 let mut iv = [0u8; 16];
96 let mut expected_tag = [0u8; 16];
97 reader.read_exact(&mut iv)?;
98 reader.read_exact(&mut expected_tag)?;
99 let csum_reader = ChecksumReader::new(reader, None);
100 let decrypt_reader = CryptReader::new(
101 BufReader::with_capacity(64 * 1024, csum_reader),
102 iv,
103 expected_tag,
104 config,
105 )?;
106 let decompr = zstd::stream::read::Decoder::new(decrypt_reader)?;
107 Ok(Self {
108 state: BlobReaderState::EncryptedCompressed {
109 expected_crc,
110 decompr,
111 },
112 })
113 }
114 _ => bail!("got wrong magic number {:?}", head.magic),
115 }
116 }
117
118 pub fn finish(self) -> Result<R, Error> {
119 match self.state {
120 BlobReaderState::Uncompressed {
121 csum_reader,
122 expected_crc,
123 } => {
124 let (reader, crc, _) = csum_reader.finish()?;
125 if crc != expected_crc {
126 bail!("blob crc check failed");
127 }
128 Ok(reader)
129 }
130 BlobReaderState::Compressed {
131 expected_crc,
132 decompr,
133 } => {
134 let csum_reader = decompr.finish().into_inner();
135 let (reader, crc, _) = csum_reader.finish()?;
136 if crc != expected_crc {
137 bail!("blob crc check failed");
138 }
139 Ok(reader)
140 }
141 BlobReaderState::Encrypted {
142 expected_crc,
143 decrypt_reader,
144 } => {
145 let csum_reader = decrypt_reader.finish()?.into_inner();
146 let (reader, crc, _) = csum_reader.finish()?;
147 if crc != expected_crc {
148 bail!("blob crc check failed");
149 }
150 Ok(reader)
151 }
152 BlobReaderState::EncryptedCompressed {
153 expected_crc,
154 decompr,
155 } => {
156 let decrypt_reader = decompr.finish().into_inner();
157 let csum_reader = decrypt_reader.finish()?.into_inner();
158 let (reader, crc, _) = csum_reader.finish()?;
159 if crc != expected_crc {
160 bail!("blob crc check failed");
161 }
162 Ok(reader)
163 }
164 }
165 }
166 }
167
168 impl<R: Read> Read for DataBlobReader<'_, R> {
169 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
170 match &mut self.state {
171 BlobReaderState::Uncompressed { csum_reader, .. } => csum_reader.read(buf),
172 BlobReaderState::Compressed { decompr, .. } => decompr.read(buf),
173 BlobReaderState::Encrypted { decrypt_reader, .. } => decrypt_reader.read(buf),
174 BlobReaderState::EncryptedCompressed { decompr, .. } => decompr.read(buf),
175 }
176 }
177 }