1 use std
::convert
::{TryFrom, TryInto}
;
3 use std
::marker
::PhantomData
;
4 use std
::os
::unix
::io
::{AsRawFd, FromRawFd, IntoRawFd, RawFd}
;
6 use std
::task
::{Context, Poll}
;
8 use tokio
::io
::{AsyncRead, AsyncWrite, ReadBuf}
;
10 use crate::error
::io_err_other
;
11 use crate::io
::polled_fd
::PolledFd
;
12 use crate::io
::rw_traits
;
15 pub use rw_traits
::{Read, Write}
;
17 /// Helper struct for generating pipes.
19 /// `Pipe` is a tokio-io supported type, associated with a reactor. After a `fork()` we cannot do
20 /// anything with it, including turning it into a raw fd as tokio will attempt to disassociate it
21 /// from the reactor, which will just break.
23 /// So we start out with this type which can be "upgraded" or "downgraded" into a `Pipe<T>` or
25 pub struct PipeFd
<RW
>(Fd
, PhantomData
<RW
>);
28 pub fn new(fd
: Fd
) -> Self {
32 pub fn into_fd(self) -> Fd
{
37 pub fn pipe_fds() -> io
::Result
<(PipeFd
<rw_traits
::Read
>, PipeFd
<rw_traits
::Write
>)> {
38 let mut pfd
: [RawFd
; 2] = [0, 0];
40 c_try
!(unsafe { libc::pipe2(pfd.as_mut_ptr(), libc::O_CLOEXEC) }
);
42 let (fd_in
, fd_out
) = unsafe { (Fd::from_raw_fd(pfd[0]), Fd::from_raw_fd(pfd[1])) }
;
44 Ok((PipeFd
::new(fd_in
), PipeFd
::new(fd_out
)))
47 /// Tokio supported pipe file descriptor. `tokio::fs::File` requires tokio's complete file system
48 /// feature gate, so we just use this `PolledFd` wrapper.
51 _phantom
: PhantomData
<RW
>,
54 impl<RW
> TryFrom
<PipeFd
<RW
>> for Pipe
<RW
> {
55 type Error
= io
::Error
;
57 fn try_from(fd
: PipeFd
<RW
>) -> io
::Result
<Self> {
59 fd
: PolledFd
::new(fd
.into_fd())?
,
60 _phantom
: PhantomData
,
65 impl<RW
> AsRawFd
for Pipe
<RW
> {
67 fn as_raw_fd(&self) -> RawFd
{
72 impl<RW
> IntoRawFd
for Pipe
<RW
> {
74 fn into_raw_fd(self) -> RawFd
{
79 pub fn pipe() -> io
::Result
<(Pipe
<rw_traits
::Read
>, Pipe
<rw_traits
::Write
>)> {
80 let (fd_in
, fd_out
) = pipe_fds()?
;
82 Ok((fd_in
.try_into()?
, fd_out
.try_into()?
))
85 impl<RW
: rw_traits
::HasRead
> AsyncRead
for Pipe
<RW
> {
90 ) -> Poll
<io
::Result
<()>> {
91 self.fd
.wrap_read(cx
, || {
92 let fd
= self.as_raw_fd();
93 let buf
= buf
.initialize_unfilled();
94 let size
= libc
::size_t
::try_from(buf
.len()).map_err(io_err_other
)?
;
95 c_result
!(unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, size) }
)
101 impl<RW
: rw_traits
::HasWrite
> AsyncWrite
for Pipe
<RW
> {
103 self: Pin
<&mut Self>,
104 cx
: &mut Context
<'_
>,
106 ) -> Poll
<io
::Result
<usize>> {
107 self.fd
.wrap_write(cx
, || {
108 let fd
= self.as_raw_fd();
109 let size
= libc
::size_t
::try_from(buf
.len()).map_err(io_err_other
)?
;
110 c_result
!(unsafe { libc::write(fd, buf.as_ptr() as *const libc::c_void, size) }
)
111 .map(|res
| res
as usize)
115 fn poll_flush(self: Pin
<&mut Self>, _cx
: &mut Context
<'_
>) -> Poll
<io
::Result
<()>> {
119 fn poll_shutdown(self: Pin
<&mut Self>, _cx
: &mut Context
<'_
>) -> Poll
<io
::Result
<()>> {