]> git.proxmox.com Git - pve-lxc-syscalld.git/blame - src/socket.rs
Whole bunch of async code and preparation to fork.
[pve-lxc-syscalld.git] / src / socket.rs
CommitLineData
9cffeac4 1use std::convert::TryFrom;
9cffeac4
WB
2use std::os::raw::c_void;
3use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
52f50bd4
WB
4use std::task::Context;
5use std::task::Poll;
9cffeac4
WB
6use std::{io, mem, ptr};
7
8use failure::{bail, Error};
9cffeac4 9use futures::future::poll_fn;
52f50bd4 10use futures::ready;
9cffeac4
WB
11use nix::sys::socket::{AddressFamily, SockAddr, SockFlag, SockType};
12
e420f6f9 13use crate::tools::{vec, Fd, IoVec, IoVecMut};
9cffeac4
WB
14
15pub struct SeqPacketSocket(Fd);
16
17impl FromRawFd for SeqPacketSocket {
18 unsafe fn from_raw_fd(fd: RawFd) -> Self {
19 Self(Fd(fd))
20 }
21}
22
23impl SeqPacketSocket {
24 fn fd(&self) -> RawFd {
25 (self.0).0
26 }
27
571dbe03 28 pub fn recv_fds_vectored(
d4f9eb8d 29 &self,
571dbe03
WB
30 iov: &mut [IoVecMut],
31 num_fds: usize,
32 ) -> io::Result<(usize, Vec<Fd>)> {
9cffeac4
WB
33 let fdlist_size = u32::try_from(mem::size_of::<RawFd>() * num_fds)
34 .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("size error: {}", e)))?;
35
36 let mut cmsgbuf = unsafe { vec::uninitialized(libc::CMSG_SPACE(fdlist_size) as usize) };
37 unsafe {
38 ptr::write_bytes(cmsgbuf.as_mut_ptr(), 0xff, cmsgbuf.len());
39 }
40
9cffeac4 41 let mut msg: libc::msghdr = unsafe { mem::zeroed() };
571dbe03 42 msg.msg_iov = iov.as_mut_ptr() as *mut _ as *mut libc::iovec;
9cffeac4
WB
43 msg.msg_iovlen = iov.len();
44 msg.msg_controllen = cmsgbuf.len();
45 msg.msg_control = cmsgbuf.as_mut_ptr() as *mut c_void;
46 let _ = &cmsgbuf; // from now on we only use raw pointer stuff
47
52f50bd4 48 let received = unsafe { libc::recvmsg(self.fd(), &mut msg, libc::MSG_CMSG_CLOEXEC) };
9cffeac4
WB
49 if received < 0 {
50 return Err(io::Error::last_os_error());
51 }
52
53 let mut out_fds = Vec::with_capacity(num_fds);
54 let mut cmsg_ptr = unsafe { libc::CMSG_FIRSTHDR(&msg) };
55 while !cmsg_ptr.is_null() {
56 let cmsg: &libc::cmsghdr = unsafe { &*cmsg_ptr };
57 if cmsg.cmsg_type == libc::SCM_RIGHTS
58 && cmsg.cmsg_len == unsafe { libc::CMSG_LEN(fdlist_size) as usize }
59 && cmsg.cmsg_level == libc::SOL_SOCKET
60 {
61 let fds = unsafe {
62 std::slice::from_raw_parts(libc::CMSG_DATA(cmsg_ptr) as *const RawFd, num_fds)
63 };
64 for fd in fds {
65 println!("Received fd: {}", *fd);
66 out_fds.push(Fd(*fd));
67 }
68 break;
69 }
70 cmsg_ptr = unsafe { libc::CMSG_NXTHDR(&msg, cmsg_ptr) };
71 }
72
73 Ok((received as usize, out_fds))
74 }
75
76 /// Send a message via `sendmsg(2)`.
77 ///
78 /// Note that short writes are silently treated as success, since this is a `SOCK_SEQPACKET`,
79 /// so neither continuing nor repeating a partial messages makes all that much sense.
d4f9eb8d 80 pub fn sendmsg_vectored(&self, iov: &[IoVec]) -> io::Result<()> {
9cffeac4 81 let mut msg: libc::msghdr = unsafe { mem::zeroed() };
571dbe03 82 msg.msg_iov = iov.as_ptr() as *const libc::iovec as *mut libc::iovec;
9cffeac4
WB
83 msg.msg_iovlen = iov.len();
84
85 let sent = unsafe { libc::sendmsg(self.fd(), &mut msg, libc::MSG_NOSIGNAL) };
86 if sent < 0 {
87 return Err(io::Error::last_os_error());
88 }
89
90 // XXX: what to do with short writes? we're a SEQPACKET socket...
91
92 Ok(())
93 }
94
95 fn as_fd(&self) -> &Fd {
96 &self.0
97 }
e420f6f9
WB
98
99 /// Shutdown parts of the socket.
100 #[inline]
101 pub fn shutdown(&self, how: nix::sys::socket::Shutdown) -> nix::Result<()> {
102 nix::sys::socket::shutdown(self.as_raw_fd(), how)
103 }
9cffeac4
WB
104}
105
106impl AsRawFd for SeqPacketSocket {
107 fn as_raw_fd(&self) -> RawFd {
108 self.fd()
109 }
110}
111
112pub struct SeqPacketListener {
113 fd: Fd,
114 registration: tokio::reactor::Registration,
115}
116
117impl Drop for SeqPacketListener {
118 fn drop(&mut self) {
119 if let Err(err) = self.registration.deregister(&self.fd) {
120 eprintln!("failed to deregister I/O resource with reactor: {}", err);
121 }
122 }
123}
124
125impl SeqPacketListener {
126 pub fn bind(address: &SockAddr) -> Result<Self, Error> {
127 let fd = Fd(nix::sys::socket::socket(
128 AddressFamily::Unix,
129 SockType::SeqPacket,
130 SockFlag::SOCK_CLOEXEC | SockFlag::SOCK_NONBLOCK,
131 None,
132 )?);
133
134 nix::sys::socket::bind(fd.as_raw_fd(), &address)?;
135 nix::sys::socket::listen(fd.as_raw_fd(), 16)?;
136
137 let registration = tokio::reactor::Registration::new();
138 if !registration.register(&fd)? {
139 bail!("duplicate file descriptor registration?");
140 }
141
52f50bd4 142 Ok(Self { fd, registration })
9cffeac4
WB
143 }
144
52f50bd4 145 pub fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<AsyncSeqPacketSocket>> {
9cffeac4
WB
146 let fd = loop {
147 match nix::sys::socket::accept4(
148 self.fd.as_raw_fd(),
149 SockFlag::SOCK_CLOEXEC | SockFlag::SOCK_NONBLOCK,
150 ) {
151 Ok(fd) => break Fd(fd),
152 Err(err) => match err.as_errno() {
153 Some(nix::errno::Errno::EAGAIN) => {
154 match ready!(self.registration.poll_read_ready(cx)) {
155 Ok(_) => continue,
156 Err(err) => return Poll::Ready(Err(err)),
157 }
158 }
159 Some(other) => {
160 return Poll::Ready(Err(io::Error::from_raw_os_error(other as _)));
161 }
162 None => {
163 return Poll::Ready(Err(io::Error::new(
164 io::ErrorKind::Other,
165 "unexpected non-OS error in nix::sys::socket::accept4()",
166 )));
52f50bd4
WB
167 }
168 },
9cffeac4
WB
169 };
170 };
171
172 Poll::Ready(match AsyncSeqPacketSocket::new(fd) {
173 Ok(c) => Ok(c),
174 Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
175 })
176 }
177
178 pub async fn accept(&mut self) -> io::Result<AsyncSeqPacketSocket> {
179 poll_fn(|cx| self.poll_accept(cx)).await
180 }
181}
182
183// Do I care about having it as a stream?
184//#[must_use = "streams do nothing unless polled"]
185//pub struct SeqPacketIncoming {
186//}
187
188pub struct AsyncSeqPacketSocket {
189 socket: SeqPacketSocket,
190 registration: tokio::reactor::Registration,
191}
192
193impl Drop for AsyncSeqPacketSocket {
194 fn drop(&mut self) {
195 if let Err(err) = self.registration.deregister(self.socket.as_fd()) {
196 eprintln!("failed to deregister I/O resource with reactor: {}", err);
197 }
198 }
199}
200
201impl AsyncSeqPacketSocket {
202 pub fn new(fd: Fd) -> Result<Self, Error> {
203 let registration = tokio::reactor::Registration::new();
204 if !registration.register(&fd)? {
205 bail!("duplicate file descriptor registration?");
206 }
207
208 Ok(Self {
209 socket: unsafe { SeqPacketSocket::from_raw_fd(fd.into_raw_fd()) },
210 registration,
211 })
212 }
213
e420f6f9
WB
214 /// Shutdown parts of the socket.
215 #[inline]
216 pub fn shutdown(&self, how: nix::sys::socket::Shutdown) -> nix::Result<()> {
217 self.socket.shutdown(how)
218 }
219
571dbe03 220 pub fn poll_recv_fds_vectored(
d4f9eb8d 221 &self,
571dbe03 222 iov: &mut [IoVecMut],
9cffeac4
WB
223 num_fds: usize,
224 cx: &mut Context,
225 ) -> Poll<io::Result<(usize, Vec<Fd>)>> {
226 loop {
571dbe03 227 match self.socket.recv_fds_vectored(iov, num_fds) {
9cffeac4
WB
228 Ok(res) => break Poll::Ready(Ok(res)),
229 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
230 match ready!(self.registration.poll_read_ready(cx)) {
231 Ok(_) => continue,
232 Err(err) => break Poll::Ready(Err(err)),
233 }
52f50bd4 234 }
9cffeac4
WB
235 Err(err) => break Poll::Ready(Err(err)),
236 }
237 }
238 }
239
571dbe03 240 pub async fn recv_fds_vectored(
d4f9eb8d 241 &self,
571dbe03 242 iov: &mut [IoVecMut<'_>],
9cffeac4
WB
243 num_fds: usize,
244 ) -> io::Result<(usize, Vec<Fd>)> {
571dbe03 245 poll_fn(move |cx| self.poll_recv_fds_vectored(iov, num_fds, cx)).await
9cffeac4
WB
246 }
247
e420f6f9 248 pub fn poll_sendmsg_vectored(&self, data: &[IoVec], cx: &mut Context) -> Poll<io::Result<()>> {
9cffeac4 249 loop {
571dbe03 250 match self.socket.sendmsg_vectored(data) {
9cffeac4
WB
251 Ok(res) => break Poll::Ready(Ok(res)),
252 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
253 match ready!(self.registration.poll_write_ready(cx)) {
254 Ok(_) => continue,
255 Err(err) => break Poll::Ready(Err(err)),
256 }
52f50bd4 257 }
9cffeac4
WB
258 Err(err) => break Poll::Ready(Err(err)),
259 }
260 }
261 }
262
d4f9eb8d 263 pub async fn sendmsg_vectored(&self, data: &[IoVec<'_>]) -> io::Result<()> {
571dbe03 264 poll_fn(move |cx| self.poll_sendmsg_vectored(data, cx)).await
9cffeac4
WB
265 }
266}