]> git.proxmox.com Git - pve-lxc-syscalld.git/blame - src/io/pipe.rs
update to tokio 1.0
[pve-lxc-syscalld.git] / src / io / pipe.rs
CommitLineData
edf4f4b9 1use std::convert::{TryFrom, TryInto};
f4c53643
WB
2use std::io;
3use std::marker::PhantomData;
8dd26985 4use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
5bd0c562 5use std::pin::Pin;
f4c53643
WB
6use std::task::{Context, Poll};
7
7a1ab2b2 8use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
5bd0c562 9
f4c53643 10use crate::error::io_err_other;
5bd0c562 11use crate::io::polled_fd::PolledFd;
8dd26985 12use crate::io::rw_traits;
f4c53643
WB
13use crate::tools::Fd;
14
8dd26985
WB
15pub use rw_traits::{Read, Write};
16
edf4f4b9
WB
17/// Helper struct for generating pipes.
18///
19/// `Pipe` is a tokio-io supported type, associated with a reactor. After a `fork()` we cannot do
20/// anything with it, including turning it into a raw fd as tokio will attempt to disassociate it
21/// from the reactor, which will just break.
22///
23/// So we start out with this type which can be "upgraded" or "downgraded" into a `Pipe<T>` or
24/// `Fd`.
25pub struct PipeFd<RW>(Fd, PhantomData<RW>);
26
27impl<RW> PipeFd<RW> {
28 pub fn new(fd: Fd) -> Self {
29 Self(fd, PhantomData)
30 }
31
32 pub fn into_fd(self) -> Fd {
33 self.0
34 }
35}
36
37pub fn pipe_fds() -> io::Result<(PipeFd<rw_traits::Read>, PipeFd<rw_traits::Write>)> {
38 let mut pfd: [RawFd; 2] = [0, 0];
39
40 c_try!(unsafe { libc::pipe2(pfd.as_mut_ptr(), libc::O_CLOEXEC) });
41
42 let (fd_in, fd_out) = unsafe { (Fd::from_raw_fd(pfd[0]), Fd::from_raw_fd(pfd[1])) };
43
44 Ok((PipeFd::new(fd_in), PipeFd::new(fd_out)))
45}
46
47/// Tokio supported pipe file descriptor. `tokio::fs::File` requires tokio's complete file system
48/// feature gate, so we just use this `PolledFd` wrapper.
f4c53643
WB
49pub struct Pipe<RW> {
50 fd: PolledFd,
51 _phantom: PhantomData<RW>,
52}
53
edf4f4b9
WB
54impl<RW> TryFrom<PipeFd<RW>> for Pipe<RW> {
55 type Error = io::Error;
56
57 fn try_from(fd: PipeFd<RW>) -> io::Result<Self> {
58 Ok(Self {
59 fd: PolledFd::new(fd.into_fd())?,
60 _phantom: PhantomData,
61 })
62 }
63}
64
f4c53643
WB
65impl<RW> AsRawFd for Pipe<RW> {
66 #[inline]
67 fn as_raw_fd(&self) -> RawFd {
68 self.fd.as_raw_fd()
69 }
70}
71
8dd26985
WB
72impl<RW> IntoRawFd for Pipe<RW> {
73 #[inline]
74 fn into_raw_fd(self) -> RawFd {
75 self.fd.into_raw_fd()
76 }
77}
78
f4c53643 79pub fn pipe() -> io::Result<(Pipe<rw_traits::Read>, Pipe<rw_traits::Write>)> {
edf4f4b9 80 let (fd_in, fd_out) = pipe_fds()?;
f4c53643 81
edf4f4b9 82 Ok((fd_in.try_into()?, fd_out.try_into()?))
f4c53643
WB
83}
84
5bd0c562
WB
85impl<RW: rw_traits::HasRead> AsyncRead for Pipe<RW> {
86 fn poll_read(
87 self: Pin<&mut Self>,
88 cx: &mut Context<'_>,
7a1ab2b2
FG
89 buf: &mut ReadBuf,
90 ) -> Poll<io::Result<()>> {
f4c53643 91 self.fd.wrap_read(cx, || {
5bd0c562 92 let fd = self.as_raw_fd();
7a1ab2b2 93 let buf = buf.initialize_unfilled();
5bd0c562
WB
94 let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?;
95 c_result!(unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, size) })
7a1ab2b2 96 .map(|_| ())
f4c53643
WB
97 })
98 }
f4c53643
WB
99}
100
5bd0c562
WB
101impl<RW: rw_traits::HasWrite> AsyncWrite for Pipe<RW> {
102 fn poll_write(
103 self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 buf: &[u8],
106 ) -> Poll<io::Result<usize>> {
f4c53643 107 self.fd.wrap_write(cx, || {
5bd0c562
WB
108 let fd = self.as_raw_fd();
109 let size = libc::size_t::try_from(buf.len()).map_err(io_err_other)?;
110 c_result!(unsafe { libc::write(fd, buf.as_ptr() as *const libc::c_void, size) })
f4c53643
WB
111 .map(|res| res as usize)
112 })
113 }
114
5bd0c562
WB
115 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
116 Poll::Ready(Ok(()))
117 }
118
119 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
120 Poll::Ready(Ok(()))
f4c53643
WB
121 }
122}