1 //! Sync datastore from remote server
3 use anyhow
::{bail, format_err, Error}
;
5 use std
::convert
::TryFrom
;
7 use std
::collections
::HashMap
;
8 use std
::io
::{Seek, SeekFrom}
;
9 use chrono
::{Utc, TimeZone}
;
11 use proxmox
::api
::api
;
12 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment, Permission}
;
14 use crate::server
::{WorkerTask}
;
17 use crate::config
::remote
;
18 use crate::api2
::types
::*;
19 use crate::config
::acl
::{PRIV_DATASTORE_CREATE_BACKUP, PRIV_DATASTORE_READ}
;
20 use crate::config
::cached_user_info
::CachedUserInfo
;
22 // fixme: implement filters
23 // fixme: delete vanished groups
24 // Todo: correctly lock backup groups
26 async
fn pull_index_chunks
<I
: IndexFile
>(
28 chunk_reader
: &mut RemoteChunkReader
,
29 target
: Arc
<DataStore
>,
31 ) -> Result
<(), Error
> {
34 for pos
in 0..index
.index_count() {
35 let digest
= index
.index_digest(pos
).unwrap();
36 let chunk_exists
= target
.cond_touch_chunk(digest
, false)?
;
38 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
41 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
42 let chunk
= chunk_reader
.read_raw_chunk(&digest
)?
;
44 target
.insert_chunk(&chunk
, &digest
)?
;
50 async
fn download_manifest(
51 reader
: &BackupReader
,
52 filename
: &std
::path
::Path
,
53 ) -> Result
<std
::fs
::File
, Error
> {
55 let tmp_manifest_file
= std
::fs
::OpenOptions
::new()
61 let mut tmp_manifest_file
= reader
.download(MANIFEST_BLOB_NAME
, tmp_manifest_file
).await?
;
63 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
68 async
fn pull_single_archive(
70 reader
: &BackupReader
,
71 chunk_reader
: &mut RemoteChunkReader
,
72 tgt_store
: Arc
<DataStore
>,
75 ) -> Result
<(), Error
> {
77 let mut path
= tgt_store
.base_path();
78 path
.push(snapshot
.relative_path());
79 path
.push(archive_name
);
81 let mut tmp_path
= path
.clone();
82 tmp_path
.set_extension("tmp");
84 worker
.log(format
!("sync archive {}", archive_name
));
85 let tmpfile
= std
::fs
::OpenOptions
::new()
91 let tmpfile
= reader
.download(archive_name
, tmpfile
).await?
;
93 match archive_type(archive_name
)?
{
94 ArchiveType
::DynamicIndex
=> {
95 let index
= DynamicIndexReader
::new(tmpfile
)
96 .map_err(|err
| format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
))?
;
98 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
100 ArchiveType
::FixedIndex
=> {
101 let index
= FixedIndexReader
::new(tmpfile
)
102 .map_err(|err
| format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
))?
;
104 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
106 ArchiveType
::Blob
=> { /* nothing to do */ }
108 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
109 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
114 async
fn pull_snapshot(
116 reader
: Arc
<BackupReader
>,
117 tgt_store
: Arc
<DataStore
>,
118 snapshot
: &BackupDir
,
119 ) -> Result
<(), Error
> {
121 let mut manifest_name
= tgt_store
.base_path();
122 manifest_name
.push(snapshot
.relative_path());
123 manifest_name
.push(MANIFEST_BLOB_NAME
);
125 let mut tmp_manifest_name
= manifest_name
.clone();
126 tmp_manifest_name
.set_extension("tmp");
128 let mut tmp_manifest_file
= download_manifest(&reader
, &tmp_manifest_name
).await?
;
129 let tmp_manifest_blob
= DataBlob
::load(&mut tmp_manifest_file
)?
;
130 tmp_manifest_blob
.verify_crc()?
;
132 if manifest_name
.exists() {
133 let manifest_blob
= proxmox
::try_block
!({
134 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
)
135 .map_err(|err
| format_err
!("unable to open local manifest {:?} - {}", manifest_name
, err
))?
;
137 let manifest_blob
= DataBlob
::load(&mut manifest_file
)?
;
138 manifest_blob
.verify_crc()?
;
140 }).map_err(|err
: Error
| {
141 format_err
!("unable to read local manifest {:?} - {}", manifest_name
, err
)
144 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
145 return Ok(()); // nothing changed
149 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
151 let mut chunk_reader
= RemoteChunkReader
::new(reader
.clone(), None
, HashMap
::new());
153 for item
in manifest
.files() {
154 let mut path
= tgt_store
.base_path();
155 path
.push(snapshot
.relative_path());
156 path
.push(&item
.filename
);
159 match archive_type(&item
.filename
)?
{
160 ArchiveType
::DynamicIndex
=> {
161 let index
= DynamicIndexReader
::open(&path
)?
;
162 let (csum
, size
) = index
.compute_csum();
163 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
166 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
170 ArchiveType
::FixedIndex
=> {
171 let index
= FixedIndexReader
::open(&path
)?
;
172 let (csum
, size
) = index
.compute_csum();
173 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
176 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
180 ArchiveType
::Blob
=> {
181 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
182 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
183 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
186 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
203 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
204 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
207 // cleanup - remove stale files
208 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
213 pub async
fn pull_snapshot_from(
215 reader
: Arc
<BackupReader
>,
216 tgt_store
: Arc
<DataStore
>,
217 snapshot
: &BackupDir
,
218 ) -> Result
<(), Error
> {
220 let (_path
, is_new
) = tgt_store
.create_backup_dir(&snapshot
)?
;
223 worker
.log(format
!("sync snapshot {:?}", snapshot
.relative_path()));
225 if let Err(err
) = pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await
{
226 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
) {
227 worker
.log(format
!("cleanup error - {}", cleanup_err
));
232 worker
.log(format
!("re-sync snapshot {:?}", snapshot
.relative_path()));
233 pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
239 pub async
fn pull_group(
242 src_repo
: &BackupRepository
,
243 tgt_store
: Arc
<DataStore
>,
246 ) -> Result
<(), Error
> {
248 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
251 "backup-type": group
.backup_type(),
252 "backup-id": group
.backup_id(),
255 let mut result
= client
.get(&path
, Some(args
)).await?
;
256 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
258 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
260 let auth_info
= client
.login().await?
;
261 let fingerprint
= client
.fingerprint();
263 let last_sync
= tgt_store
.last_successful_backup(group
)?
;
265 let mut remote_snapshots
= std
::collections
::HashSet
::new();
268 let backup_time
= Utc
.timestamp(item
.backup_time
, 0);
269 remote_snapshots
.insert(backup_time
);
271 if let Some(last_sync_time
) = last_sync
{
272 if last_sync_time
> backup_time { continue; }
275 let options
= HttpClientOptions
::new()
276 .password(Some(auth_info
.ticket
.clone()))
277 .fingerprint(fingerprint
.clone());
279 let new_client
= HttpClient
::new(src_repo
.host(), src_repo
.user(), options
)?
;
281 let reader
= BackupReader
::start(
291 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
);
293 pull_snapshot_from(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
;
297 let local_list
= group
.list_backups(&tgt_store
.base_path())?
;
298 for info
in local_list
{
299 let backup_time
= info
.backup_dir
.backup_time();
300 if remote_snapshots
.contains(&backup_time
) { continue; }
301 worker
.log(format
!("delete vanished snapshot {:?}", info
.backup_dir
.relative_path()));
302 tgt_store
.remove_backup_dir(&info
.backup_dir
)?
;
309 pub async
fn pull_store(
312 src_repo
: &BackupRepository
,
313 tgt_store
: Arc
<DataStore
>,
315 ) -> Result
<(), Error
> {
317 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
319 let mut result
= client
.get(&path
, None
).await?
;
321 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
323 list
.sort_unstable_by(|a
, b
| {
324 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
325 if type_order
== std
::cmp
::Ordering
::Equal
{
326 a
.backup_id
.cmp(&b
.backup_id
)
332 let mut errors
= false;
334 let mut new_groups
= std
::collections
::HashSet
::new();
337 let group
= BackupGroup
::new(&item
.backup_type
, &item
.backup_id
);
338 if let Err(err
) = pull_group(worker
, client
, src_repo
, tgt_store
.clone(), &group
, delete
).await
{
339 worker
.log(format
!("sync group {}/{} failed - {}", item
.backup_type
, item
.backup_id
, err
));
341 // do not stop here, instead continue
343 new_groups
.insert(group
);
347 let result
: Result
<(), Error
> = proxmox
::try_block
!({
348 let local_groups
= BackupGroup
::list_groups(&tgt_store
.base_path())?
;
349 for local_group
in local_groups
{
350 if new_groups
.contains(&local_group
) { continue; }
351 worker
.log(format
!("delete vanished group '{}/{}'", local_group
.backup_type(), local_group
.backup_id()));
352 if let Err(err
) = tgt_store
.remove_backup_group(&local_group
) {
353 worker
.log(err
.to_string());
359 if let Err(err
) = result
{
360 worker
.log(format
!("error during cleanup: {}", err
));
366 bail
!("sync failed with some errors.");
376 schema
: DATASTORE_SCHEMA
,
379 schema
: REMOTE_ID_SCHEMA
,
382 schema
: DATASTORE_SCHEMA
,
385 description
: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
393 // Note: used parameters are no uri parameters, so we need to test inside function body
394 description
: "The user needs Datastore.CreateBackup privilege on '/datastore/{store}' and Datastore.Read on '/remote/{remote}/{remote-store}'.",
395 permission
: &Permission
::Anybody
,
398 /// Sync store from other repository
402 remote_store
: String
,
403 delete
: Option
<bool
>,
405 rpcenv
: &mut dyn RpcEnvironment
,
406 ) -> Result
<String
, Error
> {
408 let user_info
= CachedUserInfo
::new()?
;
410 let username
= rpcenv
.get_user().unwrap();
411 user_info
.check_privs(&username
, &["datastore", &store
], PRIV_DATASTORE_CREATE_BACKUP
, false)?
;
412 user_info
.check_privs(&username
, &["remote", &remote
, &remote_store
], PRIV_DATASTORE_READ
, false)?
;
414 let delete
= delete
.unwrap_or(true);
416 let tgt_store
= DataStore
::lookup_datastore(&store
)?
;
418 let (remote_config
, _digest
) = remote
::config()?
;
419 let remote
: remote
::Remote
= remote_config
.lookup("remote", &remote
)?
;
421 let options
= HttpClientOptions
::new()
422 .password(Some(remote
.password
.clone()))
423 .fingerprint(remote
.fingerprint
.clone());
425 let client
= HttpClient
::new(&remote
.host
, &remote
.userid
, options
)?
;
426 let _auth_info
= client
.login() // make sure we can auth
428 .map_err(|err
| format_err
!("remote connection to '{}' failed - {}", remote
.host
, err
))?
;
430 let src_repo
= BackupRepository
::new(Some(remote
.userid
), Some(remote
.host
), remote_store
);
432 // fixme: set to_stdout to false?
433 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), &username
.clone(), true, move |worker
| async
move {
435 worker
.log(format
!("sync datastore '{}' start", store
));
437 // explicit create shared lock to prevent GC on newly created chunks
438 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
440 pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
).await?
;
442 worker
.log(format
!("sync datastore '{}' end", store
));
450 pub const ROUTER
: Router
= Router
::new()
451 .post(&API_METHOD_PULL
);