]> git.proxmox.com Git - rustc.git/blob - src/libstd/sys/unix/pipe.rs
3c9cdc65975f6f9f5e1b7027774b2a7dd0fa564f
[rustc.git] / src / libstd / sys / unix / pipe.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12
13 use ffi::CString;
14 use libc;
15 use mem;
16 use sync::{Arc, Mutex};
17 use sync::atomic::{AtomicBool, Ordering};
18 use old_io::{self, IoResult, IoError};
19
20 use sys::{self, timer, retry, c, set_nonblocking, wouldblock};
21 use sys::fs::{fd_t, FileDesc};
22 use sys_common::net::*;
23 use sys_common::net::SocketStatus::*;
24 use sys_common::{eof, mkerr_libc};
25
26 fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
27 match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
28 -1 => Err(super::last_error()),
29 fd => Ok(fd)
30 }
31 }
32
33 fn addr_to_sockaddr_un(addr: &CString,
34 storage: &mut libc::sockaddr_storage)
35 -> IoResult<libc::socklen_t> {
36 // the sun_path length is limited to SUN_LEN (with null)
37 assert!(mem::size_of::<libc::sockaddr_storage>() >=
38 mem::size_of::<libc::sockaddr_un>());
39 let s = unsafe { &mut *(storage as *mut _ as *mut libc::sockaddr_un) };
40
41 let len = addr.as_bytes().len();
42 if len > s.sun_path.len() - 1 {
43 return Err(IoError {
44 kind: old_io::InvalidInput,
45 desc: "invalid argument: path must be smaller than SUN_LEN",
46 detail: None,
47 })
48 }
49 s.sun_family = libc::AF_UNIX as libc::sa_family_t;
50 for (slot, value) in s.sun_path.iter_mut().zip(addr.as_bytes().iter()) {
51 *slot = *value as libc::c_char;
52 }
53
54 // count the null terminator
55 let len = mem::size_of::<libc::sa_family_t>() + len + 1;
56 return Ok(len as libc::socklen_t);
57 }
58
59 struct Inner {
60 fd: fd_t,
61
62 // Unused on Linux, where this lock is not necessary.
63 #[allow(dead_code)]
64 lock: Mutex<()>,
65 }
66
67 impl Inner {
68 fn new(fd: fd_t) -> Inner {
69 Inner { fd: fd, lock: Mutex::new(()) }
70 }
71 }
72
73 impl Drop for Inner {
74 fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
75 }
76
77 fn connect(addr: &CString, ty: libc::c_int,
78 timeout: Option<u64>) -> IoResult<Inner> {
79 let mut storage = unsafe { mem::zeroed() };
80 let len = try!(addr_to_sockaddr_un(addr, &mut storage));
81 let inner = Inner::new(try!(unix_socket(ty)));
82 let addrp = &storage as *const _ as *const libc::sockaddr;
83
84 match timeout {
85 None => {
86 match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
87 -1 => Err(super::last_error()),
88 _ => Ok(inner)
89 }
90 }
91 Some(timeout_ms) => {
92 try!(connect_timeout(inner.fd, addrp, len, timeout_ms));
93 Ok(inner)
94 }
95 }
96 }
97
98 fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
99 let mut storage = unsafe { mem::zeroed() };
100 let len = try!(addr_to_sockaddr_un(addr, &mut storage));
101 let inner = Inner::new(try!(unix_socket(ty)));
102 let addrp = &storage as *const _ as *const libc::sockaddr;
103 match unsafe {
104 libc::bind(inner.fd, addrp, len)
105 } {
106 -1 => Err(super::last_error()),
107 _ => Ok(inner)
108 }
109 }
110
111 ////////////////////////////////////////////////////////////////////////////////
112 // Unix Streams
113 ////////////////////////////////////////////////////////////////////////////////
114
115 pub struct UnixStream {
116 inner: Arc<Inner>,
117 read_deadline: u64,
118 write_deadline: u64,
119 }
120
121 impl UnixStream {
122 pub fn connect(addr: &CString,
123 timeout: Option<u64>) -> IoResult<UnixStream> {
124 connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
125 UnixStream::new(Arc::new(inner))
126 })
127 }
128
129 fn new(inner: Arc<Inner>) -> UnixStream {
130 UnixStream {
131 inner: inner,
132 read_deadline: 0,
133 write_deadline: 0,
134 }
135 }
136
137 pub fn fd(&self) -> fd_t { self.inner.fd }
138
139 #[cfg(target_os = "linux")]
140 fn lock_nonblocking(&self) {}
141
142 #[cfg(not(target_os = "linux"))]
143 fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
144 let ret = Guard {
145 fd: self.fd(),
146 guard: unsafe { self.inner.lock.lock().unwrap() },
147 };
148 assert!(set_nonblocking(self.fd(), true).is_ok());
149 ret
150 }
151
152 pub fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
153 let fd = self.fd();
154 let dolock = || self.lock_nonblocking();
155 let doread = |nb| unsafe {
156 let flags = if nb {c::MSG_DONTWAIT} else {0};
157 libc::recv(fd,
158 buf.as_mut_ptr() as *mut libc::c_void,
159 buf.len() as libc::size_t,
160 flags) as libc::c_int
161 };
162 read(fd, self.read_deadline, dolock, doread)
163 }
164
165 pub fn write(&mut self, buf: &[u8]) -> IoResult<()> {
166 let fd = self.fd();
167 let dolock = || self.lock_nonblocking();
168 let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
169 let flags = if nb {c::MSG_DONTWAIT} else {0};
170 libc::send(fd,
171 buf as *const _,
172 len as libc::size_t,
173 flags) as i64
174 };
175 match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
176 Ok(_) => Ok(()),
177 Err(e) => Err(e)
178 }
179 }
180
181 pub fn close_write(&mut self) -> IoResult<()> {
182 mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
183 }
184
185 pub fn close_read(&mut self) -> IoResult<()> {
186 mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
187 }
188
189 pub fn set_timeout(&mut self, timeout: Option<u64>) {
190 let deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
191 self.read_deadline = deadline;
192 self.write_deadline = deadline;
193 }
194
195 pub fn set_read_timeout(&mut self, timeout: Option<u64>) {
196 self.read_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
197 }
198
199 pub fn set_write_timeout(&mut self, timeout: Option<u64>) {
200 self.write_deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
201 }
202 }
203
204 impl Clone for UnixStream {
205 fn clone(&self) -> UnixStream {
206 UnixStream::new(self.inner.clone())
207 }
208 }
209
210 ////////////////////////////////////////////////////////////////////////////////
211 // Unix Listener
212 ////////////////////////////////////////////////////////////////////////////////
213
214 pub struct UnixListener {
215 inner: Inner,
216 path: CString,
217 }
218
219 // we currently own the CString, so these impls should be safe
220 unsafe impl Send for UnixListener {}
221 unsafe impl Sync for UnixListener {}
222
223 impl UnixListener {
224 pub fn bind(addr: &CString) -> IoResult<UnixListener> {
225 bind(addr, libc::SOCK_STREAM).map(|fd| {
226 UnixListener { inner: fd, path: addr.clone() }
227 })
228 }
229
230 pub fn fd(&self) -> fd_t { self.inner.fd }
231
232 pub fn listen(self) -> IoResult<UnixAcceptor> {
233 match unsafe { libc::listen(self.fd(), 128) } {
234 -1 => Err(super::last_error()),
235
236 _ => {
237 let (reader, writer) = try!(unsafe { sys::os::pipe() });
238 try!(set_nonblocking(reader.fd(), true));
239 try!(set_nonblocking(writer.fd(), true));
240 try!(set_nonblocking(self.fd(), true));
241 Ok(UnixAcceptor {
242 inner: Arc::new(AcceptorInner {
243 listener: self,
244 reader: reader,
245 writer: writer,
246 closed: AtomicBool::new(false),
247 }),
248 deadline: 0,
249 })
250 }
251 }
252 }
253 }
254
255 pub struct UnixAcceptor {
256 inner: Arc<AcceptorInner>,
257 deadline: u64,
258 }
259
260 struct AcceptorInner {
261 listener: UnixListener,
262 reader: FileDesc,
263 writer: FileDesc,
264 closed: AtomicBool,
265 }
266
267 impl UnixAcceptor {
268 pub fn fd(&self) -> fd_t { self.inner.listener.fd() }
269
270 pub fn accept(&mut self) -> IoResult<UnixStream> {
271 let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
272
273 while !self.inner.closed.load(Ordering::SeqCst) {
274 unsafe {
275 let mut storage: libc::sockaddr_storage = mem::zeroed();
276 let storagep = &mut storage as *mut libc::sockaddr_storage;
277 let size = mem::size_of::<libc::sockaddr_storage>();
278 let mut size = size as libc::socklen_t;
279 match retry(|| {
280 libc::accept(self.fd(),
281 storagep as *mut libc::sockaddr,
282 &mut size as *mut libc::socklen_t) as libc::c_int
283 }) {
284 -1 if wouldblock() => {}
285 -1 => return Err(super::last_error()),
286 fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
287 }
288 }
289 try!(await(&[self.fd(), self.inner.reader.fd()],
290 deadline, Readable));
291 }
292
293 Err(eof())
294 }
295
296 pub fn set_timeout(&mut self, timeout: Option<u64>) {
297 self.deadline = timeout.map(|a| timer::now() + a).unwrap_or(0);
298 }
299
300 pub fn close_accept(&mut self) -> IoResult<()> {
301 self.inner.closed.store(true, Ordering::SeqCst);
302 let fd = FileDesc::new(self.inner.writer.fd(), false);
303 match fd.write(&[0]) {
304 Ok(..) => Ok(()),
305 Err(..) if wouldblock() => Ok(()),
306 Err(e) => Err(e),
307 }
308 }
309 }
310
311 impl Clone for UnixAcceptor {
312 fn clone(&self) -> UnixAcceptor {
313 UnixAcceptor { inner: self.inner.clone(), deadline: 0 }
314 }
315 }
316
317 impl Drop for UnixListener {
318 fn drop(&mut self) {
319 // Unlink the path to the socket to ensure that it doesn't linger. We're
320 // careful to unlink the path before we close the file descriptor to
321 // prevent races where we unlink someone else's path.
322 unsafe {
323 let _ = libc::unlink(self.path.as_ptr());
324 }
325 }
326 }