]> git.proxmox.com Git - pve-lxc-syscalld.git/blob - src/io/seq_packet.rs
Revert "seq_packet: set SOCK_NONBLOCK on accept"
[pve-lxc-syscalld.git] / src / io / seq_packet.rs
1 use std::io::{self, IoSlice, IoSliceMut};
2 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
3 use std::ptr;
4 use std::task::{Context, Poll};
5
6 use anyhow::Error;
7 use nix::sys::socket::{self, AddressFamily, SockFlag, SockType, SockaddrLike};
8
9 use crate::io::polled_fd::PolledFd;
10 use crate::poll_fn::poll_fn;
11 use crate::tools::AssertSendSync;
12 use crate::tools::Fd;
13
14 fn seq_packet_socket(flags: SockFlag) -> nix::Result<Fd> {
15 let fd = socket::socket(
16 AddressFamily::Unix,
17 SockType::SeqPacket,
18 flags | SockFlag::SOCK_CLOEXEC,
19 None,
20 )?;
21 Ok(unsafe { Fd::from_raw_fd(fd) })
22 }
23
24 pub struct SeqPacketListener {
25 fd: PolledFd,
26 }
27
28 impl AsRawFd for SeqPacketListener {
29 #[inline]
30 fn as_raw_fd(&self) -> RawFd {
31 self.fd.as_raw_fd()
32 }
33 }
34
35 impl SeqPacketListener {
36 pub fn bind(address: &dyn SockaddrLike) -> Result<Self, Error> {
37 let fd = seq_packet_socket(SockFlag::empty())?;
38 socket::bind(fd.as_raw_fd(), address)?;
39 socket::listen(fd.as_raw_fd(), 16)?;
40
41 let fd = PolledFd::new(fd)?;
42
43 Ok(Self { fd })
44 }
45
46 pub fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<SeqPacketSocket>> {
47 let fd = self.as_raw_fd();
48 let res = self.fd.wrap_read(cx, || {
49 c_result!(unsafe {
50 libc::accept4(fd, ptr::null_mut(), ptr::null_mut(), libc::SOCK_CLOEXEC)
51 })
52 .map(|fd| unsafe { Fd::from_raw_fd(fd as RawFd) })
53 });
54 match res {
55 Poll::Pending => Poll::Pending,
56 Poll::Ready(Ok(fd)) => Poll::Ready(SeqPacketSocket::new(fd)),
57 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
58 }
59 }
60
61 pub async fn accept(&mut self) -> io::Result<SeqPacketSocket> {
62 poll_fn(move |cx| self.poll_accept(cx)).await
63 }
64 }
65
66 pub struct SeqPacketSocket {
67 fd: PolledFd,
68 }
69
70 impl AsRawFd for SeqPacketSocket {
71 #[inline]
72 fn as_raw_fd(&self) -> RawFd {
73 self.fd.as_raw_fd()
74 }
75 }
76
77 impl SeqPacketSocket {
78 pub fn new(fd: Fd) -> io::Result<Self> {
79 Ok(Self {
80 fd: PolledFd::new(fd)?,
81 })
82 }
83
84 pub fn poll_sendmsg(
85 &self,
86 cx: &mut Context,
87 msg: &AssertSendSync<libc::msghdr>,
88 ) -> Poll<io::Result<usize>> {
89 let fd = self.fd.as_raw_fd();
90
91 self.fd.wrap_write(cx, || {
92 c_result!(unsafe { libc::sendmsg(fd, &msg.0 as *const libc::msghdr, 0) })
93 .map(|rc| rc as usize)
94 })
95 }
96
97 pub async fn sendmsg_vectored(&self, iov: &[IoSlice<'_>]) -> io::Result<usize> {
98 let msg = AssertSendSync(libc::msghdr {
99 msg_name: ptr::null_mut(),
100 msg_namelen: 0,
101 msg_iov: iov.as_ptr() as _,
102 msg_iovlen: iov.len(),
103 msg_control: ptr::null_mut(),
104 msg_controllen: 0,
105 msg_flags: 0,
106 });
107
108 poll_fn(move |cx| self.poll_sendmsg(cx, &msg)).await
109 }
110
111 pub fn poll_recvmsg(
112 &self,
113 cx: &mut Context,
114 msg: &mut AssertSendSync<libc::msghdr>,
115 ) -> Poll<io::Result<usize>> {
116 let fd = self.fd.as_raw_fd();
117
118 self.fd.wrap_read(cx, || {
119 c_result!(unsafe { libc::recvmsg(fd, &mut msg.0 as *mut libc::msghdr, 0) })
120 .map(|rc| rc as usize)
121 })
122 }
123
124 // clippy is wrong about this one
125 #[allow(clippy::needless_lifetimes)]
126 pub async fn recvmsg_vectored(
127 &self,
128 iov: &mut [IoSliceMut<'_>],
129 cmsg_buf: &mut [u8],
130 ) -> io::Result<(usize, usize)> {
131 let mut msg = AssertSendSync(libc::msghdr {
132 msg_name: ptr::null_mut(),
133 msg_namelen: 0,
134 msg_iov: iov.as_ptr() as _,
135 msg_iovlen: iov.len(),
136 msg_control: cmsg_buf.as_mut_ptr() as *mut std::ffi::c_void,
137 msg_controllen: cmsg_buf.len(),
138 msg_flags: libc::MSG_CMSG_CLOEXEC,
139 });
140
141 let data_size = poll_fn(|cx| self.poll_recvmsg(cx, &mut msg)).await?;
142 Ok((data_size, msg.0.msg_controllen as usize))
143 }
144
145 #[inline]
146 pub fn shutdown(&self, how: socket::Shutdown) -> nix::Result<()> {
147 socket::shutdown(self.as_raw_fd(), how)
148 }
149 }