]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/pull.rs
api2/pull: extend do_sync_job to also handle schedule and jobstate
[proxmox-backup.git] / src / api2 / pull.rs
1 //! Sync datastore from remote server
2 use std::sync::{Arc};
3
4 use anyhow::{format_err, Error};
5 use futures::{select, future::FutureExt};
6
7 use proxmox::api::api;
8 use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
9
10 use crate::server::{WorkerTask};
11 use crate::backup::DataStore;
12 use crate::client::{HttpClient, HttpClientOptions, BackupRepository, pull::pull_store};
13 use crate::api2::types::*;
14 use crate::config::{
15 remote,
16 sync::SyncJobConfig,
17 jobstate::Job,
18 acl::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ},
19 cached_user_info::CachedUserInfo,
20 };
21
22
23 pub fn check_pull_privs(
24 userid: &Userid,
25 store: &str,
26 remote: &str,
27 remote_store: &str,
28 delete: bool,
29 ) -> Result<(), Error> {
30
31 let user_info = CachedUserInfo::new()?;
32
33 user_info.check_privs(userid, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?;
34 user_info.check_privs(userid, &["remote", remote, remote_store], PRIV_REMOTE_READ, false)?;
35
36 if delete {
37 user_info.check_privs(userid, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?;
38 }
39
40 Ok(())
41 }
42
43 pub async fn get_pull_parameters(
44 store: &str,
45 remote: &str,
46 remote_store: &str,
47 ) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
48
49 let tgt_store = DataStore::lookup_datastore(store)?;
50
51 let (remote_config, _digest) = remote::config()?;
52 let remote: remote::Remote = remote_config.lookup("remote", remote)?;
53
54 let options = HttpClientOptions::new()
55 .password(Some(remote.password.clone()))
56 .fingerprint(remote.fingerprint.clone());
57
58 let client = HttpClient::new(&remote.host, &remote.userid, options)?;
59 let _auth_info = client.login() // make sure we can auth
60 .await
61 .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
62
63 let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store.to_string());
64
65 Ok((client, src_repo, tgt_store))
66 }
67
68 pub fn do_sync_job(
69 id: &str,
70 sync_job: SyncJobConfig,
71 userid: &Userid,
72 schedule: Option<String>,
73 mut job: Job,
74 ) -> Result<String, Error> {
75
76 let job_id = id.to_string();
77 let worker_type = "syncjob";
78
79 let upid_str = WorkerTask::spawn(
80 worker_type,
81 Some(id.to_string()),
82 userid.clone(),
83 false,
84 move |worker| async move {
85
86 job.start(&worker.upid().to_string())?;
87
88 let worker2 = worker.clone();
89
90 let worker_future = async move {
91
92 let delete = sync_job.remove_vanished.unwrap_or(true);
93 let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
94
95 worker.log(format!("Starting datastore sync job '{}'", job_id));
96 if let Some(event_str) = schedule {
97 worker.log(format!("task triggered by schedule '{}'", event_str));
98 }
99 worker.log(format!("Sync datastore '{}' from '{}/{}'",
100 sync_job.store, sync_job.remote, sync_job.remote_store));
101
102 crate::client::pull::pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, Userid::backup_userid().clone()).await?;
103
104 worker.log(format!("sync job '{}' end", &job_id));
105
106 Ok(())
107 };
108
109 let mut abort_future = worker2.abort_future().map(|_| Err(format_err!("sync aborted")));
110
111 let res = select!{
112 worker = worker_future.fuse() => worker,
113 abort = abort_future => abort,
114 };
115
116 let status = worker2.create_state(&res);
117
118 match job.finish(status) {
119 Ok(_) => {},
120 Err(err) => {
121 eprintln!("could not finish job state: {}", err);
122 }
123 }
124
125 res
126 })?;
127
128 Ok(upid_str)
129 }
130
131 #[api(
132 input: {
133 properties: {
134 store: {
135 schema: DATASTORE_SCHEMA,
136 },
137 remote: {
138 schema: REMOTE_ID_SCHEMA,
139 },
140 "remote-store": {
141 schema: DATASTORE_SCHEMA,
142 },
143 "remove-vanished": {
144 schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
145 optional: true,
146 },
147 },
148 },
149 access: {
150 // Note: used parameters are no uri parameters, so we need to test inside function body
151 description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
152 and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
153 The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
154 "###,
155 permission: &Permission::Anybody,
156 },
157 )]
158 /// Sync store from other repository
159 async fn pull (
160 store: String,
161 remote: String,
162 remote_store: String,
163 remove_vanished: Option<bool>,
164 _info: &ApiMethod,
165 rpcenv: &mut dyn RpcEnvironment,
166 ) -> Result<String, Error> {
167
168 let userid: Userid = rpcenv.get_user().unwrap().parse()?;
169 let delete = remove_vanished.unwrap_or(true);
170
171 check_pull_privs(&userid, &store, &remote, &remote_store, delete)?;
172
173 let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
174
175 // fixme: set to_stdout to false?
176 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), userid.clone(), true, move |worker| async move {
177
178 worker.log(format!("sync datastore '{}' start", store));
179
180 pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, userid).await?;
181
182 worker.log(format!("sync datastore '{}' end", store));
183
184 Ok(())
185 })?;
186
187 Ok(upid_str)
188 }
189
190 pub const ROUTER: Router = Router::new()
191 .post(&API_METHOD_PULL);