]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/tools/parallel_handler.rs: remove static lifetime bound from handler_fn
authorDietmar Maurer <dietmar@proxmox.com>
Sat, 26 Sep 2020 07:22:50 +0000 (09:22 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 26 Sep 2020 07:26:06 +0000 (09:26 +0200)
src/tools/parallel_handler.rs

index cfa32199aa68ffb738238cf59299e60298a76e23..f0274afeeffc601c2c77c423de19d5cd28a4ce4d 100644 (file)
@@ -36,10 +36,11 @@ impl <I: Send + Sync +'static> SendHandle<I> {
 ///
 /// When done, the 'complete()' method needs to be called to check for
 /// outstanding errors.
-pub struct ParallelHandler<I> {
+pub struct ParallelHandler<'a, I> {
     handles: Vec<JoinHandle<()>>,
     name: String,
     input: Option<SendHandle<I>>,
+    _marker: std::marker::PhantomData<&'a ()>,
 }
 
 impl <I> Clone for SendHandle<I> {
@@ -48,7 +49,7 @@ impl <I> Clone for SendHandle<I> {
     }
 }
 
-impl <I: Send + Sync + 'static> ParallelHandler<I> {
+impl <'a, I: Send + Sync + 'static> ParallelHandler<'a, I> {
 
     /// Create a new thread pool, each thread processing incoming data
     /// with 'handler_fn'.
@@ -57,7 +58,7 @@ impl <I: Send + Sync + 'static> ParallelHandler<I> {
         threads: usize,
         handler_fn: F,
     ) -> Self
-        where F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+        where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
     {
         let mut handles = Vec::new();
         let (input_tx, input_rx) = bounded::<I>(threads);
@@ -67,7 +68,14 @@ impl <I: Send + Sync + 'static> ParallelHandler<I> {
         for i in 0..threads {
             let input_rx = input_rx.clone();
             let abort = abort.clone();
-            let handler_fn = handler_fn.clone();
+
+            // Erase the 'a lifetime bound. This is safe because we
+            // join all thread in the drop handler.
+            let handler_fn: Box<dyn Fn(I) -> Result<(), Error> + Send + 'a> =
+                Box::new(handler_fn.clone());
+            let handler_fn: Box<dyn Fn(I) -> Result<(), Error> + Send + 'static> =
+                unsafe { std::mem::transmute(handler_fn) };
+
             handles.push(
                 std::thread::Builder::new()
                     .name(format!("{} ({})", name, i))
@@ -98,6 +106,7 @@ impl <I: Send + Sync + 'static> ParallelHandler<I> {
                 input: input_tx,
                 abort,
             }),
+            _marker: std::marker::PhantomData,
         }
     }
 
@@ -152,7 +161,7 @@ impl <I: Send + Sync + 'static> ParallelHandler<I> {
 }
 
 // Note: We make sure that all threads will be joined
-impl <I> Drop for ParallelHandler<I> {
+impl <'a, I> Drop for ParallelHandler<'a, I> {
     fn drop(&mut self) {
         drop(self.input.take());
         loop {