]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/client/pull.rs: allow up to 20 concurrent download streams
authorDietmar Maurer <dietmar@proxmox.com>
Tue, 22 Sep 2020 07:52:14 +0000 (09:52 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Tue, 22 Sep 2020 09:39:31 +0000 (11:39 +0200)
src/client/pull.rs

index cac6f15b1b82dbcd333706058e96f1adbc83da11..f671c0031241213892755baa5ccfbe063f0b63ce 100644 (file)
@@ -23,26 +23,40 @@ use crate::{
 
 async fn pull_index_chunks<I: IndexFile>(
     _worker: &WorkerTask,
-    chunk_reader: &mut RemoteChunkReader,
+    chunk_reader: RemoteChunkReader,
     target: Arc<DataStore>,
     index: I,
 ) -> Result<(), Error> {
 
+    use futures::stream::{self, StreamExt, TryStreamExt};
 
-    for pos in 0..index.index_count() {
-        let info = index.chunk_info(pos).unwrap();
-        let chunk_exists = target.cond_touch_chunk(&info.digest, false)?;
-        if chunk_exists {
-            //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
-            continue;
-        }
-        //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
-        let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+    let stream = stream::iter((0..index.index_count()).map(|pos| index.chunk_info(pos).unwrap()));
 
-        chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
+    stream
+        .map(|info| {
 
-        target.insert_chunk(&chunk, &info.digest)?;
-    }
+            let target = Arc::clone(&target);
+            let chunk_reader = chunk_reader.clone();
+
+            Ok::<_, Error>(async move {
+                let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
+                if chunk_exists {
+                    //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
+                    return Ok::<_, Error>(());
+                }
+                //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
+                let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+
+                crate::tools::runtime::block_in_place(|| {
+                    chunk.verify_unencrypted(info.size() as usize, &info.digest)?;
+                    target.insert_chunk(&chunk, &info.digest)?;
+                    Ok(())
+                })
+            })
+        })
+        .try_buffer_unordered(20)
+        .try_for_each(|_res| futures::future::ok(()))
+        .await?;
 
     Ok(())
 }
@@ -115,7 +129,7 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
+            pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?;
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile)
@@ -123,7 +137,7 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
+            pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index).await?;
         }
         ArchiveType::Blob => {
             let (csum, size) = compute_file_csum(&mut tmpfile)?;