]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/sync.rs
src/api2/sync.rs: implement remote sync
[proxmox-backup.git] / src / api2 / sync.rs
CommitLineData
de8ec041
DM
1use failure::*;
2use serde_json::json;
3use std::convert::TryFrom;
4use std::sync::Arc;
5use std::collections::HashMap;
6use std::io::{Seek, SeekFrom};
7use chrono::{Utc, TimeZone};
8
9use proxmox::api::api;
10use proxmox::api::{ApiMethod, Router, RpcEnvironment};
11
12use crate::server::{WorkerTask};
13use crate::backup::*;
14use crate::client::*;
15use crate::api2::types::*;
16
17// fixme: implement filters
18// Todo: correctly lock backup groups
19
20async fn sync_index_chunks<I: IndexFile>(
21 _worker: &WorkerTask,
22 chunk_reader: &mut RemoteChunkReader,
23 target: Arc<DataStore>,
24 index: I,
25) -> Result<(), Error> {
26
27
28 for pos in 0..index.index_count() {
29 let digest = index.index_digest(pos).unwrap();
30 let chunk_exists = target.cond_touch_chunk(digest, false)?;
31 if chunk_exists {
32 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
33 continue;
34 }
35 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
36 let chunk = chunk_reader.read_raw_chunk(&digest)?;
37
38 target.insert_chunk(&chunk, &digest)?;
39 }
40
41 Ok(())
42}
43
44async fn download_manifest(
45 reader: &BackupReader,
46 filename: &std::path::Path,
47) -> Result<std::fs::File, Error> {
48
49 let tmp_manifest_file = std::fs::OpenOptions::new()
50 .write(true)
51 .create(true)
52 .read(true)
53 .open(&filename)?;
54
55 let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
56
57 tmp_manifest_file.seek(SeekFrom::Start(0))?;
58
59 Ok(tmp_manifest_file)
60}
61
62async fn sync_single_archive(
63 worker: &WorkerTask,
64 reader: &BackupReader,
65 chunk_reader: &mut RemoteChunkReader,
66 tgt_store: Arc<DataStore>,
67 snapshot: &BackupDir,
68 archive_name: &str,
69) -> Result<(), Error> {
70
71 let mut path = tgt_store.base_path();
72 path.push(snapshot.relative_path());
73 path.push(archive_name);
74
75 let mut tmp_path = path.clone();
76 tmp_path.set_extension("tmp");
77
78 worker.log(format!("sync archive {}", archive_name));
79 let tmpfile = std::fs::OpenOptions::new()
80 .write(true)
81 .create(true)
82 .read(true)
83 .open(&tmp_path)?;
84
85 let tmpfile = reader.download(archive_name, tmpfile).await?;
86
87 match archive_type(archive_name)? {
88 ArchiveType::DynamicIndex => {
89 let index = DynamicIndexReader::new(tmpfile)
90 .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
91
92 sync_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
93 }
94 ArchiveType::FixedIndex => {
95 let index = FixedIndexReader::new(tmpfile)
96 .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
97
98 sync_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
99 }
100 ArchiveType::Blob => { /* nothing to do */ }
101 }
102 if let Err(err) = std::fs::rename(&tmp_path, &path) {
103 bail!("Atomic rename file {:?} failed - {}", path, err);
104 }
105 Ok(())
106}
107
108async fn sync_snapshot(
109 worker: &WorkerTask,
110 reader: Arc<BackupReader>,
111 tgt_store: Arc<DataStore>,
112 snapshot: &BackupDir,
113) -> Result<(), Error> {
114
115 let mut manifest_name = tgt_store.base_path();
116 manifest_name.push(snapshot.relative_path());
117 manifest_name.push(MANIFEST_BLOB_NAME);
118
119 let mut tmp_manifest_name = manifest_name.clone();
120 tmp_manifest_name.set_extension("tmp");
121
122 let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
123 let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
124 tmp_manifest_blob.verify_crc()?;
125
126 if manifest_name.exists() {
127 let manifest_blob = proxmox::tools::try_block!({
128 let mut manifest_file = std::fs::File::open(&manifest_name)
129 .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
130
131 let manifest_blob = DataBlob::load(&mut manifest_file)?;
132 manifest_blob.verify_crc()?;
133 Ok(manifest_blob)
134 }).map_err(|err: Error| {
135 format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
136 })?;
137
138 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
139 return Ok(()); // nothing changed
140 }
141 }
142
143 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
144
145 let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
146
147 for item in manifest.files() {
148 let mut path = tgt_store.base_path();
149 path.push(snapshot.relative_path());
150 path.push(&item.filename);
151
152 if path.exists() {
153 match archive_type(&item.filename)? {
154 ArchiveType::DynamicIndex => {
155 let index = DynamicIndexReader::open(&path)?;
156 let (csum, size) = index.compute_csum();
157 match manifest.verify_file(&item.filename, &csum, size) {
158 Ok(_) => continue,
159 Err(err) => {
160 worker.log(format!("detected changed file {:?} - {}", path, err));
161 }
162 }
163 }
164 ArchiveType::FixedIndex => {
165 let index = FixedIndexReader::open(&path)?;
166 let (csum, size) = index.compute_csum();
167 match manifest.verify_file(&item.filename, &csum, size) {
168 Ok(_) => continue,
169 Err(err) => {
170 worker.log(format!("detected changed file {:?} - {}", path, err));
171 }
172 }
173 }
174 ArchiveType::Blob => {
175 let mut tmpfile = std::fs::File::open(&path)?;
176 let (csum, size) = compute_file_csum(&mut tmpfile)?;
177 match manifest.verify_file(&item.filename, &csum, size) {
178 Ok(_) => continue,
179 Err(err) => {
180 worker.log(format!("detected changed file {:?} - {}", path, err));
181 }
182 }
183 }
184 }
185 }
186
187 sync_single_archive(
188 worker,
189 &reader,
190 &mut chunk_reader,
191 tgt_store.clone(),
192 snapshot,
193 &item.filename,
194 ).await?;
195 }
196
197 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
198 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
199 }
200
201 // cleanup - remove stale files
202 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
203
204 Ok(())
205}
206
207pub async fn sync_snapshot_from(
208 worker: &WorkerTask,
209 reader: Arc<BackupReader>,
210 tgt_store: Arc<DataStore>,
211 snapshot: &BackupDir,
212) -> Result<(), Error> {
213
214 let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
215
216 if is_new {
217 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
218
219 if let Err(err) = sync_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
220 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
221 worker.log(format!("cleanup error - {}", cleanup_err));
222 }
223 return Err(err);
224 }
225 } else {
226 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
227 sync_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
228 }
229
230 Ok(())
231}
232
233pub async fn sync_group(
234 worker: &WorkerTask,
235 client: &HttpClient,
236 src_repo: &BackupRepository,
237 tgt_store: Arc<DataStore>,
238 group: &BackupGroup,
239) -> Result<(), Error> {
240
241 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
242
243 let args = json!({
244 "backup-type": group.backup_type(),
245 "backup-id": group.backup_id(),
246 });
247
248 let mut result = client.get(&path, Some(args)).await?;
249 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
250
251 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
252
253 let auth_info = client.login().await?;
254
255 let last_sync = group.last_successful_backup(&tgt_store.base_path())?;
256
257 for item in list {
258 let backup_time = Utc.timestamp(item.backup_time, 0);
259 if let Some(last_sync_time) = last_sync {
260 if last_sync_time > backup_time { continue; }
261 }
262
263 let new_client = HttpClient::new(
264 src_repo.host(),
265 src_repo.user(),
266 Some(auth_info.ticket.clone())
267 )?;
268
269 let reader = BackupReader::start(
270 new_client,
271 None,
272 src_repo.store(),
273 &item.backup_type,
274 &item.backup_id,
275 backup_time,
276 true,
277 ).await?;
278
279 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
280
281 sync_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
282 }
283
284 Ok(())
285}
286
287pub async fn sync_store(
288 worker: &WorkerTask,
289 client: &HttpClient,
290 src_repo: &BackupRepository,
291 tgt_store: Arc<DataStore>,
292) -> Result<(), Error> {
293
294 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
295
296 let mut result = client.get(&path, None).await?;
297
298 let list = result["data"].as_array_mut().unwrap();
299
300 list.sort_unstable_by(|a, b| {
301 let a_id = a["backup-id"].as_str().unwrap();
302 let a_backup_type = a["backup-type"].as_str().unwrap();
303 let b_id = b["backup-id"].as_str().unwrap();
304 let b_backup_type = b["backup-type"].as_str().unwrap();
305
306 let type_order = a_backup_type.cmp(b_backup_type);
307 if type_order == std::cmp::Ordering::Equal {
308 a_id.cmp(b_id)
309 } else {
310 type_order
311 }
312 });
313
314 let mut errors = false;
315
316 for item in list {
317
318 let id = item["backup-id"].as_str().unwrap();
319 let btype = item["backup-type"].as_str().unwrap();
320
321 let group = BackupGroup::new(btype, id);
322 if let Err(err) = sync_group(worker, client, src_repo, tgt_store.clone(), &group).await {
323 worker.log(format!("sync group {}/{} failed - {}", btype, id, err));
324 errors = true;
325 // continue
326 }
327 }
328
329 if errors {
330 bail!("sync failed with some errors.");
331 }
332
333 Ok(())
334}
335
336#[api(
337 input: {
338 properties: {
339 store: {
340 schema: DATASTORE_SCHEMA,
341 },
342 "remote-host": {
343 description: "Remote host", // TODO: use predefined type: host or IP
344 type: String,
345 },
346 "remote-store": {
347 schema: DATASTORE_SCHEMA,
348 },
349 "remote-user": {
350 description: "Remote user name.", // TODO: use predefined typed
351 type: String,
352 },
353 "remote-password": {
354 description: "Remote passsword.",
355 type: String,
356 },
357 },
358 },
359)]
360/// Sync store from otherrepository
361async fn sync_from (
362 store: String,
363 remote_host: String,
364 remote_store: String,
365 remote_user: String,
366 remote_password: String,
367 _info: &ApiMethod,
368 rpcenv: &mut dyn RpcEnvironment,
369) -> Result<String, Error> {
370
371 let username = rpcenv.get_user().unwrap();
372
373 let tgt_store = DataStore::lookup_datastore(&store)?;
374
375 let client = HttpClient::new(&remote_host, &remote_user, Some(remote_password))?;
376 let _auth_info = client.login() // make sure we can auth
377 .await
378 .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote_host, err))?;
379
380 let src_repo = BackupRepository::new(Some(remote_user), Some(remote_host), remote_store);
381
382 // fixme: set to_stdout to false?
383 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
384
385 worker.log(format!("sync datastore '{}' start", store));
386
387 // explicit create shared lock to prevent GC on newly created chunks
388 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
389
390 sync_store(&worker, &client, &src_repo, tgt_store.clone()).await?;
391
392 worker.log(format!("sync datastore '{}' end", store));
393
394 Ok(())
395 })?;
396
397 Ok(upid_str)
398}
399
400pub const ROUTER: Router = Router::new()
401 .post(&API_METHOD_SYNC_FROM);