-use failure::*;
-use std::io::{Write, Seek};
-use std::thread;
-use std::sync::{Arc, Mutex};
+use std::collections::HashSet;
+use std::io::{Seek, Write};
use std::os::unix::io::FromRawFd;
use std::path::{Path, PathBuf};
-use std::collections::HashSet;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+use std::thread;
-use futures::Poll;
+use failure::*;
use futures::stream::Stream;
use nix::fcntl::OFlag;
}
impl PxarBackupStream {
+ pin_utils::unsafe_pinned!(stream: Option<WrappedReaderStream<std::fs::File>>);
pub fn new<W: Write + Seek + Send + 'static>(
mut dir: Dir,
impl Stream for PxarBackupStream {
- type Item = Vec<u8>;
- type Error = Error;
+ type Item = Result<Vec<u8>, Error>;
- fn poll(&mut self) -> Poll<Option<Vec<u8>>, Error> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
{ // limit lock scope
let error = self.error.lock().unwrap();
if let Some(ref msg) = *error {
- return Err(format_err!("{}", msg));
+ return Poll::Ready(Some(Err(format_err!("{}", msg))));
}
}
- self.stream.as_mut().unwrap().poll().map_err(Error::from)
+ let res = self.as_mut()
+ .stream()
+ .as_pin_mut()
+ .unwrap()
+ .poll_next(cx);
+ Poll::Ready(futures::ready!(res)
+ .map(|v| v.map_err(Error::from))
+ )
}
}