1 use std
::sync
::{Arc, Mutex}
;
2 use std
::thread
::JoinHandle
;
4 use anyhow
::{bail, format_err, Error}
;
5 use crossbeam_channel
::{bounded, Sender}
;
7 /// A handle to send data to the worker thread (implements clone)
8 pub struct SendHandle
<I
> {
10 abort
: Arc
<Mutex
<Option
<String
>>>,
13 /// Returns the first error happened, if any
14 pub fn check_abort(abort
: &Mutex
<Option
<String
>>) -> Result
<(), Error
> {
15 let guard
= abort
.lock().unwrap();
16 if let Some(err_msg
) = &*guard
{
17 return Err(format_err
!("{}", err_msg
));
22 impl<I
: Send
> SendHandle
<I
> {
23 /// Send data to the worker threads
24 pub fn send(&self, input
: I
) -> Result
<(), Error
> {
25 check_abort(&self.abort
)?
;
26 match self.input
.send(input
) {
28 Err(_
) => bail
!("send failed - channel closed"),
33 /// A thread pool which run the supplied closure
35 /// The send command sends data to the worker threads. If one handler
36 /// returns an error, we mark the channel as failed and it is no
37 /// longer possible to send data.
39 /// When done, the 'complete()' method needs to be called to check for
40 /// outstanding errors.
41 pub struct ParallelHandler
<'a
, I
> {
42 handles
: Vec
<JoinHandle
<()>>,
44 input
: Option
<SendHandle
<I
>>,
45 _marker
: std
::marker
::PhantomData
<&'
a ()>,
48 impl<I
> Clone
for SendHandle
<I
> {
49 fn clone(&self) -> Self {
51 input
: self.input
.clone(),
52 abort
: Arc
::clone(&self.abort
),
57 impl<'a
, I
: Send
+ '
static> ParallelHandler
<'a
, I
> {
58 /// Create a new thread pool, each thread processing incoming data
59 /// with 'handler_fn'.
60 pub fn new
<F
>(name
: &str, threads
: usize, handler_fn
: F
) -> Self
61 where F
: Fn(I
) -> Result
<(), Error
> + Send
+ Clone
+ 'a
,
63 let mut handles
= Vec
::new();
64 let (input_tx
, input_rx
) = bounded
::<I
>(threads
);
66 let abort
= Arc
::new(Mutex
::new(None
));
69 let input_rx
= input_rx
.clone();
70 let abort
= Arc
::clone(&abort
);
72 // Erase the 'a lifetime bound. This is safe because we
73 // join all thread in the drop handler.
74 let handler_fn
: Box
<dyn Fn(I
) -> Result
<(), Error
> + Send
+ 'a
> =
75 Box
::new(handler_fn
.clone());
76 let handler_fn
: Box
<dyn Fn(I
) -> Result
<(), Error
> + Send
+ '
static> =
77 unsafe { std::mem::transmute(handler_fn) }
;
80 std
::thread
::Builder
::new()
81 .name(format
!("{} ({})", name
, i
))
83 let data
= match input_rx
.recv() {
87 match (handler_fn
)(data
) {
90 let mut guard
= abort
.lock().unwrap();
92 *guard
= Some(err
.to_string());
102 name
: name
.to_string(),
103 input
: Some(SendHandle
{
107 _marker
: std
::marker
::PhantomData
,
111 /// Returns a cloneable channel to send data to the worker threads
112 pub fn channel(&self) -> SendHandle
<I
> {
113 self.input
.as_ref().unwrap().clone()
116 /// Send data to the worker threads
117 pub fn send(&self, input
: I
) -> Result
<(), Error
> {
118 self.input
.as_ref().unwrap().send(input
)?
;
122 /// Wait for worker threads to complete and check for errors
123 pub fn complete(mut self) -> Result
<(), Error
> {
124 let input
= self.input
.take().unwrap();
125 let abort
= Arc
::clone(&input
.abort
);
126 check_abort(&abort
)?
;
129 let msg_list
= self.join_threads();
131 // an error might be encountered while waiting for the join
132 check_abort(&abort
)?
;
134 if msg_list
.is_empty() {
137 Err(format_err
!("{}", msg_list
.join("\n")))
140 fn join_threads(&mut self) -> Vec
<String
> {
142 let mut msg_list
= Vec
::new();
146 let handle
= match self.handles
.pop() {
147 Some(handle
) => handle
,
150 if let Err(panic
) = handle
.join() {
151 match panic
.downcast
::<&str>() {
152 Ok(panic_msg
) => msg_list
.push(
153 format
!("thread {} ({}) paniced: {}", self.name
, i
, panic_msg
)
155 Err(_
) => msg_list
.push(
156 format
!("thread {} ({}) paniced", self.name
, i
)
166 // Note: We make sure that all threads will be joined
167 impl<'a
, I
> Drop
for ParallelHandler
<'a
, I
> {
169 drop(self.input
.take());
170 while let Some(handle
) = self.handles
.pop() {
171 let _
= handle
.join();