1 use std
::sync
::{Arc, Mutex}
;
3 use std
::convert
::TryInto
;
5 use anyhow
::{format_err, bail, Error}
;
6 use once_cell
::sync
::OnceCell
;
7 use tokio
::io
::{AsyncReadExt, AsyncSeekExt}
;
8 use tokio
::runtime
::Runtime
;
10 use proxmox_backup
::tools
::runtime
::get_runtime_with_builder
;
11 use proxmox_backup
::backup
::*;
12 use proxmox_backup
::client
::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader}
;
14 use super::BackupSetup
;
15 use crate::registry
::Registry
;
16 use crate::capi_types
::DataPointer
;
18 struct ImageAccessInfo
{
19 reader
: Arc
<tokio
::sync
::Mutex
<AsyncIndexReader
<RemoteChunkReader
, FixedIndexReader
>>>,
20 _archive_name
: String
,
24 pub(crate) struct RestoreTask
{
26 runtime
: Arc
<Runtime
>,
27 crypt_config
: Option
<Arc
<CryptConfig
>>,
28 client
: OnceCell
<Arc
<BackupReader
>>,
29 manifest
: OnceCell
<Arc
<BackupManifest
>>,
30 image_registry
: Arc
<Mutex
<Registry
<ImageAccessInfo
>>>,
35 /// Create a new instance, using the specified Runtime
37 /// We keep a reference to the runtime - else the runtime can be
38 /// dropped and further connections fails.
39 pub fn with_runtime(setup
: BackupSetup
, runtime
: Arc
<Runtime
>) -> Result
<Self, Error
> {
41 let crypt_config
= match setup
.keyfile
{
44 let (key
, _
, _
) = load_and_decrypt_key(path
, & || {
45 match setup
.key_password
{
46 Some(ref key_password
) => Ok(key_password
.as_bytes().to_vec()),
47 None
=> bail
!("no key_password specified"),
50 Some(Arc
::new(CryptConfig
::new(key
)?
))
58 client
: OnceCell
::new(),
59 manifest
: OnceCell
::new(),
60 image_registry
: Arc
::new(Mutex
::new(Registry
::<ImageAccessInfo
>::new())),
64 pub fn new(setup
: BackupSetup
) -> Result
<Self, Error
> {
65 let runtime
= get_runtime_with_builder(|| {
66 let mut builder
= tokio
::runtime
::Builder
::new_multi_thread();
68 builder
.max_blocking_threads(2);
69 builder
.worker_threads(4);
70 builder
.thread_name("proxmox-restore-worker");
73 Self::with_runtime(setup
, runtime
)
76 pub async
fn connect(&self) -> Result
<libc
::c_int
, Error
> {
78 let options
= HttpClientOptions
::new_non_interactive(
79 self.setup
.password
.clone(),
80 self.setup
.fingerprint
.clone(),
83 let http
= HttpClient
::new(&self.setup
.host
, self.setup
.port
, &self.setup
.auth_id
, options
)?
;
84 let client
= BackupReader
::start(
86 self.crypt_config
.clone(),
88 &self.setup
.backup_type
,
89 &self.setup
.backup_id
,
90 self.setup
.backup_time
,
94 let (manifest
, _
) = client
.download_manifest().await?
;
95 manifest
.check_fingerprint(self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
97 self.manifest
.set(Arc
::new(manifest
))
98 .map_err(|_
| format_err
!("already connected!"))?
;
100 self.client
.set(client
)
101 .map_err(|_
| format_err
!("already connected!"))?
;
106 pub fn runtime(&self) -> tokio
::runtime
::Handle
{
107 self.runtime
.handle().clone()
110 pub async
fn restore_image(
112 archive_name
: String
,
113 write_data_callback
: impl Fn(u64, &[u8]) -> i32,
114 write_zero_callback
: impl Fn(u64, u64) -> i32,
116 ) -> Result
<(), Error
> {
119 eprintln
!("download and verify backup index");
122 let client
= match self.client
.get() {
123 Some(reader
) => Arc
::clone(reader
),
124 None
=> bail
!("not connected"),
127 let manifest
= match self.manifest
.get() {
128 Some(manifest
) => Arc
::clone(manifest
),
129 None
=> bail
!("no manifest"),
132 let index
= client
.download_fixed_index(&manifest
, &archive_name
).await?
;
134 let (_
, zero_chunk_digest
) = DataChunkBuilder
::build_zero_chunk(
135 self.crypt_config
.as_ref().map(Arc
::as_ref
),
140 let most_used
= index
.find_most_used_chunks(8);
142 let file_info
= manifest
.lookup_file_info(&archive_name
)?
;
144 let chunk_reader
= RemoteChunkReader
::new(
146 self.crypt_config
.clone(),
147 file_info
.chunk_crypt_mode(),
155 let start_time
= std
::time
::Instant
::now();
157 for pos
in 0..index
.index_count() {
158 let digest
= index
.index_digest(pos
).unwrap();
159 let offset
= (pos
*index
.chunk_size
) as u64;
160 if digest
== &zero_chunk_digest
{
161 let res
= write_zero_callback(offset
, index
.chunk_size
as u64);
163 bail
!("write_zero_callback failed ({})", res
);
165 bytes
+= index
.chunk_size
;
166 zeroes
+= index
.chunk_size
;
168 let raw_data
= ReadChunk
::read_chunk(&chunk_reader
, &digest
)?
;
169 let res
= write_data_callback(offset
, &raw_data
);
171 bail
!("write_data_callback failed ({})", res
);
173 bytes
+= raw_data
.len();
176 let next_per
= ((pos
+1)*100)/index
.index_count();
178 eprintln
!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
180 zeroes
*100/bytes
, zeroes
,
181 start_time
.elapsed().as_secs());
187 let end_time
= std
::time
::Instant
::now();
188 let elapsed
= end_time
.duration_since(start_time
);
189 eprintln
!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
191 elapsed
.as_secs_f64(),
192 bytes
as f64/(1024.0*1024.0*elapsed
.as_secs_f64())
198 pub fn get_image_length(&self, aid
: u8) -> Result
<u64, Error
> {
199 let mut guard
= self.image_registry
.lock().unwrap();
200 let info
= guard
.lookup(aid
)?
;
201 Ok(info
.archive_size
)
204 pub async
fn open_image(
206 archive_name
: String
,
207 ) -> Result
<u8, Error
> {
209 let client
= match self.client
.get() {
210 Some(reader
) => Arc
::clone(reader
),
211 None
=> bail
!("not connected"),
214 let manifest
= match self.manifest
.get() {
215 Some(manifest
) => Arc
::clone(manifest
),
216 None
=> bail
!("no manifest"),
219 let index
= client
.download_fixed_index(&manifest
, &archive_name
).await?
;
220 let archive_size
= index
.index_bytes();
221 let most_used
= index
.find_most_used_chunks(8);
223 let file_info
= manifest
.lookup_file_info(&archive_name
)?
;
225 let chunk_reader
= RemoteChunkReader
::new(
227 self.crypt_config
.clone(),
228 file_info
.chunk_crypt_mode(),
232 let reader
= AsyncIndexReader
::new(index
, chunk_reader
);
234 let info
= ImageAccessInfo
{
236 _archive_name
: archive_name
, /// useful to debug
237 reader
: Arc
::new(tokio
::sync
::Mutex
::new(reader
)),
240 (*self.image_registry
.lock().unwrap()).register(info
)
243 pub async
fn read_image_at(
249 ) -> Result
<libc
::c_int
, Error
> {
251 let (reader
, image_size
) = {
252 let mut guard
= self.image_registry
.lock().unwrap();
253 let info
= guard
.lookup(aid
)?
;
254 (Arc
::clone(&info
.reader
), info
.archive_size
)
257 if offset
> image_size
{
258 bail
!("read index {} out of bounds {}", offset
, image_size
);
261 let mut reader
= reader
.lock().await
;
263 let buf
: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)}
;
267 reader
.seek(SeekFrom
::Start(offset
+ read
)).await?
;
268 let bytes
= reader
.read(&mut buf
[read
as usize..]).await?
;
275 read
+= bytes
as u64;