]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/backup_reader.rs
move 'wait_for_local_worker' from client to server
[proxmox-backup.git] / src / client / backup_reader.rs
CommitLineData
b53f6379 1use anyhow::{format_err, Error};
1bc1d81a 2use std::io::{Write, Seek, SeekFrom};
bb19af73 3use std::fs::File;
9e490a74 4use std::sync::Arc;
c3d84a22 5use std::os::unix::fs::OpenOptionsExt;
9e490a74 6
dc089345 7use futures::future::AbortHandle;
9e490a74
DM
8use serde_json::{json, Value};
9
10use proxmox::tools::digest_to_hex;
11
75f83c6a
WB
12use pbs_datastore::{CryptConfig, BackupManifest};
13use pbs_datastore::data_blob::DataBlob;
14use pbs_datastore::data_blob_reader::DataBlobReader;
15use pbs_datastore::dynamic_index::DynamicIndexReader;
16use pbs_datastore::fixed_index::FixedIndexReader;
17use pbs_datastore::index::IndexFile;
18use pbs_datastore::manifest::MANIFEST_BLOB_NAME;
ba0ccc59 19use pbs_tools::sha::sha256;
9e490a74
DM
20
21use super::{HttpClient, H2Client};
22
913acb41 23/// Backup Reader
9e490a74
DM
24pub struct BackupReader {
25 h2: H2Client,
dc089345 26 abort: AbortHandle,
296c50ba 27 crypt_config: Option<Arc<CryptConfig>>,
9e490a74
DM
28}
29
30impl Drop for BackupReader {
31
32 fn drop(&mut self) {
dc089345 33 self.abort.abort();
9e490a74
DM
34 }
35}
36
37impl BackupReader {
38
dc089345
WB
39 fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
40 Arc::new(Self { h2, abort, crypt_config})
9e490a74
DM
41 }
42
913acb41 43 /// Create a new instance by upgrading the connection at '/api2/json/reader'
9e490a74
DM
44 pub async fn start(
45 client: HttpClient,
296c50ba 46 crypt_config: Option<Arc<CryptConfig>>,
9e490a74
DM
47 datastore: &str,
48 backup_type: &str,
49 backup_id: &str,
6a7be83e 50 backup_time: i64,
9e490a74
DM
51 debug: bool,
52 ) -> Result<Arc<BackupReader>, Error> {
53
54 let param = json!({
55 "backup-type": backup_type,
56 "backup-id": backup_id,
6a7be83e 57 "backup-time": backup_time,
9e490a74
DM
58 "store": datastore,
59 "debug": debug,
60 });
ba20987a 61 let req = HttpClient::request_builder(client.server(), client.port(), "GET", "/api2/json/reader", Some(param)).unwrap();
9e490a74 62
dc089345 63 let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
9e490a74 64
dc089345 65 Ok(BackupReader::new(h2, abort, crypt_config))
9e490a74
DM
66 }
67
913acb41 68 /// Execute a GET request
9e490a74
DM
69 pub async fn get(
70 &self,
71 path: &str,
72 param: Option<Value>,
73 ) -> Result<Value, Error> {
74 self.h2.get(path, param).await
75 }
76
913acb41 77 /// Execute a PUT request
9e490a74
DM
78 pub async fn put(
79 &self,
80 path: &str,
81 param: Option<Value>,
82 ) -> Result<Value, Error> {
83 self.h2.put(path, param).await
84 }
85
913acb41 86 /// Execute a POST request
9e490a74
DM
87 pub async fn post(
88 &self,
89 path: &str,
90 param: Option<Value>,
91 ) -> Result<Value, Error> {
92 self.h2.post(path, param).await
93 }
94
913acb41 95 /// Execute a GET request and send output to a writer
9e490a74
DM
96 pub async fn download<W: Write + Send>(
97 &self,
98 file_name: &str,
99 output: W,
3d571d55 100 ) -> Result<(), Error> {
9e490a74
DM
101 let path = "download";
102 let param = json!({ "file-name": file_name });
103 self.h2.download(path, Some(param), output).await
104 }
105
913acb41
DM
106 /// Execute a special GET request and send output to a writer
107 ///
108 /// This writes random data, and is only useful to test download speed.
9e490a74
DM
109 pub async fn speedtest<W: Write + Send>(
110 &self,
111 output: W,
3d571d55 112 ) -> Result<(), Error> {
9e490a74
DM
113 self.h2.download("speedtest", None, output).await
114 }
115
913acb41 116 /// Download a specific chunk
9e490a74
DM
117 pub async fn download_chunk<W: Write + Send>(
118 &self,
119 digest: &[u8; 32],
120 output: W,
3d571d55 121 ) -> Result<(), Error> {
9e490a74
DM
122 let path = "chunk";
123 let param = json!({ "digest": digest_to_hex(digest) });
124 self.h2.download(path, Some(param), output).await
125 }
126
127 pub fn force_close(self) {
dc089345 128 self.abort.abort();
9e490a74 129 }
296c50ba
DM
130
131 /// Download backup manifest (index.json)
2107a5ae
DM
132 ///
133 /// The manifest signature is verified if we have a crypt_config.
134 pub async fn download_manifest(&self) -> Result<(BackupManifest, Vec<u8>), Error> {
f06b820a 135
3d571d55
WB
136 let mut raw_data = Vec::with_capacity(64 * 1024);
137 self.download(MANIFEST_BLOB_NAME, &mut raw_data).await?;
39f18b30 138 let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
8819d1f2
FG
139 // no expected digest available
140 let data = blob.decode(None, None)?;
b53f6379
DM
141
142 let manifest = BackupManifest::from_data(&data[..], self.crypt_config.as_ref().map(Arc::as_ref))?;
2107a5ae
DM
143
144 Ok((manifest, data))
296c50ba 145 }
c3d84a22 146
bb19af73
DM
147 /// Download a .blob file
148 ///
add5861e 149 /// This creates a temporary file in /tmp (using O_TMPFILE). The data is verified using
bb19af73
DM
150 /// the provided manifest.
151 pub async fn download_blob(
152 &self,
153 manifest: &BackupManifest,
154 name: &str,
90ff75f8 155 ) -> Result<DataBlobReader<'_, File>, Error> {
bb19af73 156
3d571d55 157 let mut tmpfile = std::fs::OpenOptions::new()
bb19af73
DM
158 .write(true)
159 .read(true)
160 .custom_flags(libc::O_TMPFILE)
161 .open("/tmp")?;
162
3d571d55 163 self.download(name, &mut tmpfile).await?;
bb19af73 164
ba0ccc59
WB
165 tmpfile.seek(SeekFrom::Start(0))?;
166 let (csum, size) = sha256(&mut tmpfile)?;
bb19af73
DM
167 manifest.verify_file(name, &csum, size)?;
168
169 tmpfile.seek(SeekFrom::Start(0))?;
170
171 DataBlobReader::new(tmpfile, self.crypt_config.clone())
172 }
173
c3d84a22
DM
174 /// Download dynamic index file
175 ///
add5861e 176 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
c3d84a22
DM
177 /// the provided manifest.
178 pub async fn download_dynamic_index(
179 &self,
180 manifest: &BackupManifest,
181 name: &str,
182 ) -> Result<DynamicIndexReader, Error> {
183
3d571d55 184 let mut tmpfile = std::fs::OpenOptions::new()
c3d84a22
DM
185 .write(true)
186 .read(true)
187 .custom_flags(libc::O_TMPFILE)
188 .open("/tmp")?;
189
3d571d55 190 self.download(name, &mut tmpfile).await?;
c3d84a22
DM
191
192 let index = DynamicIndexReader::new(tmpfile)
193 .map_err(|err| format_err!("unable to read dynamic index '{}' - {}", name, err))?;
194
195 // Note: do not use values stored in index (not trusted) - instead, computed them again
196 let (csum, size) = index.compute_csum();
197 manifest.verify_file(name, &csum, size)?;
198
199 Ok(index)
200 }
72050500
DM
201
202 /// Download fixed index file
203 ///
add5861e 204 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
72050500
DM
205 /// the provided manifest.
206 pub async fn download_fixed_index(
207 &self,
208 manifest: &BackupManifest,
209 name: &str,
210 ) -> Result<FixedIndexReader, Error> {
211
3d571d55 212 let mut tmpfile = std::fs::OpenOptions::new()
72050500
DM
213 .write(true)
214 .read(true)
215 .custom_flags(libc::O_TMPFILE)
216 .open("/tmp")?;
217
3d571d55 218 self.download(name, &mut tmpfile).await?;
72050500
DM
219
220 let index = FixedIndexReader::new(tmpfile)
221 .map_err(|err| format_err!("unable to read fixed index '{}' - {}", name, err))?;
222
223 // Note: do not use values stored in index (not trusted) - instead, computed them again
224 let (csum, size) = index.compute_csum();
225 manifest.verify_file(name, &csum, size)?;
226
227 Ok(index)
228 }
9e490a74 229}