[dependencies]
base64 = "0.10"
-bytes = "0.4"
+bytes = "0.5"
chrono = "0.4" # Date and time library for Rust
crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
failure = "0.1"
-futures-preview = "0.3.0-alpha"
-h2 = { version = "0.2.0-alpha.1", features = ["stream"] }
-http = "0.1"
-hyper = { version = "0.13.0-alpha.1" }
+futures = "0.3"
+h2 = { version = "0.2", features = ["stream"] }
+http = "0.2"
+hyper = "0.13"
lazy_static = "1.3"
libc = "0.2"
log = "0.4"
serde_json = "1.0"
siphasher = "0.3"
syslog = "4.0"
-tokio = { version = "0.2.0-alpha.4" }
-tokio-executor = { version = "0.2.0-alpha.4" }
-tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] }
-tokio-openssl = "0.4.0-alpha.2"
-tower-service = "0.3.0-alpha.1"
+tokio = { version = "0.2.0", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
+tokio-util = { version = "0.2.0", features = [ "codec" ] }
+tokio-openssl = "0.4.0"
+tower-service = "0.3.0"
url = "1.7"
valgrind_request = { version = "1.1", optional = true }
walkdir = "2"
.map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err)))
.await?;
- let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
- .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+ let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+ .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
let body = Body::wrap_stream(payload);
// fixme: set other headers ?
env.log(format!("download {:?}", path3));
- let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
- .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+ let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+ .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
let body = Body::wrap_stream(payload);
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
.and_then(move |file| {
env2.debug(format!("download chunk {:?}", path3));
- let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
- .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+ let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+ .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
let body = Body::wrap_stream(payload);
None => {
this.scan_pos = 0;
if this.buffer.len() > 0 {
- return Poll::Ready(Some(Ok(this.buffer.take())));
+ return Poll::Ready(Some(Ok(this.buffer.split())));
} else {
return Poll::Ready(None);
}
let this = self.get_mut();
loop {
if this.buffer.len() == this.chunk_size {
- return Poll::Ready(Some(Ok(this.buffer.take())));
+ return Poll::Ready(Some(Ok(this.buffer.split())));
} else if this.buffer.len() > this.chunk_size {
let result = this.buffer.split_to(this.chunk_size);
return Poll::Ready(Some(Ok(result)));
None => {
// last chunk can have any size
if this.buffer.len() > 0 {
- return Poll::Ready(Some(Ok(this.buffer.take())));
+ return Poll::Ready(Some(Ok(this.buffer.split())));
} else {
return Poll::Ready(None);
}
} else {
match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) {
Some(Ok(chunk)) => {
- this.body.release_capacity().release_capacity(chunk.len())?;
+ this.body.flow_control().release_capacity(chunk.len())?;
this.bytes += chunk.len();
// println!("GOT FRAME {}", chunk.len());
},
} else {
match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) {
Some(Ok(chunk)) => {
- this.body.release_capacity().release_capacity(chunk.len())?;
+ this.body.flow_control().release_capacity(chunk.len())?;
this.bytes += chunk.len();
// println!("GOT FRAME {}", chunk.len());
},
let acceptor = Arc::new(acceptor.build());
- let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
+ let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
println!("listening on {:?}", listener.local_addr());
- let mut incoming = listener.incoming();
- while let Some(socket) = incoming.try_next().await? {
+ loop {
+ let (socket, _addr) = listener.accept().await?;
tokio::spawn(handle_connection(socket, Arc::clone(&acceptor))
.map(|res| {
if let Err(err) = res {
}
}));
}
-
- Ok(())
}
async fn handle_connection(
#[tokio::main]
async fn main() -> Result<(), Error> {
- let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
+ let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
println!("listening on {:?}", listener.local_addr());
- let mut incoming = listener.incoming();
- while let Some(socket) = incoming.try_next().await? {
+ loop {
+ let (socket, _addr) = listener.accept().await?;
tokio::spawn(handle_connection(socket)
.map(|res| {
if let Err(err) = res {
}
}));
}
-
- Ok(())
}
async fn handle_connection<T: AsyncRead + AsyncWrite + Unpin>(socket: T) -> Result<(), Error> {
let server = daemon::create_daemon(
([127,0,0,1], 82).into(),
move |listener, ready| {
+ let incoming = proxmox_backup::tools::async_io::StaticIncoming::from(listener);
Ok(ready
- .and_then(|_| hyper::Server::builder(listener.incoming())
+ .and_then(|_| hyper::Server::builder(incoming)
.serve(rest_server)
.with_graceful_shutdown(server::shutdown_future())
.map_err(Error::from)
// spawn chunker inside a separate task so that it can run parallel
tokio::spawn(async move {
- let _ = tx.send_all(&mut chunk_stream).await;
+ while let Some(v) = chunk_stream.next().await {
+ let _ = tx.send(v).await;
+ }
});
let stats = client
let file = tokio::fs::File::open(path).await?;
- let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
+ let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
.map_err(Error::from);
let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
}
fn async_main<F: Future>(fut: F) -> <F as Future>::Output {
- let rt = tokio::runtime::Runtime::new().unwrap();
+ let mut rt = tokio::runtime::Runtime::new().unwrap();
let ret = rt.block_on(fut);
- rt.shutdown_now();
+ // This does not exist anymore. We need to actually stop our runaways instead...
+ // rt.shutdown_now();
ret
}
let server = daemon::create_daemon(
([0,0,0,0,0,0,0,0], 8007).into(),
|listener, ready| {
- let connections = listener
- .incoming()
+ let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
.map_err(Error::from)
- .try_filter_map(move |sock| {
+ .try_filter_map(move |(sock, _addr)| {
let acceptor = Arc::clone(&acceptor);
async move {
sock.set_nodelay(true).unwrap();
)
}
});
+ let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
Ok(ready
.and_then(|_| hyper::Server::builder(connections)
let file = tokio::fs::File::open("random-test.dat").await?;
- let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
+ let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
.map_ok(|bytes| bytes.to_vec())
.map_err(Error::from);
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
let (verify_result_tx, verify_result_rx) = oneshot::channel();
- hyper::rt::spawn(
+ // FIXME: check if this works as expected as replacement for the combinator below?
+ // tokio::spawn(async move {
+ // let result: Result<(), Error> = (async move {
+ // while let Some(response) = verify_queue_rx.recv().await {
+ // match H2Client::h2api_response(response.await?).await {
+ // Ok(result) => println!("RESPONSE: {:?}", result),
+ // Err(err) => bail!("pipelined request failed: {}", err),
+ // }
+ // }
+ // Ok(())
+ // }).await;
+ // let _ignore_closed_channel = verify_result_tx.send(result);
+ // });
+ // old code for reference?
+ tokio::spawn(
verify_queue_rx
.map(Ok::<_, Error>)
.try_for_each(|response: h2::client::ResponseFuture| {
let h2_2 = h2.clone();
- hyper::rt::spawn(
+ // FIXME: async-block-ify this code!
+ tokio::spawn(
verify_queue_rx
.map(Ok::<_, Error>)
.and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
println!("append chunks list len ({})", digest_list.len());
let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
- let param_data = bytes::Bytes::from(param.to_string().as_bytes());
+ let param_data = bytes::Bytes::from(param.to_string().into_bytes());
let upload_data = Some(param_data);
h2_2.send_request(request, upload_data)
.and_then(move |response| {
}
let mut body = resp.into_body();
- let mut release_capacity = body.release_capacity().clone();
+ let mut flow_control = body.flow_control().clone();
let mut stream = DigestListDecoder::new(body.map_err(Error::from));
while let Some(chunk) = stream.try_next().await? {
- let _ = release_capacity.release_capacity(chunk.len());
+ let _ = flow_control.release_capacity(chunk.len());
println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
known_chunks.lock().unwrap().insert(chunk);
}
println!("upload new chunk {} ({} bytes, offset {})", digest_str,
chunk_info.chunk_len, offset);
- let chunk_data = chunk_info.chunk.raw_data();
+ let chunk_data = chunk_info.chunk.into_inner();
let param = json!({
"wid": wid,
"digest": digest_str,
upload_queue
.send((new_info, Some(response)))
.await
- .map_err(Error::from)
+ .map_err(|err| format_err!("failed to send to upload queue: {}", err))
})
)
} else {
upload_queue
.send((merged_chunk_info, None))
.await
- .map_err(Error::from)
+ .map_err(|err| format_err!("failed to send to upload queue: {}", err))
})
}
})
use std::io::Write;
+use std::task::{Context, Poll};
use chrono::Utc;
use failure::*;
let connection = connection.map(|_| ());
// Spawn a new task to drive the connection state
- hyper::rt::spawn(connection);
+ tokio::spawn(connection);
// Wait until the `SendRequest` handle has available capacity.
let c = h2.ready().await?;
async fn api_response(response: Response<Body>) -> Result<Value, Error> {
let status = response.status();
- let data = response
- .into_body()
- .try_concat()
- .await?;
+ let data = hyper::body::to_bytes(response.into_body()).await?;
let text = String::from_utf8(data.to_vec()).unwrap();
if status.is_success() {
}
let mut body = resp.into_body();
- let mut release_capacity = body.release_capacity().clone();
-
- while let Some(chunk) = body.try_next().await? {
- let _ = release_capacity.release_capacity(chunk.len());
+ while let Some(chunk) = body.data().await {
+ let chunk = chunk?;
+ body.flow_control().release_capacity(chunk.len())?;
output.write_all(&chunk)?;
}
let (_head, mut body) = response.into_parts();
- // The `release_capacity` handle allows the caller to manage
- // flow control.
- //
- // Whenever data is received, the caller is responsible for
- // releasing capacity back to the server once it has freed
- // the data from memory.
- let mut release_capacity = body.release_capacity().clone();
-
let mut data = Vec::new();
- while let Some(chunk) = body.try_next().await? {
+ while let Some(chunk) = body.data().await {
+ let chunk = chunk?;
+ // Whenever data is received, the caller is responsible for
+ // releasing capacity back to the server once it has freed
+ // the data from memory.
// Let the server send more data.
- let _ = release_capacity.release_capacity(chunk.len());
+ body.flow_control().release_capacity(chunk.len())?;
data.extend(chunk);
}
}
}
+#[derive(Clone)]
pub struct HttpsConnector {
http: HttpConnector,
- ssl_connector: SslConnector,
+ ssl_connector: std::sync::Arc<SslConnector>,
}
impl HttpsConnector {
Self {
http,
- ssl_connector,
+ ssl_connector: std::sync::Arc::new(ssl_connector),
}
}
}
tokio_openssl::SslStream<tokio::net::TcpStream>,
>;
-impl hyper::client::connect::Connect for HttpsConnector {
- type Transport = MaybeTlsStream;
+impl hyper::service::Service<Uri> for HttpsConnector {
+ type Response = MaybeTlsStream;
type Error = Error;
- type Future = Box<dyn Future<Output = Result<(
- Self::Transport,
- hyper::client::connect::Connected,
- ), Error>> + Send + Unpin + 'static>;
-
- fn connect(&self, dst: hyper::client::connect::Destination) -> Self::Future {
- let is_https = dst.scheme() == "https";
- let host = dst.host().to_string();
+ type Future = std::pin::Pin<Box<
+ dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static
+ >>;
- let config = self.ssl_connector.configure();
- let conn = self.http.connect(dst);
+ fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ // This connector is always ready, but others might not be.
+ Poll::Ready(Ok(()))
+ }
- Box::new(Box::pin(async move {
- let (conn, connected) = conn.await?;
+ fn call(&mut self, dst: Uri) -> Self::Future {
+ let mut this = self.clone();
+ async move {
+ let is_https = dst
+ .scheme()
+ .ok_or_else(|| format_err!("missing URL scheme"))?
+ == "https";
+ let host = dst
+ .host()
+ .ok_or_else(|| format_err!("missing hostname in destination url?"))?
+ .to_string();
+
+ let config = this.ssl_connector.configure();
+ let conn = this.http.call(dst).await?;
if is_https {
let conn = tokio_openssl::connect(config?, &host, conn).await?;
- Ok((MaybeTlsStream::Right(conn), connected))
+ Ok(MaybeTlsStream::Right(conn))
} else {
- Ok((MaybeTlsStream::Left(conn), connected))
+ Ok(MaybeTlsStream::Left(conn))
}
- }))
+ }.boxed()
}
}
use futures::*;
-use tokio::net::unix::UnixListener;
+use tokio::net::UnixListener;
use std::path::PathBuf;
use serde_json::Value;
use nix::sys::socket;
/// Listens on a Unix Socket to handle simple command asynchronously
-pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Output = ()>, Error>
+pub fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
where
P: Into<PathBuf>,
F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
{
let path: PathBuf = path.into();
- let socket = UnixListener::bind(&path)?;
+ let mut socket = UnixListener::bind(&path)?;
- let f = Arc::new(f);
- let path2 = Arc::new(path);
- let path3 = path2.clone();
+ let func = Arc::new(func);
- let control_future = socket.incoming()
- .map_err(Error::from)
- .and_then(|conn| {
- use futures::future::{err, ok};
+ let control_future = async move {
+ loop {
+ let (conn, _addr) = socket
+ .accept()
+ .await
+ .map_err(|err| {
+ format_err!("failed to accept on control socket {:?}: {}", path, err)
+ })?;
// check permissions (same gid, or root user)
let opt = socket::sockopt::PeerCredentials {};
Ok(cred) => {
let mygid = unsafe { libc::getgid() };
if !(cred.uid() == 0 || cred.gid() == mygid) {
- return err(format_err!("no permissions for {:?}", cred));
+ bail!("no permissions for {:?}", cred);
}
}
- Err(e) => {
- return err(format_err!(
- "no permissions - unable to read peer credential - {}",
- e,
- ));
- }
+ Err(e) => bail!("no permissions - unable to read peer credential - {}", e),
}
- ok(conn)
- })
- .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); })
- .try_for_each(move |conn| {
- let f = Arc::clone(&f);
- let (rx, mut tx) = conn.split();
- let path = path3.clone();
+ let (rx, mut tx) = tokio::io::split(conn);
let abort_future = super::last_worker_future().map(|_| ());
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+ let func = Arc::clone(&func);
+ let path = path.clone();
tokio::spawn(futures::future::select(
async move {
let mut rx = tokio::io::BufReader::new(rx);
}
let response = match line.parse::<Value>() {
- Ok(param) => match f(param) {
+ Ok(param) => match func(param) {
Ok(res) => format!("OK: {}\n", res),
Err(err) => format!("ERROR: {}\n", err),
}
}.boxed(),
abort_future,
).map(|_| ()));
- futures::future::ok(())
- });
+ }
+ }.boxed();
let abort_future = super::last_worker_future().map_err(|_| {});
let task = futures::future::select(
control_future,
abort_future,
- ).map(|_| ());
+ ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
Ok(task)
}
tokio::net::UnixStream::connect(path)
.map_err(move |err| format_err!("control socket connect failed - {}", err))
- .and_then(move |conn| {
-
- let (rx, mut tx) = conn.split();
+ .and_then(move |mut conn| {
let mut command_string = params.to_string();
command_string.push('\n');
async move {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
- tx.write_all(command_string.as_bytes()).await?;
- tx.shutdown().await?;
- let mut rx = tokio::io::BufReader::new(rx);
+ conn.write_all(command_string.as_bytes()).await?;
+ AsyncWriteExt::shutdown(&mut conn).await?;
+ let mut rx = tokio::io::BufReader::new(conn);
let mut data = String::new();
if rx.read_line(&mut data).await? == 0 {
bail!("no response");
use std::collections::HashMap;
+use std::future::Future;
use std::hash::BuildHasher;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use futures::stream::TryStreamExt;
use hyper::header;
use hyper::http::request::Parts;
-use hyper::rt::Future;
use hyper::{Body, Request, Response, StatusCode};
use serde_json::{json, Value};
use tokio::fs::File;
+use tokio::time::Instant;
use url::form_urlencoded;
use proxmox::api::http_err;
Err(err) => {
if let Some(httperr) = err.downcast_ref::<HttpError>() {
if httperr.code == StatusCode::UNAUTHORIZED {
- tokio::timer::delay(delay_unauth_time).await;
+ tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await;
}
}
(formatter.format_error)(err)
.await
.map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err)))?;
- let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
- .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+ let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+ .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
let body = Body::wrap_stream(payload);
// fixme: set other headers ?
Err(err) => {
// always delay unauthorized calls by 3 seconds (from start of request)
let err = http_err!(UNAUTHORIZED, format!("permission check failed - {}", err));
- tokio::timer::delay(delay_unauth_time).await;
+ tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await;
return Ok((formatter.format_error)(err));
}
}
return Ok(get_index(Some(username), Some(new_token)));
}
_ => {
- tokio::timer::delay(delay_unauth_time).await;
+ tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await;
return Ok(get_index(None, None));
}
}
use futures::*;
-use tokio_net::signal::unix::{signal, SignalKind};
+use tokio::signal::unix::{signal, SignalKind};
use crate::tools::{self, BroadcastData};
pub fn server_state_init() -> Result<(), Error> {
- let stream = signal(SignalKind::interrupt())?;
+ let mut stream = signal(SignalKind::interrupt())?;
- let future = stream.for_each(|_| {
- println!("got shutdown request (SIGINT)");
- SERVER_STATE.lock().unwrap().reload_request = false;
- tools::request_shutdown();
- futures::future::ready(())
- });
+ let future = async move {
+ while stream.recv().await.is_some() {
+ println!("got shutdown request (SIGINT)");
+ SERVER_STATE.lock().unwrap().reload_request = false;
+ tools::request_shutdown();
+ }
+ }.boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
- let stream = signal(SignalKind::hangup())?;
+ let mut stream = signal(SignalKind::hangup())?;
- let future = stream.for_each(|_| {
- println!("got reload request (SIGHUP)");
- SERVER_STATE.lock().unwrap().reload_request = true;
- tools::request_shutdown();
- futures::future::ready(())
- });
+ let future = async move {
+ while stream.recv().await.is_some() {
+ println!("got reload request (SIGHUP)");
+ SERVER_STATE.lock().unwrap().reload_request = true;
+ tools::request_shutdown();
+ }
+ }.boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
pub mod acl;
pub mod async_io;
-pub mod async_mutex;
pub mod borrow;
pub mod daemon;
pub mod fs;
//! Generic AsyncRead/AsyncWrite utilities.
use std::io;
+use std::mem::MaybeUninit;
+use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll};
+use futures::stream::{Stream, TryStream};
use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::TcpListener;
+use hyper::client::connect::Connection;
pub enum EitherStream<L, R> {
Left(L),
}
}
- unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
match *self {
EitherStream::Left(ref s) => s.prepare_uninitialized_buffer(buf),
EitherStream::Right(ref s) => s.prepare_uninitialized_buffer(buf),
}
}
}
+
+// we need this for crate::client::http_client:
+impl Connection for EitherStream<
+ tokio::net::TcpStream,
+ tokio_openssl::SslStream<tokio::net::TcpStream>,
+> {
+ fn connected(&self) -> hyper::client::connect::Connected {
+ match self {
+ EitherStream::Left(s) => s.connected(),
+ EitherStream::Right(s) => s.get_ref().connected(),
+ }
+ }
+}
+
+/// Tokio's `Incoming` now is a reference type and hyper's `AddrIncoming` misses some standard
+/// stuff like `AsRawFd`, so here's something implementing hyper's `Accept` from a `TcpListener`
+pub struct StaticIncoming(TcpListener);
+
+impl From<TcpListener> for StaticIncoming {
+ fn from(inner: TcpListener) -> Self {
+ Self(inner)
+ }
+}
+
+impl AsRawFd for StaticIncoming {
+ fn as_raw_fd(&self) -> RawFd {
+ self.0.as_raw_fd()
+ }
+}
+
+impl hyper::server::accept::Accept for StaticIncoming {
+ type Conn = tokio::net::TcpStream;
+ type Error = std::io::Error;
+
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ match self.get_mut().0.poll_accept(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Ok((conn, _addr))) => Poll::Ready(Some(Ok(conn))),
+ Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+ }
+ }
+}
+
+/// We also implement TryStream for this, as tokio doesn't do this anymore either and we want to be
+/// able to map connections to then add eg. ssl to them. This support code makes the changes
+/// required for hyper 0.13 a bit less annoying to read.
+impl Stream for StaticIncoming {
+ type Item = std::io::Result<(tokio::net::TcpStream, std::net::SocketAddr)>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ match self.get_mut().0.poll_accept(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(result) => Poll::Ready(Some(result)),
+ }
+ }
+}
+
+/// Implement hyper's `Accept` for any `TryStream` of sockets:
+pub struct HyperAccept<T>(pub T);
+
+
+impl<T, I> hyper::server::accept::Accept for HyperAccept<T>
+where
+ T: TryStream<Ok = I>,
+ I: AsyncRead + AsyncWrite,
+{
+ type Conn = I;
+ type Error = T::Error;
+
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ let this = unsafe { self.map_unchecked_mut(|this| &mut this.0) };
+ this.try_poll_next(cx)
+ }
+}
fd_change_cloexec(fd, true)?;
Ok(Self::from_std(
unsafe { std::net::TcpListener::from_raw_fd(fd) },
- &tokio_net::driver::Handle::default(),
)?)
}
}
use failure::Error;
use futures::future::FutureExt;
-
-use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture};
+use tokio::sync::oneshot;
/// Make a future cancellable.
///
pub struct Cancellable<T: Future + Unpin> {
/// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
/// lock, so that our LockFuture finishes.
- inner: futures::future::Select<T, LockFuture<()>>,
+ inner: futures::future::Select<T, oneshot::Receiver<()>>,
/// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
/// future, it'll drop this guard, causing our inner future to resolve to `None`.
- guard: Arc<Mutex<Option<AsyncLockGuard<()>>>>,
+ sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
/// Reference to a cancellable future. Multiple instances may exist simultaneously.
///
/// This can be cloned to be used in multiple places.
#[derive(Clone)]
-pub struct Canceller(Arc<Mutex<Option<AsyncLockGuard<()>>>>);
+pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>);
impl Canceller {
/// Cancel the associated future.
///
/// This does nothing if the future already finished successfully.
pub fn cancel(&self) {
- *self.0.lock().unwrap() = None;
+ let _ = self.0.lock().unwrap().take().unwrap().send(());
}
}
///
/// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
- // we don't even need to sture the mutex...
- let (mutex, guard) = AsyncMutex::new_locked(())?;
+ // we don't even need to store the mutex...
+ let (tx, rx) = oneshot::channel();
let this = Self {
- inner: futures::future::select(inner, mutex.lock()),
- guard: Arc::new(Mutex::new(Some(guard))),
+ inner: futures::future::select(inner, rx),
+ sender: Arc::new(Mutex::new(Some(tx))),
};
+
let canceller = this.canceller();
Ok((this, canceller))
}
/// Create another `Canceller` for this future.
pub fn canceller(&self) -> Canceller {
- Canceller(self.guard.clone())
+ Canceller(Arc::clone(&self.sender))
}
}
F: Future<Output = T> + Send + 'static,
T: std::fmt::Debug + Send + 'static,
{
- let rt = tokio::runtime::Runtime::new().unwrap();
+ let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let (tx, rx) = tokio::sync::oneshot::channel();
use std::pin::Pin;
use std::task::{Context, Poll};
-use tokio_executor::threadpool::blocking;
+use tokio::task::block_in_place;
use futures::stream::Stream;
pub struct WrappedReaderStream<R: Read + Unpin> {
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
- match blocking(|| this.reader.read(&mut this.buffer)) {
- Poll::Ready(Ok(Ok(n))) => {
+ match block_in_place(|| this.reader.read(&mut this.buffer)) {
+ Ok(n) => {
if n == 0 {
// EOF
Poll::Ready(None)
Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
}
}
- Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))),
- Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new(
- io::ErrorKind::Other,
- err.to_string(),
- )))),
- Poll::Pending => Poll::Pending,
+ Err(err) => Poll::Ready(Some(Err(err))),
}
}
}