1 //! Sync datastore from remote server
5 use std
::convert
::TryFrom
;
7 use std
::collections
::HashMap
;
8 use std
::io
::{Seek, SeekFrom}
;
9 use chrono
::{Utc, TimeZone}
;
11 use proxmox
::api
::api
;
12 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment}
;
14 use crate::server
::{WorkerTask}
;
17 use crate::config
::remote
;
18 use crate::api2
::types
::*;
20 // fixme: implement filters
21 // fixme: delete vanished groups
22 // Todo: correctly lock backup groups
24 async
fn pull_index_chunks
<I
: IndexFile
>(
26 chunk_reader
: &mut RemoteChunkReader
,
27 target
: Arc
<DataStore
>,
29 ) -> Result
<(), Error
> {
32 for pos
in 0..index
.index_count() {
33 let digest
= index
.index_digest(pos
).unwrap();
34 let chunk_exists
= target
.cond_touch_chunk(digest
, false)?
;
36 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
39 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
40 let chunk
= chunk_reader
.read_raw_chunk(&digest
)?
;
42 target
.insert_chunk(&chunk
, &digest
)?
;
48 async
fn download_manifest(
49 reader
: &BackupReader
,
50 filename
: &std
::path
::Path
,
51 ) -> Result
<std
::fs
::File
, Error
> {
53 let tmp_manifest_file
= std
::fs
::OpenOptions
::new()
59 let mut tmp_manifest_file
= reader
.download(MANIFEST_BLOB_NAME
, tmp_manifest_file
).await?
;
61 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
66 async
fn pull_single_archive(
68 reader
: &BackupReader
,
69 chunk_reader
: &mut RemoteChunkReader
,
70 tgt_store
: Arc
<DataStore
>,
73 ) -> Result
<(), Error
> {
75 let mut path
= tgt_store
.base_path();
76 path
.push(snapshot
.relative_path());
77 path
.push(archive_name
);
79 let mut tmp_path
= path
.clone();
80 tmp_path
.set_extension("tmp");
82 worker
.log(format
!("sync archive {}", archive_name
));
83 let tmpfile
= std
::fs
::OpenOptions
::new()
89 let tmpfile
= reader
.download(archive_name
, tmpfile
).await?
;
91 match archive_type(archive_name
)?
{
92 ArchiveType
::DynamicIndex
=> {
93 let index
= DynamicIndexReader
::new(tmpfile
)
94 .map_err(|err
| format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
))?
;
96 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
98 ArchiveType
::FixedIndex
=> {
99 let index
= FixedIndexReader
::new(tmpfile
)
100 .map_err(|err
| format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
))?
;
102 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
104 ArchiveType
::Blob
=> { /* nothing to do */ }
106 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
107 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
112 async
fn pull_snapshot(
114 reader
: Arc
<BackupReader
>,
115 tgt_store
: Arc
<DataStore
>,
116 snapshot
: &BackupDir
,
117 ) -> Result
<(), Error
> {
119 let mut manifest_name
= tgt_store
.base_path();
120 manifest_name
.push(snapshot
.relative_path());
121 manifest_name
.push(MANIFEST_BLOB_NAME
);
123 let mut tmp_manifest_name
= manifest_name
.clone();
124 tmp_manifest_name
.set_extension("tmp");
126 let mut tmp_manifest_file
= download_manifest(&reader
, &tmp_manifest_name
).await?
;
127 let tmp_manifest_blob
= DataBlob
::load(&mut tmp_manifest_file
)?
;
128 tmp_manifest_blob
.verify_crc()?
;
130 if manifest_name
.exists() {
131 let manifest_blob
= proxmox
::try_block
!({
132 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
)
133 .map_err(|err
| format_err
!("unable to open local manifest {:?} - {}", manifest_name
, err
))?
;
135 let manifest_blob
= DataBlob
::load(&mut manifest_file
)?
;
136 manifest_blob
.verify_crc()?
;
138 }).map_err(|err
: Error
| {
139 format_err
!("unable to read local manifest {:?} - {}", manifest_name
, err
)
142 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
143 return Ok(()); // nothing changed
147 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
149 let mut chunk_reader
= RemoteChunkReader
::new(reader
.clone(), None
, HashMap
::new());
151 for item
in manifest
.files() {
152 let mut path
= tgt_store
.base_path();
153 path
.push(snapshot
.relative_path());
154 path
.push(&item
.filename
);
157 match archive_type(&item
.filename
)?
{
158 ArchiveType
::DynamicIndex
=> {
159 let index
= DynamicIndexReader
::open(&path
)?
;
160 let (csum
, size
) = index
.compute_csum();
161 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
164 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
168 ArchiveType
::FixedIndex
=> {
169 let index
= FixedIndexReader
::open(&path
)?
;
170 let (csum
, size
) = index
.compute_csum();
171 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
174 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
178 ArchiveType
::Blob
=> {
179 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
180 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
181 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
184 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
201 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
202 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
205 // cleanup - remove stale files
206 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
211 pub async
fn pull_snapshot_from(
213 reader
: Arc
<BackupReader
>,
214 tgt_store
: Arc
<DataStore
>,
215 snapshot
: &BackupDir
,
216 ) -> Result
<(), Error
> {
218 let (_path
, is_new
) = tgt_store
.create_backup_dir(&snapshot
)?
;
221 worker
.log(format
!("sync snapshot {:?}", snapshot
.relative_path()));
223 if let Err(err
) = pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await
{
224 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
) {
225 worker
.log(format
!("cleanup error - {}", cleanup_err
));
230 worker
.log(format
!("re-sync snapshot {:?}", snapshot
.relative_path()));
231 pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
237 pub async
fn pull_group(
240 src_repo
: &BackupRepository
,
241 tgt_store
: Arc
<DataStore
>,
244 ) -> Result
<(), Error
> {
246 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
249 "backup-type": group
.backup_type(),
250 "backup-id": group
.backup_id(),
253 let mut result
= client
.get(&path
, Some(args
)).await?
;
254 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
256 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
258 let auth_info
= client
.login().await?
;
260 let last_sync
= tgt_store
.last_successful_backup(group
)?
;
262 let mut remote_snapshots
= std
::collections
::HashSet
::new();
265 let backup_time
= Utc
.timestamp(item
.backup_time
, 0);
266 remote_snapshots
.insert(backup_time
);
268 if let Some(last_sync_time
) = last_sync
{
269 if last_sync_time
> backup_time { continue; }
272 let new_client
= HttpClient
::new(
275 Some(auth_info
.ticket
.clone())
278 let reader
= BackupReader
::start(
288 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
);
290 pull_snapshot_from(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
;
294 let local_list
= group
.list_backups(&tgt_store
.base_path())?
;
295 for info
in local_list
{
296 let backup_time
= info
.backup_dir
.backup_time();
297 if remote_snapshots
.contains(&backup_time
) { continue; }
298 worker
.log(format
!("delete vanished snapshot {:?}", info
.backup_dir
.relative_path()));
299 tgt_store
.remove_backup_dir(&info
.backup_dir
)?
;
306 pub async
fn pull_store(
309 src_repo
: &BackupRepository
,
310 tgt_store
: Arc
<DataStore
>,
312 ) -> Result
<(), Error
> {
314 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
316 let mut result
= client
.get(&path
, None
).await?
;
318 let mut list
: Vec
<GroupListItem
> = serde_json
::from_value(result
["data"].take())?
;
320 list
.sort_unstable_by(|a
, b
| {
321 let type_order
= a
.backup_type
.cmp(&b
.backup_type
);
322 if type_order
== std
::cmp
::Ordering
::Equal
{
323 a
.backup_id
.cmp(&b
.backup_id
)
329 let mut errors
= false;
331 let mut new_groups
= std
::collections
::HashSet
::new();
334 let group
= BackupGroup
::new(&item
.backup_type
, &item
.backup_id
);
335 if let Err(err
) = pull_group(worker
, client
, src_repo
, tgt_store
.clone(), &group
, delete
).await
{
336 worker
.log(format
!("sync group {}/{} failed - {}", item
.backup_type
, item
.backup_id
, err
));
338 // do not stop here, instead continue
340 new_groups
.insert(group
);
344 let result
: Result
<(), Error
> = proxmox
::try_block
!({
345 let local_groups
= BackupGroup
::list_groups(&tgt_store
.base_path())?
;
346 for local_group
in local_groups
{
347 if new_groups
.contains(&local_group
) { continue; }
348 worker
.log(format
!("delete vanished group '{}/{}'", local_group
.backup_type(), local_group
.backup_id()));
349 if let Err(err
) = tgt_store
.remove_backup_group(&local_group
) {
350 worker
.log(err
.to_string());
356 if let Err(err
) = result
{
357 worker
.log(format
!("error during cleanup: {}", err
));
363 bail
!("sync failed with some errors.");
373 schema
: DATASTORE_SCHEMA
,
376 schema
: REMOTE_ID_SCHEMA
,
379 schema
: DATASTORE_SCHEMA
,
382 description
: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
390 /// Sync store from other repository
394 remote_store
: String
,
395 delete
: Option
<bool
>,
397 rpcenv
: &mut dyn RpcEnvironment
,
398 ) -> Result
<String
, Error
> {
400 let username
= rpcenv
.get_user().unwrap();
402 let delete
= delete
.unwrap_or(true);
404 let tgt_store
= DataStore
::lookup_datastore(&store
)?
;
406 let (remote_config
, _digest
) = remote
::config()?
;
407 let remote
: remote
::Remote
= remote_config
.lookup("remote", &remote
)?
;
409 let client
= HttpClient
::new(&remote
.host
, &remote
.userid
, Some(remote
.password
.clone()))?
;
410 let _auth_info
= client
.login() // make sure we can auth
412 .map_err(|err
| format_err
!("remote connection to '{}' failed - {}", remote
.host
, err
))?
;
414 let src_repo
= BackupRepository
::new(Some(remote
.userid
), Some(remote
.host
), remote_store
);
416 // fixme: set to_stdout to false?
417 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), &username
.clone(), true, move |worker
| async
move {
419 worker
.log(format
!("sync datastore '{}' start", store
));
421 // explicit create shared lock to prevent GC on newly created chunks
422 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
424 pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone(), delete
).await?
;
426 worker
.log(format
!("sync datastore '{}' end", store
));
434 pub const ROUTER
: Router
= Router
::new()
435 .post(&API_METHOD_PULL
);