1 //! Generic AsyncRead/AsyncWrite utilities.
4 use std
::mem
::MaybeUninit
;
5 use std
::os
::unix
::io
::{AsRawFd, RawFd}
;
7 use std
::task
::{Context, Poll}
;
9 use futures
::stream
::{Stream, TryStream}
;
10 use tokio
::io
::{AsyncRead, AsyncWrite}
;
11 use tokio
::net
::TcpListener
;
12 use hyper
::client
::connect
::Connection
;
14 pub enum EitherStream
<L
, R
> {
19 impl<L
: AsyncRead
, R
: AsyncRead
> AsyncRead
for EitherStream
<L
, R
> {
24 ) -> Poll
<Result
<usize, io
::Error
>> {
25 match unsafe { self.get_unchecked_mut() }
{
26 EitherStream
::Left(ref mut s
) => {
27 unsafe { Pin::new_unchecked(s) }
.poll_read(cx
, buf
)
29 EitherStream
::Right(ref mut s
) => {
30 unsafe { Pin::new_unchecked(s) }
.poll_read(cx
, buf
)
35 unsafe fn prepare_uninitialized_buffer(&self, buf
: &mut [MaybeUninit
<u8>]) -> bool
{
37 EitherStream
::Left(ref s
) => s
.prepare_uninitialized_buffer(buf
),
38 EitherStream
::Right(ref s
) => s
.prepare_uninitialized_buffer(buf
),
46 ) -> Poll
<Result
<usize, io
::Error
>>
50 match unsafe { self.get_unchecked_mut() }
{
51 EitherStream
::Left(ref mut s
) => {
52 unsafe { Pin::new_unchecked(s) }
.poll_read_buf(cx
, buf
)
54 EitherStream
::Right(ref mut s
) => {
55 unsafe { Pin::new_unchecked(s) }
.poll_read_buf(cx
, buf
)
61 impl<L
: AsyncWrite
, R
: AsyncWrite
> AsyncWrite
for EitherStream
<L
, R
> {
66 ) -> Poll
<Result
<usize, io
::Error
>> {
67 match unsafe { self.get_unchecked_mut() }
{
68 EitherStream
::Left(ref mut s
) => {
69 unsafe { Pin::new_unchecked(s) }
.poll_write(cx
, buf
)
71 EitherStream
::Right(ref mut s
) => {
72 unsafe { Pin::new_unchecked(s) }
.poll_write(cx
, buf
)
77 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Result
<(), io
::Error
>> {
78 match unsafe { self.get_unchecked_mut() }
{
79 EitherStream
::Left(ref mut s
) => {
80 unsafe { Pin::new_unchecked(s) }
.poll_flush(cx
)
82 EitherStream
::Right(ref mut s
) => {
83 unsafe { Pin::new_unchecked(s) }
.poll_flush(cx
)
88 fn poll_shutdown(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Result
<(), io
::Error
>> {
89 match unsafe { self.get_unchecked_mut() }
{
90 EitherStream
::Left(ref mut s
) => {
91 unsafe { Pin::new_unchecked(s) }
.poll_shutdown(cx
)
93 EitherStream
::Right(ref mut s
) => {
94 unsafe { Pin::new_unchecked(s) }
.poll_shutdown(cx
)
100 self: Pin
<&mut Self>,
103 ) -> Poll
<Result
<usize, io
::Error
>>
107 match unsafe { self.get_unchecked_mut() }
{
108 EitherStream
::Left(ref mut s
) => {
109 unsafe { Pin::new_unchecked(s) }
.poll_write_buf(cx
, buf
)
111 EitherStream
::Right(ref mut s
) => {
112 unsafe { Pin::new_unchecked(s) }
.poll_write_buf(cx
, buf
)
118 // we need this for crate::client::http_client:
119 impl Connection
for EitherStream
<
120 tokio
::net
::TcpStream
,
121 tokio_openssl
::SslStream
<tokio
::net
::TcpStream
>,
123 fn connected(&self) -> hyper
::client
::connect
::Connected
{
125 EitherStream
::Left(s
) => s
.connected(),
126 EitherStream
::Right(s
) => s
.get_ref().connected(),
131 /// Tokio's `Incoming` now is a reference type and hyper's `AddrIncoming` misses some standard
132 /// stuff like `AsRawFd`, so here's something implementing hyper's `Accept` from a `TcpListener`
133 pub struct StaticIncoming(TcpListener
);
135 impl From
<TcpListener
> for StaticIncoming
{
136 fn from(inner
: TcpListener
) -> Self {
141 impl AsRawFd
for StaticIncoming
{
142 fn as_raw_fd(&self) -> RawFd
{
147 impl hyper
::server
::accept
::Accept
for StaticIncoming
{
148 type Conn
= tokio
::net
::TcpStream
;
149 type Error
= std
::io
::Error
;
152 self: Pin
<&mut Self>,
154 ) -> Poll
<Option
<Result
<Self::Conn
, Self::Error
>>> {
155 match self.get_mut().0.poll_accept(cx
) {
156 Poll
::Pending
=> Poll
::Pending
,
157 Poll
::Ready(Ok((conn
, _addr
))) => Poll
::Ready(Some(Ok(conn
))),
158 Poll
::Ready(Err(err
)) => Poll
::Ready(Some(Err(err
))),
163 /// We also implement TryStream for this, as tokio doesn't do this anymore either and we want to be
164 /// able to map connections to then add eg. ssl to them. This support code makes the changes
165 /// required for hyper 0.13 a bit less annoying to read.
166 impl Stream
for StaticIncoming
{
167 type Item
= std
::io
::Result
<(tokio
::net
::TcpStream
, std
::net
::SocketAddr
)>;
169 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
170 match self.get_mut().0.poll_accept(cx
) {
171 Poll
::Pending
=> Poll
::Pending
,
172 Poll
::Ready(result
) => Poll
::Ready(Some(result
)),
177 /// Implement hyper's `Accept` for any `TryStream` of sockets:
178 pub struct HyperAccept
<T
>(pub T
);
181 impl<T
, I
> hyper
::server
::accept
::Accept
for HyperAccept
<T
>
183 T
: TryStream
<Ok
= I
>,
184 I
: AsyncRead
+ AsyncWrite
,
187 type Error
= T
::Error
;
190 self: Pin
<&mut Self>,
192 ) -> Poll
<Option
<Result
<Self::Conn
, Self::Error
>>> {
193 let this
= unsafe { self.map_unchecked_mut(|this| &mut this.0) }
;
194 this
.try_poll_next(cx
)