]>
Commit | Line | Data |
---|---|---|
eb506c83 DM |
1 | //! Sync datastore from remote server |
2 | ||
f7d4e4b5 | 3 | use anyhow::{bail, format_err, Error}; |
de8ec041 DM |
4 | use serde_json::json; |
5 | use std::convert::TryFrom; | |
6 | use std::sync::Arc; | |
7 | use std::collections::HashMap; | |
8 | use std::io::{Seek, SeekFrom}; | |
9 | use chrono::{Utc, TimeZone}; | |
10 | ||
11 | use proxmox::api::api; | |
404d78c4 | 12 | use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission}; |
de8ec041 DM |
13 | |
14 | use crate::server::{WorkerTask}; | |
15 | use crate::backup::*; | |
16 | use crate::client::*; | |
f357390c | 17 | use crate::config::remote; |
de8ec041 | 18 | use crate::api2::types::*; |
404d78c4 | 19 | use crate::config::acl::PRIV_DATASTORE_ALLOCATE_SPACE; |
de8ec041 DM |
20 | |
21 | // fixme: implement filters | |
eb506c83 | 22 | // fixme: delete vanished groups |
de8ec041 DM |
23 | // Todo: correctly lock backup groups |
24 | ||
eb506c83 | 25 | async fn pull_index_chunks<I: IndexFile>( |
de8ec041 DM |
26 | _worker: &WorkerTask, |
27 | chunk_reader: &mut RemoteChunkReader, | |
28 | target: Arc<DataStore>, | |
29 | index: I, | |
30 | ) -> Result<(), Error> { | |
31 | ||
32 | ||
33 | for pos in 0..index.index_count() { | |
34 | let digest = index.index_digest(pos).unwrap(); | |
35 | let chunk_exists = target.cond_touch_chunk(digest, false)?; | |
36 | if chunk_exists { | |
37 | //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); | |
38 | continue; | |
39 | } | |
40 | //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); | |
41 | let chunk = chunk_reader.read_raw_chunk(&digest)?; | |
42 | ||
43 | target.insert_chunk(&chunk, &digest)?; | |
44 | } | |
45 | ||
46 | Ok(()) | |
47 | } | |
48 | ||
49 | async fn download_manifest( | |
50 | reader: &BackupReader, | |
51 | filename: &std::path::Path, | |
52 | ) -> Result<std::fs::File, Error> { | |
53 | ||
54 | let tmp_manifest_file = std::fs::OpenOptions::new() | |
55 | .write(true) | |
56 | .create(true) | |
57 | .read(true) | |
58 | .open(&filename)?; | |
59 | ||
60 | let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?; | |
61 | ||
62 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
63 | ||
64 | Ok(tmp_manifest_file) | |
65 | } | |
66 | ||
eb506c83 | 67 | async fn pull_single_archive( |
de8ec041 DM |
68 | worker: &WorkerTask, |
69 | reader: &BackupReader, | |
70 | chunk_reader: &mut RemoteChunkReader, | |
71 | tgt_store: Arc<DataStore>, | |
72 | snapshot: &BackupDir, | |
73 | archive_name: &str, | |
74 | ) -> Result<(), Error> { | |
75 | ||
76 | let mut path = tgt_store.base_path(); | |
77 | path.push(snapshot.relative_path()); | |
78 | path.push(archive_name); | |
79 | ||
80 | let mut tmp_path = path.clone(); | |
81 | tmp_path.set_extension("tmp"); | |
82 | ||
83 | worker.log(format!("sync archive {}", archive_name)); | |
84 | let tmpfile = std::fs::OpenOptions::new() | |
85 | .write(true) | |
86 | .create(true) | |
87 | .read(true) | |
88 | .open(&tmp_path)?; | |
89 | ||
90 | let tmpfile = reader.download(archive_name, tmpfile).await?; | |
91 | ||
92 | match archive_type(archive_name)? { | |
93 | ArchiveType::DynamicIndex => { | |
94 | let index = DynamicIndexReader::new(tmpfile) | |
95 | .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?; | |
96 | ||
eb506c83 | 97 | pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; |
de8ec041 DM |
98 | } |
99 | ArchiveType::FixedIndex => { | |
100 | let index = FixedIndexReader::new(tmpfile) | |
101 | .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?; | |
102 | ||
eb506c83 | 103 | pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; |
de8ec041 DM |
104 | } |
105 | ArchiveType::Blob => { /* nothing to do */ } | |
106 | } | |
107 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
108 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
109 | } | |
110 | Ok(()) | |
111 | } | |
112 | ||
eb506c83 | 113 | async fn pull_snapshot( |
de8ec041 DM |
114 | worker: &WorkerTask, |
115 | reader: Arc<BackupReader>, | |
116 | tgt_store: Arc<DataStore>, | |
117 | snapshot: &BackupDir, | |
118 | ) -> Result<(), Error> { | |
119 | ||
120 | let mut manifest_name = tgt_store.base_path(); | |
121 | manifest_name.push(snapshot.relative_path()); | |
122 | manifest_name.push(MANIFEST_BLOB_NAME); | |
123 | ||
124 | let mut tmp_manifest_name = manifest_name.clone(); | |
125 | tmp_manifest_name.set_extension("tmp"); | |
126 | ||
127 | let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?; | |
128 | let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?; | |
129 | tmp_manifest_blob.verify_crc()?; | |
130 | ||
131 | if manifest_name.exists() { | |
9ea4bce4 | 132 | let manifest_blob = proxmox::try_block!({ |
de8ec041 DM |
133 | let mut manifest_file = std::fs::File::open(&manifest_name) |
134 | .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?; | |
135 | ||
136 | let manifest_blob = DataBlob::load(&mut manifest_file)?; | |
137 | manifest_blob.verify_crc()?; | |
138 | Ok(manifest_blob) | |
139 | }).map_err(|err: Error| { | |
140 | format_err!("unable to read local manifest {:?} - {}", manifest_name, err) | |
141 | })?; | |
142 | ||
143 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
144 | return Ok(()); // nothing changed | |
145 | } | |
146 | } | |
147 | ||
148 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
149 | ||
150 | let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new()); | |
151 | ||
152 | for item in manifest.files() { | |
153 | let mut path = tgt_store.base_path(); | |
154 | path.push(snapshot.relative_path()); | |
155 | path.push(&item.filename); | |
156 | ||
157 | if path.exists() { | |
158 | match archive_type(&item.filename)? { | |
159 | ArchiveType::DynamicIndex => { | |
160 | let index = DynamicIndexReader::open(&path)?; | |
161 | let (csum, size) = index.compute_csum(); | |
162 | match manifest.verify_file(&item.filename, &csum, size) { | |
163 | Ok(_) => continue, | |
164 | Err(err) => { | |
165 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
166 | } | |
167 | } | |
168 | } | |
169 | ArchiveType::FixedIndex => { | |
170 | let index = FixedIndexReader::open(&path)?; | |
171 | let (csum, size) = index.compute_csum(); | |
172 | match manifest.verify_file(&item.filename, &csum, size) { | |
173 | Ok(_) => continue, | |
174 | Err(err) => { | |
175 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
176 | } | |
177 | } | |
178 | } | |
179 | ArchiveType::Blob => { | |
180 | let mut tmpfile = std::fs::File::open(&path)?; | |
181 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
182 | match manifest.verify_file(&item.filename, &csum, size) { | |
183 | Ok(_) => continue, | |
184 | Err(err) => { | |
185 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
186 | } | |
187 | } | |
188 | } | |
189 | } | |
190 | } | |
191 | ||
eb506c83 | 192 | pull_single_archive( |
de8ec041 DM |
193 | worker, |
194 | &reader, | |
195 | &mut chunk_reader, | |
196 | tgt_store.clone(), | |
197 | snapshot, | |
198 | &item.filename, | |
199 | ).await?; | |
200 | } | |
201 | ||
202 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
203 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
204 | } | |
205 | ||
206 | // cleanup - remove stale files | |
207 | tgt_store.cleanup_backup_dir(snapshot, &manifest)?; | |
208 | ||
209 | Ok(()) | |
210 | } | |
211 | ||
eb506c83 | 212 | pub async fn pull_snapshot_from( |
de8ec041 DM |
213 | worker: &WorkerTask, |
214 | reader: Arc<BackupReader>, | |
215 | tgt_store: Arc<DataStore>, | |
216 | snapshot: &BackupDir, | |
217 | ) -> Result<(), Error> { | |
218 | ||
219 | let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?; | |
220 | ||
221 | if is_new { | |
222 | worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); | |
223 | ||
eb506c83 | 224 | if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await { |
de8ec041 DM |
225 | if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) { |
226 | worker.log(format!("cleanup error - {}", cleanup_err)); | |
227 | } | |
228 | return Err(err); | |
229 | } | |
230 | } else { | |
231 | worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); | |
eb506c83 | 232 | pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await? |
de8ec041 DM |
233 | } |
234 | ||
235 | Ok(()) | |
236 | } | |
237 | ||
eb506c83 | 238 | pub async fn pull_group( |
de8ec041 DM |
239 | worker: &WorkerTask, |
240 | client: &HttpClient, | |
241 | src_repo: &BackupRepository, | |
242 | tgt_store: Arc<DataStore>, | |
243 | group: &BackupGroup, | |
4b4eba0b | 244 | delete: bool, |
de8ec041 DM |
245 | ) -> Result<(), Error> { |
246 | ||
247 | let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); | |
248 | ||
249 | let args = json!({ | |
250 | "backup-type": group.backup_type(), | |
251 | "backup-id": group.backup_id(), | |
252 | }); | |
253 | ||
254 | let mut result = client.get(&path, Some(args)).await?; | |
255 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
256 | ||
257 | list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); | |
258 | ||
259 | let auth_info = client.login().await?; | |
d59dbeca | 260 | let fingerprint = client.fingerprint(); |
de8ec041 | 261 | |
18cc66ee | 262 | let last_sync = tgt_store.last_successful_backup(group)?; |
de8ec041 | 263 | |
c425bdc9 DM |
264 | let mut remote_snapshots = std::collections::HashSet::new(); |
265 | ||
de8ec041 DM |
266 | for item in list { |
267 | let backup_time = Utc.timestamp(item.backup_time, 0); | |
c425bdc9 DM |
268 | remote_snapshots.insert(backup_time); |
269 | ||
de8ec041 DM |
270 | if let Some(last_sync_time) = last_sync { |
271 | if last_sync_time > backup_time { continue; } | |
272 | } | |
273 | ||
d59dbeca DM |
274 | let options = HttpClientOptions::new() |
275 | .password(Some(auth_info.ticket.clone())) | |
276 | .fingerprint(fingerprint.clone()); | |
277 | ||
278 | let new_client = HttpClient::new(src_repo.host(), src_repo.user(), options)?; | |
de8ec041 DM |
279 | |
280 | let reader = BackupReader::start( | |
281 | new_client, | |
282 | None, | |
283 | src_repo.store(), | |
284 | &item.backup_type, | |
285 | &item.backup_id, | |
286 | backup_time, | |
287 | true, | |
288 | ).await?; | |
289 | ||
290 | let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time); | |
291 | ||
eb506c83 | 292 | pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?; |
de8ec041 DM |
293 | } |
294 | ||
4b4eba0b | 295 | if delete { |
c425bdc9 DM |
296 | let local_list = group.list_backups(&tgt_store.base_path())?; |
297 | for info in local_list { | |
298 | let backup_time = info.backup_dir.backup_time(); | |
299 | if remote_snapshots.contains(&backup_time) { continue; } | |
300 | worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path())); | |
301 | tgt_store.remove_backup_dir(&info.backup_dir)?; | |
302 | } | |
4b4eba0b DM |
303 | } |
304 | ||
de8ec041 DM |
305 | Ok(()) |
306 | } | |
307 | ||
eb506c83 | 308 | pub async fn pull_store( |
de8ec041 DM |
309 | worker: &WorkerTask, |
310 | client: &HttpClient, | |
311 | src_repo: &BackupRepository, | |
312 | tgt_store: Arc<DataStore>, | |
4b4eba0b | 313 | delete: bool, |
de8ec041 DM |
314 | ) -> Result<(), Error> { |
315 | ||
316 | let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); | |
317 | ||
318 | let mut result = client.get(&path, None).await?; | |
319 | ||
b31c8019 | 320 | let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; |
de8ec041 DM |
321 | |
322 | list.sort_unstable_by(|a, b| { | |
b31c8019 | 323 | let type_order = a.backup_type.cmp(&b.backup_type); |
de8ec041 | 324 | if type_order == std::cmp::Ordering::Equal { |
b31c8019 | 325 | a.backup_id.cmp(&b.backup_id) |
de8ec041 DM |
326 | } else { |
327 | type_order | |
328 | } | |
329 | }); | |
330 | ||
331 | let mut errors = false; | |
332 | ||
4b4eba0b DM |
333 | let mut new_groups = std::collections::HashSet::new(); |
334 | ||
de8ec041 | 335 | for item in list { |
b31c8019 | 336 | let group = BackupGroup::new(&item.backup_type, &item.backup_id); |
4b4eba0b | 337 | if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await { |
b31c8019 | 338 | worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err)); |
de8ec041 | 339 | errors = true; |
4b4eba0b | 340 | // do not stop here, instead continue |
de8ec041 | 341 | } |
4b4eba0b DM |
342 | new_groups.insert(group); |
343 | } | |
344 | ||
345 | if delete { | |
9ea4bce4 | 346 | let result: Result<(), Error> = proxmox::try_block!({ |
4b4eba0b DM |
347 | let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?; |
348 | for local_group in local_groups { | |
349 | if new_groups.contains(&local_group) { continue; } | |
350 | worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id())); | |
351 | if let Err(err) = tgt_store.remove_backup_group(&local_group) { | |
c425bdc9 | 352 | worker.log(err.to_string()); |
4b4eba0b DM |
353 | errors = true; |
354 | } | |
355 | } | |
356 | Ok(()) | |
357 | }); | |
358 | if let Err(err) = result { | |
359 | worker.log(format!("error during cleanup: {}", err)); | |
360 | errors = true; | |
361 | }; | |
de8ec041 DM |
362 | } |
363 | ||
364 | if errors { | |
365 | bail!("sync failed with some errors."); | |
366 | } | |
367 | ||
368 | Ok(()) | |
369 | } | |
370 | ||
371 | #[api( | |
372 | input: { | |
373 | properties: { | |
374 | store: { | |
375 | schema: DATASTORE_SCHEMA, | |
376 | }, | |
94609e23 DM |
377 | remote: { |
378 | schema: REMOTE_ID_SCHEMA, | |
de8ec041 DM |
379 | }, |
380 | "remote-store": { | |
381 | schema: DATASTORE_SCHEMA, | |
382 | }, | |
4b4eba0b DM |
383 | delete: { |
384 | description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.", | |
385 | type: Boolean, | |
386 | optional: true, | |
387 | default: true, | |
388 | }, | |
de8ec041 DM |
389 | }, |
390 | }, | |
404d78c4 DM |
391 | access: { |
392 | permission: &Permission::And(&[ | |
393 | &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_ALLOCATE_SPACE, false), | |
394 | &Permission::Privilege(&["remote", "{store}"], PRIV_DATASTORE_ALLOCATE_SPACE, false), | |
395 | ]), | |
396 | }, | |
de8ec041 | 397 | )] |
eb506c83 DM |
398 | /// Sync store from other repository |
399 | async fn pull ( | |
de8ec041 | 400 | store: String, |
94609e23 | 401 | remote: String, |
de8ec041 | 402 | remote_store: String, |
4b4eba0b | 403 | delete: Option<bool>, |
de8ec041 DM |
404 | _info: &ApiMethod, |
405 | rpcenv: &mut dyn RpcEnvironment, | |
406 | ) -> Result<String, Error> { | |
407 | ||
408 | let username = rpcenv.get_user().unwrap(); | |
409 | ||
4b4eba0b DM |
410 | let delete = delete.unwrap_or(true); |
411 | ||
de8ec041 DM |
412 | let tgt_store = DataStore::lookup_datastore(&store)?; |
413 | ||
f357390c DM |
414 | let (remote_config, _digest) = remote::config()?; |
415 | let remote: remote::Remote = remote_config.lookup("remote", &remote)?; | |
94609e23 | 416 | |
d59dbeca DM |
417 | let options = HttpClientOptions::new() |
418 | .password(Some(remote.password.clone())) | |
419 | .fingerprint(remote.fingerprint.clone()); | |
420 | ||
421 | let client = HttpClient::new(&remote.host, &remote.userid, options)?; | |
de8ec041 DM |
422 | let _auth_info = client.login() // make sure we can auth |
423 | .await | |
94609e23 | 424 | .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?; |
de8ec041 | 425 | |
94609e23 | 426 | let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store); |
de8ec041 DM |
427 | |
428 | // fixme: set to_stdout to false? | |
429 | let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move { | |
430 | ||
431 | worker.log(format!("sync datastore '{}' start", store)); | |
432 | ||
433 | // explicit create shared lock to prevent GC on newly created chunks | |
434 | let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; | |
435 | ||
4b4eba0b | 436 | pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete).await?; |
de8ec041 DM |
437 | |
438 | worker.log(format!("sync datastore '{}' end", store)); | |
439 | ||
440 | Ok(()) | |
441 | })?; | |
442 | ||
443 | Ok(upid_str) | |
444 | } | |
445 | ||
446 | pub const ROUTER: Router = Router::new() | |
eb506c83 | 447 | .post(&API_METHOD_PULL); |