]>
Commit | Line | Data |
---|---|---|
166a48f9 DM |
1 | use std::collections::HashSet; |
2 | use std::sync::{Arc, Mutex}; | |
3 | ||
4 | use anyhow::{format_err, Error}; | |
5 | ||
4de1c42c | 6 | use pbs_datastore::{DataBlob, DataStore, SnapshotReader}; |
b2065dc7 | 7 | |
cef5c726 | 8 | use crate::tape::CatalogSet; |
166a48f9 DM |
9 | |
10 | /// Chunk iterator which use a separate thread to read chunks | |
11 | /// | |
12 | /// The iterator skips duplicate chunks and chunks already in the | |
13 | /// catalog. | |
14 | pub struct NewChunksIterator { | |
223271e1 | 15 | #[allow(clippy::type_complexity)] |
166a48f9 DM |
16 | rx: std::sync::mpsc::Receiver<Result<Option<([u8; 32], DataBlob)>, Error>>, |
17 | } | |
18 | ||
19 | impl NewChunksIterator { | |
166a48f9 DM |
20 | /// Creates the iterator, spawning a new thread |
21 | /// | |
d20137e5 | 22 | /// Make sure to join() the returned thread handle. |
166a48f9 DM |
23 | pub fn spawn( |
24 | datastore: Arc<DataStore>, | |
25 | snapshot_reader: Arc<Mutex<SnapshotReader>>, | |
26 | catalog_set: Arc<Mutex<CatalogSet>>, | |
27 | ) -> Result<(std::thread::JoinHandle<()>, Self), Error> { | |
166a48f9 DM |
28 | let (tx, rx) = std::sync::mpsc::sync_channel(3); |
29 | ||
30 | let reader_thread = std::thread::spawn(move || { | |
166a48f9 DM |
31 | let snapshot_reader = snapshot_reader.lock().unwrap(); |
32 | ||
4de1c42c | 33 | let mut chunk_index: HashSet<[u8; 32]> = HashSet::new(); |
166a48f9 | 34 | |
dcd9c17f | 35 | let datastore_name = snapshot_reader.datastore_name().to_string(); |
166a48f9 | 36 | |
6ef1b649 | 37 | let result: Result<(), Error> = proxmox_lang::try_block!({ |
dcd9c17f DC |
38 | let mut chunk_iter = snapshot_reader.chunk_iterator(move |digest| { |
39 | catalog_set | |
40 | .lock() | |
41 | .unwrap() | |
42 | .contains_chunk(&datastore_name, digest) | |
43 | })?; | |
166a48f9 DM |
44 | |
45 | loop { | |
46 | let digest = match chunk_iter.next() { | |
47 | None => { | |
2d5d264f | 48 | let _ = tx.send(Ok(None)); // ignore send error |
166a48f9 DM |
49 | break; |
50 | } | |
51 | Some(digest) => digest?, | |
52 | }; | |
53 | ||
54 | if chunk_index.contains(&digest) { | |
55 | continue; | |
56 | } | |
57 | ||
166a48f9 | 58 | let blob = datastore.load_chunk(&digest)?; |
25877d05 | 59 | //println!("LOAD CHUNK {}", hex::encode(&digest)); |
6aff2de5 MS |
60 | if let Err(err) = tx.send(Ok(Some((digest, blob)))) { |
61 | eprintln!("could not send chunk to reader thread: {err}"); | |
62 | break; | |
2d5d264f | 63 | } |
166a48f9 DM |
64 | |
65 | chunk_index.insert(digest); | |
66 | } | |
67 | ||
68 | Ok(()) | |
69 | }); | |
70 | if let Err(err) = result { | |
2d5d264f DC |
71 | if let Err(err) = tx.send(Err(err)) { |
72 | eprintln!("error sending result to reader thread: {}", err); | |
73 | } | |
166a48f9 DM |
74 | } |
75 | }); | |
76 | ||
77 | Ok((reader_thread, Self { rx })) | |
78 | } | |
79 | } | |
80 | ||
81 | // We do not use Receiver::into_iter(). The manual implementation | |
82 | // returns a simpler type. | |
83 | impl Iterator for NewChunksIterator { | |
84 | type Item = Result<([u8; 32], DataBlob), Error>; | |
85 | ||
86 | fn next(&mut self) -> Option<Self::Item> { | |
87 | match self.rx.recv() { | |
88 | Ok(Ok(None)) => None, | |
89 | Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))), | |
90 | Ok(Err(err)) => Some(Err(err)), | |
91 | Err(_) => Some(Err(format_err!("reader thread failed"))), | |
92 | } | |
93 | } | |
94 | } |