]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/backup_reader.rs
move some tools used by the client
[proxmox-backup.git] / src / client / backup_reader.rs
1 use anyhow::{format_err, Error};
2 use std::io::{Write, Seek, SeekFrom};
3 use std::fs::File;
4 use std::sync::Arc;
5 use std::os::unix::fs::OpenOptionsExt;
6
7 use futures::future::AbortHandle;
8 use serde_json::{json, Value};
9
10 use proxmox::tools::digest_to_hex;
11
12 use pbs_datastore::{CryptConfig, BackupManifest};
13 use pbs_datastore::data_blob::DataBlob;
14 use pbs_datastore::data_blob_reader::DataBlobReader;
15 use pbs_datastore::dynamic_index::DynamicIndexReader;
16 use pbs_datastore::fixed_index::FixedIndexReader;
17 use pbs_datastore::index::IndexFile;
18 use pbs_datastore::manifest::MANIFEST_BLOB_NAME;
19 use pbs_tools::sha::sha256;
20
21 use super::{HttpClient, H2Client};
22
23 /// Backup Reader
24 pub struct BackupReader {
25 h2: H2Client,
26 abort: AbortHandle,
27 crypt_config: Option<Arc<CryptConfig>>,
28 }
29
30 impl Drop for BackupReader {
31
32 fn drop(&mut self) {
33 self.abort.abort();
34 }
35 }
36
37 impl BackupReader {
38
39 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
40 Arc::new(Self { h2, abort, crypt_config})
41 }
42
43 /// Create a new instance by upgrading the connection at '/api2/json/reader'
44 pub async fn start(
45 client: HttpClient,
46 crypt_config: Option<Arc<CryptConfig>>,
47 datastore: &str,
48 backup_type: &str,
49 backup_id: &str,
50 backup_time: i64,
51 debug: bool,
52 ) -> Result<Arc<BackupReader>, Error> {
53
54 let param = json!({
55 "backup-type": backup_type,
56 "backup-id": backup_id,
57 "backup-time": backup_time,
58 "store": datastore,
59 "debug": debug,
60 });
61 let req = HttpClient::request_builder(client.server(), client.port(), "GET", "/api2/json/reader", Some(param)).unwrap();
62
63 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
64
65 Ok(BackupReader::new(h2, abort, crypt_config))
66 }
67
68 /// Execute a GET request
69 pub async fn get(
70 &self,
71 path: &str,
72 param: Option<Value>,
73 ) -> Result<Value, Error> {
74 self.h2.get(path, param).await
75 }
76
77 /// Execute a PUT request
78 pub async fn put(
79 &self,
80 path: &str,
81 param: Option<Value>,
82 ) -> Result<Value, Error> {
83 self.h2.put(path, param).await
84 }
85
86 /// Execute a POST request
87 pub async fn post(
88 &self,
89 path: &str,
90 param: Option<Value>,
91 ) -> Result<Value, Error> {
92 self.h2.post(path, param).await
93 }
94
95 /// Execute a GET request and send output to a writer
96 pub async fn download<W: Write + Send>(
97 &self,
98 file_name: &str,
99 output: W,
100 ) -> Result<(), Error> {
101 let path = "download";
102 let param = json!({ "file-name": file_name });
103 self.h2.download(path, Some(param), output).await
104 }
105
106 /// Execute a special GET request and send output to a writer
107 ///
108 /// This writes random data, and is only useful to test download speed.
109 pub async fn speedtest<W: Write + Send>(
110 &self,
111 output: W,
112 ) -> Result<(), Error> {
113 self.h2.download("speedtest", None, output).await
114 }
115
116 /// Download a specific chunk
117 pub async fn download_chunk<W: Write + Send>(
118 &self,
119 digest: &[u8; 32],
120 output: W,
121 ) -> Result<(), Error> {
122 let path = "chunk";
123 let param = json!({ "digest": digest_to_hex(digest) });
124 self.h2.download(path, Some(param), output).await
125 }
126
127 pub fn force_close(self) {
128 self.abort.abort();
129 }
130
131 /// Download backup manifest (index.json)
132 ///
133 /// The manifest signature is verified if we have a crypt_config.
134 pub async fn download_manifest(&self) -> Result<(BackupManifest, Vec<u8>), Error> {
135
136 let mut raw_data = Vec::with_capacity(64 * 1024);
137 self.download(MANIFEST_BLOB_NAME, &mut raw_data).await?;
138 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
139 // no expected digest available
140 let data = blob.decode(None, None)?;
141
142 let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
143
144 Ok((manifest, data))
145 }
146
147 /// Download a .blob file
148 ///
149 /// This creates a temporary file in /tmp (using O_TMPFILE). The data is verified using
150 /// the provided manifest.
151 pub async fn download_blob(
152 &self,
153 manifest: &BackupManifest,
154 name: &str,
155 ) -> Result<DataBlobReader<'_, File>, Error> {
156
157 let mut tmpfile = std::fs::OpenOptions::new()
158 .write(true)
159 .read(true)
160 .custom_flags(libc::O_TMPFILE)
161 .open("/tmp")?;
162
163 self.download(name, &mut tmpfile).await?;
164
165 tmpfile.seek(SeekFrom::Start(0))?;
166 let (csum, size) = sha256(&mut tmpfile)?;
167 manifest.verify_file(name, &csum, size)?;
168
169 tmpfile.seek(SeekFrom::Start(0))?;
170
171 DataBlobReader::new(tmpfile, self.crypt_config.clone())
172 }
173
174 /// Download dynamic index file
175 ///
176 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
177 /// the provided manifest.
178 pub async fn download_dynamic_index(
179 &self,
180 manifest: &BackupManifest,
181 name: &str,
182 ) -> Result<DynamicIndexReader, Error> {
183
184 let mut tmpfile = std::fs::OpenOptions::new()
185 .write(true)
186 .read(true)
187 .custom_flags(libc::O_TMPFILE)
188 .open("/tmp")?;
189
190 self.download(name, &mut tmpfile).await?;
191
192 let index = DynamicIndexReader::new(tmpfile)
193 .map_err(|err| format_err!("unable to read dynamic index '{}' - {}", name, err))?;
194
195 // Note: do not use values stored in index (not trusted) - instead, computed them again
196 let (csum, size) = index.compute_csum();
197 manifest.verify_file(name, &csum, size)?;
198
199 Ok(index)
200 }
201
202 /// Download fixed index file
203 ///
204 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
205 /// the provided manifest.
206 pub async fn download_fixed_index(
207 &self,
208 manifest: &BackupManifest,
209 name: &str,
210 ) -> Result<FixedIndexReader, Error> {
211
212 let mut tmpfile = std::fs::OpenOptions::new()
213 .write(true)
214 .read(true)
215 .custom_flags(libc::O_TMPFILE)
216 .open("/tmp")?;
217
218 self.download(name, &mut tmpfile).await?;
219
220 let index = FixedIndexReader::new(tmpfile)
221 .map_err(|err| format_err!("unable to read fixed index '{}' - {}", name, err))?;
222
223 // Note: do not use values stored in index (not trusted) - instead, computed them again
224 let (csum, size) = index.compute_csum();
225 manifest.verify_file(name, &csum, size)?;
226
227 Ok(index)
228 }
229 }