]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/pull.rs
switch from failure to anyhow
[proxmox-backup.git] / src / api2 / pull.rs
CommitLineData
eb506c83
DM
1//! Sync datastore from remote server
2
f7d4e4b5 3use anyhow::{bail, format_err, Error};
de8ec041
DM
4use serde_json::json;
5use std::convert::TryFrom;
6use std::sync::Arc;
7use std::collections::HashMap;
8use std::io::{Seek, SeekFrom};
9use chrono::{Utc, TimeZone};
10
11use proxmox::api::api;
404d78c4 12use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
de8ec041
DM
13
14use crate::server::{WorkerTask};
15use crate::backup::*;
16use crate::client::*;
f357390c 17use crate::config::remote;
de8ec041 18use crate::api2::types::*;
404d78c4 19use crate::config::acl::PRIV_DATASTORE_ALLOCATE_SPACE;
de8ec041
DM
20
21// fixme: implement filters
eb506c83 22// fixme: delete vanished groups
de8ec041
DM
23// Todo: correctly lock backup groups
24
eb506c83 25async fn pull_index_chunks<I: IndexFile>(
de8ec041
DM
26 _worker: &WorkerTask,
27 chunk_reader: &mut RemoteChunkReader,
28 target: Arc<DataStore>,
29 index: I,
30) -> Result<(), Error> {
31
32
33 for pos in 0..index.index_count() {
34 let digest = index.index_digest(pos).unwrap();
35 let chunk_exists = target.cond_touch_chunk(digest, false)?;
36 if chunk_exists {
37 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
38 continue;
39 }
40 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
41 let chunk = chunk_reader.read_raw_chunk(&digest)?;
42
43 target.insert_chunk(&chunk, &digest)?;
44 }
45
46 Ok(())
47}
48
49async fn download_manifest(
50 reader: &BackupReader,
51 filename: &std::path::Path,
52) -> Result<std::fs::File, Error> {
53
54 let tmp_manifest_file = std::fs::OpenOptions::new()
55 .write(true)
56 .create(true)
57 .read(true)
58 .open(&filename)?;
59
60 let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
61
62 tmp_manifest_file.seek(SeekFrom::Start(0))?;
63
64 Ok(tmp_manifest_file)
65}
66
eb506c83 67async fn pull_single_archive(
de8ec041
DM
68 worker: &WorkerTask,
69 reader: &BackupReader,
70 chunk_reader: &mut RemoteChunkReader,
71 tgt_store: Arc<DataStore>,
72 snapshot: &BackupDir,
73 archive_name: &str,
74) -> Result<(), Error> {
75
76 let mut path = tgt_store.base_path();
77 path.push(snapshot.relative_path());
78 path.push(archive_name);
79
80 let mut tmp_path = path.clone();
81 tmp_path.set_extension("tmp");
82
83 worker.log(format!("sync archive {}", archive_name));
84 let tmpfile = std::fs::OpenOptions::new()
85 .write(true)
86 .create(true)
87 .read(true)
88 .open(&tmp_path)?;
89
90 let tmpfile = reader.download(archive_name, tmpfile).await?;
91
92 match archive_type(archive_name)? {
93 ArchiveType::DynamicIndex => {
94 let index = DynamicIndexReader::new(tmpfile)
95 .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
96
eb506c83 97 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
de8ec041
DM
98 }
99 ArchiveType::FixedIndex => {
100 let index = FixedIndexReader::new(tmpfile)
101 .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
102
eb506c83 103 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
de8ec041
DM
104 }
105 ArchiveType::Blob => { /* nothing to do */ }
106 }
107 if let Err(err) = std::fs::rename(&tmp_path, &path) {
108 bail!("Atomic rename file {:?} failed - {}", path, err);
109 }
110 Ok(())
111}
112
eb506c83 113async fn pull_snapshot(
de8ec041
DM
114 worker: &WorkerTask,
115 reader: Arc<BackupReader>,
116 tgt_store: Arc<DataStore>,
117 snapshot: &BackupDir,
118) -> Result<(), Error> {
119
120 let mut manifest_name = tgt_store.base_path();
121 manifest_name.push(snapshot.relative_path());
122 manifest_name.push(MANIFEST_BLOB_NAME);
123
124 let mut tmp_manifest_name = manifest_name.clone();
125 tmp_manifest_name.set_extension("tmp");
126
127 let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
128 let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
129 tmp_manifest_blob.verify_crc()?;
130
131 if manifest_name.exists() {
9ea4bce4 132 let manifest_blob = proxmox::try_block!({
de8ec041
DM
133 let mut manifest_file = std::fs::File::open(&manifest_name)
134 .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
135
136 let manifest_blob = DataBlob::load(&mut manifest_file)?;
137 manifest_blob.verify_crc()?;
138 Ok(manifest_blob)
139 }).map_err(|err: Error| {
140 format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
141 })?;
142
143 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
144 return Ok(()); // nothing changed
145 }
146 }
147
148 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
149
150 let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
151
152 for item in manifest.files() {
153 let mut path = tgt_store.base_path();
154 path.push(snapshot.relative_path());
155 path.push(&item.filename);
156
157 if path.exists() {
158 match archive_type(&item.filename)? {
159 ArchiveType::DynamicIndex => {
160 let index = DynamicIndexReader::open(&path)?;
161 let (csum, size) = index.compute_csum();
162 match manifest.verify_file(&item.filename, &csum, size) {
163 Ok(_) => continue,
164 Err(err) => {
165 worker.log(format!("detected changed file {:?} - {}", path, err));
166 }
167 }
168 }
169 ArchiveType::FixedIndex => {
170 let index = FixedIndexReader::open(&path)?;
171 let (csum, size) = index.compute_csum();
172 match manifest.verify_file(&item.filename, &csum, size) {
173 Ok(_) => continue,
174 Err(err) => {
175 worker.log(format!("detected changed file {:?} - {}", path, err));
176 }
177 }
178 }
179 ArchiveType::Blob => {
180 let mut tmpfile = std::fs::File::open(&path)?;
181 let (csum, size) = compute_file_csum(&mut tmpfile)?;
182 match manifest.verify_file(&item.filename, &csum, size) {
183 Ok(_) => continue,
184 Err(err) => {
185 worker.log(format!("detected changed file {:?} - {}", path, err));
186 }
187 }
188 }
189 }
190 }
191
eb506c83 192 pull_single_archive(
de8ec041
DM
193 worker,
194 &reader,
195 &mut chunk_reader,
196 tgt_store.clone(),
197 snapshot,
198 &item.filename,
199 ).await?;
200 }
201
202 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
203 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
204 }
205
206 // cleanup - remove stale files
207 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
208
209 Ok(())
210}
211
eb506c83 212pub async fn pull_snapshot_from(
de8ec041
DM
213 worker: &WorkerTask,
214 reader: Arc<BackupReader>,
215 tgt_store: Arc<DataStore>,
216 snapshot: &BackupDir,
217) -> Result<(), Error> {
218
219 let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
220
221 if is_new {
222 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
223
eb506c83 224 if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
de8ec041
DM
225 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
226 worker.log(format!("cleanup error - {}", cleanup_err));
227 }
228 return Err(err);
229 }
230 } else {
231 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
eb506c83 232 pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
de8ec041
DM
233 }
234
235 Ok(())
236}
237
eb506c83 238pub async fn pull_group(
de8ec041
DM
239 worker: &WorkerTask,
240 client: &HttpClient,
241 src_repo: &BackupRepository,
242 tgt_store: Arc<DataStore>,
243 group: &BackupGroup,
4b4eba0b 244 delete: bool,
de8ec041
DM
245) -> Result<(), Error> {
246
247 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
248
249 let args = json!({
250 "backup-type": group.backup_type(),
251 "backup-id": group.backup_id(),
252 });
253
254 let mut result = client.get(&path, Some(args)).await?;
255 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
256
257 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
258
259 let auth_info = client.login().await?;
d59dbeca 260 let fingerprint = client.fingerprint();
de8ec041 261
18cc66ee 262 let last_sync = tgt_store.last_successful_backup(group)?;
de8ec041 263
c425bdc9
DM
264 let mut remote_snapshots = std::collections::HashSet::new();
265
de8ec041
DM
266 for item in list {
267 let backup_time = Utc.timestamp(item.backup_time, 0);
c425bdc9
DM
268 remote_snapshots.insert(backup_time);
269
de8ec041
DM
270 if let Some(last_sync_time) = last_sync {
271 if last_sync_time > backup_time { continue; }
272 }
273
d59dbeca
DM
274 let options = HttpClientOptions::new()
275 .password(Some(auth_info.ticket.clone()))
276 .fingerprint(fingerprint.clone());
277
278 let new_client = HttpClient::new(src_repo.host(), src_repo.user(), options)?;
de8ec041
DM
279
280 let reader = BackupReader::start(
281 new_client,
282 None,
283 src_repo.store(),
284 &item.backup_type,
285 &item.backup_id,
286 backup_time,
287 true,
288 ).await?;
289
290 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
291
eb506c83 292 pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
de8ec041
DM
293 }
294
4b4eba0b 295 if delete {
c425bdc9
DM
296 let local_list = group.list_backups(&tgt_store.base_path())?;
297 for info in local_list {
298 let backup_time = info.backup_dir.backup_time();
299 if remote_snapshots.contains(&backup_time) { continue; }
300 worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
301 tgt_store.remove_backup_dir(&info.backup_dir)?;
302 }
4b4eba0b
DM
303 }
304
de8ec041
DM
305 Ok(())
306}
307
eb506c83 308pub async fn pull_store(
de8ec041
DM
309 worker: &WorkerTask,
310 client: &HttpClient,
311 src_repo: &BackupRepository,
312 tgt_store: Arc<DataStore>,
4b4eba0b 313 delete: bool,
de8ec041
DM
314) -> Result<(), Error> {
315
316 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
317
318 let mut result = client.get(&path, None).await?;
319
b31c8019 320 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
de8ec041
DM
321
322 list.sort_unstable_by(|a, b| {
b31c8019 323 let type_order = a.backup_type.cmp(&b.backup_type);
de8ec041 324 if type_order == std::cmp::Ordering::Equal {
b31c8019 325 a.backup_id.cmp(&b.backup_id)
de8ec041
DM
326 } else {
327 type_order
328 }
329 });
330
331 let mut errors = false;
332
4b4eba0b
DM
333 let mut new_groups = std::collections::HashSet::new();
334
de8ec041 335 for item in list {
b31c8019 336 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
4b4eba0b 337 if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await {
b31c8019 338 worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err));
de8ec041 339 errors = true;
4b4eba0b 340 // do not stop here, instead continue
de8ec041 341 }
4b4eba0b
DM
342 new_groups.insert(group);
343 }
344
345 if delete {
9ea4bce4 346 let result: Result<(), Error> = proxmox::try_block!({
4b4eba0b
DM
347 let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
348 for local_group in local_groups {
349 if new_groups.contains(&local_group) { continue; }
350 worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
351 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
c425bdc9 352 worker.log(err.to_string());
4b4eba0b
DM
353 errors = true;
354 }
355 }
356 Ok(())
357 });
358 if let Err(err) = result {
359 worker.log(format!("error during cleanup: {}", err));
360 errors = true;
361 };
de8ec041
DM
362 }
363
364 if errors {
365 bail!("sync failed with some errors.");
366 }
367
368 Ok(())
369}
370
371#[api(
372 input: {
373 properties: {
374 store: {
375 schema: DATASTORE_SCHEMA,
376 },
94609e23
DM
377 remote: {
378 schema: REMOTE_ID_SCHEMA,
de8ec041
DM
379 },
380 "remote-store": {
381 schema: DATASTORE_SCHEMA,
382 },
4b4eba0b
DM
383 delete: {
384 description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
385 type: Boolean,
386 optional: true,
387 default: true,
388 },
de8ec041
DM
389 },
390 },
404d78c4
DM
391 access: {
392 permission: &Permission::And(&[
393 &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_ALLOCATE_SPACE, false),
394 &Permission::Privilege(&["remote", "{store}"], PRIV_DATASTORE_ALLOCATE_SPACE, false),
395 ]),
396 },
de8ec041 397)]
eb506c83
DM
398/// Sync store from other repository
399async fn pull (
de8ec041 400 store: String,
94609e23 401 remote: String,
de8ec041 402 remote_store: String,
4b4eba0b 403 delete: Option<bool>,
de8ec041
DM
404 _info: &ApiMethod,
405 rpcenv: &mut dyn RpcEnvironment,
406) -> Result<String, Error> {
407
408 let username = rpcenv.get_user().unwrap();
409
4b4eba0b
DM
410 let delete = delete.unwrap_or(true);
411
de8ec041
DM
412 let tgt_store = DataStore::lookup_datastore(&store)?;
413
f357390c
DM
414 let (remote_config, _digest) = remote::config()?;
415 let remote: remote::Remote = remote_config.lookup("remote", &remote)?;
94609e23 416
d59dbeca
DM
417 let options = HttpClientOptions::new()
418 .password(Some(remote.password.clone()))
419 .fingerprint(remote.fingerprint.clone());
420
421 let client = HttpClient::new(&remote.host, &remote.userid, options)?;
de8ec041
DM
422 let _auth_info = client.login() // make sure we can auth
423 .await
94609e23 424 .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
de8ec041 425
94609e23 426 let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store);
de8ec041
DM
427
428 // fixme: set to_stdout to false?
429 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
430
431 worker.log(format!("sync datastore '{}' start", store));
432
433 // explicit create shared lock to prevent GC on newly created chunks
434 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
435
4b4eba0b 436 pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete).await?;
de8ec041
DM
437
438 worker.log(format!("sync datastore '{}' end", store));
439
440 Ok(())
441 })?;
442
443 Ok(upid_str)
444}
445
446pub const ROUTER: Router = Router::new()
eb506c83 447 .post(&API_METHOD_PULL);