]>
Commit | Line | Data |
---|---|---|
eb506c83 DM |
1 | //! Sync datastore from remote server |
2 | ||
f7d4e4b5 | 3 | use anyhow::{bail, format_err, Error}; |
de8ec041 DM |
4 | use serde_json::json; |
5 | use std::convert::TryFrom; | |
6 | use std::sync::Arc; | |
7 | use std::collections::HashMap; | |
8 | use std::io::{Seek, SeekFrom}; | |
9 | use chrono::{Utc, TimeZone}; | |
10 | ||
11 | use proxmox::api::api; | |
404d78c4 | 12 | use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission}; |
de8ec041 DM |
13 | |
14 | use crate::server::{WorkerTask}; | |
15 | use crate::backup::*; | |
16 | use crate::client::*; | |
f357390c | 17 | use crate::config::remote; |
de8ec041 | 18 | use crate::api2::types::*; |
d00e1a21 | 19 | use crate::config::acl::{PRIV_DATASTORE_CREATE_BACKUP, PRIV_DATASTORE_READ}; |
365f0f72 | 20 | use crate::config::cached_user_info::CachedUserInfo; |
de8ec041 DM |
21 | |
22 | // fixme: implement filters | |
eb506c83 | 23 | // fixme: delete vanished groups |
de8ec041 DM |
24 | // Todo: correctly lock backup groups |
25 | ||
eb506c83 | 26 | async fn pull_index_chunks<I: IndexFile>( |
de8ec041 DM |
27 | _worker: &WorkerTask, |
28 | chunk_reader: &mut RemoteChunkReader, | |
29 | target: Arc<DataStore>, | |
30 | index: I, | |
31 | ) -> Result<(), Error> { | |
32 | ||
33 | ||
34 | for pos in 0..index.index_count() { | |
35 | let digest = index.index_digest(pos).unwrap(); | |
36 | let chunk_exists = target.cond_touch_chunk(digest, false)?; | |
37 | if chunk_exists { | |
38 | //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); | |
39 | continue; | |
40 | } | |
41 | //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); | |
42 | let chunk = chunk_reader.read_raw_chunk(&digest)?; | |
43 | ||
44 | target.insert_chunk(&chunk, &digest)?; | |
45 | } | |
46 | ||
47 | Ok(()) | |
48 | } | |
49 | ||
50 | async fn download_manifest( | |
51 | reader: &BackupReader, | |
52 | filename: &std::path::Path, | |
53 | ) -> Result<std::fs::File, Error> { | |
54 | ||
55 | let tmp_manifest_file = std::fs::OpenOptions::new() | |
56 | .write(true) | |
57 | .create(true) | |
58 | .read(true) | |
59 | .open(&filename)?; | |
60 | ||
61 | let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?; | |
62 | ||
63 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
64 | ||
65 | Ok(tmp_manifest_file) | |
66 | } | |
67 | ||
eb506c83 | 68 | async fn pull_single_archive( |
de8ec041 DM |
69 | worker: &WorkerTask, |
70 | reader: &BackupReader, | |
71 | chunk_reader: &mut RemoteChunkReader, | |
72 | tgt_store: Arc<DataStore>, | |
73 | snapshot: &BackupDir, | |
74 | archive_name: &str, | |
75 | ) -> Result<(), Error> { | |
76 | ||
77 | let mut path = tgt_store.base_path(); | |
78 | path.push(snapshot.relative_path()); | |
79 | path.push(archive_name); | |
80 | ||
81 | let mut tmp_path = path.clone(); | |
82 | tmp_path.set_extension("tmp"); | |
83 | ||
84 | worker.log(format!("sync archive {}", archive_name)); | |
85 | let tmpfile = std::fs::OpenOptions::new() | |
86 | .write(true) | |
87 | .create(true) | |
88 | .read(true) | |
89 | .open(&tmp_path)?; | |
90 | ||
91 | let tmpfile = reader.download(archive_name, tmpfile).await?; | |
92 | ||
93 | match archive_type(archive_name)? { | |
94 | ArchiveType::DynamicIndex => { | |
95 | let index = DynamicIndexReader::new(tmpfile) | |
96 | .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?; | |
97 | ||
eb506c83 | 98 | pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; |
de8ec041 DM |
99 | } |
100 | ArchiveType::FixedIndex => { | |
101 | let index = FixedIndexReader::new(tmpfile) | |
102 | .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?; | |
103 | ||
eb506c83 | 104 | pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; |
de8ec041 DM |
105 | } |
106 | ArchiveType::Blob => { /* nothing to do */ } | |
107 | } | |
108 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
109 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
110 | } | |
111 | Ok(()) | |
112 | } | |
113 | ||
eb506c83 | 114 | async fn pull_snapshot( |
de8ec041 DM |
115 | worker: &WorkerTask, |
116 | reader: Arc<BackupReader>, | |
117 | tgt_store: Arc<DataStore>, | |
118 | snapshot: &BackupDir, | |
119 | ) -> Result<(), Error> { | |
120 | ||
121 | let mut manifest_name = tgt_store.base_path(); | |
122 | manifest_name.push(snapshot.relative_path()); | |
123 | manifest_name.push(MANIFEST_BLOB_NAME); | |
124 | ||
125 | let mut tmp_manifest_name = manifest_name.clone(); | |
126 | tmp_manifest_name.set_extension("tmp"); | |
127 | ||
128 | let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?; | |
129 | let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?; | |
130 | tmp_manifest_blob.verify_crc()?; | |
131 | ||
132 | if manifest_name.exists() { | |
9ea4bce4 | 133 | let manifest_blob = proxmox::try_block!({ |
de8ec041 DM |
134 | let mut manifest_file = std::fs::File::open(&manifest_name) |
135 | .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?; | |
136 | ||
137 | let manifest_blob = DataBlob::load(&mut manifest_file)?; | |
138 | manifest_blob.verify_crc()?; | |
139 | Ok(manifest_blob) | |
140 | }).map_err(|err: Error| { | |
141 | format_err!("unable to read local manifest {:?} - {}", manifest_name, err) | |
142 | })?; | |
143 | ||
144 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
145 | return Ok(()); // nothing changed | |
146 | } | |
147 | } | |
148 | ||
149 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
150 | ||
151 | let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new()); | |
152 | ||
153 | for item in manifest.files() { | |
154 | let mut path = tgt_store.base_path(); | |
155 | path.push(snapshot.relative_path()); | |
156 | path.push(&item.filename); | |
157 | ||
158 | if path.exists() { | |
159 | match archive_type(&item.filename)? { | |
160 | ArchiveType::DynamicIndex => { | |
161 | let index = DynamicIndexReader::open(&path)?; | |
162 | let (csum, size) = index.compute_csum(); | |
163 | match manifest.verify_file(&item.filename, &csum, size) { | |
164 | Ok(_) => continue, | |
165 | Err(err) => { | |
166 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
167 | } | |
168 | } | |
169 | } | |
170 | ArchiveType::FixedIndex => { | |
171 | let index = FixedIndexReader::open(&path)?; | |
172 | let (csum, size) = index.compute_csum(); | |
173 | match manifest.verify_file(&item.filename, &csum, size) { | |
174 | Ok(_) => continue, | |
175 | Err(err) => { | |
176 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
177 | } | |
178 | } | |
179 | } | |
180 | ArchiveType::Blob => { | |
181 | let mut tmpfile = std::fs::File::open(&path)?; | |
182 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
183 | match manifest.verify_file(&item.filename, &csum, size) { | |
184 | Ok(_) => continue, | |
185 | Err(err) => { | |
186 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
187 | } | |
188 | } | |
189 | } | |
190 | } | |
191 | } | |
192 | ||
eb506c83 | 193 | pull_single_archive( |
de8ec041 DM |
194 | worker, |
195 | &reader, | |
196 | &mut chunk_reader, | |
197 | tgt_store.clone(), | |
198 | snapshot, | |
199 | &item.filename, | |
200 | ).await?; | |
201 | } | |
202 | ||
203 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
204 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
205 | } | |
206 | ||
207 | // cleanup - remove stale files | |
208 | tgt_store.cleanup_backup_dir(snapshot, &manifest)?; | |
209 | ||
210 | Ok(()) | |
211 | } | |
212 | ||
eb506c83 | 213 | pub async fn pull_snapshot_from( |
de8ec041 DM |
214 | worker: &WorkerTask, |
215 | reader: Arc<BackupReader>, | |
216 | tgt_store: Arc<DataStore>, | |
217 | snapshot: &BackupDir, | |
218 | ) -> Result<(), Error> { | |
219 | ||
220 | let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?; | |
221 | ||
222 | if is_new { | |
223 | worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); | |
224 | ||
eb506c83 | 225 | if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await { |
de8ec041 DM |
226 | if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) { |
227 | worker.log(format!("cleanup error - {}", cleanup_err)); | |
228 | } | |
229 | return Err(err); | |
230 | } | |
231 | } else { | |
232 | worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); | |
eb506c83 | 233 | pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await? |
de8ec041 DM |
234 | } |
235 | ||
236 | Ok(()) | |
237 | } | |
238 | ||
eb506c83 | 239 | pub async fn pull_group( |
de8ec041 DM |
240 | worker: &WorkerTask, |
241 | client: &HttpClient, | |
242 | src_repo: &BackupRepository, | |
243 | tgt_store: Arc<DataStore>, | |
244 | group: &BackupGroup, | |
4b4eba0b | 245 | delete: bool, |
de8ec041 DM |
246 | ) -> Result<(), Error> { |
247 | ||
248 | let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); | |
249 | ||
250 | let args = json!({ | |
251 | "backup-type": group.backup_type(), | |
252 | "backup-id": group.backup_id(), | |
253 | }); | |
254 | ||
255 | let mut result = client.get(&path, Some(args)).await?; | |
256 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
257 | ||
258 | list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); | |
259 | ||
260 | let auth_info = client.login().await?; | |
d59dbeca | 261 | let fingerprint = client.fingerprint(); |
de8ec041 | 262 | |
18cc66ee | 263 | let last_sync = tgt_store.last_successful_backup(group)?; |
de8ec041 | 264 | |
c425bdc9 DM |
265 | let mut remote_snapshots = std::collections::HashSet::new(); |
266 | ||
de8ec041 DM |
267 | for item in list { |
268 | let backup_time = Utc.timestamp(item.backup_time, 0); | |
c425bdc9 DM |
269 | remote_snapshots.insert(backup_time); |
270 | ||
de8ec041 DM |
271 | if let Some(last_sync_time) = last_sync { |
272 | if last_sync_time > backup_time { continue; } | |
273 | } | |
274 | ||
d59dbeca DM |
275 | let options = HttpClientOptions::new() |
276 | .password(Some(auth_info.ticket.clone())) | |
277 | .fingerprint(fingerprint.clone()); | |
278 | ||
279 | let new_client = HttpClient::new(src_repo.host(), src_repo.user(), options)?; | |
de8ec041 DM |
280 | |
281 | let reader = BackupReader::start( | |
282 | new_client, | |
283 | None, | |
284 | src_repo.store(), | |
285 | &item.backup_type, | |
286 | &item.backup_id, | |
287 | backup_time, | |
288 | true, | |
289 | ).await?; | |
290 | ||
291 | let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time); | |
292 | ||
eb506c83 | 293 | pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?; |
de8ec041 DM |
294 | } |
295 | ||
4b4eba0b | 296 | if delete { |
c425bdc9 DM |
297 | let local_list = group.list_backups(&tgt_store.base_path())?; |
298 | for info in local_list { | |
299 | let backup_time = info.backup_dir.backup_time(); | |
300 | if remote_snapshots.contains(&backup_time) { continue; } | |
301 | worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path())); | |
302 | tgt_store.remove_backup_dir(&info.backup_dir)?; | |
303 | } | |
4b4eba0b DM |
304 | } |
305 | ||
de8ec041 DM |
306 | Ok(()) |
307 | } | |
308 | ||
eb506c83 | 309 | pub async fn pull_store( |
de8ec041 DM |
310 | worker: &WorkerTask, |
311 | client: &HttpClient, | |
312 | src_repo: &BackupRepository, | |
313 | tgt_store: Arc<DataStore>, | |
4b4eba0b | 314 | delete: bool, |
de8ec041 DM |
315 | ) -> Result<(), Error> { |
316 | ||
317 | let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); | |
318 | ||
319 | let mut result = client.get(&path, None).await?; | |
320 | ||
b31c8019 | 321 | let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; |
de8ec041 DM |
322 | |
323 | list.sort_unstable_by(|a, b| { | |
b31c8019 | 324 | let type_order = a.backup_type.cmp(&b.backup_type); |
de8ec041 | 325 | if type_order == std::cmp::Ordering::Equal { |
b31c8019 | 326 | a.backup_id.cmp(&b.backup_id) |
de8ec041 DM |
327 | } else { |
328 | type_order | |
329 | } | |
330 | }); | |
331 | ||
332 | let mut errors = false; | |
333 | ||
4b4eba0b DM |
334 | let mut new_groups = std::collections::HashSet::new(); |
335 | ||
de8ec041 | 336 | for item in list { |
b31c8019 | 337 | let group = BackupGroup::new(&item.backup_type, &item.backup_id); |
4b4eba0b | 338 | if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await { |
b31c8019 | 339 | worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err)); |
de8ec041 | 340 | errors = true; |
4b4eba0b | 341 | // do not stop here, instead continue |
de8ec041 | 342 | } |
4b4eba0b DM |
343 | new_groups.insert(group); |
344 | } | |
345 | ||
346 | if delete { | |
9ea4bce4 | 347 | let result: Result<(), Error> = proxmox::try_block!({ |
4b4eba0b DM |
348 | let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?; |
349 | for local_group in local_groups { | |
350 | if new_groups.contains(&local_group) { continue; } | |
351 | worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id())); | |
352 | if let Err(err) = tgt_store.remove_backup_group(&local_group) { | |
c425bdc9 | 353 | worker.log(err.to_string()); |
4b4eba0b DM |
354 | errors = true; |
355 | } | |
356 | } | |
357 | Ok(()) | |
358 | }); | |
359 | if let Err(err) = result { | |
360 | worker.log(format!("error during cleanup: {}", err)); | |
361 | errors = true; | |
362 | }; | |
de8ec041 DM |
363 | } |
364 | ||
365 | if errors { | |
366 | bail!("sync failed with some errors."); | |
367 | } | |
368 | ||
369 | Ok(()) | |
370 | } | |
371 | ||
372 | #[api( | |
373 | input: { | |
374 | properties: { | |
375 | store: { | |
376 | schema: DATASTORE_SCHEMA, | |
377 | }, | |
94609e23 DM |
378 | remote: { |
379 | schema: REMOTE_ID_SCHEMA, | |
de8ec041 DM |
380 | }, |
381 | "remote-store": { | |
382 | schema: DATASTORE_SCHEMA, | |
383 | }, | |
4b4eba0b DM |
384 | delete: { |
385 | description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.", | |
386 | type: Boolean, | |
387 | optional: true, | |
388 | default: true, | |
389 | }, | |
de8ec041 DM |
390 | }, |
391 | }, | |
404d78c4 | 392 | access: { |
365f0f72 | 393 | // Note: used parameters are no uri parameters, so we need to test inside function body |
d00e1a21 | 394 | description: "The user needs Datastore.CreateBackup privilege on '/datastore/{store}' and Datastore.Read on '/remote/{remote}/{remote-store}'.", |
365f0f72 | 395 | permission: &Permission::Anybody, |
404d78c4 | 396 | }, |
de8ec041 | 397 | )] |
eb506c83 DM |
398 | /// Sync store from other repository |
399 | async fn pull ( | |
de8ec041 | 400 | store: String, |
94609e23 | 401 | remote: String, |
de8ec041 | 402 | remote_store: String, |
4b4eba0b | 403 | delete: Option<bool>, |
de8ec041 DM |
404 | _info: &ApiMethod, |
405 | rpcenv: &mut dyn RpcEnvironment, | |
406 | ) -> Result<String, Error> { | |
407 | ||
365f0f72 DM |
408 | let user_info = CachedUserInfo::new()?; |
409 | ||
de8ec041 | 410 | let username = rpcenv.get_user().unwrap(); |
d00e1a21 DM |
411 | user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_CREATE_BACKUP, false)?; |
412 | user_info.check_privs(&username, &["remote", &remote, &remote_store], PRIV_DATASTORE_READ, false)?; | |
de8ec041 | 413 | |
4b4eba0b DM |
414 | let delete = delete.unwrap_or(true); |
415 | ||
de8ec041 DM |
416 | let tgt_store = DataStore::lookup_datastore(&store)?; |
417 | ||
f357390c DM |
418 | let (remote_config, _digest) = remote::config()?; |
419 | let remote: remote::Remote = remote_config.lookup("remote", &remote)?; | |
94609e23 | 420 | |
d59dbeca DM |
421 | let options = HttpClientOptions::new() |
422 | .password(Some(remote.password.clone())) | |
423 | .fingerprint(remote.fingerprint.clone()); | |
424 | ||
425 | let client = HttpClient::new(&remote.host, &remote.userid, options)?; | |
de8ec041 DM |
426 | let _auth_info = client.login() // make sure we can auth |
427 | .await | |
94609e23 | 428 | .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?; |
de8ec041 | 429 | |
94609e23 | 430 | let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store); |
de8ec041 DM |
431 | |
432 | // fixme: set to_stdout to false? | |
433 | let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move { | |
434 | ||
435 | worker.log(format!("sync datastore '{}' start", store)); | |
436 | ||
437 | // explicit create shared lock to prevent GC on newly created chunks | |
438 | let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; | |
439 | ||
4b4eba0b | 440 | pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete).await?; |
de8ec041 DM |
441 | |
442 | worker.log(format!("sync datastore '{}' end", store)); | |
443 | ||
444 | Ok(()) | |
445 | })?; | |
446 | ||
447 | Ok(upid_str) | |
448 | } | |
449 | ||
450 | pub const ROUTER: Router = Router::new() | |
eb506c83 | 451 | .post(&API_METHOD_PULL); |