]> git.proxmox.com Git - proxmox-backup.git/blame - src/tape/pool_writer/new_chunks_iterator.rs
api: use if-let pattern for error-only handling
[proxmox-backup.git] / src / tape / pool_writer / new_chunks_iterator.rs
CommitLineData
166a48f9
DM
1use std::collections::HashSet;
2use std::sync::{Arc, Mutex};
3
4use anyhow::{format_err, Error};
5
4de1c42c 6use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
b2065dc7 7
cef5c726 8use crate::tape::CatalogSet;
166a48f9
DM
9
10/// Chunk iterator which use a separate thread to read chunks
11///
12/// The iterator skips duplicate chunks and chunks already in the
13/// catalog.
14pub struct NewChunksIterator {
223271e1 15 #[allow(clippy::type_complexity)]
166a48f9
DM
16 rx: std::sync::mpsc::Receiver<Result<Option<([u8; 32], DataBlob)>, Error>>,
17}
18
19impl NewChunksIterator {
166a48f9
DM
20 /// Creates the iterator, spawning a new thread
21 ///
d20137e5 22 /// Make sure to join() the returned thread handle.
166a48f9
DM
23 pub fn spawn(
24 datastore: Arc<DataStore>,
25 snapshot_reader: Arc<Mutex<SnapshotReader>>,
26 catalog_set: Arc<Mutex<CatalogSet>>,
27 ) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
166a48f9
DM
28 let (tx, rx) = std::sync::mpsc::sync_channel(3);
29
30 let reader_thread = std::thread::spawn(move || {
166a48f9
DM
31 let snapshot_reader = snapshot_reader.lock().unwrap();
32
4de1c42c 33 let mut chunk_index: HashSet<[u8; 32]> = HashSet::new();
166a48f9 34
dcd9c17f 35 let datastore_name = snapshot_reader.datastore_name().to_string();
166a48f9 36
6ef1b649 37 let result: Result<(), Error> = proxmox_lang::try_block!({
dcd9c17f
DC
38 let mut chunk_iter = snapshot_reader.chunk_iterator(move |digest| {
39 catalog_set
40 .lock()
41 .unwrap()
42 .contains_chunk(&datastore_name, digest)
43 })?;
166a48f9
DM
44
45 loop {
46 let digest = match chunk_iter.next() {
47 None => {
2d5d264f 48 let _ = tx.send(Ok(None)); // ignore send error
166a48f9
DM
49 break;
50 }
51 Some(digest) => digest?,
52 };
53
54 if chunk_index.contains(&digest) {
55 continue;
56 }
57
166a48f9 58 let blob = datastore.load_chunk(&digest)?;
25877d05 59 //println!("LOAD CHUNK {}", hex::encode(&digest));
6aff2de5
MS
60 if let Err(err) = tx.send(Ok(Some((digest, blob)))) {
61 eprintln!("could not send chunk to reader thread: {err}");
62 break;
2d5d264f 63 }
166a48f9
DM
64
65 chunk_index.insert(digest);
66 }
67
68 Ok(())
69 });
70 if let Err(err) = result {
2d5d264f
DC
71 if let Err(err) = tx.send(Err(err)) {
72 eprintln!("error sending result to reader thread: {}", err);
73 }
166a48f9
DM
74 }
75 });
76
77 Ok((reader_thread, Self { rx }))
78 }
79}
80
81// We do not use Receiver::into_iter(). The manual implementation
82// returns a simpler type.
83impl Iterator for NewChunksIterator {
84 type Item = Result<([u8; 32], DataBlob), Error>;
85
86 fn next(&mut self) -> Option<Self::Item> {
87 match self.rx.recv() {
88 Ok(Ok(None)) => None,
89 Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))),
90 Ok(Err(err)) => Some(Err(err)),
91 Err(_) => Some(Err(format_err!("reader thread failed"))),
92 }
93 }
94}