- archive_name: &str,
- stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
- prefix: &str,
- fixed_size: Option<u64>,
- crypt_config: Option<Arc<CryptConfig>>,
- ) -> impl Future<Item=(), Error=Error> {
-
- let known_chunks = Arc::new(Mutex::new(HashSet::new()));
-
- let h2 = self.h2.clone();
- let h2_2 = self.h2.clone();
- let h2_3 = self.h2.clone();
- let h2_4 = self.h2.clone();
-
- let mut param = json!({ "archive-name": archive_name });
- if let Some(size) = fixed_size {
- param["size"] = size.into();
- }
-
- let index_path = format!("{}_index", prefix);
- let close_path = format!("{}_close", prefix);
-
- let prefix = prefix.to_owned();
-
- Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone())
- .and_then(move |_| {
- h2_2.post(&index_path, Some(param))
- })
- .and_then(move |res| {
- let wid = res.as_u64().unwrap();
- Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config)
- .and_then(move |(chunk_count, size, _speed)| {
- let param = json!({
- "wid": wid ,
- "chunk-count": chunk_count,
- "size": size,
- });
- h2_4.post(&close_path, Some(param))
- })
- .map(|_| ())
- })
- }
-
- fn response_queue() -> (
- mpsc::Sender<h2::client::ResponseFuture>,
- sync::oneshot::Receiver<Result<(), Error>>
- ) {
- let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
- let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
-
- hyper::rt::spawn(
- verify_queue_rx
- .map_err(Error::from)
- .for_each(|response: h2::client::ResponseFuture| {
- response
- .map_err(Error::from)
- .and_then(H2Client::h2api_response)
- .and_then(|result| {
- println!("RESPONSE: {:?}", result);
- Ok(())
- })
- .map_err(|err| format_err!("pipelined request failed: {}", err))
- })
- .then(|result|
- verify_result_tx.send(result)
- )
- .map_err(|_| { /* ignore closed channel */ })
- );
-
- (verify_queue_tx, verify_result_rx)
- }
-
- fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
- mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
- sync::oneshot::Receiver<Result<(), Error>>
- ) {
- let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
- let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
-
- let h2_2 = h2.clone();
-
- hyper::rt::spawn(
- verify_queue_rx
- .map_err(Error::from)
- .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
- match (response, merged_chunk_info) {
- (Some(response), MergedChunkInfo::Known(list)) => {
- future::Either::A(
- response
- .map_err(Error::from)
- .and_then(H2Client::h2api_response)
- .and_then(move |_result| {
- Ok(MergedChunkInfo::Known(list))
- })
- )
- }
- (None, MergedChunkInfo::Known(list)) => {
- future::Either::B(future::ok(MergedChunkInfo::Known(list)))
- }
- _ => unreachable!(),
- }
- })
- .merge_known_chunks()
- .and_then(move |merged_chunk_info| {
- match merged_chunk_info {
- MergedChunkInfo::Known(chunk_list) => {
- let mut digest_list = vec![];
- let mut offset_list = vec![];
- for (offset, digest) in chunk_list {
- //println!("append chunk {} (offset {})", proxmox::tools::digest_to_hex(&digest), offset);
- digest_list.push(proxmox::tools::digest_to_hex(&digest));
- offset_list.push(offset);
- }
- println!("append chunks list len ({})", digest_list.len());
- let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
- let mut request = H2Client::request_builder("localhost", "PUT", &path, None).unwrap();
- request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json"));
- let param_data = bytes::Bytes::from(param.to_string().as_bytes());
- let upload_data = Some(param_data);
- h2_2.send_request(request, upload_data)
- .and_then(move |response| {
- response
- .map_err(Error::from)
- .and_then(H2Client::h2api_response)
- .and_then(|_| Ok(()))
- })
- .map_err(|err| format_err!("pipelined request failed: {}", err))
- }
- _ => unreachable!(),
- }
- })
- .for_each(|_| Ok(()))
- .then(|result|
- verify_result_tx.send(result)
- )
- .map_err(|_| { /* ignore closed channel */ })
- );
-
- (verify_queue_tx, verify_result_rx)
- }
-
- fn download_chunk_list(
- h2: H2Client,