]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/pull.rs
rename src/api2/sync.rs -> src/api2/pull.rs
[proxmox-backup.git] / src / api2 / pull.rs
1 //! Sync datastore from remote server
2
3 use failure::*;
4 use serde_json::json;
5 use std::convert::TryFrom;
6 use std::sync::Arc;
7 use std::collections::HashMap;
8 use std::io::{Seek, SeekFrom};
9 use chrono::{Utc, TimeZone};
10
11 use proxmox::api::api;
12 use proxmox::api::{ApiMethod, Router, RpcEnvironment};
13
14 use crate::server::{WorkerTask};
15 use crate::backup::*;
16 use crate::client::*;
17 use crate::api2::types::*;
18
19 // fixme: implement filters
20 // fixme: delete vanished groups
21 // Todo: correctly lock backup groups
22
23 async fn pull_index_chunks<I: IndexFile>(
24 _worker: &WorkerTask,
25 chunk_reader: &mut RemoteChunkReader,
26 target: Arc<DataStore>,
27 index: I,
28 ) -> Result<(), Error> {
29
30
31 for pos in 0..index.index_count() {
32 let digest = index.index_digest(pos).unwrap();
33 let chunk_exists = target.cond_touch_chunk(digest, false)?;
34 if chunk_exists {
35 //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
36 continue;
37 }
38 //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
39 let chunk = chunk_reader.read_raw_chunk(&digest)?;
40
41 target.insert_chunk(&chunk, &digest)?;
42 }
43
44 Ok(())
45 }
46
47 async fn download_manifest(
48 reader: &BackupReader,
49 filename: &std::path::Path,
50 ) -> Result<std::fs::File, Error> {
51
52 let tmp_manifest_file = std::fs::OpenOptions::new()
53 .write(true)
54 .create(true)
55 .read(true)
56 .open(&filename)?;
57
58 let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
59
60 tmp_manifest_file.seek(SeekFrom::Start(0))?;
61
62 Ok(tmp_manifest_file)
63 }
64
65 async fn pull_single_archive(
66 worker: &WorkerTask,
67 reader: &BackupReader,
68 chunk_reader: &mut RemoteChunkReader,
69 tgt_store: Arc<DataStore>,
70 snapshot: &BackupDir,
71 archive_name: &str,
72 ) -> Result<(), Error> {
73
74 let mut path = tgt_store.base_path();
75 path.push(snapshot.relative_path());
76 path.push(archive_name);
77
78 let mut tmp_path = path.clone();
79 tmp_path.set_extension("tmp");
80
81 worker.log(format!("sync archive {}", archive_name));
82 let tmpfile = std::fs::OpenOptions::new()
83 .write(true)
84 .create(true)
85 .read(true)
86 .open(&tmp_path)?;
87
88 let tmpfile = reader.download(archive_name, tmpfile).await?;
89
90 match archive_type(archive_name)? {
91 ArchiveType::DynamicIndex => {
92 let index = DynamicIndexReader::new(tmpfile)
93 .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
94
95 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
96 }
97 ArchiveType::FixedIndex => {
98 let index = FixedIndexReader::new(tmpfile)
99 .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
100
101 pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
102 }
103 ArchiveType::Blob => { /* nothing to do */ }
104 }
105 if let Err(err) = std::fs::rename(&tmp_path, &path) {
106 bail!("Atomic rename file {:?} failed - {}", path, err);
107 }
108 Ok(())
109 }
110
111 async fn pull_snapshot(
112 worker: &WorkerTask,
113 reader: Arc<BackupReader>,
114 tgt_store: Arc<DataStore>,
115 snapshot: &BackupDir,
116 ) -> Result<(), Error> {
117
118 let mut manifest_name = tgt_store.base_path();
119 manifest_name.push(snapshot.relative_path());
120 manifest_name.push(MANIFEST_BLOB_NAME);
121
122 let mut tmp_manifest_name = manifest_name.clone();
123 tmp_manifest_name.set_extension("tmp");
124
125 let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
126 let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
127 tmp_manifest_blob.verify_crc()?;
128
129 if manifest_name.exists() {
130 let manifest_blob = proxmox::tools::try_block!({
131 let mut manifest_file = std::fs::File::open(&manifest_name)
132 .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
133
134 let manifest_blob = DataBlob::load(&mut manifest_file)?;
135 manifest_blob.verify_crc()?;
136 Ok(manifest_blob)
137 }).map_err(|err: Error| {
138 format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
139 })?;
140
141 if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
142 return Ok(()); // nothing changed
143 }
144 }
145
146 let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
147
148 let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
149
150 for item in manifest.files() {
151 let mut path = tgt_store.base_path();
152 path.push(snapshot.relative_path());
153 path.push(&item.filename);
154
155 if path.exists() {
156 match archive_type(&item.filename)? {
157 ArchiveType::DynamicIndex => {
158 let index = DynamicIndexReader::open(&path)?;
159 let (csum, size) = index.compute_csum();
160 match manifest.verify_file(&item.filename, &csum, size) {
161 Ok(_) => continue,
162 Err(err) => {
163 worker.log(format!("detected changed file {:?} - {}", path, err));
164 }
165 }
166 }
167 ArchiveType::FixedIndex => {
168 let index = FixedIndexReader::open(&path)?;
169 let (csum, size) = index.compute_csum();
170 match manifest.verify_file(&item.filename, &csum, size) {
171 Ok(_) => continue,
172 Err(err) => {
173 worker.log(format!("detected changed file {:?} - {}", path, err));
174 }
175 }
176 }
177 ArchiveType::Blob => {
178 let mut tmpfile = std::fs::File::open(&path)?;
179 let (csum, size) = compute_file_csum(&mut tmpfile)?;
180 match manifest.verify_file(&item.filename, &csum, size) {
181 Ok(_) => continue,
182 Err(err) => {
183 worker.log(format!("detected changed file {:?} - {}", path, err));
184 }
185 }
186 }
187 }
188 }
189
190 pull_single_archive(
191 worker,
192 &reader,
193 &mut chunk_reader,
194 tgt_store.clone(),
195 snapshot,
196 &item.filename,
197 ).await?;
198 }
199
200 if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
201 bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
202 }
203
204 // cleanup - remove stale files
205 tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
206
207 Ok(())
208 }
209
210 pub async fn pull_snapshot_from(
211 worker: &WorkerTask,
212 reader: Arc<BackupReader>,
213 tgt_store: Arc<DataStore>,
214 snapshot: &BackupDir,
215 ) -> Result<(), Error> {
216
217 let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
218
219 if is_new {
220 worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
221
222 if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
223 if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
224 worker.log(format!("cleanup error - {}", cleanup_err));
225 }
226 return Err(err);
227 }
228 } else {
229 worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
230 pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
231 }
232
233 Ok(())
234 }
235
236 pub async fn pull_group(
237 worker: &WorkerTask,
238 client: &HttpClient,
239 src_repo: &BackupRepository,
240 tgt_store: Arc<DataStore>,
241 group: &BackupGroup,
242 ) -> Result<(), Error> {
243
244 let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
245
246 let args = json!({
247 "backup-type": group.backup_type(),
248 "backup-id": group.backup_id(),
249 });
250
251 let mut result = client.get(&path, Some(args)).await?;
252 let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
253
254 list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
255
256 let auth_info = client.login().await?;
257
258 let last_sync = group.last_successful_backup(&tgt_store.base_path())?;
259
260 for item in list {
261 let backup_time = Utc.timestamp(item.backup_time, 0);
262 if let Some(last_sync_time) = last_sync {
263 if last_sync_time > backup_time { continue; }
264 }
265
266 let new_client = HttpClient::new(
267 src_repo.host(),
268 src_repo.user(),
269 Some(auth_info.ticket.clone())
270 )?;
271
272 let reader = BackupReader::start(
273 new_client,
274 None,
275 src_repo.store(),
276 &item.backup_type,
277 &item.backup_id,
278 backup_time,
279 true,
280 ).await?;
281
282 let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
283
284 pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
285 }
286
287 Ok(())
288 }
289
290 pub async fn pull_store(
291 worker: &WorkerTask,
292 client: &HttpClient,
293 src_repo: &BackupRepository,
294 tgt_store: Arc<DataStore>,
295 ) -> Result<(), Error> {
296
297 let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
298
299 let mut result = client.get(&path, None).await?;
300
301 let list = result["data"].as_array_mut().unwrap();
302
303 list.sort_unstable_by(|a, b| {
304 let a_id = a["backup-id"].as_str().unwrap();
305 let a_backup_type = a["backup-type"].as_str().unwrap();
306 let b_id = b["backup-id"].as_str().unwrap();
307 let b_backup_type = b["backup-type"].as_str().unwrap();
308
309 let type_order = a_backup_type.cmp(b_backup_type);
310 if type_order == std::cmp::Ordering::Equal {
311 a_id.cmp(b_id)
312 } else {
313 type_order
314 }
315 });
316
317 let mut errors = false;
318
319 for item in list {
320
321 let id = item["backup-id"].as_str().unwrap();
322 let btype = item["backup-type"].as_str().unwrap();
323
324 let group = BackupGroup::new(btype, id);
325 if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group).await {
326 worker.log(format!("sync group {}/{} failed - {}", btype, id, err));
327 errors = true;
328 // continue
329 }
330 }
331
332 if errors {
333 bail!("sync failed with some errors.");
334 }
335
336 Ok(())
337 }
338
339 #[api(
340 input: {
341 properties: {
342 store: {
343 schema: DATASTORE_SCHEMA,
344 },
345 "remote-host": {
346 description: "Remote host", // TODO: use predefined type: host or IP
347 type: String,
348 },
349 "remote-store": {
350 schema: DATASTORE_SCHEMA,
351 },
352 "remote-user": {
353 description: "Remote user name.", // TODO: use predefined typed
354 type: String,
355 },
356 "remote-password": {
357 description: "Remote passsword.",
358 type: String,
359 },
360 },
361 },
362 )]
363 /// Sync store from other repository
364 async fn pull (
365 store: String,
366 remote_host: String,
367 remote_store: String,
368 remote_user: String,
369 remote_password: String,
370 _info: &ApiMethod,
371 rpcenv: &mut dyn RpcEnvironment,
372 ) -> Result<String, Error> {
373
374 let username = rpcenv.get_user().unwrap();
375
376 let tgt_store = DataStore::lookup_datastore(&store)?;
377
378 let client = HttpClient::new(&remote_host, &remote_user, Some(remote_password))?;
379 let _auth_info = client.login() // make sure we can auth
380 .await
381 .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote_host, err))?;
382
383 let src_repo = BackupRepository::new(Some(remote_user), Some(remote_host), remote_store);
384
385 // fixme: set to_stdout to false?
386 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
387
388 worker.log(format!("sync datastore '{}' start", store));
389
390 // explicit create shared lock to prevent GC on newly created chunks
391 let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
392
393 pull_store(&worker, &client, &src_repo, tgt_store.clone()).await?;
394
395 worker.log(format!("sync datastore '{}' end", store));
396
397 Ok(())
398 })?;
399
400 Ok(upid_str)
401 }
402
403 pub const ROUTER: Router = Router::new()
404 .post(&API_METHOD_PULL);