1 //! Sync datastore from remote server
5 use std
::convert
::TryFrom
;
7 use std
::collections
::HashMap
;
8 use std
::io
::{Seek, SeekFrom}
;
9 use chrono
::{Utc, TimeZone}
;
11 use proxmox
::api
::api
;
12 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment}
;
14 use crate::server
::{WorkerTask}
;
17 use crate::config
::remote
;
18 use crate::api2
::types
::*;
20 // fixme: implement filters
21 // fixme: delete vanished groups
22 // Todo: correctly lock backup groups
24 async
fn pull_index_chunks
<I
: IndexFile
>(
26 chunk_reader
: &mut RemoteChunkReader
,
27 target
: Arc
<DataStore
>,
29 ) -> Result
<(), Error
> {
32 for pos
in 0..index
.index_count() {
33 let digest
= index
.index_digest(pos
).unwrap();
34 let chunk_exists
= target
.cond_touch_chunk(digest
, false)?
;
36 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
39 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
40 let chunk
= chunk_reader
.read_raw_chunk(&digest
)?
;
42 target
.insert_chunk(&chunk
, &digest
)?
;
48 async
fn download_manifest(
49 reader
: &BackupReader
,
50 filename
: &std
::path
::Path
,
51 ) -> Result
<std
::fs
::File
, Error
> {
53 let tmp_manifest_file
= std
::fs
::OpenOptions
::new()
59 let mut tmp_manifest_file
= reader
.download(MANIFEST_BLOB_NAME
, tmp_manifest_file
).await?
;
61 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
66 async
fn pull_single_archive(
68 reader
: &BackupReader
,
69 chunk_reader
: &mut RemoteChunkReader
,
70 tgt_store
: Arc
<DataStore
>,
73 ) -> Result
<(), Error
> {
75 let mut path
= tgt_store
.base_path();
76 path
.push(snapshot
.relative_path());
77 path
.push(archive_name
);
79 let mut tmp_path
= path
.clone();
80 tmp_path
.set_extension("tmp");
82 worker
.log(format
!("sync archive {}", archive_name
));
83 let tmpfile
= std
::fs
::OpenOptions
::new()
89 let tmpfile
= reader
.download(archive_name
, tmpfile
).await?
;
91 match archive_type(archive_name
)?
{
92 ArchiveType
::DynamicIndex
=> {
93 let index
= DynamicIndexReader
::new(tmpfile
)
94 .map_err(|err
| format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
))?
;
96 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
98 ArchiveType
::FixedIndex
=> {
99 let index
= FixedIndexReader
::new(tmpfile
)
100 .map_err(|err
| format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
))?
;
102 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
104 ArchiveType
::Blob
=> { /* nothing to do */ }
106 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
107 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
112 async
fn pull_snapshot(
114 reader
: Arc
<BackupReader
>,
115 tgt_store
: Arc
<DataStore
>,
116 snapshot
: &BackupDir
,
117 ) -> Result
<(), Error
> {
119 let mut manifest_name
= tgt_store
.base_path();
120 manifest_name
.push(snapshot
.relative_path());
121 manifest_name
.push(MANIFEST_BLOB_NAME
);
123 let mut tmp_manifest_name
= manifest_name
.clone();
124 tmp_manifest_name
.set_extension("tmp");
126 let mut tmp_manifest_file
= download_manifest(&reader
, &tmp_manifest_name
).await?
;
127 let tmp_manifest_blob
= DataBlob
::load(&mut tmp_manifest_file
)?
;
128 tmp_manifest_blob
.verify_crc()?
;
130 if manifest_name
.exists() {
131 let manifest_blob
= proxmox
::tools
::try_block
!({
132 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
)
133 .map_err(|err
| format_err
!("unable to open local manifest {:?} - {}", manifest_name
, err
))?
;
135 let manifest_blob
= DataBlob
::load(&mut manifest_file
)?
;
136 manifest_blob
.verify_crc()?
;
138 }).map_err(|err
: Error
| {
139 format_err
!("unable to read local manifest {:?} - {}", manifest_name
, err
)
142 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
143 return Ok(()); // nothing changed
147 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
149 let mut chunk_reader
= RemoteChunkReader
::new(reader
.clone(), None
, HashMap
::new());
151 for item
in manifest
.files() {
152 let mut path
= tgt_store
.base_path();
153 path
.push(snapshot
.relative_path());
154 path
.push(&item
.filename
);
157 match archive_type(&item
.filename
)?
{
158 ArchiveType
::DynamicIndex
=> {
159 let index
= DynamicIndexReader
::open(&path
)?
;
160 let (csum
, size
) = index
.compute_csum();
161 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
164 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
168 ArchiveType
::FixedIndex
=> {
169 let index
= FixedIndexReader
::open(&path
)?
;
170 let (csum
, size
) = index
.compute_csum();
171 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
174 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
178 ArchiveType
::Blob
=> {
179 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
180 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
181 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
184 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
201 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
202 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
205 // cleanup - remove stale files
206 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
211 pub async
fn pull_snapshot_from(
213 reader
: Arc
<BackupReader
>,
214 tgt_store
: Arc
<DataStore
>,
215 snapshot
: &BackupDir
,
216 ) -> Result
<(), Error
> {
218 let (_path
, is_new
) = tgt_store
.create_backup_dir(&snapshot
)?
;
221 worker
.log(format
!("sync snapshot {:?}", snapshot
.relative_path()));
223 if let Err(err
) = pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await
{
224 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
) {
225 worker
.log(format
!("cleanup error - {}", cleanup_err
));
230 worker
.log(format
!("re-sync snapshot {:?}", snapshot
.relative_path()));
231 pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
237 pub async
fn pull_group(
240 src_repo
: &BackupRepository
,
241 tgt_store
: Arc
<DataStore
>,
243 ) -> Result
<(), Error
> {
245 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
248 "backup-type": group
.backup_type(),
249 "backup-id": group
.backup_id(),
252 let mut result
= client
.get(&path
, Some(args
)).await?
;
253 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
255 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
257 let auth_info
= client
.login().await?
;
259 let last_sync
= group
.last_successful_backup(&tgt_store
.base_path())?
;
262 let backup_time
= Utc
.timestamp(item
.backup_time
, 0);
263 if let Some(last_sync_time
) = last_sync
{
264 if last_sync_time
> backup_time { continue; }
267 let new_client
= HttpClient
::new(
270 Some(auth_info
.ticket
.clone())
273 let reader
= BackupReader
::start(
283 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
);
285 pull_snapshot_from(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
;
291 pub async
fn pull_store(
294 src_repo
: &BackupRepository
,
295 tgt_store
: Arc
<DataStore
>,
296 ) -> Result
<(), Error
> {
298 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
300 let mut result
= client
.get(&path
, None
).await?
;
302 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
304 list
.sort_unstable_by(|a
, b
| {
305 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
306 if type_order
== std
::cmp
::Ordering
::Equal
{
307 a
.backup_id
.cmp(&b
.backup_id
)
313 let mut errors
= false;
316 let group
= BackupGroup
::new(&item
.backup_type
, &item
.backup_id
);
317 if let Err(err
) = pull_group(worker
, client
, src_repo
, tgt_store
.clone(), &group
).await
{
318 worker
.log(format
!("sync group {}/{} failed - {}", item
.backup_type
, item
.backup_id
, err
));
325 bail
!("sync failed with some errors.");
335 schema
: DATASTORE_SCHEMA
,
338 schema
: REMOTE_ID_SCHEMA
,
341 schema
: DATASTORE_SCHEMA
,
346 /// Sync store from other repository
350 remote_store
: String
,
352 rpcenv
: &mut dyn RpcEnvironment
,
353 ) -> Result
<String
, Error
> {
355 let username
= rpcenv
.get_user().unwrap();
357 let tgt_store
= DataStore
::lookup_datastore(&store
)?
;
359 let (remote_config
, _digest
) = remote
::config()?
;
360 let remote
: remote
::Remote
= remote_config
.lookup("remote", &remote
)?
;
362 let client
= HttpClient
::new(&remote
.host
, &remote
.userid
, Some(remote
.password
.clone()))?
;
363 let _auth_info
= client
.login() // make sure we can auth
365 .map_err(|err
| format_err
!("remote connection to '{}' failed - {}", remote
.host
, err
))?
;
367 let src_repo
= BackupRepository
::new(Some(remote
.userid
), Some(remote
.host
), remote_store
);
369 // fixme: set to_stdout to false?
370 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), &username
.clone(), true, move |worker
| async
move {
372 worker
.log(format
!("sync datastore '{}' start", store
));
374 // explicit create shared lock to prevent GC on newly created chunks
375 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
377 pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone()).await?
;
379 worker
.log(format
!("sync datastore '{}' end", store
));
387 pub const ROUTER
: Router
= Router
::new()
388 .post(&API_METHOD_PULL
);