]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/pull.rs
src/api2/pull.rs: delete fanished snapshots
[proxmox-backup.git] / src / api2 / pull.rs
1 //! Sync datastore from remote server
2
3 use failure::*;
4 use serde_json::json;
5 use std::convert::TryFrom;
6 use std::sync::Arc;
7 use std::collections::HashMap;
8 use std::io::{Seek, SeekFrom};
9 use chrono::{Utc, TimeZone};
10
11 use proxmox::api::api;
12 use proxmox::api::{ApiMethod, Router, RpcEnvironment};
13
14 use crate::server::{WorkerTask};
15 use crate::backup::*;
16 use crate::client::*;
17 use crate::config::remote;
18 use crate::api2::types::*;
19
20 // fixme: implement filters
21 // fixme: delete vanished groups
22 // Todo: correctly lock backup groups
23
24 async fn pull_index_chunks<I: IndexFile>(
25 _worker: &WorkerTask,
26 chunk_reader: &mut RemoteChunkReader,
27 target: Arc<DataStore>,
28 index: I,
29 ) -> Result<(), Error> {
30
31
32 for pos in 0..index.index_count() {
33 let digest = index.index_digest(pos).unwrap();
34 let chunk_exists = target.cond_touch_chunk(digest, false)?;
35 if chunk_exists {
36 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
37 continue;
38 }
39 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
40 let chunk = chunk_reader.read_raw_chunk(&digest)?;
41
42 target.insert_chunk(&chunk, &digest)?;
43 }
44
45 Ok(())
46 }
47
48 async fn download_manifest(
49 reader: &BackupReader,
50 filename: &std::path::Path,
51 ) -> Result<std::fs::File, Error> {
52
53 let tmp_manifest_file = std::fs::OpenOptions::new()
54 .write(true)
55 .create(true)
56 .read(true)
57 .open(&filename)?;
58
59 let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
60
61 tmp_manifest_file.seek(SeekFrom::Start(0))?;
62
63 Ok(tmp_manifest_file)
64 }
65
66 async fn pull_single_archive(
67 worker: &WorkerTask,
68 reader: &BackupReader,
69 chunk_reader: &mut RemoteChunkReader,
70 tgt_store: Arc<DataStore>,
71 snapshot: &BackupDir,
72 archive_name: &str,
73 ) -> Result<(), Error> {
74
75 let mut path = tgt_store.base_path();
76 path.push(snapshot.relative_path());
77 path.push(archive_name);
78
79 let mut tmp_path = path.clone();
80 tmp_path.set_extension("tmp");
81
82 worker.log(format!("sync archive {}", archive_name));
83 let tmpfile = std::fs::OpenOptions::new()
84 .write(true)
85 .create(true)
86 .read(true)
87 .open(&tmp_path)?;
88
89 let tmpfile = reader.download(archive_name, tmpfile).await?;
90
91 match archive_type(archive_name)? {
92 ArchiveType::DynamicIndex => {
93 let index = DynamicIndexReader::new(tmpfile)
94 .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
95
96 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
97 }
98 ArchiveType::FixedIndex => {
99 let index = FixedIndexReader::new(tmpfile)
100 .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
101
102 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
103 }
104 ArchiveType::Blob => { /* nothing to do */ }
105 }
106 if let Err(err) = std::fs::rename(&tmp_path, &path) {
107 bail!("Atomic rename file {:?} failed - {}", path, err);
108 }
109 Ok(())
110 }
111
112 async fn pull_snapshot(
113 worker: &WorkerTask,
114 reader: Arc<BackupReader>,
115 tgt_store: Arc<DataStore>,
116 snapshot: &BackupDir,
117 ) -> Result<(), Error> {
118
119 let mut manifest_name = tgt_store.base_path();
120 manifest_name.push(snapshot.relative_path());
121 manifest_name.push(MANIFEST_BLOB_NAME);
122
123 let mut tmp_manifest_name = manifest_name.clone();
124 tmp_manifest_name.set_extension("tmp");
125
126 let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
127 let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
128 tmp_manifest_blob.verify_crc()?;
129
130 if manifest_name.exists() {
131 let manifest_blob = proxmox::try_block!({
132 let mut manifest_file = std::fs::File::open(&manifest_name)
133 .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
134
135 let manifest_blob = DataBlob::load(&mut manifest_file)?;
136 manifest_blob.verify_crc()?;
137 Ok(manifest_blob)
138 }).map_err(|err: Error| {
139 format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
140 })?;
141
142 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
143 return Ok(()); // nothing changed
144 }
145 }
146
147 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
148
149 let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
150
151 for item in manifest.files() {
152 let mut path = tgt_store.base_path();
153 path.push(snapshot.relative_path());
154 path.push(&item.filename);
155
156 if path.exists() {
157 match archive_type(&item.filename)? {
158 ArchiveType::DynamicIndex => {
159 let index = DynamicIndexReader::open(&path)?;
160 let (csum, size) = index.compute_csum();
161 match manifest.verify_file(&item.filename, &csum, size) {
162 Ok(_) => continue,
163 Err(err) => {
164 worker.log(format!("detected changed file {:?} - {}", path, err));
165 }
166 }
167 }
168 ArchiveType::FixedIndex => {
169 let index = FixedIndexReader::open(&path)?;
170 let (csum, size) = index.compute_csum();
171 match manifest.verify_file(&item.filename, &csum, size) {
172 Ok(_) => continue,
173 Err(err) => {
174 worker.log(format!("detected changed file {:?} - {}", path, err));
175 }
176 }
177 }
178 ArchiveType::Blob => {
179 let mut tmpfile = std::fs::File::open(&path)?;
180 let (csum, size) = compute_file_csum(&mut tmpfile)?;
181 match manifest.verify_file(&item.filename, &csum, size) {
182 Ok(_) => continue,
183 Err(err) => {
184 worker.log(format!("detected changed file {:?} - {}", path, err));
185 }
186 }
187 }
188 }
189 }
190
191 pull_single_archive(
192 worker,
193 &reader,
194 &mut chunk_reader,
195 tgt_store.clone(),
196 snapshot,
197 &item.filename,
198 ).await?;
199 }
200
201 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
202 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
203 }
204
205 // cleanup - remove stale files
206 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
207
208 Ok(())
209 }
210
211 pub async fn pull_snapshot_from(
212 worker: &WorkerTask,
213 reader: Arc<BackupReader>,
214 tgt_store: Arc<DataStore>,
215 snapshot: &BackupDir,
216 ) -> Result<(), Error> {
217
218 let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
219
220 if is_new {
221 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
222
223 if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
224 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
225 worker.log(format!("cleanup error - {}", cleanup_err));
226 }
227 return Err(err);
228 }
229 } else {
230 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
231 pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
232 }
233
234 Ok(())
235 }
236
237 pub async fn pull_group(
238 worker: &WorkerTask,
239 client: &HttpClient,
240 src_repo: &BackupRepository,
241 tgt_store: Arc<DataStore>,
242 group: &BackupGroup,
243 delete: bool,
244 ) -> Result<(), Error> {
245
246 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
247
248 let args = json!({
249 "backup-type": group.backup_type(),
250 "backup-id": group.backup_id(),
251 });
252
253 let mut result = client.get(&path, Some(args)).await?;
254 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
255
256 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
257
258 let auth_info = client.login().await?;
259
260 let last_sync = tgt_store.last_successful_backup(group)?;
261
262 let mut remote_snapshots = std::collections::HashSet::new();
263
264 for item in list {
265 let backup_time = Utc.timestamp(item.backup_time, 0);
266 remote_snapshots.insert(backup_time);
267
268 if let Some(last_sync_time) = last_sync {
269 if last_sync_time > backup_time { continue; }
270 }
271
272 let new_client = HttpClient::new(
273 src_repo.host(),
274 src_repo.user(),
275 Some(auth_info.ticket.clone())
276 )?;
277
278 let reader = BackupReader::start(
279 new_client,
280 None,
281 src_repo.store(),
282 &item.backup_type,
283 &item.backup_id,
284 backup_time,
285 true,
286 ).await?;
287
288 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
289
290 pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
291 }
292
293 if delete {
294 let local_list = group.list_backups(&tgt_store.base_path())?;
295 for info in local_list {
296 let backup_time = info.backup_dir.backup_time();
297 if remote_snapshots.contains(&backup_time) { continue; }
298 worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
299 tgt_store.remove_backup_dir(&info.backup_dir)?;
300 }
301 }
302
303 Ok(())
304 }
305
306 pub async fn pull_store(
307 worker: &WorkerTask,
308 client: &HttpClient,
309 src_repo: &BackupRepository,
310 tgt_store: Arc<DataStore>,
311 delete: bool,
312 ) -> Result<(), Error> {
313
314 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
315
316 let mut result = client.get(&path, None).await?;
317
318 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
319
320 list.sort_unstable_by(|a, b| {
321 let type_order = a.backup_type.cmp(&b.backup_type);
322 if type_order == std::cmp::Ordering::Equal {
323 a.backup_id.cmp(&b.backup_id)
324 } else {
325 type_order
326 }
327 });
328
329 let mut errors = false;
330
331 let mut new_groups = std::collections::HashSet::new();
332
333 for item in list {
334 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
335 if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await {
336 worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err));
337 errors = true;
338 // do not stop here, instead continue
339 }
340 new_groups.insert(group);
341 }
342
343 if delete {
344 let result: Result<(), Error> = proxmox::try_block!({
345 let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
346 for local_group in local_groups {
347 if new_groups.contains(&local_group) { continue; }
348 worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
349 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
350 worker.log(err.to_string());
351 errors = true;
352 }
353 }
354 Ok(())
355 });
356 if let Err(err) = result {
357 worker.log(format!("error during cleanup: {}", err));
358 errors = true;
359 };
360 }
361
362 if errors {
363 bail!("sync failed with some errors.");
364 }
365
366 Ok(())
367 }
368
369 #[api(
370 input: {
371 properties: {
372 store: {
373 schema: DATASTORE_SCHEMA,
374 },
375 remote: {
376 schema: REMOTE_ID_SCHEMA,
377 },
378 "remote-store": {
379 schema: DATASTORE_SCHEMA,
380 },
381 delete: {
382 description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
383 type: Boolean,
384 optional: true,
385 default: true,
386 },
387 },
388 },
389 )]
390 /// Sync store from other repository
391 async fn pull (
392 store: String,
393 remote: String,
394 remote_store: String,
395 delete: Option<bool>,
396 _info: &ApiMethod,
397 rpcenv: &mut dyn RpcEnvironment,
398 ) -> Result<String, Error> {
399
400 let username = rpcenv.get_user().unwrap();
401
402 let delete = delete.unwrap_or(true);
403
404 let tgt_store = DataStore::lookup_datastore(&store)?;
405
406 let (remote_config, _digest) = remote::config()?;
407 let remote: remote::Remote = remote_config.lookup("remote", &remote)?;
408
409 let client = HttpClient::new(&remote.host, &remote.userid, Some(remote.password.clone()))?;
410 let _auth_info = client.login() // make sure we can auth
411 .await
412 .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
413
414 let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store);
415
416 // fixme: set to_stdout to false?
417 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
418
419 worker.log(format!("sync datastore '{}' start", store));
420
421 // explicit create shared lock to prevent GC on newly created chunks
422 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
423
424 pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete).await?;
425
426 worker.log(format!("sync datastore '{}' end", store));
427
428 Ok(())
429 })?;
430
431 Ok(upid_str)
432 }
433
434 pub const ROUTER: Router = Router::new()
435 .post(&API_METHOD_PULL);