1 use std
::collections
::HashSet
;
2 use std
::sync
::{Arc, Mutex}
;
4 use anyhow
::{format_err, Error}
;
6 use pbs_datastore
::{DataStore, DataBlob, SnapshotReader}
;
8 use crate::tape
::CatalogSet
;
10 /// Chunk iterator which use a separate thread to read chunks
12 /// The iterator skips duplicate chunks and chunks already in the
14 pub struct NewChunksIterator
{
15 rx
: std
::sync
::mpsc
::Receiver
<Result
<Option
<([u8; 32], DataBlob
)>, Error
>>,
18 impl NewChunksIterator
{
20 /// Creates the iterator, spawning a new thread
22 /// Make sure to join() the returnd thread handle.
24 datastore
: Arc
<DataStore
>,
25 snapshot_reader
: Arc
<Mutex
<SnapshotReader
>>,
26 catalog_set
: Arc
<Mutex
<CatalogSet
>>,
27 ) -> Result
<(std
::thread
::JoinHandle
<()>, Self), Error
> {
29 let (tx
, rx
) = std
::sync
::mpsc
::sync_channel(3);
31 let reader_thread
= std
::thread
::spawn(move || {
33 let snapshot_reader
= snapshot_reader
.lock().unwrap();
35 let mut chunk_index
: HashSet
<[u8;32]> = HashSet
::new();
37 let datastore_name
= snapshot_reader
.datastore_name();
39 let result
: Result
<(), Error
> = proxmox_lang
::try_block
!({
41 let mut chunk_iter
= snapshot_reader
.chunk_iterator()?
;
44 let digest
= match chunk_iter
.next() {
46 let _
= tx
.send(Ok(None
)); // ignore send error
49 Some(digest
) => digest?
,
52 if chunk_index
.contains(&digest
) {
56 if catalog_set
.lock().unwrap().contains_chunk(datastore_name
, &digest
) {
60 let blob
= datastore
.load_chunk(&digest
)?
;
61 //println!("LOAD CHUNK {}", hex::encode(&digest));
62 match tx
.send(Ok(Some((digest
, blob
)))) {
65 eprintln
!("could not send chunk to reader thread: {}", err
);
70 chunk_index
.insert(digest
);
75 if let Err(err
) = result
{
76 if let Err(err
) = tx
.send(Err(err
)) {
77 eprintln
!("error sending result to reader thread: {}", err
);
82 Ok((reader_thread
, Self { rx }
))
86 // We do not use Receiver::into_iter(). The manual implementation
87 // returns a simpler type.
88 impl Iterator
for NewChunksIterator
{
89 type Item
= Result
<([u8; 32], DataBlob
), Error
>;
91 fn next(&mut self) -> Option
<Self::Item
> {
92 match self.rx
.recv() {
94 Ok(Ok(Some((digest
, blob
)))) => Some(Ok((digest
, blob
))),
95 Ok(Err(err
)) => Some(Err(err
)),
96 Err(_
) => Some(Err(format_err
!("reader thread failed"))),