1 //! Sync datastore from remote server
5 use std
::convert
::TryFrom
;
7 use std
::collections
::HashMap
;
8 use std
::io
::{Seek, SeekFrom}
;
9 use chrono
::{Utc, TimeZone}
;
11 use proxmox
::api
::api
;
12 use proxmox
::api
::{ApiMethod, Router, RpcEnvironment}
;
14 use crate::server
::{WorkerTask}
;
17 use crate::api2
::types
::*;
19 // fixme: implement filters
20 // fixme: delete vanished groups
21 // Todo: correctly lock backup groups
23 async
fn pull_index_chunks
<I
: IndexFile
>(
25 chunk_reader
: &mut RemoteChunkReader
,
26 target
: Arc
<DataStore
>,
28 ) -> Result
<(), Error
> {
31 for pos
in 0..index
.index_count() {
32 let digest
= index
.index_digest(pos
).unwrap();
33 let chunk_exists
= target
.cond_touch_chunk(digest
, false)?
;
35 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
38 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
39 let chunk
= chunk_reader
.read_raw_chunk(&digest
)?
;
41 target
.insert_chunk(&chunk
, &digest
)?
;
47 async
fn download_manifest(
48 reader
: &BackupReader
,
49 filename
: &std
::path
::Path
,
50 ) -> Result
<std
::fs
::File
, Error
> {
52 let tmp_manifest_file
= std
::fs
::OpenOptions
::new()
58 let mut tmp_manifest_file
= reader
.download(MANIFEST_BLOB_NAME
, tmp_manifest_file
).await?
;
60 tmp_manifest_file
.seek(SeekFrom
::Start(0))?
;
65 async
fn pull_single_archive(
67 reader
: &BackupReader
,
68 chunk_reader
: &mut RemoteChunkReader
,
69 tgt_store
: Arc
<DataStore
>,
72 ) -> Result
<(), Error
> {
74 let mut path
= tgt_store
.base_path();
75 path
.push(snapshot
.relative_path());
76 path
.push(archive_name
);
78 let mut tmp_path
= path
.clone();
79 tmp_path
.set_extension("tmp");
81 worker
.log(format
!("sync archive {}", archive_name
));
82 let tmpfile
= std
::fs
::OpenOptions
::new()
88 let tmpfile
= reader
.download(archive_name
, tmpfile
).await?
;
90 match archive_type(archive_name
)?
{
91 ArchiveType
::DynamicIndex
=> {
92 let index
= DynamicIndexReader
::new(tmpfile
)
93 .map_err(|err
| format_err
!("unable to read dynamic index {:?} - {}", tmp_path
, err
))?
;
95 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
97 ArchiveType
::FixedIndex
=> {
98 let index
= FixedIndexReader
::new(tmpfile
)
99 .map_err(|err
| format_err
!("unable to read fixed index '{:?}' - {}", tmp_path
, err
))?
;
101 pull_index_chunks(worker
, chunk_reader
, tgt_store
.clone(), index
).await?
;
103 ArchiveType
::Blob
=> { /* nothing to do */ }
105 if let Err(err
) = std
::fs
::rename(&tmp_path
, &path
) {
106 bail
!("Atomic rename file {:?} failed - {}", path
, err
);
111 async
fn pull_snapshot(
113 reader
: Arc
<BackupReader
>,
114 tgt_store
: Arc
<DataStore
>,
115 snapshot
: &BackupDir
,
116 ) -> Result
<(), Error
> {
118 let mut manifest_name
= tgt_store
.base_path();
119 manifest_name
.push(snapshot
.relative_path());
120 manifest_name
.push(MANIFEST_BLOB_NAME
);
122 let mut tmp_manifest_name
= manifest_name
.clone();
123 tmp_manifest_name
.set_extension("tmp");
125 let mut tmp_manifest_file
= download_manifest(&reader
, &tmp_manifest_name
).await?
;
126 let tmp_manifest_blob
= DataBlob
::load(&mut tmp_manifest_file
)?
;
127 tmp_manifest_blob
.verify_crc()?
;
129 if manifest_name
.exists() {
130 let manifest_blob
= proxmox
::tools
::try_block
!({
131 let mut manifest_file
= std
::fs
::File
::open(&manifest_name
)
132 .map_err(|err
| format_err
!("unable to open local manifest {:?} - {}", manifest_name
, err
))?
;
134 let manifest_blob
= DataBlob
::load(&mut manifest_file
)?
;
135 manifest_blob
.verify_crc()?
;
137 }).map_err(|err
: Error
| {
138 format_err
!("unable to read local manifest {:?} - {}", manifest_name
, err
)
141 if manifest_blob
.raw_data() == tmp_manifest_blob
.raw_data() {
142 return Ok(()); // nothing changed
146 let manifest
= BackupManifest
::try_from(tmp_manifest_blob
)?
;
148 let mut chunk_reader
= RemoteChunkReader
::new(reader
.clone(), None
, HashMap
::new());
150 for item
in manifest
.files() {
151 let mut path
= tgt_store
.base_path();
152 path
.push(snapshot
.relative_path());
153 path
.push(&item
.filename
);
156 match archive_type(&item
.filename
)?
{
157 ArchiveType
::DynamicIndex
=> {
158 let index
= DynamicIndexReader
::open(&path
)?
;
159 let (csum
, size
) = index
.compute_csum();
160 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
163 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
167 ArchiveType
::FixedIndex
=> {
168 let index
= FixedIndexReader
::open(&path
)?
;
169 let (csum
, size
) = index
.compute_csum();
170 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
173 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
177 ArchiveType
::Blob
=> {
178 let mut tmpfile
= std
::fs
::File
::open(&path
)?
;
179 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
180 match manifest
.verify_file(&item
.filename
, &csum
, size
) {
183 worker
.log(format
!("detected changed file {:?} - {}", path
, err
));
200 if let Err(err
) = std
::fs
::rename(&tmp_manifest_name
, &manifest_name
) {
201 bail
!("Atomic rename file {:?} failed - {}", manifest_name
, err
);
204 // cleanup - remove stale files
205 tgt_store
.cleanup_backup_dir(snapshot
, &manifest
)?
;
210 pub async
fn pull_snapshot_from(
212 reader
: Arc
<BackupReader
>,
213 tgt_store
: Arc
<DataStore
>,
214 snapshot
: &BackupDir
,
215 ) -> Result
<(), Error
> {
217 let (_path
, is_new
) = tgt_store
.create_backup_dir(&snapshot
)?
;
220 worker
.log(format
!("sync snapshot {:?}", snapshot
.relative_path()));
222 if let Err(err
) = pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await
{
223 if let Err(cleanup_err
) = tgt_store
.remove_backup_dir(&snapshot
) {
224 worker
.log(format
!("cleanup error - {}", cleanup_err
));
229 worker
.log(format
!("re-sync snapshot {:?}", snapshot
.relative_path()));
230 pull_snapshot(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
236 pub async
fn pull_group(
239 src_repo
: &BackupRepository
,
240 tgt_store
: Arc
<DataStore
>,
242 ) -> Result
<(), Error
> {
244 let path
= format
!("api2/json/admin/datastore/{}/snapshots", src_repo
.store());
247 "backup-type": group
.backup_type(),
248 "backup-id": group
.backup_id(),
251 let mut result
= client
.get(&path
, Some(args
)).await?
;
252 let mut list
: Vec
<SnapshotListItem
> = serde_json
::from_value(result
["data"].take())?
;
254 list
.sort_unstable_by(|a
, b
| a
.backup_time
.cmp(&b
.backup_time
));
256 let auth_info
= client
.login().await?
;
258 let last_sync
= group
.last_successful_backup(&tgt_store
.base_path())?
;
261 let backup_time
= Utc
.timestamp(item
.backup_time
, 0);
262 if let Some(last_sync_time
) = last_sync
{
263 if last_sync_time
> backup_time { continue; }
266 let new_client
= HttpClient
::new(
269 Some(auth_info
.ticket
.clone())
272 let reader
= BackupReader
::start(
282 let snapshot
= BackupDir
::new(item
.backup_type
, item
.backup_id
, item
.backup_time
);
284 pull_snapshot_from(worker
, reader
, tgt_store
.clone(), &snapshot
).await?
;
290 pub async
fn pull_store(
293 src_repo
: &BackupRepository
,
294 tgt_store
: Arc
<DataStore
>,
295 ) -> Result
<(), Error
> {
297 let path
= format
!("api2/json/admin/datastore/{}/groups", src_repo
.store());
299 let mut result
= client
.get(&path
, None
).await?
;
301 let list
= result
["data"].as_array_mut().unwrap();
303 list
.sort_unstable_by(|a
, b
| {
304 let a_id
= a
["backup-id"].as_str().unwrap();
305 let a_backup_type
= a
["backup-type"].as_str().unwrap();
306 let b_id
= b
["backup-id"].as_str().unwrap();
307 let b_backup_type
= b
["backup-type"].as_str().unwrap();
309 let type_order
= a_backup_type
.cmp(b_backup_type
);
310 if type_order
== std
::cmp
::Ordering
::Equal
{
317 let mut errors
= false;
321 let id
= item
["backup-id"].as_str().unwrap();
322 let btype
= item
["backup-type"].as_str().unwrap();
324 let group
= BackupGroup
::new(btype
, id
);
325 if let Err(err
) = pull_group(worker
, client
, src_repo
, tgt_store
.clone(), &group
).await
{
326 worker
.log(format
!("sync group {}/{} failed - {}", btype
, id
, err
));
333 bail
!("sync failed with some errors.");
343 schema
: DATASTORE_SCHEMA
,
346 description
: "Remote host", // TODO: use predefined type: host or IP
350 schema
: DATASTORE_SCHEMA
,
353 description
: "Remote user name.", // TODO: use predefined typed
357 description
: "Remote passsword.",
363 /// Sync store from other repository
367 remote_store
: String
,
369 remote_password
: String
,
371 rpcenv
: &mut dyn RpcEnvironment
,
372 ) -> Result
<String
, Error
> {
374 let username
= rpcenv
.get_user().unwrap();
376 let tgt_store
= DataStore
::lookup_datastore(&store
)?
;
378 let client
= HttpClient
::new(&remote_host
, &remote_user
, Some(remote_password
))?
;
379 let _auth_info
= client
.login() // make sure we can auth
381 .map_err(|err
| format_err
!("remote connection to '{}' failed - {}", remote_host
, err
))?
;
383 let src_repo
= BackupRepository
::new(Some(remote_user
), Some(remote_host
), remote_store
);
385 // fixme: set to_stdout to false?
386 let upid_str
= WorkerTask
::spawn("sync", Some(store
.clone()), &username
.clone(), true, move |worker
| async
move {
388 worker
.log(format
!("sync datastore '{}' start", store
));
390 // explicit create shared lock to prevent GC on newly created chunks
391 let _shared_store_lock
= tgt_store
.try_shared_chunk_store_lock()?
;
393 pull_store(&worker
, &client
, &src_repo
, tgt_store
.clone()).await?
;
395 worker
.log(format
!("sync datastore '{}' end", store
));
403 pub const ROUTER
: Router
= Router
::new()
404 .post(&API_METHOD_PULL
);