]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/pull.rs
client/remote: add support to specify port number
[proxmox-backup.git] / src / client / pull.rs
1 //! Sync datastore from remote server
2
3 use anyhow::{bail, format_err, Error};
4 use serde_json::json;
5 use std::convert::TryFrom;
6 use std::sync::{Arc, Mutex};
7 use std::collections::{HashSet, HashMap};
8 use std::io::{Seek, SeekFrom};
9 use std::time::SystemTime;
10 use std::sync::atomic::{AtomicUsize, Ordering};
11
12 use proxmox::api::error::{StatusCode, HttpError};
13 use crate::{
14 tools::{ParallelHandler, compute_file_csum},
15 server::WorkerTask,
16 backup::*,
17 api2::types::*,
18 client::*,
19 };
20
21
22 // fixme: implement filters
23 // fixme: delete vanished groups
24 // Todo: correctly lock backup groups
25
26 async fn pull_index_chunks<I: IndexFile>(
27 worker: &WorkerTask,
28 chunk_reader: RemoteChunkReader,
29 target: Arc<DataStore>,
30 index: I,
31 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
32 ) -> Result<(), Error> {
33
34 use futures::stream::{self, StreamExt, TryStreamExt};
35
36 let start_time = SystemTime::now();
37
38 let stream = stream::iter(
39 (0..index.index_count())
40 .map(|pos| index.chunk_info(pos).unwrap())
41 .filter(|info| {
42 let mut guard = downloaded_chunks.lock().unwrap();
43 let done = guard.contains(&info.digest);
44 if !done {
45 // Note: We mark a chunk as downloaded before its actually downloaded
46 // to avoid duplicate downloads.
47 guard.insert(info.digest);
48 }
49 !done
50 })
51 );
52
53 let verify_pool = ParallelHandler::new(
54 "sync chunk writer", 4,
55 |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
56 // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
57 chunk.verify_unencrypted(size as usize, &digest)?;
58 target.insert_chunk(&chunk, &digest)?;
59 Ok(())
60 }
61 );
62
63 let verify_and_write_channel = verify_pool.channel();
64
65 let bytes = Arc::new(AtomicUsize::new(0));
66
67 stream
68 .map(|info| {
69
70 let target = Arc::clone(&target);
71 let chunk_reader = chunk_reader.clone();
72 let bytes = Arc::clone(&bytes);
73 let verify_and_write_channel = verify_and_write_channel.clone();
74
75 Ok::<_, Error>(async move {
76 let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
77 if chunk_exists {
78 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
79 return Ok::<_, Error>(());
80 }
81 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
82 let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
83 let raw_size = chunk.raw_size() as usize;
84
85 // decode, verify and write in a separate threads to maximize throughput
86 crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?;
87
88 bytes.fetch_add(raw_size, Ordering::SeqCst);
89
90 Ok(())
91 })
92 })
93 .try_buffer_unordered(20)
94 .try_for_each(|_res| futures::future::ok(()))
95 .await?;
96
97 drop(verify_and_write_channel);
98
99 verify_pool.complete()?;
100
101 let elapsed = start_time.elapsed()?.as_secs_f64();
102
103 let bytes = bytes.load(Ordering::SeqCst);
104
105 worker.log(format!("downloaded {} bytes ({} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed)));
106
107 Ok(())
108 }
109
110 async fn download_manifest(
111 reader: &BackupReader,
112 filename: &std::path::Path,
113 ) -> Result<std::fs::File, Error> {
114
115 let mut tmp_manifest_file = std::fs::OpenOptions::new()
116 .write(true)
117 .create(true)
118 .truncate(true)
119 .read(true)
120 .open(&filename)?;
121
122 reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?;
123
124 tmp_manifest_file.seek(SeekFrom::Start(0))?;
125
126 Ok(tmp_manifest_file)
127 }
128
129 fn verify_archive(
130 info: &FileInfo,
131 csum: &[u8; 32],
132 size: u64,
133 ) -> Result<(), Error> {
134 if size != info.size {
135 bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size);
136 }
137
138 if csum != &info.csum {
139 bail!("wrong checksum for file '{}'", info.filename);
140 }
141
142 Ok(())
143 }
144
145 async fn pull_single_archive(
146 worker: &WorkerTask,
147 reader: &BackupReader,
148 chunk_reader: &mut RemoteChunkReader,
149 tgt_store: Arc<DataStore>,
150 snapshot: &BackupDir,
151 archive_info: &FileInfo,
152 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
153 ) -> Result<(), Error> {
154
155 let archive_name = &archive_info.filename;
156 let mut path = tgt_store.base_path();
157 path.push(snapshot.relative_path());
158 path.push(archive_name);
159
160 let mut tmp_path = path.clone();
161 tmp_path.set_extension("tmp");
162
163 worker.log(format!("sync archive {}", archive_name));
164 let mut tmpfile = std::fs::OpenOptions::new()
165 .write(true)
166 .create(true)
167 .read(true)
168 .open(&tmp_path)?;
169
170 reader.download(archive_name, &mut tmpfile).await?;
171
172 match archive_type(archive_name)? {
173 ArchiveType::DynamicIndex => {
174 let index = DynamicIndexReader::new(tmpfile)
175 .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
176 let (csum, size) = index.compute_csum();
177 verify_archive(archive_info, &csum, size)?;
178
179 pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
180 }
181 ArchiveType::FixedIndex => {
182 let index = FixedIndexReader::new(tmpfile)
183 .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
184 let (csum, size) = index.compute_csum();
185 verify_archive(archive_info, &csum, size)?;
186
187 pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
188 }
189 ArchiveType::Blob => {
190 let (csum, size) = compute_file_csum(&mut tmpfile)?;
191 verify_archive(archive_info, &csum, size)?;
192 }
193 }
194 if let Err(err) = std::fs::rename(&tmp_path, &path) {
195 bail!("Atomic rename file {:?} failed - {}", path, err);
196 }
197 Ok(())
198 }
199
200 // Note: The client.log.blob is uploaded after the backup, so it is
201 // not mentioned in the manifest.
202 async fn try_client_log_download(
203 worker: &WorkerTask,
204 reader: Arc<BackupReader>,
205 path: &std::path::Path,
206 ) -> Result<(), Error> {
207
208 let mut tmp_path = path.to_owned();
209 tmp_path.set_extension("tmp");
210
211 let tmpfile = std::fs::OpenOptions::new()
212 .write(true)
213 .create(true)
214 .read(true)
215 .open(&tmp_path)?;
216
217 // Note: be silent if there is no log - only log successful download
218 if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
219 if let Err(err) = std::fs::rename(&tmp_path, &path) {
220 bail!("Atomic rename file {:?} failed - {}", path, err);
221 }
222 worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
223 }
224
225 Ok(())
226 }
227
228 async fn pull_snapshot(
229 worker: &WorkerTask,
230 reader: Arc<BackupReader>,
231 tgt_store: Arc<DataStore>,
232 snapshot: &BackupDir,
233 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
234 ) -> Result<(), Error> {
235
236 let mut manifest_name = tgt_store.base_path();
237 manifest_name.push(snapshot.relative_path());
238 manifest_name.push(MANIFEST_BLOB_NAME);
239
240 let mut client_log_name = tgt_store.base_path();
241 client_log_name.push(snapshot.relative_path());
242 client_log_name.push(CLIENT_LOG_BLOB_NAME);
243
244 let mut tmp_manifest_name = manifest_name.clone();
245 tmp_manifest_name.set_extension("tmp");
246
247 let download_res = download_manifest(&reader, &tmp_manifest_name).await;
248 let mut tmp_manifest_file = match download_res {
249 Ok(manifest_file) => manifest_file,
250 Err(err) => {
251 match err.downcast_ref::<HttpError>() {
252 Some(HttpError { code, message }) => {
253 match code {
254 &StatusCode::NOT_FOUND => {
255 worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot));
256 return Ok(());
257 },
258 _ => {
259 bail!("HTTP error {} - {}", code, message);
260 },
261 }
262 },
263 None => {
264 return Err(err);
265 },
266 };
267 },
268 };
269 let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
270
271 if manifest_name.exists() {
272 let manifest_blob = proxmox::try_block!({
273 let mut manifest_file = std::fs::File::open(&manifest_name)
274 .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
275
276 let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
277 Ok(manifest_blob)
278 }).map_err(|err: Error| {
279 format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
280 })?;
281
282 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
283 if !client_log_name.exists() {
284 try_client_log_download(worker, reader, &client_log_name).await?;
285 }
286 worker.log("no data changes");
287 let _ = std::fs::remove_file(&tmp_manifest_name);
288 return Ok(()); // nothing changed
289 }
290 }
291
292 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
293
294 for item in manifest.files() {
295 let mut path = tgt_store.base_path();
296 path.push(snapshot.relative_path());
297 path.push(&item.filename);
298
299 if path.exists() {
300 match archive_type(&item.filename)? {
301 ArchiveType::DynamicIndex => {
302 let index = DynamicIndexReader::open(&path)?;
303 let (csum, size) = index.compute_csum();
304 match manifest.verify_file(&item.filename, &csum, size) {
305 Ok(_) => continue,
306 Err(err) => {
307 worker.log(format!("detected changed file {:?} - {}", path, err));
308 }
309 }
310 }
311 ArchiveType::FixedIndex => {
312 let index = FixedIndexReader::open(&path)?;
313 let (csum, size) = index.compute_csum();
314 match manifest.verify_file(&item.filename, &csum, size) {
315 Ok(_) => continue,
316 Err(err) => {
317 worker.log(format!("detected changed file {:?} - {}", path, err));
318 }
319 }
320 }
321 ArchiveType::Blob => {
322 let mut tmpfile = std::fs::File::open(&path)?;
323 let (csum, size) = compute_file_csum(&mut tmpfile)?;
324 match manifest.verify_file(&item.filename, &csum, size) {
325 Ok(_) => continue,
326 Err(err) => {
327 worker.log(format!("detected changed file {:?} - {}", path, err));
328 }
329 }
330 }
331 }
332 }
333
334 let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new());
335
336 pull_single_archive(
337 worker,
338 &reader,
339 &mut chunk_reader,
340 tgt_store.clone(),
341 snapshot,
342 &item,
343 downloaded_chunks.clone(),
344 ).await?;
345 }
346
347 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
348 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
349 }
350
351 if !client_log_name.exists() {
352 try_client_log_download(worker, reader, &client_log_name).await?;
353 }
354
355 // cleanup - remove stale files
356 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
357
358 Ok(())
359 }
360
361 pub async fn pull_snapshot_from(
362 worker: &WorkerTask,
363 reader: Arc<BackupReader>,
364 tgt_store: Arc<DataStore>,
365 snapshot: &BackupDir,
366 downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
367 ) -> Result<(), Error> {
368
369 let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
370
371 if is_new {
372 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
373
374 if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
375 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
376 worker.log(format!("cleanup error - {}", cleanup_err));
377 }
378 return Err(err);
379 }
380 worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
381 } else {
382 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
383 pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
384 worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
385 }
386
387 Ok(())
388 }
389
390 pub async fn pull_group(
391 worker: &WorkerTask,
392 client: &HttpClient,
393 src_repo: &BackupRepository,
394 tgt_store: Arc<DataStore>,
395 group: &BackupGroup,
396 delete: bool,
397 ) -> Result<(), Error> {
398
399 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
400
401 let args = json!({
402 "backup-type": group.backup_type(),
403 "backup-id": group.backup_id(),
404 });
405
406 let mut result = client.get(&path, Some(args)).await?;
407 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
408
409 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
410
411 let auth_info = client.login().await?;
412 let fingerprint = client.fingerprint();
413
414 let last_sync = tgt_store.last_successful_backup(group)?;
415
416 let mut remote_snapshots = std::collections::HashSet::new();
417
418 // start with 16384 chunks (up to 65GB)
419 let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
420
421 for item in list {
422 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
423
424 // in-progress backups can't be synced
425 if let None = item.size {
426 worker.log(format!("skipping snapshot {} - in-progress backup", snapshot));
427 continue;
428 }
429
430 let backup_time = snapshot.backup_time();
431
432 remote_snapshots.insert(backup_time);
433
434 if let Some(last_sync_time) = last_sync {
435 if last_sync_time > backup_time { continue; }
436 }
437
438 let options = HttpClientOptions::new()
439 .password(Some(auth_info.ticket.clone()))
440 .fingerprint(fingerprint.clone());
441
442 let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.user(), options)?;
443
444 let reader = BackupReader::start(
445 new_client,
446 None,
447 src_repo.store(),
448 snapshot.group().backup_type(),
449 snapshot.group().backup_id(),
450 backup_time,
451 true,
452 ).await?;
453
454 pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await?;
455 }
456
457 if delete {
458 let local_list = group.list_backups(&tgt_store.base_path())?;
459 for info in local_list {
460 let backup_time = info.backup_dir.backup_time();
461 if remote_snapshots.contains(&backup_time) { continue; }
462 worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
463 tgt_store.remove_backup_dir(&info.backup_dir, false)?;
464 }
465 }
466
467 Ok(())
468 }
469
470 pub async fn pull_store(
471 worker: &WorkerTask,
472 client: &HttpClient,
473 src_repo: &BackupRepository,
474 tgt_store: Arc<DataStore>,
475 delete: bool,
476 userid: Userid,
477 ) -> Result<(), Error> {
478
479 // explicit create shared lock to prevent GC on newly created chunks
480 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
481
482 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
483
484 let mut result = client.get(&path, None).await?;
485
486 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
487
488 list.sort_unstable_by(|a, b| {
489 let type_order = a.backup_type.cmp(&b.backup_type);
490 if type_order == std::cmp::Ordering::Equal {
491 a.backup_id.cmp(&b.backup_id)
492 } else {
493 type_order
494 }
495 });
496
497 let mut errors = false;
498
499 let mut new_groups = std::collections::HashSet::new();
500 for item in list.iter() {
501 new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
502 }
503
504 for item in list {
505 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
506
507 let (owner, _lock_guard) = tgt_store.create_locked_backup_group(&group, &userid)?;
508 // permission check
509 if userid != owner { // only the owner is allowed to create additional snapshots
510 worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})",
511 item.backup_type, item.backup_id, userid, owner));
512 errors = true;
513 continue; // do not stop here, instead continue
514 }
515
516 if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await {
517 worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err));
518 errors = true;
519 continue; // do not stop here, instead continue
520 }
521 }
522
523 if delete {
524 let result: Result<(), Error> = proxmox::try_block!({
525 let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
526 for local_group in local_groups {
527 if new_groups.contains(&local_group) { continue; }
528 worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
529 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
530 worker.log(err.to_string());
531 errors = true;
532 }
533 }
534 Ok(())
535 });
536 if let Err(err) = result {
537 worker.log(format!("error during cleanup: {}", err));
538 errors = true;
539 };
540 }
541
542 if errors {
543 bail!("sync failed with some errors.");
544 }
545
546 Ok(())
547 }