]>
Commit | Line | Data |
---|---|---|
9cffeac4 | 1 | use std::convert::TryFrom; |
9cffeac4 WB |
2 | use std::os::raw::c_void; |
3 | use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | |
52f50bd4 WB |
4 | use std::task::Context; |
5 | use std::task::Poll; | |
9cffeac4 WB |
6 | use std::{io, mem, ptr}; |
7 | ||
8 | use failure::{bail, Error}; | |
9cffeac4 | 9 | use futures::future::poll_fn; |
52f50bd4 | 10 | use futures::ready; |
9cffeac4 WB |
11 | use nix::sys::socket::{AddressFamily, SockAddr, SockFlag, SockType}; |
12 | ||
e420f6f9 | 13 | use crate::tools::{vec, Fd, IoVec, IoVecMut}; |
9cffeac4 WB |
14 | |
15 | pub struct SeqPacketSocket(Fd); | |
16 | ||
17 | impl FromRawFd for SeqPacketSocket { | |
18 | unsafe fn from_raw_fd(fd: RawFd) -> Self { | |
19 | Self(Fd(fd)) | |
20 | } | |
21 | } | |
22 | ||
23 | impl SeqPacketSocket { | |
24 | fn fd(&self) -> RawFd { | |
25 | (self.0).0 | |
26 | } | |
27 | ||
571dbe03 | 28 | pub fn recv_fds_vectored( |
d4f9eb8d | 29 | &self, |
571dbe03 WB |
30 | iov: &mut [IoVecMut], |
31 | num_fds: usize, | |
32 | ) -> io::Result<(usize, Vec<Fd>)> { | |
9cffeac4 WB |
33 | let fdlist_size = u32::try_from(mem::size_of::<RawFd>() * num_fds) |
34 | .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("size error: {}", e)))?; | |
35 | ||
36 | let mut cmsgbuf = unsafe { vec::uninitialized(libc::CMSG_SPACE(fdlist_size) as usize) }; | |
37 | unsafe { | |
38 | ptr::write_bytes(cmsgbuf.as_mut_ptr(), 0xff, cmsgbuf.len()); | |
39 | } | |
40 | ||
9cffeac4 | 41 | let mut msg: libc::msghdr = unsafe { mem::zeroed() }; |
571dbe03 | 42 | msg.msg_iov = iov.as_mut_ptr() as *mut _ as *mut libc::iovec; |
9cffeac4 WB |
43 | msg.msg_iovlen = iov.len(); |
44 | msg.msg_controllen = cmsgbuf.len(); | |
45 | msg.msg_control = cmsgbuf.as_mut_ptr() as *mut c_void; | |
46 | let _ = &cmsgbuf; // from now on we only use raw pointer stuff | |
47 | ||
52f50bd4 | 48 | let received = unsafe { libc::recvmsg(self.fd(), &mut msg, libc::MSG_CMSG_CLOEXEC) }; |
9cffeac4 WB |
49 | if received < 0 { |
50 | return Err(io::Error::last_os_error()); | |
51 | } | |
52 | ||
53 | let mut out_fds = Vec::with_capacity(num_fds); | |
54 | let mut cmsg_ptr = unsafe { libc::CMSG_FIRSTHDR(&msg) }; | |
55 | while !cmsg_ptr.is_null() { | |
56 | let cmsg: &libc::cmsghdr = unsafe { &*cmsg_ptr }; | |
57 | if cmsg.cmsg_type == libc::SCM_RIGHTS | |
58 | && cmsg.cmsg_len == unsafe { libc::CMSG_LEN(fdlist_size) as usize } | |
59 | && cmsg.cmsg_level == libc::SOL_SOCKET | |
60 | { | |
61 | let fds = unsafe { | |
62 | std::slice::from_raw_parts(libc::CMSG_DATA(cmsg_ptr) as *const RawFd, num_fds) | |
63 | }; | |
64 | for fd in fds { | |
65 | println!("Received fd: {}", *fd); | |
66 | out_fds.push(Fd(*fd)); | |
67 | } | |
68 | break; | |
69 | } | |
70 | cmsg_ptr = unsafe { libc::CMSG_NXTHDR(&msg, cmsg_ptr) }; | |
71 | } | |
72 | ||
73 | Ok((received as usize, out_fds)) | |
74 | } | |
75 | ||
76 | /// Send a message via `sendmsg(2)`. | |
77 | /// | |
78 | /// Note that short writes are silently treated as success, since this is a `SOCK_SEQPACKET`, | |
79 | /// so neither continuing nor repeating a partial messages makes all that much sense. | |
d4f9eb8d | 80 | pub fn sendmsg_vectored(&self, iov: &[IoVec]) -> io::Result<()> { |
9cffeac4 | 81 | let mut msg: libc::msghdr = unsafe { mem::zeroed() }; |
571dbe03 | 82 | msg.msg_iov = iov.as_ptr() as *const libc::iovec as *mut libc::iovec; |
9cffeac4 WB |
83 | msg.msg_iovlen = iov.len(); |
84 | ||
85 | let sent = unsafe { libc::sendmsg(self.fd(), &mut msg, libc::MSG_NOSIGNAL) }; | |
86 | if sent < 0 { | |
87 | return Err(io::Error::last_os_error()); | |
88 | } | |
89 | ||
90 | // XXX: what to do with short writes? we're a SEQPACKET socket... | |
91 | ||
92 | Ok(()) | |
93 | } | |
94 | ||
95 | fn as_fd(&self) -> &Fd { | |
96 | &self.0 | |
97 | } | |
e420f6f9 WB |
98 | |
99 | /// Shutdown parts of the socket. | |
100 | #[inline] | |
101 | pub fn shutdown(&self, how: nix::sys::socket::Shutdown) -> nix::Result<()> { | |
102 | nix::sys::socket::shutdown(self.as_raw_fd(), how) | |
103 | } | |
9cffeac4 WB |
104 | } |
105 | ||
106 | impl AsRawFd for SeqPacketSocket { | |
107 | fn as_raw_fd(&self) -> RawFd { | |
108 | self.fd() | |
109 | } | |
110 | } | |
111 | ||
112 | pub struct SeqPacketListener { | |
113 | fd: Fd, | |
114 | registration: tokio::reactor::Registration, | |
115 | } | |
116 | ||
117 | impl Drop for SeqPacketListener { | |
118 | fn drop(&mut self) { | |
119 | if let Err(err) = self.registration.deregister(&self.fd) { | |
120 | eprintln!("failed to deregister I/O resource with reactor: {}", err); | |
121 | } | |
122 | } | |
123 | } | |
124 | ||
125 | impl SeqPacketListener { | |
126 | pub fn bind(address: &SockAddr) -> Result<Self, Error> { | |
127 | let fd = Fd(nix::sys::socket::socket( | |
128 | AddressFamily::Unix, | |
129 | SockType::SeqPacket, | |
130 | SockFlag::SOCK_CLOEXEC | SockFlag::SOCK_NONBLOCK, | |
131 | None, | |
132 | )?); | |
133 | ||
134 | nix::sys::socket::bind(fd.as_raw_fd(), &address)?; | |
135 | nix::sys::socket::listen(fd.as_raw_fd(), 16)?; | |
136 | ||
137 | let registration = tokio::reactor::Registration::new(); | |
138 | if !registration.register(&fd)? { | |
139 | bail!("duplicate file descriptor registration?"); | |
140 | } | |
141 | ||
52f50bd4 | 142 | Ok(Self { fd, registration }) |
9cffeac4 WB |
143 | } |
144 | ||
52f50bd4 | 145 | pub fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<AsyncSeqPacketSocket>> { |
9cffeac4 WB |
146 | let fd = loop { |
147 | match nix::sys::socket::accept4( | |
148 | self.fd.as_raw_fd(), | |
149 | SockFlag::SOCK_CLOEXEC | SockFlag::SOCK_NONBLOCK, | |
150 | ) { | |
151 | Ok(fd) => break Fd(fd), | |
152 | Err(err) => match err.as_errno() { | |
153 | Some(nix::errno::Errno::EAGAIN) => { | |
154 | match ready!(self.registration.poll_read_ready(cx)) { | |
155 | Ok(_) => continue, | |
156 | Err(err) => return Poll::Ready(Err(err)), | |
157 | } | |
158 | } | |
159 | Some(other) => { | |
160 | return Poll::Ready(Err(io::Error::from_raw_os_error(other as _))); | |
161 | } | |
162 | None => { | |
163 | return Poll::Ready(Err(io::Error::new( | |
164 | io::ErrorKind::Other, | |
165 | "unexpected non-OS error in nix::sys::socket::accept4()", | |
166 | ))); | |
52f50bd4 WB |
167 | } |
168 | }, | |
9cffeac4 WB |
169 | }; |
170 | }; | |
171 | ||
172 | Poll::Ready(match AsyncSeqPacketSocket::new(fd) { | |
173 | Ok(c) => Ok(c), | |
174 | Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())), | |
175 | }) | |
176 | } | |
177 | ||
178 | pub async fn accept(&mut self) -> io::Result<AsyncSeqPacketSocket> { | |
179 | poll_fn(|cx| self.poll_accept(cx)).await | |
180 | } | |
181 | } | |
182 | ||
183 | // Do I care about having it as a stream? | |
184 | //#[must_use = "streams do nothing unless polled"] | |
185 | //pub struct SeqPacketIncoming { | |
186 | //} | |
187 | ||
188 | pub struct AsyncSeqPacketSocket { | |
189 | socket: SeqPacketSocket, | |
190 | registration: tokio::reactor::Registration, | |
191 | } | |
192 | ||
193 | impl Drop for AsyncSeqPacketSocket { | |
194 | fn drop(&mut self) { | |
195 | if let Err(err) = self.registration.deregister(self.socket.as_fd()) { | |
196 | eprintln!("failed to deregister I/O resource with reactor: {}", err); | |
197 | } | |
198 | } | |
199 | } | |
200 | ||
201 | impl AsyncSeqPacketSocket { | |
202 | pub fn new(fd: Fd) -> Result<Self, Error> { | |
203 | let registration = tokio::reactor::Registration::new(); | |
204 | if !registration.register(&fd)? { | |
205 | bail!("duplicate file descriptor registration?"); | |
206 | } | |
207 | ||
208 | Ok(Self { | |
209 | socket: unsafe { SeqPacketSocket::from_raw_fd(fd.into_raw_fd()) }, | |
210 | registration, | |
211 | }) | |
212 | } | |
213 | ||
e420f6f9 WB |
214 | /// Shutdown parts of the socket. |
215 | #[inline] | |
216 | pub fn shutdown(&self, how: nix::sys::socket::Shutdown) -> nix::Result<()> { | |
217 | self.socket.shutdown(how) | |
218 | } | |
219 | ||
571dbe03 | 220 | pub fn poll_recv_fds_vectored( |
d4f9eb8d | 221 | &self, |
571dbe03 | 222 | iov: &mut [IoVecMut], |
9cffeac4 WB |
223 | num_fds: usize, |
224 | cx: &mut Context, | |
225 | ) -> Poll<io::Result<(usize, Vec<Fd>)>> { | |
226 | loop { | |
571dbe03 | 227 | match self.socket.recv_fds_vectored(iov, num_fds) { |
9cffeac4 WB |
228 | Ok(res) => break Poll::Ready(Ok(res)), |
229 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { | |
230 | match ready!(self.registration.poll_read_ready(cx)) { | |
231 | Ok(_) => continue, | |
232 | Err(err) => break Poll::Ready(Err(err)), | |
233 | } | |
52f50bd4 | 234 | } |
9cffeac4 WB |
235 | Err(err) => break Poll::Ready(Err(err)), |
236 | } | |
237 | } | |
238 | } | |
239 | ||
571dbe03 | 240 | pub async fn recv_fds_vectored( |
d4f9eb8d | 241 | &self, |
571dbe03 | 242 | iov: &mut [IoVecMut<'_>], |
9cffeac4 WB |
243 | num_fds: usize, |
244 | ) -> io::Result<(usize, Vec<Fd>)> { | |
571dbe03 | 245 | poll_fn(move |cx| self.poll_recv_fds_vectored(iov, num_fds, cx)).await |
9cffeac4 WB |
246 | } |
247 | ||
e420f6f9 | 248 | pub fn poll_sendmsg_vectored(&self, data: &[IoVec], cx: &mut Context) -> Poll<io::Result<()>> { |
9cffeac4 | 249 | loop { |
571dbe03 | 250 | match self.socket.sendmsg_vectored(data) { |
9cffeac4 WB |
251 | Ok(res) => break Poll::Ready(Ok(res)), |
252 | Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { | |
253 | match ready!(self.registration.poll_write_ready(cx)) { | |
254 | Ok(_) => continue, | |
255 | Err(err) => break Poll::Ready(Err(err)), | |
256 | } | |
52f50bd4 | 257 | } |
9cffeac4 WB |
258 | Err(err) => break Poll::Ready(Err(err)), |
259 | } | |
260 | } | |
261 | } | |
262 | ||
d4f9eb8d | 263 | pub async fn sendmsg_vectored(&self, data: &[IoVec<'_>]) -> io::Result<()> { |
571dbe03 | 264 | poll_fn(move |cx| self.poll_sendmsg_vectored(data, cx)).await |
9cffeac4 WB |
265 | } |
266 | } |