})
}
- /// Login future
+ /// Login
///
/// Login is done on demand, so this is onyl required if you need
/// access to authentication data in 'AuthInfo'.
.build::<_, Body>(https)
}
- pub fn request(&self, mut req: Request<Body>) -> impl Future<Output = Result<Value, Error>> {
-
- let login = self.auth.listen();
+ pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
let client = self.client.clone();
- login.and_then(move |auth| {
-
- let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
- req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
- req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
+ let auth = self.login().await?;
- let request = Self::api_request(client, req);
+ let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+ req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+ req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
- request
- })
+ Self::api_request(client, req).await
}
- pub fn get(
+ pub async fn get(
&self,
path: &str,
data: Option<Value>,
- ) -> impl Future<Output = Result<Value, Error>> {
+ ) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
- self.request(req)
+ self.request(req).await
}
- pub fn delete(
+ pub async fn delete(
&mut self,
path: &str,
data: Option<Value>,
- ) -> impl Future<Output = Result<Value, Error>> {
+ ) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
- self.request(req)
+ self.request(req).await
}
- pub fn post(
+ pub async fn post(
&mut self,
path: &str,
data: Option<Value>,
- ) -> impl Future<Output = Result<Value, Error>> {
+ ) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
- self.request(req)
+ self.request(req).await
}
- pub fn download<W: Write + Send + 'static>(
+ pub async fn download(
&mut self,
path: &str,
- output: W,
- ) -> impl Future<Output = Result<W, Error>> {
+ output: &mut (dyn Write + Send),
+ ) -> Result<(), Error> {
let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
- let login = self.auth.listen();
-
let client = self.client.clone();
- login.and_then(move |auth| {
+ let auth = self.login().await?;
- let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
- req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+ let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+ req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
- client.request(req)
+ let resp = client.request(req).await?;
+ let status = resp.status();
+ if !status.is_success() {
+ HttpClient::api_response(resp)
+ .map(|_| Err(format_err!("unknown error")))
+ .await?
+ } else {
+ resp.into_body()
.map_err(Error::from)
- .and_then(|resp| {
- let status = resp.status();
- if !status.is_success() {
- future::Either::Left(
- HttpClient::api_response(resp)
- .map(|_| Err(format_err!("unknown error")))
- )
- } else {
- future::Either::Right(
- resp.into_body()
- .map_err(Error::from)
- .try_fold(output, move |mut acc, chunk| async move {
- acc.write_all(&chunk)?;
- Ok::<_, Error>(acc)
- })
- )
- }
+ .try_fold(output, move |acc, chunk| async move {
+ acc.write_all(&chunk)?;
+ Ok::<_, Error>(acc)
})
- })
+ .await?;
+ }
+ Ok(())
}
- pub fn upload(
+ pub async fn upload(
&mut self,
content_type: &str,
body: Body,
path: &str,
data: Option<Value>,
- ) -> impl Future<Output = Result<Value, Error>> {
+ ) -> Result<Value, Error> {
let path = path.trim_matches('/');
let mut url = format!("https://{}:8007/{}", &self.server, path);
.header("Content-Type", content_type)
.body(body).unwrap();
- self.request(req)
+ self.request(req).await
}
- pub fn start_backup(
+ pub async fn start_backup(
&self,
datastore: &str,
backup_type: &str,
backup_id: &str,
backup_time: DateTime<Utc>,
debug: bool,
- ) -> impl Future<Output = Result<Arc<BackupClient>, Error>> {
+ ) -> Result<Arc<BackupClient>, Error> {
let param = json!({
"backup-type": backup_type,
let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap();
- self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
- .map_ok(|(h2, canceller)| BackupClient::new(h2, canceller))
+ let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
+
+ Ok(BackupClient::new(h2, canceller))
}
- pub fn start_backup_reader(
+ pub async fn start_backup_reader(
&self,
datastore: &str,
backup_type: &str,
backup_id: &str,
backup_time: DateTime<Utc>,
debug: bool,
- ) -> impl Future<Output = Result<Arc<BackupReader>, Error>> {
+ ) -> Result<Arc<BackupReader>, Error> {
let param = json!({
"backup-type": backup_type,
});
let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap();
- self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
- .map_ok(|(h2, canceller)| BackupReader::new(h2, canceller))
+ let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
+
+ Ok(BackupReader::new(h2, canceller))
}
- pub fn start_h2_connection(
+ pub async fn start_h2_connection(
&self,
mut req: Request<Body>,
protocol_name: String,
- ) -> impl Future<Output = Result<(H2Client, Canceller), Error>> {
+ ) -> Result<(H2Client, Canceller), Error> {
- let login = self.auth.listen();
+ let auth = self.login().await?;
let client = self.client.clone();
- login.and_then(move |auth| {
+ let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
+ req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
+ req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
- let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
- req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
- req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
+ let resp = client.request(req).await?;
+ let status = resp.status();
- client.request(req)
- .map_err(Error::from)
- .and_then(|resp| {
-
- let status = resp.status();
- if status != http::StatusCode::SWITCHING_PROTOCOLS {
- future::Either::Left(
- Self::api_response(resp)
- .map(|_| Err(format_err!("unknown error")))
- )
- } else {
- future::Either::Right(
- resp
- .into_body()
- .on_upgrade()
- .map_err(Error::from)
- )
- }
- })
- .and_then(|upgraded| {
- let max_window_size = (1 << 31) - 2;
-
- h2::client::Builder::new()
- .initial_connection_window_size(max_window_size)
- .initial_window_size(max_window_size)
- .max_frame_size(4*1024*1024)
- .handshake(upgraded)
- .map_err(Error::from)
- })
- .and_then(|(h2, connection)| async move {
- let connection = connection
- .map_err(|_| panic!("HTTP/2.0 connection failed"));
-
- let (connection, canceller) = cancellable(connection)?;
- // A cancellable future returns an Option which is None when cancelled and
- // Some when it finished instead, since we don't care about the return type we
- // need to map it away:
- let connection = connection.map(|_| ());
-
- // Spawn a new task to drive the connection state
- hyper::rt::spawn(connection);
-
- // Wait until the `SendRequest` handle has available capacity.
- let c = h2.ready().await?;
- Ok((H2Client::new(c), canceller))
- }.boxed())
- })
+ if status != http::StatusCode::SWITCHING_PROTOCOLS {
+ Self::api_response(resp)
+ .map(|_| Err(format_err!("unknown error")))
+ .await?;
+ unreachable!();
+ }
+
+ let upgraded = resp
+ .into_body()
+ .on_upgrade()
+ .await?;
+
+ let max_window_size = (1 << 31) - 2;
+
+ let (h2, connection) = h2::client::Builder::new()
+ .initial_connection_window_size(max_window_size)
+ .initial_window_size(max_window_size)
+ .max_frame_size(4*1024*1024)
+ .handshake(upgraded)
+ .await?;
+
+ let connection = connection
+ .map_err(|_| panic!("HTTP/2.0 connection failed"));
+
+ let (connection, canceller) = cancellable(connection)?;
+ // A cancellable future returns an Option which is None when cancelled and
+ // Some when it finished instead, since we don't care about the return type we
+ // need to map it away:
+ let connection = connection.map(|_| ());
+
+ // Spawn a new task to drive the connection state
+ hyper::rt::spawn(connection);
+
+ // Wait until the `SendRequest` handle has available capacity.
+ let c = h2.ready().await?;
+ Ok((H2Client::new(c), canceller))
}
async fn credentials(
}
}
- fn api_request(
+ async fn api_request(
client: Client<HttpsConnector>,
req: Request<Body>
- ) -> impl Future<Output = Result<Value, Error>> {
+ ) -> Result<Value, Error> {
client.request(req)
.map_err(Error::from)
.and_then(Self::api_response)
+ .await
}
pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
let csum = guard.as_mut().unwrap();
let chunk_end = offset + chunk_len as u64;
-
+
csum.update(&chunk_end.to_le_bytes());
csum.update(digest);