]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/pull.rs
src/config/acl.rs: introduce more/better datastore privileges
[proxmox-backup.git] / src / api2 / pull.rs
CommitLineData
eb506c83
DM
1//! Sync datastore from remote server
2
f7d4e4b5 3use anyhow::{bail, format_err, Error};
de8ec041
DM
4use serde_json::json;
5use std::convert::TryFrom;
6use std::sync::Arc;
7use std::collections::HashMap;
8use std::io::{Seek, SeekFrom};
9use chrono::{Utc, TimeZone};
10
11use proxmox::api::api;
404d78c4 12use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
de8ec041
DM
13
14use crate::server::{WorkerTask};
15use crate::backup::*;
16use crate::client::*;
f357390c 17use crate::config::remote;
de8ec041 18use crate::api2::types::*;
d00e1a21 19use crate::config::acl::{PRIV_DATASTORE_CREATE_BACKUP, PRIV_DATASTORE_READ};
365f0f72 20use crate::config::cached_user_info::CachedUserInfo;
de8ec041
DM
21
22// fixme: implement filters
eb506c83 23// fixme: delete vanished groups
de8ec041
DM
24// Todo: correctly lock backup groups
25
eb506c83 26async fn pull_index_chunks<I: IndexFile>(
de8ec041
DM
27 _worker: &WorkerTask,
28 chunk_reader: &mut RemoteChunkReader,
29 target: Arc<DataStore>,
30 index: I,
31) -> Result<(), Error> {
32
33
34 for pos in 0..index.index_count() {
35 let digest = index.index_digest(pos).unwrap();
36 let chunk_exists = target.cond_touch_chunk(digest, false)?;
37 if chunk_exists {
38 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
39 continue;
40 }
41 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
42 let chunk = chunk_reader.read_raw_chunk(&digest)?;
43
44 target.insert_chunk(&chunk, &digest)?;
45 }
46
47 Ok(())
48}
49
50async fn download_manifest(
51 reader: &BackupReader,
52 filename: &std::path::Path,
53) -> Result<std::fs::File, Error> {
54
55 let tmp_manifest_file = std::fs::OpenOptions::new()
56 .write(true)
57 .create(true)
58 .read(true)
59 .open(&filename)?;
60
61 let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
62
63 tmp_manifest_file.seek(SeekFrom::Start(0))?;
64
65 Ok(tmp_manifest_file)
66}
67
eb506c83 68async fn pull_single_archive(
de8ec041
DM
69 worker: &WorkerTask,
70 reader: &BackupReader,
71 chunk_reader: &mut RemoteChunkReader,
72 tgt_store: Arc<DataStore>,
73 snapshot: &BackupDir,
74 archive_name: &str,
75) -> Result<(), Error> {
76
77 let mut path = tgt_store.base_path();
78 path.push(snapshot.relative_path());
79 path.push(archive_name);
80
81 let mut tmp_path = path.clone();
82 tmp_path.set_extension("tmp");
83
84 worker.log(format!("sync archive {}", archive_name));
85 let tmpfile = std::fs::OpenOptions::new()
86 .write(true)
87 .create(true)
88 .read(true)
89 .open(&tmp_path)?;
90
91 let tmpfile = reader.download(archive_name, tmpfile).await?;
92
93 match archive_type(archive_name)? {
94 ArchiveType::DynamicIndex => {
95 let index = DynamicIndexReader::new(tmpfile)
96 .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
97
eb506c83 98 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
de8ec041
DM
99 }
100 ArchiveType::FixedIndex => {
101 let index = FixedIndexReader::new(tmpfile)
102 .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
103
eb506c83 104 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
de8ec041
DM
105 }
106 ArchiveType::Blob => { /* nothing to do */ }
107 }
108 if let Err(err) = std::fs::rename(&tmp_path, &path) {
109 bail!("Atomic rename file {:?} failed - {}", path, err);
110 }
111 Ok(())
112}
113
eb506c83 114async fn pull_snapshot(
de8ec041
DM
115 worker: &WorkerTask,
116 reader: Arc<BackupReader>,
117 tgt_store: Arc<DataStore>,
118 snapshot: &BackupDir,
119) -> Result<(), Error> {
120
121 let mut manifest_name = tgt_store.base_path();
122 manifest_name.push(snapshot.relative_path());
123 manifest_name.push(MANIFEST_BLOB_NAME);
124
125 let mut tmp_manifest_name = manifest_name.clone();
126 tmp_manifest_name.set_extension("tmp");
127
128 let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
129 let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
130 tmp_manifest_blob.verify_crc()?;
131
132 if manifest_name.exists() {
9ea4bce4 133 let manifest_blob = proxmox::try_block!({
de8ec041
DM
134 let mut manifest_file = std::fs::File::open(&manifest_name)
135 .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
136
137 let manifest_blob = DataBlob::load(&mut manifest_file)?;
138 manifest_blob.verify_crc()?;
139 Ok(manifest_blob)
140 }).map_err(|err: Error| {
141 format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
142 })?;
143
144 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
145 return Ok(()); // nothing changed
146 }
147 }
148
149 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
150
151 let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
152
153 for item in manifest.files() {
154 let mut path = tgt_store.base_path();
155 path.push(snapshot.relative_path());
156 path.push(&item.filename);
157
158 if path.exists() {
159 match archive_type(&item.filename)? {
160 ArchiveType::DynamicIndex => {
161 let index = DynamicIndexReader::open(&path)?;
162 let (csum, size) = index.compute_csum();
163 match manifest.verify_file(&item.filename, &csum, size) {
164 Ok(_) => continue,
165 Err(err) => {
166 worker.log(format!("detected changed file {:?} - {}", path, err));
167 }
168 }
169 }
170 ArchiveType::FixedIndex => {
171 let index = FixedIndexReader::open(&path)?;
172 let (csum, size) = index.compute_csum();
173 match manifest.verify_file(&item.filename, &csum, size) {
174 Ok(_) => continue,
175 Err(err) => {
176 worker.log(format!("detected changed file {:?} - {}", path, err));
177 }
178 }
179 }
180 ArchiveType::Blob => {
181 let mut tmpfile = std::fs::File::open(&path)?;
182 let (csum, size) = compute_file_csum(&mut tmpfile)?;
183 match manifest.verify_file(&item.filename, &csum, size) {
184 Ok(_) => continue,
185 Err(err) => {
186 worker.log(format!("detected changed file {:?} - {}", path, err));
187 }
188 }
189 }
190 }
191 }
192
eb506c83 193 pull_single_archive(
de8ec041
DM
194 worker,
195 &reader,
196 &mut chunk_reader,
197 tgt_store.clone(),
198 snapshot,
199 &item.filename,
200 ).await?;
201 }
202
203 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
204 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
205 }
206
207 // cleanup - remove stale files
208 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
209
210 Ok(())
211}
212
eb506c83 213pub async fn pull_snapshot_from(
de8ec041
DM
214 worker: &WorkerTask,
215 reader: Arc<BackupReader>,
216 tgt_store: Arc<DataStore>,
217 snapshot: &BackupDir,
218) -> Result<(), Error> {
219
220 let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
221
222 if is_new {
223 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
224
eb506c83 225 if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
de8ec041
DM
226 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
227 worker.log(format!("cleanup error - {}", cleanup_err));
228 }
229 return Err(err);
230 }
231 } else {
232 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
eb506c83 233 pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
de8ec041
DM
234 }
235
236 Ok(())
237}
238
eb506c83 239pub async fn pull_group(
de8ec041
DM
240 worker: &WorkerTask,
241 client: &HttpClient,
242 src_repo: &BackupRepository,
243 tgt_store: Arc<DataStore>,
244 group: &BackupGroup,
4b4eba0b 245 delete: bool,
de8ec041
DM
246) -> Result<(), Error> {
247
248 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
249
250 let args = json!({
251 "backup-type": group.backup_type(),
252 "backup-id": group.backup_id(),
253 });
254
255 let mut result = client.get(&path, Some(args)).await?;
256 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
257
258 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
259
260 let auth_info = client.login().await?;
d59dbeca 261 let fingerprint = client.fingerprint();
de8ec041 262
18cc66ee 263 let last_sync = tgt_store.last_successful_backup(group)?;
de8ec041 264
c425bdc9
DM
265 let mut remote_snapshots = std::collections::HashSet::new();
266
de8ec041
DM
267 for item in list {
268 let backup_time = Utc.timestamp(item.backup_time, 0);
c425bdc9
DM
269 remote_snapshots.insert(backup_time);
270
de8ec041
DM
271 if let Some(last_sync_time) = last_sync {
272 if last_sync_time > backup_time { continue; }
273 }
274
d59dbeca
DM
275 let options = HttpClientOptions::new()
276 .password(Some(auth_info.ticket.clone()))
277 .fingerprint(fingerprint.clone());
278
279 let new_client = HttpClient::new(src_repo.host(), src_repo.user(), options)?;
de8ec041
DM
280
281 let reader = BackupReader::start(
282 new_client,
283 None,
284 src_repo.store(),
285 &item.backup_type,
286 &item.backup_id,
287 backup_time,
288 true,
289 ).await?;
290
291 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
292
eb506c83 293 pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
de8ec041
DM
294 }
295
4b4eba0b 296 if delete {
c425bdc9
DM
297 let local_list = group.list_backups(&tgt_store.base_path())?;
298 for info in local_list {
299 let backup_time = info.backup_dir.backup_time();
300 if remote_snapshots.contains(&backup_time) { continue; }
301 worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
302 tgt_store.remove_backup_dir(&info.backup_dir)?;
303 }
4b4eba0b
DM
304 }
305
de8ec041
DM
306 Ok(())
307}
308
eb506c83 309pub async fn pull_store(
de8ec041
DM
310 worker: &WorkerTask,
311 client: &HttpClient,
312 src_repo: &BackupRepository,
313 tgt_store: Arc<DataStore>,
4b4eba0b 314 delete: bool,
de8ec041
DM
315) -> Result<(), Error> {
316
317 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
318
319 let mut result = client.get(&path, None).await?;
320
b31c8019 321 let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
de8ec041
DM
322
323 list.sort_unstable_by(|a, b| {
b31c8019 324 let type_order = a.backup_type.cmp(&b.backup_type);
de8ec041 325 if type_order == std::cmp::Ordering::Equal {
b31c8019 326 a.backup_id.cmp(&b.backup_id)
de8ec041
DM
327 } else {
328 type_order
329 }
330 });
331
332 let mut errors = false;
333
4b4eba0b
DM
334 let mut new_groups = std::collections::HashSet::new();
335
de8ec041 336 for item in list {
b31c8019 337 let group = BackupGroup::new(&item.backup_type, &item.backup_id);
4b4eba0b 338 if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group, delete).await {
b31c8019 339 worker.log(format!("sync group {}/{} failed - {}", item.backup_type, item.backup_id, err));
de8ec041 340 errors = true;
4b4eba0b 341 // do not stop here, instead continue
de8ec041 342 }
4b4eba0b
DM
343 new_groups.insert(group);
344 }
345
346 if delete {
9ea4bce4 347 let result: Result<(), Error> = proxmox::try_block!({
4b4eba0b
DM
348 let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?;
349 for local_group in local_groups {
350 if new_groups.contains(&local_group) { continue; }
351 worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
352 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
c425bdc9 353 worker.log(err.to_string());
4b4eba0b
DM
354 errors = true;
355 }
356 }
357 Ok(())
358 });
359 if let Err(err) = result {
360 worker.log(format!("error during cleanup: {}", err));
361 errors = true;
362 };
de8ec041
DM
363 }
364
365 if errors {
366 bail!("sync failed with some errors.");
367 }
368
369 Ok(())
370}
371
372#[api(
373 input: {
374 properties: {
375 store: {
376 schema: DATASTORE_SCHEMA,
377 },
94609e23
DM
378 remote: {
379 schema: REMOTE_ID_SCHEMA,
de8ec041
DM
380 },
381 "remote-store": {
382 schema: DATASTORE_SCHEMA,
383 },
4b4eba0b
DM
384 delete: {
385 description: "Delete vanished backups. This remove the local copy if the remote backup was deleted.",
386 type: Boolean,
387 optional: true,
388 default: true,
389 },
de8ec041
DM
390 },
391 },
404d78c4 392 access: {
365f0f72 393 // Note: used parameters are no uri parameters, so we need to test inside function body
d00e1a21 394 description: "The user needs Datastore.CreateBackup privilege on '/datastore/{store}' and Datastore.Read on '/remote/{remote}/{remote-store}'.",
365f0f72 395 permission: &Permission::Anybody,
404d78c4 396 },
de8ec041 397)]
eb506c83
DM
398/// Sync store from other repository
399async fn pull (
de8ec041 400 store: String,
94609e23 401 remote: String,
de8ec041 402 remote_store: String,
4b4eba0b 403 delete: Option<bool>,
de8ec041
DM
404 _info: &ApiMethod,
405 rpcenv: &mut dyn RpcEnvironment,
406) -> Result<String, Error> {
407
365f0f72
DM
408 let user_info = CachedUserInfo::new()?;
409
de8ec041 410 let username = rpcenv.get_user().unwrap();
d00e1a21
DM
411 user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_CREATE_BACKUP, false)?;
412 user_info.check_privs(&username, &["remote", &remote, &remote_store], PRIV_DATASTORE_READ, false)?;
de8ec041 413
4b4eba0b
DM
414 let delete = delete.unwrap_or(true);
415
de8ec041
DM
416 let tgt_store = DataStore::lookup_datastore(&store)?;
417
f357390c
DM
418 let (remote_config, _digest) = remote::config()?;
419 let remote: remote::Remote = remote_config.lookup("remote", &remote)?;
94609e23 420
d59dbeca
DM
421 let options = HttpClientOptions::new()
422 .password(Some(remote.password.clone()))
423 .fingerprint(remote.fingerprint.clone());
424
425 let client = HttpClient::new(&remote.host, &remote.userid, options)?;
de8ec041
DM
426 let _auth_info = client.login() // make sure we can auth
427 .await
94609e23 428 .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
de8ec041 429
94609e23 430 let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store);
de8ec041
DM
431
432 // fixme: set to_stdout to false?
433 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
434
435 worker.log(format!("sync datastore '{}' start", store));
436
437 // explicit create shared lock to prevent GC on newly created chunks
438 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
439
4b4eba0b 440 pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete).await?;
de8ec041
DM
441
442 worker.log(format!("sync datastore '{}' end", store));
443
444 Ok(())
445 })?;
446
447 Ok(upid_str)
448}
449
450pub const ROUTER: Router = Router::new()
eb506c83 451 .post(&API_METHOD_PULL);