]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/admin/datastore/pxar.rs
88af9bdcbf285ce453207ec9b59a96d0a1253f5c
[proxmox-backup.git] / src / api2 / admin / datastore / pxar.rs
1 use failure::*;
2
3 use crate::tools;
4 use crate::tools::wrapped_reader_stream::*;
5 use crate::backup::*;
6 use crate::server;
7 use crate::api_schema::*;
8 use crate::api_schema::router::*;
9
10 use chrono::{Local, TimeZone};
11
12 use serde_json::Value;
13 use std::io::Write;
14 use futures::*;
15 //use std::path::PathBuf;
16 use std::sync::Arc;
17
18 use hyper::Body;
19 use hyper::http::request::Parts;
20
21 pub struct UploadPxar {
22 stream: Body,
23 index: DynamicChunkWriter,
24 count: usize,
25 }
26
27 impl Future for UploadPxar {
28 type Item = ();
29 type Error = failure::Error;
30
31 fn poll(&mut self) -> Poll<(), failure::Error> {
32 loop {
33 match try_ready!(self.stream.poll()) {
34 Some(chunk) => {
35 self.count += chunk.len();
36 if let Err(err) = self.index.write_all(&chunk) {
37 bail!("writing chunk failed - {}", err);
38 }
39 }
40 None => {
41 self.index.close()?;
42 return Ok(Async::Ready(()))
43 }
44 }
45 }
46 }
47 }
48
49 fn upload_pxar(
50 parts: Parts,
51 req_body: Body,
52 param: Value,
53 _info: &ApiAsyncMethod,
54 rpcenv: Box<RpcEnvironment>,
55 ) -> Result<BoxFut, Error> {
56
57 let store = tools::required_string_param(&param, "store")?;
58 let mut archive_name = String::from(tools::required_string_param(&param, "archive-name")?);
59
60 if !archive_name.ends_with(".pxar") {
61 bail!("got wront file extension (expected '.pxar')");
62 }
63
64 archive_name.push_str(".didx");
65
66 let backup_type = tools::required_string_param(&param, "backup-type")?;
67 let backup_id = tools::required_string_param(&param, "backup-id")?;
68 let backup_time = tools::required_integer_param(&param, "backup-time")?;
69
70 let worker_id = format!("{}_{}_{}_{}_{}", store, backup_type, backup_id, backup_time, archive_name);
71
72 println!("Upload {}", worker_id);
73
74 let content_type = parts.headers.get(http::header::CONTENT_TYPE)
75 .ok_or(format_err!("missing content-type header"))?;
76
77 if content_type != "application/x-proxmox-backup-pxar" {
78 bail!("got wrong content-type for pxar archive upload");
79 }
80
81 let chunk_size = param["chunk-size"].as_u64().unwrap_or(4096*1024) as usize;
82 verify_chunk_size(chunk_size)?;
83
84 let datastore = DataStore::lookup_datastore(store)?;
85 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
86
87 let (mut path, _new) = datastore.create_backup_dir(&backup_dir)?;
88
89 path.push(archive_name);
90
91 let index = datastore.create_dynamic_writer(path)?;
92 let index = DynamicChunkWriter::new(index, chunk_size as usize);
93
94 let upload = UploadPxar { stream: req_body, index, count: 0};
95
96 let worker = server::WorkerTask::new("upload", Some(worker_id), &rpcenv.get_user().unwrap(), false)?;
97 let worker1 = worker.clone();
98 let abort_future = worker.abort_future();
99
100 let resp = upload
101 .select(abort_future.map_err(|_| {})
102 .then(move |_| {
103 worker1.log("aborting task...");
104 bail!("task aborted");
105 })
106 )
107 .then(move |result| {
108 match result {
109 Ok((result,_)) => worker.log_result(Ok(result)),
110 Err((err, _)) => worker.log_result(Err(err)),
111 }
112 Ok(())
113 })
114 .and_then(|_| {
115
116 let response = http::Response::builder()
117 .status(200)
118 .body(hyper::Body::empty())
119 .unwrap();
120
121 Ok(response)
122 });
123
124 Ok(Box::new(resp))
125 }
126
127 pub fn api_method_upload_pxar() -> ApiAsyncMethod {
128 ApiAsyncMethod::new(
129 upload_pxar,
130 ObjectSchema::new("Upload .pxar backup file.")
131 .required("store", StringSchema::new("Datastore name."))
132 .required("archive-name", StringSchema::new("Backup archive name."))
133 .required("backup-type", StringSchema::new("Backup type.")
134 .format(Arc::new(ApiStringFormat::Enum(&["ct", "host"]))))
135 .required("backup-id", StringSchema::new("Backup ID."))
136 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
137 .minimum(1547797308))
138 .optional(
139 "chunk-size",
140 IntegerSchema::new("Chunk size in bytes. Must be a power of 2.")
141 .minimum(64*1024)
142 .maximum(4096*1024)
143 .default(4096*1024)
144 )
145 )
146 }
147
148 fn download_pxar(
149 _parts: Parts,
150 _req_body: Body,
151 param: Value,
152 _info: &ApiAsyncMethod,
153 _rpcenv: Box<RpcEnvironment>,
154 ) -> Result<BoxFut, Error> {
155
156 let store = tools::required_string_param(&param, "store")?;
157 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
158
159 if !archive_name.ends_with(".pxar") {
160 bail!("wrong archive extension");
161 } else {
162 archive_name.push_str(".didx");
163 }
164
165 let backup_type = tools::required_string_param(&param, "backup-type")?;
166 let backup_id = tools::required_string_param(&param, "backup-id")?;
167 let backup_time = tools::required_integer_param(&param, "backup-time")?;
168
169 println!("Download {} from {} ({}/{}/{}/{})", archive_name, store,
170 backup_type, backup_id, Local.timestamp(backup_time, 0), archive_name);
171
172 let datastore = DataStore::lookup_datastore(store)?;
173
174 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
175
176 let mut path = backup_dir.relative_path();
177 path.push(archive_name);
178
179 let index = datastore.open_dynamic_reader(path)?;
180 let reader = BufferedDynamicReader::new(index);
181 let stream = WrappedReaderStream::new(reader);
182
183 // fixme: set size, content type?
184 let response = http::Response::builder()
185 .status(200)
186 .body(Body::wrap_stream(stream))?;
187
188 Ok(Box::new(future::ok(response)))
189 }
190
191 pub fn api_method_download_pxar() -> ApiAsyncMethod {
192 ApiAsyncMethod::new(
193 download_pxar,
194 ObjectSchema::new("Download .pxar backup file.")
195 .required("store", StringSchema::new("Datastore name."))
196 .required("archive-name", StringSchema::new("Backup archive name."))
197 .required("backup-type", StringSchema::new("Backup type.")
198 .format(Arc::new(ApiStringFormat::Enum(&["ct", "host"]))))
199 .required("backup-id", StringSchema::new("Backup ID."))
200 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
201 .minimum(1547797308))
202
203 )
204 }