]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/admin/datastore/catar.rs
src/api2/admin/datastore/catar.rs: allow to configure chunk-size
[proxmox-backup.git] / src / api2 / admin / datastore / catar.rs
CommitLineData
1629d2ad
DM
1use failure::*;
2
0fe5d605 3use crate::tools;
0b05fd58 4use crate::tools::wrapped_reader_stream::*;
e5064ba6 5use crate::backup::*;
2085142e 6//use crate::server::rest::*;
ef2f2efb 7use crate::api_schema::*;
dc9a007b 8use crate::api_schema::router::*;
1629d2ad 9
7ca80246
DM
10use chrono::{Utc, TimeZone};
11
2085142e
DM
12use serde_json::Value;
13use std::io::Write;
1629d2ad 14use futures::*;
cf16af2a 15use std::path::PathBuf;
ff3d3100 16use std::sync::Arc;
1629d2ad 17
83bdac1e
DM
18use hyper::Body;
19use hyper::http::request::Parts;
20
1629d2ad 21pub struct UploadCaTar {
83bdac1e 22 stream: Body,
93d5d779 23 index: DynamicIndexWriter,
1629d2ad
DM
24 count: usize,
25}
26
27impl Future for UploadCaTar {
28 type Item = ();
29 type Error = failure::Error;
30
31 fn poll(&mut self) -> Poll<(), failure::Error> {
32 loop {
1629d2ad
DM
33 match try_ready!(self.stream.poll()) {
34 Some(chunk) => {
35 self.count += chunk.len();
1c7a88ae 36 if let Err(err) = self.index.write_all(&chunk) {
1629d2ad
DM
37 bail!("writing chunk failed - {}", err);
38 }
1629d2ad
DM
39 }
40 None => {
2085142e 41 self.index.close()?;
1629d2ad
DM
42 return Ok(Async::Ready(()))
43 }
44 }
45 }
46 }
47}
48
e82dad97
DM
49fn upload_catar(
50 parts: Parts,
51 req_body: Body,
52 param: Value,
53 _info: &ApiAsyncMethod,
54 _rpcenv: &mut RpcEnvironment,
55) -> Result<BoxFut, Error> {
1629d2ad 56
5a778d92 57 let store = tools::required_string_param(&param, "store")?;
541a3022
DM
58 let mut archive_name = String::from(tools::required_string_param(&param, "archive_name")?);
59
60 if !archive_name.ends_with(".catar") {
61 bail!("got wront file extension (expected '.catar')");
62 }
63
64 archive_name.push_str(".didx");
1629d2ad 65
ff3d3100
DM
66 let backup_type = tools::required_string_param(&param, "type")?;
67 let backup_id = tools::required_string_param(&param, "id")?;
68 let backup_time = tools::required_integer_param(&param, "time")?;
69
f02e6fc4 70 println!("Upload {}/{}/{}/{}/{}", store, backup_type, backup_id, backup_time, archive_name);
1629d2ad 71
83bdac1e
DM
72 let content_type = parts.headers.get(http::header::CONTENT_TYPE)
73 .ok_or(format_err!("missing content-type header"))?;
74
75 if content_type != "application/x-proxmox-backup-catar" {
76 bail!("got wrong content-type for catar archive upload");
77 }
78
247cdbce
DM
79 let chunk_size = param["chunk-size"].as_u64().unwrap_or(4096*1024);
80 verify_chunk_size(chunk_size)?;
0ee0ad5b
DM
81
82 let datastore = DataStore::lookup_datastore(store)?;
1629d2ad 83
ff3d3100
DM
84 let mut path = datastore.create_backup_dir(backup_type, backup_id, backup_time)?;
85
541a3022 86 path.push(archive_name);
ff3d3100 87
247cdbce 88 let index = datastore.create_dynamic_writer(path, chunk_size as usize)?;
1629d2ad
DM
89
90 let upload = UploadCaTar { stream: req_body, index, count: 0};
91
2085142e 92 let resp = upload.and_then(|_| {
1629d2ad
DM
93
94 let response = http::Response::builder()
95 .status(200)
96 .body(hyper::Body::empty())
97 .unwrap();
98
99 Ok(response)
100 });
101
0ee0ad5b 102 Ok(Box::new(resp))
1629d2ad
DM
103}
104
50cfb695
DM
105pub fn api_method_upload_catar() -> ApiAsyncMethod {
106 ApiAsyncMethod::new(
1629d2ad
DM
107 upload_catar,
108 ObjectSchema::new("Upload .catar backup file.")
5a778d92 109 .required("store", StringSchema::new("Datastore name."))
cf16af2a 110 .required("archive_name", StringSchema::new("Backup archive name."))
ff3d3100
DM
111 .required("type", StringSchema::new("Backup type.")
112 .format(Arc::new(ApiStringFormat::Enum(vec!["ct".into(), "host".into()]))))
113 .required("id", StringSchema::new("Backup ID."))
114 .required("time", IntegerSchema::new("Backup time (Unix epoch.)")
115 .minimum(1547797308))
247cdbce
DM
116 .optional(
117 "chunk-size",
118 IntegerSchema::new("Chunk size in bytes. Must be a power of 2.")
119 .minimum(64*1024)
120 .maximum(4096*1024)
121 .default(4096*1024)
122 )
1629d2ad
DM
123 )
124}
50cfb695 125
e82dad97
DM
126fn download_catar(
127 _parts: Parts,
128 _req_body: Body,
129 param: Value,
130 _info: &ApiAsyncMethod,
131 _rpcenv: &mut RpcEnvironment,
132) -> Result<BoxFut, Error> {
50cfb695 133
6a4c0916
DM
134 let store = tools::required_string_param(&param, "store")?;
135 let archive_name = tools::required_string_param(&param, "archive_name")?;
136
137 let backup_type = tools::required_string_param(&param, "type")?;
138 let backup_id = tools::required_string_param(&param, "id")?;
139 let backup_time = tools::required_integer_param(&param, "time")?;
7ca80246 140 let backup_time = Utc.timestamp(backup_time, 0);
6a4c0916 141
93d5d779 142 println!("Download {}.catar from {} ({}/{}/{}/{}.didx)", archive_name, store,
6a4c0916
DM
143 backup_type, backup_id, backup_time, archive_name);
144
145 let datastore = DataStore::lookup_datastore(store)?;
146
147 let mut path = datastore.get_backup_dir(backup_type, backup_id, backup_time);
148
149 let mut full_archive_name = PathBuf::from(archive_name);
93d5d779 150 full_archive_name.set_extension("didx");
6a4c0916
DM
151
152 path.push(full_archive_name);
153
93d5d779
DM
154 let index = datastore.open_dynamic_reader(path)?;
155 let reader = BufferedDynamicReader::new(index);
0b05fd58 156 let stream = WrappedReaderStream::new(reader);
6a4c0916 157
0b05fd58
DM
158 // fixme: set size, content type?
159 let response = http::Response::builder()
160 .status(200)
161 .body(Body::wrap_stream(stream))?;
162
163 Ok(Box::new(future::ok(response)))
50cfb695
DM
164}
165
166pub fn api_method_download_catar() -> ApiAsyncMethod {
167 ApiAsyncMethod::new(
168 download_catar,
169 ObjectSchema::new("Download .catar backup file.")
170 .required("store", StringSchema::new("Datastore name."))
171 .required("archive_name", StringSchema::new("Backup archive name."))
172 .required("type", StringSchema::new("Backup type.")
173 .format(Arc::new(ApiStringFormat::Enum(vec!["ct".into(), "host".into()]))))
174 .required("id", StringSchema::new("Backup ID."))
175 .required("time", IntegerSchema::new("Backup time (Unix epoch.)")
176 .minimum(1547797308))
177
178 )
179}