let status = resp.status();
if status != http::StatusCode::SWITCHING_PROTOCOLS {
- bail!("got status {:?} instead of protocol switch", status);
+ future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); }))
+ } else {
+ future::Either::B(resp.into_body().on_upgrade().map_err(Error::from))
}
-
- Ok(resp.into_body().on_upgrade().map_err(Error::from))
})
- .flatten()
.and_then(|upgraded| {
h2::client::handshake(upgraded).map_err(Error::from)
})
//#[derive(Clone)]
pub struct BackupClient {
- h2: h2::client::SendRequest<bytes::Bytes>,
+ h2: H2Client,
}
impl BackupClient {
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
- Self { h2 }
+ Self { h2: H2Client::new(h2) }
}
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
- let req = Self::request_builder("localhost", "GET", path, param).unwrap();
- Self::request(self.h2.clone(), req)
+ self.h2.get(path, param)
}
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
- let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
- Self::request(self.h2.clone(), req)
+ self.h2.put(path, param)
}
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
- Self::h2post(self.h2.clone(), path, param)
- }
-
- fn h2post(
- h2: h2::client::SendRequest<bytes::Bytes>,
- path: &str,
- param: Option<Value>
- ) -> impl Future<Item=Value, Error=Error> {
- let req = Self::request_builder("localhost", "POST", path, param).unwrap();
- Self::request(h2, req)
- }
-
- pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
- let request = Self::request_builder("localhost", "POST", path, param).unwrap();
-
- self.h2.clone()
- .ready()
- .map_err(Error::from)
- .and_then(move |mut send_request| {
- let (response, stream) = send_request.send_request(request, false).unwrap();
- PipeToSendStream::new(bytes::Bytes::from(data), stream)
- .and_then(|_| {
- response
- .map_err(Error::from)
- .and_then(Self::h2api_response)
- })
- })
+ self.h2.post(path, param)
}
fn response_queue() -> (
.for_each(|response: h2::client::ResponseFuture| {
response
.map_err(Error::from)
- .and_then(Self::h2api_response)
+ .and_then(H2Client::h2api_response)
.and_then(|result| {
println!("RESPONSE: {:?}", result);
Ok(())
}
fn download_chunk_list(
- h2: h2::client::SendRequest<bytes::Bytes>,
+ h2: H2Client,
path: &str,
archive_name: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> impl Future<Item=(), Error=Error> {
let param = json!({ "archive-name": archive_name });
- let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap();
+ let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
- Self::send_request(h2.clone(), request, None)
+ h2.send_request(request, None)
.and_then(move |response| {
response
.map_err(Error::from)
}
pub fn finish(&self) -> impl Future<Item=(), Error=Error> {
- Self::h2post(self.h2.clone(), "finish", None)
- .map(|_| ())
+ self.h2.clone().post("finish", None).map(|_| ())
}
pub fn upload_dynamic_stream(
Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone())
.and_then(move |_| {
- Self::h2post(h2_2, "dynamic_index", Some(param))
+ h2_2.post("dynamic_index", Some(param))
})
.and_then(move |res| {
- println!("GOT1 {:?}", res);
let wid = res.as_u64().unwrap();
- //let dir_path = PathBuf::from("../casync-pve");
- //let dir_path = PathBuf::from(".");
-
- //upload_pxar(h2, known_chunks, &dir_path, wid).unwrap()
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
.and_then(move |_size| {
- Self::h2post(h2_4, "dynamic_close", Some(json!({ "wid": wid })))
+ h2_4.post("dynamic_close", Some(json!({ "wid": wid })))
})
.map(|_| ())
})
}
fn upload_stream(
- h2: h2::client::SendRequest<bytes::Bytes>,
+ h2: H2Client,
wid: u64,
stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
let start_time = std::time::Instant::now();
-
stream
.for_each(move |data| {
let h2 = h2.clone();
if chunk_is_known {
println!("append existing chunk ({} bytes)", data.len());
let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) });
- request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
+ request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
upload_data = None;
} else {
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len());
known_chunks.insert(digest);
let param = json!({ "wid": wid, "size" : data.len() });
- request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
+ request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
upload_data = Some(bytes::Bytes::from(data));
}
- Self::send_request(h2, request, upload_data)
+ h2.send_request(request, upload_data)
.and_then(move |response| {
upload_queue.send(response)
.map(|_| ()).map_err(Error::from)
let upload_queue = upload_queue.clone();
println!("send test data ({} bytes)", data.len());
- let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
- Self::send_request(h2, request, Some(bytes::Bytes::from(data)))
+ let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap();
+ h2.send_request(request, Some(bytes::Bytes::from(data)))
.and_then(move |response| {
upload_queue.send(response)
.map(|_| ()).map_err(Error::from)
Ok(speed)
})
}
+}
+
+#[derive(Clone)]
+pub struct H2Client {
+ h2: h2::client::SendRequest<bytes::Bytes>,
+}
+
+impl H2Client {
+
+ pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
+ Self { h2 }
+ }
+
+ pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+ let req = Self::request_builder("localhost", "GET", path, param).unwrap();
+ self.request(req)
+ }
+
+ pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+ let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
+ self.request(req)
+ }
+
+ pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
+ let req = Self::request_builder("localhost", "POST", path, param).unwrap();
+ self.request(req)
+ }
+
+ pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
+ let request = Self::request_builder("localhost", "POST", path, param).unwrap();
+
+
+ self.h2.clone()
+ .ready()
+ .map_err(Error::from)
+ .and_then(move |mut send_request| {
+ let (response, stream) = send_request.send_request(request, false).unwrap();
+ PipeToSendStream::new(bytes::Bytes::from(data), stream)
+ .and_then(|_| {
+ response
+ .map_err(Error::from)
+ .and_then(Self::h2api_response)
+ })
+ })
+ }
fn request(
- h2: h2::client::SendRequest<bytes::Bytes>,
+ &self,
request: Request<()>,
) -> impl Future<Item=Value, Error=Error> {
- Self::send_request(h2, request, None)
+ self.send_request(request, None)
.and_then(move |response| {
response
.map_err(Error::from)
}
fn send_request(
- h2: h2::client::SendRequest<bytes::Bytes>,
+ &self,
request: Request<()>,
data: Option<bytes::Bytes>,
) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
- h2
+ self.h2.clone()
.ready()
.map_err(Error::from)
.and_then(move |mut send_request| {