]> git.proxmox.com Git - proxmox-backup-qemu.git/blob - src/restore.rs
0790d7ff6cc6022abad25aae6923a099ad386bdd
[proxmox-backup-qemu.git] / src / restore.rs
1 use std::sync::{Arc, Mutex};
2 use std::io::SeekFrom;
3 use std::convert::TryInto;
4
5 use anyhow::{format_err, bail, Error};
6 use once_cell::sync::OnceCell;
7 use tokio::io::{AsyncReadExt, AsyncSeekExt};
8 use tokio::runtime::Runtime;
9
10 use proxmox_backup::tools::runtime::get_runtime_with_builder;
11 use proxmox_backup::backup::*;
12 use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
13
14 use super::BackupSetup;
15 use crate::registry::Registry;
16 use crate::capi_types::DataPointer;
17
18 struct ImageAccessInfo {
19 reader: Arc<tokio::sync::Mutex<AsyncIndexReader<RemoteChunkReader, FixedIndexReader>>>,
20 _archive_name: String,
21 archive_size: u64,
22 }
23
24 pub(crate) struct RestoreTask {
25 setup: BackupSetup,
26 runtime: Arc<Runtime>,
27 crypt_config: Option<Arc<CryptConfig>>,
28 client: OnceCell<Arc<BackupReader>>,
29 manifest: OnceCell<Arc<BackupManifest>>,
30 image_registry: Arc<Mutex<Registry<ImageAccessInfo>>>,
31 }
32
33 impl RestoreTask {
34
35 /// Create a new instance, using the specified Runtime
36 ///
37 /// We keep a reference to the runtime - else the runtime can be
38 /// dropped and further connections fails.
39 pub fn with_runtime(setup: BackupSetup, runtime: Arc<Runtime>) -> Result<Self, Error> {
40
41 let crypt_config = match setup.keyfile {
42 None => None,
43 Some(ref path) => {
44 let (key, _, _) = load_and_decrypt_key(path, & || {
45 match setup.key_password {
46 Some(ref key_password) => Ok(key_password.as_bytes().to_vec()),
47 None => bail!("no key_password specified"),
48 }
49 })?;
50 Some(Arc::new(CryptConfig::new(key)?))
51 }
52 };
53
54 Ok(Self {
55 setup,
56 runtime,
57 crypt_config,
58 client: OnceCell::new(),
59 manifest: OnceCell::new(),
60 image_registry: Arc::new(Mutex::new(Registry::<ImageAccessInfo>::new())),
61 })
62 }
63
64 pub fn new(setup: BackupSetup) -> Result<Self, Error> {
65 let runtime = get_runtime_with_builder(|| {
66 let mut builder = tokio::runtime::Builder::new_multi_thread();
67 builder.enable_all();
68 builder.max_blocking_threads(2);
69 builder.worker_threads(4);
70 builder.thread_name("proxmox-restore-worker");
71 builder
72 });
73 Self::with_runtime(setup, runtime)
74 }
75
76 pub async fn connect(&self) -> Result<libc::c_int, Error> {
77
78 let options = HttpClientOptions::new_non_interactive(
79 self.setup.password.clone(),
80 self.setup.fingerprint.clone(),
81 );
82
83 let http = HttpClient::new(&self.setup.host, self.setup.port, &self.setup.auth_id, options)?;
84 let client = BackupReader::start(
85 http,
86 self.crypt_config.clone(),
87 &self.setup.store,
88 &self.setup.backup_type,
89 &self.setup.backup_id,
90 self.setup.backup_time,
91 true
92 ).await?;
93
94 let (manifest, _) = client.download_manifest().await?;
95 manifest.check_fingerprint(self.crypt_config.as_ref().map(Arc::as_ref))?;
96
97 self.manifest.set(Arc::new(manifest))
98 .map_err(|_| format_err!("already connected!"))?;
99
100 self.client.set(client)
101 .map_err(|_| format_err!("already connected!"))?;
102
103 Ok(0)
104 }
105
106 pub fn runtime(&self) -> tokio::runtime::Handle {
107 self.runtime.handle().clone()
108 }
109
110 pub async fn restore_image(
111 &self,
112 archive_name: String,
113 write_data_callback: impl Fn(u64, &[u8]) -> i32,
114 write_zero_callback: impl Fn(u64, u64) -> i32,
115 verbose: bool,
116 ) -> Result<(), Error> {
117
118 if verbose {
119 eprintln!("download and verify backup index");
120 }
121
122 let client = match self.client.get() {
123 Some(reader) => Arc::clone(reader),
124 None => bail!("not connected"),
125 };
126
127 let manifest = match self.manifest.get() {
128 Some(manifest) => Arc::clone(manifest),
129 None => bail!("no manifest"),
130 };
131
132 let index = client.download_fixed_index(&manifest, &archive_name).await?;
133
134 let (_, zero_chunk_digest) = DataChunkBuilder::build_zero_chunk(
135 self.crypt_config.as_ref().map(Arc::as_ref),
136 index.chunk_size,
137 true,
138 )?;
139
140 let most_used = index.find_most_used_chunks(8);
141
142 let file_info = manifest.lookup_file_info(&archive_name)?;
143
144 let chunk_reader = RemoteChunkReader::new(
145 Arc::clone(&client),
146 self.crypt_config.clone(),
147 file_info.chunk_crypt_mode(),
148 most_used,
149 );
150
151 let mut per = 0;
152 let mut bytes = 0;
153 let mut zeroes = 0;
154
155 let start_time = std::time::Instant::now();
156
157 for pos in 0..index.index_count() {
158 let digest = index.index_digest(pos).unwrap();
159 let offset = (pos*index.chunk_size) as u64;
160 if digest == &zero_chunk_digest {
161 let res = write_zero_callback(offset, index.chunk_size as u64);
162 if res < 0 {
163 bail!("write_zero_callback failed ({})", res);
164 }
165 bytes += index.chunk_size;
166 zeroes += index.chunk_size;
167 } else {
168 let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
169 let res = write_data_callback(offset, &raw_data);
170 if res < 0 {
171 bail!("write_data_callback failed ({})", res);
172 }
173 bytes += raw_data.len();
174 }
175 if verbose {
176 let next_per = ((pos+1)*100)/index.index_count();
177 if per != next_per {
178 eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
179 next_per, bytes,
180 zeroes*100/bytes, zeroes,
181 start_time.elapsed().as_secs());
182 per = next_per;
183 }
184 }
185 }
186
187 let end_time = std::time::Instant::now();
188 let elapsed = end_time.duration_since(start_time);
189 eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
190 bytes,
191 elapsed.as_secs_f64(),
192 bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
193 );
194
195 Ok(())
196 }
197
198 pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {
199 let mut guard = self.image_registry.lock().unwrap();
200 let info = guard.lookup(aid)?;
201 Ok(info.archive_size)
202 }
203
204 pub async fn open_image(
205 &self,
206 archive_name: String,
207 ) -> Result<u8, Error> {
208
209 let client = match self.client.get() {
210 Some(reader) => Arc::clone(reader),
211 None => bail!("not connected"),
212 };
213
214 let manifest = match self.manifest.get() {
215 Some(manifest) => Arc::clone(manifest),
216 None => bail!("no manifest"),
217 };
218
219 let index = client.download_fixed_index(&manifest, &archive_name).await?;
220 let archive_size = index.index_bytes();
221 let most_used = index.find_most_used_chunks(8);
222
223 let file_info = manifest.lookup_file_info(&archive_name)?;
224
225 let chunk_reader = RemoteChunkReader::new(
226 Arc::clone(&client),
227 self.crypt_config.clone(),
228 file_info.chunk_crypt_mode(),
229 most_used,
230 );
231
232 let reader = AsyncIndexReader::new(index, chunk_reader);
233
234 let info = ImageAccessInfo {
235 archive_size,
236 _archive_name: archive_name, /// useful to debug
237 reader: Arc::new(tokio::sync::Mutex::new(reader)),
238 };
239
240 (*self.image_registry.lock().unwrap()).register(info)
241 }
242
243 pub async fn read_image_at(
244 &self,
245 aid: u8,
246 data: DataPointer,
247 offset: u64,
248 size: u64,
249 ) -> Result<libc::c_int, Error> {
250
251 let (reader, image_size) = {
252 let mut guard = self.image_registry.lock().unwrap();
253 let info = guard.lookup(aid)?;
254 (Arc::clone(&info.reader), info.archive_size)
255 };
256
257 if offset > image_size {
258 bail!("read index {} out of bounds {}", offset, image_size);
259 }
260
261 let mut reader = reader.lock().await;
262
263 let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
264 let mut read = 0;
265
266 while read < size {
267 reader.seek(SeekFrom::Start(offset + read)).await?;
268 let bytes = reader.read(&mut buf[read as usize..]).await?;
269
270 if bytes == 0 {
271 // EOF
272 break;
273 }
274
275 read += bytes as u64;
276 }
277
278 Ok(read.try_into()?)
279 }
280 }