]> git.proxmox.com Git - proxmox-backup.git/blob - src/tape/pool_writer/new_chunks_iterator.rs
tree-wide: fix needless borrows
[proxmox-backup.git] / src / tape / pool_writer / new_chunks_iterator.rs
1 use std::collections::HashSet;
2 use std::sync::{Arc, Mutex};
3
4 use anyhow::{format_err, Error};
5
6 use pbs_datastore::{DataStore, DataBlob, SnapshotReader};
7
8 use crate::tape::CatalogSet;
9
10 /// Chunk iterator which use a separate thread to read chunks
11 ///
12 /// The iterator skips duplicate chunks and chunks already in the
13 /// catalog.
14 pub struct NewChunksIterator {
15 rx: std::sync::mpsc::Receiver<Result<Option<([u8; 32], DataBlob)>, Error>>,
16 }
17
18 impl NewChunksIterator {
19
20 /// Creates the iterator, spawning a new thread
21 ///
22 /// Make sure to join() the returnd thread handle.
23 pub fn spawn(
24 datastore: Arc<DataStore>,
25 snapshot_reader: Arc<Mutex<SnapshotReader>>,
26 catalog_set: Arc<Mutex<CatalogSet>>,
27 ) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
28
29 let (tx, rx) = std::sync::mpsc::sync_channel(3);
30
31 let reader_thread = std::thread::spawn(move || {
32
33 let snapshot_reader = snapshot_reader.lock().unwrap();
34
35 let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
36
37 let datastore_name = snapshot_reader.datastore_name();
38
39 let result: Result<(), Error> = proxmox_lang::try_block!({
40
41 let mut chunk_iter = snapshot_reader.chunk_iterator()?;
42
43 loop {
44 let digest = match chunk_iter.next() {
45 None => {
46 let _ = tx.send(Ok(None)); // ignore send error
47 break;
48 }
49 Some(digest) => digest?,
50 };
51
52 if chunk_index.contains(&digest) {
53 continue;
54 }
55
56 if catalog_set.lock().unwrap().contains_chunk(datastore_name, &digest) {
57 continue;
58 };
59
60 let blob = datastore.load_chunk(&digest)?;
61 //println!("LOAD CHUNK {}", hex::encode(&digest));
62 match tx.send(Ok(Some((digest, blob)))) {
63 Ok(()) => {},
64 Err(err) => {
65 eprintln!("could not send chunk to reader thread: {}", err);
66 break;
67 }
68 }
69
70 chunk_index.insert(digest);
71 }
72
73 Ok(())
74 });
75 if let Err(err) = result {
76 if let Err(err) = tx.send(Err(err)) {
77 eprintln!("error sending result to reader thread: {}", err);
78 }
79 }
80 });
81
82 Ok((reader_thread, Self { rx }))
83 }
84 }
85
86 // We do not use Receiver::into_iter(). The manual implementation
87 // returns a simpler type.
88 impl Iterator for NewChunksIterator {
89 type Item = Result<([u8; 32], DataBlob), Error>;
90
91 fn next(&mut self) -> Option<Self::Item> {
92 match self.rx.recv() {
93 Ok(Ok(None)) => None,
94 Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))),
95 Ok(Err(err)) => Some(Err(err)),
96 Err(_) => Some(Err(format_err!("reader thread failed"))),
97 }
98 }
99 }