]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/pull.rs
move jobstate to server
[proxmox-backup.git] / src / api2 / pull.rs
1 //! Sync datastore from remote server
2 use std::sync::{Arc};
3
4 use anyhow::{format_err, Error};
5 use futures::{select, future::FutureExt};
6
7 use proxmox::api::api;
8 use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
9
10 use crate::server::{WorkerTask, jobstate::Job};
11 use crate::backup::DataStore;
12 use crate::client::{HttpClient, HttpClientOptions, BackupRepository, pull::pull_store};
13 use crate::api2::types::*;
14 use crate::config::{
15 remote,
16 sync::SyncJobConfig,
17 acl::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ},
18 cached_user_info::CachedUserInfo,
19 };
20
21
22 pub fn check_pull_privs(
23 userid: &Userid,
24 store: &str,
25 remote: &str,
26 remote_store: &str,
27 delete: bool,
28 ) -> Result<(), Error> {
29
30 let user_info = CachedUserInfo::new()?;
31
32 user_info.check_privs(userid, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?;
33 user_info.check_privs(userid, &["remote", remote, remote_store], PRIV_REMOTE_READ, false)?;
34
35 if delete {
36 user_info.check_privs(userid, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?;
37 }
38
39 Ok(())
40 }
41
42 pub async fn get_pull_parameters(
43 store: &str,
44 remote: &str,
45 remote_store: &str,
46 ) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
47
48 let tgt_store = DataStore::lookup_datastore(store)?;
49
50 let (remote_config, _digest) = remote::config()?;
51 let remote: remote::Remote = remote_config.lookup("remote", remote)?;
52
53 let options = HttpClientOptions::new()
54 .password(Some(remote.password.clone()))
55 .fingerprint(remote.fingerprint.clone());
56
57 let src_repo = BackupRepository::new(Some(remote.userid.clone()), Some(remote.host.clone()), remote.port, remote_store.to_string());
58
59 let client = HttpClient::new(&src_repo.host(), src_repo.port(), &src_repo.user(), options)?;
60 let _auth_info = client.login() // make sure we can auth
61 .await
62 .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
63
64
65 Ok((client, src_repo, tgt_store))
66 }
67
68 pub fn do_sync_job(
69 mut job: Job,
70 sync_job: SyncJobConfig,
71 userid: &Userid,
72 schedule: Option<String>,
73 ) -> Result<String, Error> {
74
75 let job_id = job.jobname().to_string();
76 let worker_type = job.jobtype().to_string();
77
78 let upid_str = WorkerTask::spawn(
79 &worker_type,
80 Some(job.jobname().to_string()),
81 userid.clone(),
82 false,
83 move |worker| async move {
84
85 job.start(&worker.upid().to_string())?;
86
87 let worker2 = worker.clone();
88
89 let worker_future = async move {
90
91 let delete = sync_job.remove_vanished.unwrap_or(true);
92 let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
93
94 worker.log(format!("Starting datastore sync job '{}'", job_id));
95 if let Some(event_str) = schedule {
96 worker.log(format!("task triggered by schedule '{}'", event_str));
97 }
98 worker.log(format!("Sync datastore '{}' from '{}/{}'",
99 sync_job.store, sync_job.remote, sync_job.remote_store));
100
101 crate::client::pull::pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, Userid::backup_userid().clone()).await?;
102
103 worker.log(format!("sync job '{}' end", &job_id));
104
105 Ok(())
106 };
107
108 let mut abort_future = worker2.abort_future().map(|_| Err(format_err!("sync aborted")));
109
110 let res = select!{
111 worker = worker_future.fuse() => worker,
112 abort = abort_future => abort,
113 };
114
115 let status = worker2.create_state(&res);
116
117 match job.finish(status) {
118 Ok(_) => {},
119 Err(err) => {
120 eprintln!("could not finish job state: {}", err);
121 }
122 }
123
124 res
125 })?;
126
127 Ok(upid_str)
128 }
129
130 #[api(
131 input: {
132 properties: {
133 store: {
134 schema: DATASTORE_SCHEMA,
135 },
136 remote: {
137 schema: REMOTE_ID_SCHEMA,
138 },
139 "remote-store": {
140 schema: DATASTORE_SCHEMA,
141 },
142 "remove-vanished": {
143 schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
144 optional: true,
145 },
146 },
147 },
148 access: {
149 // Note: used parameters are no uri parameters, so we need to test inside function body
150 description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
151 and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
152 The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
153 "###,
154 permission: &Permission::Anybody,
155 },
156 )]
157 /// Sync store from other repository
158 async fn pull (
159 store: String,
160 remote: String,
161 remote_store: String,
162 remove_vanished: Option<bool>,
163 _info: &ApiMethod,
164 rpcenv: &mut dyn RpcEnvironment,
165 ) -> Result<String, Error> {
166
167 let userid: Userid = rpcenv.get_user().unwrap().parse()?;
168 let delete = remove_vanished.unwrap_or(true);
169
170 check_pull_privs(&userid, &store, &remote, &remote_store, delete)?;
171
172 let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
173
174 // fixme: set to_stdout to false?
175 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), userid.clone(), true, move |worker| async move {
176
177 worker.log(format!("sync datastore '{}' start", store));
178
179 let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, userid);
180 let future = select!{
181 success = pull_future.fuse() => success,
182 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
183 };
184
185 let _ = future?;
186
187 worker.log(format!("sync datastore '{}' end", store));
188
189 Ok(())
190 })?;
191
192 Ok(upid_str)
193 }
194
195 pub const ROUTER: Router = Router::new()
196 .post(&API_METHOD_PULL);