async fn pull_index_chunks<I: IndexFile>(
_worker: &WorkerTask,
- chunk_reader: &mut RemoteChunkReader,
+ chunk_reader: RemoteChunkReader,
target: Arc<DataStore>,
index: I,
) -> Result<(), Error> {
+ use futures::stream::{self, StreamExt, TryStreamExt};
- for pos in 0..index.index_count() {
- let info = index.chunk_info(pos).unwrap();
- let chunk_exists = target.cond_touch_chunk(&info.digest, false)?;
- if chunk_exists {
- //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
- continue;
- }
- //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
- let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+ let stream = stream::iter((0..index.index_count()).map(|pos| index.chunk_info(pos).unwrap()));
- chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
+ stream
+ .map(|info| {
- target.insert_chunk(&chunk, &info.digest)?;
- }
+ let target = Arc::clone(&target);
+ let chunk_reader = chunk_reader.clone();
+
+ Ok::<_, Error>(async move {
+ let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
+ if chunk_exists {
+ //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
+ return Ok::<_, Error>(());
+ }
+ //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
+ let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+
+ crate::tools::runtime::block_in_place(|| {
+ chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
+ target.insert_chunk(&chunk, &info.digest)?;
+ Ok(())
+ })
+ })
+ })
+ .try_buffer_unordered(20)
+ .try_for_each(|_res| futures::future::ok(()))
+ .await?;
Ok(())
}
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
+ pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?;
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile)
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
+ pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?;
}
ArchiveType::Blob => {
let (csum, size) = compute_file_csum(&mut tmpfile)?;