4 use crate::tools
::wrapped_reader_stream
::*;
7 use crate::api_schema
::*;
8 use crate::api_schema
::router
::*;
10 use chrono
::{Local, TimeZone}
;
12 use serde_json
::Value
;
15 //use std::path::PathBuf;
19 use hyper
::http
::request
::Parts
;
21 pub struct UploadPxar
{
23 index
: DynamicChunkWriter
,
27 impl Future
for UploadPxar
{
29 type Error
= failure
::Error
;
31 fn poll(&mut self) -> Poll
<(), failure
::Error
> {
33 match try_ready
!(self.stream
.poll()) {
35 self.count
+= chunk
.len();
36 if let Err(err
) = self.index
.write_all(&chunk
) {
37 bail
!("writing chunk failed - {}", err
);
42 return Ok(Async
::Ready(()))
53 _info
: &ApiAsyncMethod
,
54 rpcenv
: Box
<RpcEnvironment
>,
55 ) -> Result
<BoxFut
, Error
> {
57 let store
= tools
::required_string_param(¶m
, "store")?
;
58 let mut archive_name
= String
::from(tools
::required_string_param(¶m
, "archive-name")?
);
60 if !archive_name
.ends_with(".pxar") {
61 bail
!("got wront file extension (expected '.pxar')");
64 archive_name
.push_str(".didx");
66 let backup_type
= tools
::required_string_param(¶m
, "backup-type")?
;
67 let backup_id
= tools
::required_string_param(¶m
, "backup-id")?
;
68 let backup_time
= tools
::required_integer_param(¶m
, "backup-time")?
;
70 let worker_id
= format
!("{}_{}_{}_{}_{}", store
, backup_type
, backup_id
, backup_time
, archive_name
);
72 println
!("Upload {}", worker_id
);
74 let content_type
= parts
.headers
.get(http
::header
::CONTENT_TYPE
)
75 .ok_or(format_err
!("missing content-type header"))?
;
77 if content_type
!= "application/x-proxmox-backup-pxar" {
78 bail
!("got wrong content-type for pxar archive upload");
81 let chunk_size
= param
["chunk-size"].as_u64().unwrap_or(4096*1024) as usize;
82 verify_chunk_size(chunk_size
)?
;
84 let datastore
= DataStore
::lookup_datastore(store
)?
;
85 let backup_dir
= BackupDir
::new(backup_type
, backup_id
, backup_time
);
87 let (mut path
, _new
) = datastore
.create_backup_dir(&backup_dir
)?
;
89 path
.push(archive_name
);
91 let index
= datastore
.create_dynamic_writer(path
)?
;
92 let index
= DynamicChunkWriter
::new(index
, chunk_size
as usize);
94 let upload
= UploadPxar { stream: req_body, index, count: 0}
;
96 let worker
= server
::WorkerTask
::new("upload", Some(worker_id
), &rpcenv
.get_user().unwrap(), false)?
;
97 let worker1
= worker
.clone();
98 let abort_future
= worker
.abort_future();
101 .select(abort_future
.map_err(|_
| {}
)
103 worker1
.log("aborting task...");
104 bail
!("task aborted");
107 .then(move |result
| {
109 Ok((result
,_
)) => worker
.log_result(Ok(result
)),
110 Err((err
, _
)) => worker
.log_result(Err(err
)),
116 let response
= http
::Response
::builder()
118 .body(hyper
::Body
::empty())
127 pub fn api_method_upload_pxar() -> ApiAsyncMethod
{
130 ObjectSchema
::new("Upload .pxar backup file.")
131 .required("store", StringSchema
::new("Datastore name."))
132 .required("archive-name", StringSchema
::new("Backup archive name."))
133 .required("backup-type", StringSchema
::new("Backup type.")
134 .format(Arc
::new(ApiStringFormat
::Enum(&["ct", "host"]))))
135 .required("backup-id", StringSchema
::new("Backup ID."))
136 .required("backup-time", IntegerSchema
::new("Backup time (Unix epoch.)")
137 .minimum(1547797308))
140 IntegerSchema
::new("Chunk size in bytes. Must be a power of 2.")
152 _info
: &ApiAsyncMethod
,
153 _rpcenv
: Box
<RpcEnvironment
>,
154 ) -> Result
<BoxFut
, Error
> {
156 let store
= tools
::required_string_param(¶m
, "store")?
;
157 let mut archive_name
= tools
::required_string_param(¶m
, "archive-name")?
.to_owned();
159 if !archive_name
.ends_with(".pxar") {
160 bail
!("wrong archive extension");
162 archive_name
.push_str(".didx");
165 let backup_type
= tools
::required_string_param(¶m
, "backup-type")?
;
166 let backup_id
= tools
::required_string_param(¶m
, "backup-id")?
;
167 let backup_time
= tools
::required_integer_param(¶m
, "backup-time")?
;
169 println
!("Download {} from {} ({}/{}/{}/{})", archive_name
, store
,
170 backup_type
, backup_id
, Local
.timestamp(backup_time
, 0), archive_name
);
172 let datastore
= DataStore
::lookup_datastore(store
)?
;
174 let backup_dir
= BackupDir
::new(backup_type
, backup_id
, backup_time
);
176 let mut path
= backup_dir
.relative_path();
177 path
.push(archive_name
);
179 let index
= datastore
.open_dynamic_reader(path
)?
;
180 let reader
= BufferedDynamicReader
::new(index
);
181 let stream
= WrappedReaderStream
::new(reader
);
183 // fixme: set size, content type?
184 let response
= http
::Response
::builder()
186 .body(Body
::wrap_stream(stream
))?
;
188 Ok(Box
::new(future
::ok(response
)))
191 pub fn api_method_download_pxar() -> ApiAsyncMethod
{
194 ObjectSchema
::new("Download .pxar backup file.")
195 .required("store", StringSchema
::new("Datastore name."))
196 .required("archive-name", StringSchema
::new("Backup archive name."))
197 .required("backup-type", StringSchema
::new("Backup type.")
198 .format(Arc
::new(ApiStringFormat
::Enum(&["ct", "host"]))))
199 .required("backup-id", StringSchema
::new("Backup ID."))
200 .required("backup-time", IntegerSchema
::new("Backup time (Unix epoch.)")
201 .minimum(1547797308))