1 use std
::sync
::{Arc, Mutex}
;
2 use std
::thread
::JoinHandle
;
4 use anyhow
::{bail, format_err, Error}
;
5 use crossbeam_channel
::{bounded, Sender}
;
7 /// A handle to send data to the worker thread (implements clone)
8 pub struct SendHandle
<I
> {
10 abort
: Arc
<Mutex
<Option
<String
>>>,
13 /// Returns the first error happened, if any
14 pub fn check_abort(abort
: &Mutex
<Option
<String
>>) -> Result
<(), Error
> {
15 let guard
= abort
.lock().unwrap();
16 if let Some(err_msg
) = &*guard
{
17 return Err(format_err
!("{}", err_msg
));
22 impl<I
: Send
> SendHandle
<I
> {
23 /// Send data to the worker threads
24 pub fn send(&self, input
: I
) -> Result
<(), Error
> {
25 check_abort(&self.abort
)?
;
26 match self.input
.send(input
) {
28 Err(_
) => bail
!("send failed - channel closed"),
33 /// A thread pool which run the supplied closure
35 /// The send command sends data to the worker threads. If one handler
36 /// returns an error, we mark the channel as failed and it is no
37 /// longer possible to send data.
39 /// When done, the 'complete()' method needs to be called to check for
40 /// outstanding errors.
41 pub struct ParallelHandler
<I
> {
42 handles
: Vec
<JoinHandle
<()>>,
44 input
: Option
<SendHandle
<I
>>,
47 impl<I
> Clone
for SendHandle
<I
> {
48 fn clone(&self) -> Self {
50 input
: self.input
.clone(),
51 abort
: Arc
::clone(&self.abort
),
56 impl<I
: Send
+ '
static> ParallelHandler
<I
> {
57 /// Create a new thread pool, each thread processing incoming data
58 /// with 'handler_fn'.
59 pub fn new
<F
>(name
: &str, threads
: usize, handler_fn
: F
) -> Self
60 where F
: Fn(I
) -> Result
<(), Error
> + Send
+ Clone
+ '
static,
62 let mut handles
= Vec
::new();
63 let (input_tx
, input_rx
) = bounded
::<I
>(threads
);
65 let abort
= Arc
::new(Mutex
::new(None
));
68 let input_rx
= input_rx
.clone();
69 let abort
= Arc
::clone(&abort
);
70 let handler_fn
= handler_fn
.clone();
73 std
::thread
::Builder
::new()
74 .name(format
!("{} ({})", name
, i
))
76 let data
= match input_rx
.recv() {
80 match (handler_fn
)(data
) {
83 let mut guard
= abort
.lock().unwrap();
85 *guard
= Some(err
.to_string());
95 name
: name
.to_string(),
96 input
: Some(SendHandle
{
103 /// Returns a cloneable channel to send data to the worker threads
104 pub fn channel(&self) -> SendHandle
<I
> {
105 self.input
.as_ref().unwrap().clone()
108 /// Send data to the worker threads
109 pub fn send(&self, input
: I
) -> Result
<(), Error
> {
110 self.input
.as_ref().unwrap().send(input
)?
;
114 /// Wait for worker threads to complete and check for errors
115 pub fn complete(mut self) -> Result
<(), Error
> {
116 let input
= self.input
.take().unwrap();
117 let abort
= Arc
::clone(&input
.abort
);
118 check_abort(&abort
)?
;
121 let msg_list
= self.join_threads();
123 // an error might be encountered while waiting for the join
124 check_abort(&abort
)?
;
126 if msg_list
.is_empty() {
129 Err(format_err
!("{}", msg_list
.join("\n")))
132 fn join_threads(&mut self) -> Vec
<String
> {
134 let mut msg_list
= Vec
::new();
138 let handle
= match self.handles
.pop() {
139 Some(handle
) => handle
,
142 if let Err(panic
) = handle
.join() {
143 match panic
.downcast
::<&str>() {
144 Ok(panic_msg
) => msg_list
.push(
145 format
!("thread {} ({}) paniced: {}", self.name
, i
, panic_msg
)
147 Err(_
) => msg_list
.push(
148 format
!("thread {} ({}) paniced", self.name
, i
)
158 // Note: We make sure that all threads will be joined
159 impl<I
> Drop
for ParallelHandler
<I
> {
161 drop(self.input
.take());
162 while let Some(handle
) = self.handles
.pop() {
163 let _
= handle
.join();