]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/pull.rs
e631920fb63a6073c2cf47dedce058d66b98f656
[proxmox-backup.git] / src / api2 / pull.rs
1 //! Sync datastore from remote server
2 use std::sync::{Arc};
3
4 use anyhow::{format_err, Error};
5 use futures::{select, future::FutureExt};
6
7 use proxmox::api::api;
8 use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
9
10 use pbs_client::{HttpClient, BackupRepository};
11 use pbs_api_types::{
12 Remote, Authid, SyncJobConfig,
13 DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
14 PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
15 };
16
17 use crate::server::{WorkerTask, jobstate::Job, pull::pull_store};
18 use crate::backup::DataStore;
19 use pbs_config::CachedUserInfo;
20
21 pub fn check_pull_privs(
22 auth_id: &Authid,
23 store: &str,
24 remote: &str,
25 remote_store: &str,
26 delete: bool,
27 ) -> Result<(), Error> {
28
29 let user_info = CachedUserInfo::new()?;
30
31 user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?;
32 user_info.check_privs(auth_id, &["remote", remote, remote_store], PRIV_REMOTE_READ, false)?;
33
34 if delete {
35 user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?;
36 }
37
38 Ok(())
39 }
40
41 pub async fn get_pull_parameters(
42 store: &str,
43 remote: &str,
44 remote_store: &str,
45 ) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
46
47 let tgt_store = DataStore::lookup_datastore(store)?;
48
49 let (remote_config, _digest) = pbs_config::remote::config()?;
50 let remote: Remote = remote_config.lookup("remote", remote)?;
51
52 let src_repo = BackupRepository::new(
53 Some(remote.config.auth_id.clone()),
54 Some(remote.config.host.clone()),
55 remote.config.port,
56 remote_store.to_string(),
57 );
58
59 let client = crate::api2::config::remote::remote_client(remote).await?;
60
61 Ok((client, src_repo, tgt_store))
62 }
63
64 pub fn do_sync_job(
65 mut job: Job,
66 sync_job: SyncJobConfig,
67 auth_id: &Authid,
68 schedule: Option<String>,
69 to_stdout: bool,
70 ) -> Result<String, Error> {
71
72 let job_id = format!("{}:{}:{}:{}",
73 sync_job.remote,
74 sync_job.remote_store,
75 sync_job.store,
76 job.jobname());
77 let worker_type = job.jobtype().to_string();
78
79 let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
80
81 let upid_str = WorkerTask::spawn(
82 &worker_type,
83 Some(job_id.clone()),
84 auth_id.to_string(),
85 to_stdout,
86 move |worker| async move {
87
88 job.start(&worker.upid().to_string())?;
89
90 let worker2 = worker.clone();
91 let sync_job2 = sync_job.clone();
92
93 let worker_future = async move {
94
95 let delete = sync_job.remove_vanished.unwrap_or(true);
96 let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone());
97 let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
98
99 worker.log(format!("Starting datastore sync job '{}'", job_id));
100 if let Some(event_str) = schedule {
101 worker.log(format!("task triggered by schedule '{}'", event_str));
102 }
103 worker.log(format!("Sync datastore '{}' from '{}/{}'",
104 sync_job.store, sync_job.remote, sync_job.remote_store));
105
106 pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?;
107
108 worker.log(format!("sync job '{}' end", &job_id));
109
110 Ok(())
111 };
112
113 let mut abort_future = worker2.abort_future().map(|_| Err(format_err!("sync aborted")));
114
115 let result = select!{
116 worker = worker_future.fuse() => worker,
117 abort = abort_future => abort,
118 };
119
120 let status = worker2.create_state(&result);
121
122 match job.finish(status) {
123 Ok(_) => {},
124 Err(err) => {
125 eprintln!("could not finish job state: {}", err);
126 }
127 }
128
129 if let Some(email) = email {
130 if let Err(err) = crate::server::send_sync_status(&email, notify, &sync_job2, &result) {
131 eprintln!("send sync notification failed: {}", err);
132 }
133 }
134
135 result
136 })?;
137
138 Ok(upid_str)
139 }
140
141 #[api(
142 input: {
143 properties: {
144 store: {
145 schema: DATASTORE_SCHEMA,
146 },
147 remote: {
148 schema: REMOTE_ID_SCHEMA,
149 },
150 "remote-store": {
151 schema: DATASTORE_SCHEMA,
152 },
153 "remove-vanished": {
154 schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
155 optional: true,
156 },
157 },
158 },
159 access: {
160 // Note: used parameters are no uri parameters, so we need to test inside function body
161 description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
162 and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
163 The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
164 "###,
165 permission: &Permission::Anybody,
166 },
167 )]
168 /// Sync store from other repository
169 async fn pull (
170 store: String,
171 remote: String,
172 remote_store: String,
173 remove_vanished: Option<bool>,
174 _info: &ApiMethod,
175 rpcenv: &mut dyn RpcEnvironment,
176 ) -> Result<String, Error> {
177
178 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
179 let delete = remove_vanished.unwrap_or(true);
180
181 check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
182
183 let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
184
185 // fixme: set to_stdout to false?
186 let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
187
188 worker.log(format!("sync datastore '{}' start", store));
189
190 let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id);
191 let future = select!{
192 success = pull_future.fuse() => success,
193 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
194 };
195
196 let _ = future?;
197
198 worker.log(format!("sync datastore '{}' end", store));
199
200 Ok(())
201 })?;
202
203 Ok(upid_str)
204 }
205
206 pub const ROUTER: Router = Router::new()
207 .post(&API_METHOD_PULL);