]> git.proxmox.com Git - pve-xtermjs.git/blob - src/main.rs
termproxy: rewrite in rust
[pve-xtermjs.git] / src / main.rs
1 use std::cmp::min;
2 use std::collections::HashMap;
3 use std::ffi::{OsStr, OsString};
4 use std::io::{ErrorKind, Read, Result, Write};
5 use std::net::TcpStream;
6 use std::os::unix::io::{AsRawFd, FromRawFd};
7 use std::os::unix::process::CommandExt;
8 use std::process::Command;
9 use std::time::{Duration, Instant};
10
11 use clap::{App, AppSettings, Arg};
12 use curl::easy::Easy;
13 use mio::net::TcpListener;
14 use mio::unix::{EventedFd, UnixReady};
15 use mio::{Events, Poll, PollOpt, Ready, Token};
16
17 use proxmox::sys::error::io_err_other;
18 use proxmox::sys::linux::pty::{make_controlling_terminal, PTY};
19 use proxmox::tools::byte_buffer::ByteBuffer;
20 use proxmox::{io_bail, io_format_err};
21
22 const MSG_TYPE_DATA: u8 = 0;
23 const MSG_TYPE_RESIZE: u8 = 1;
24 //const MSG_TYPE_PING: u8 = 2;
25
26 fn remove_number(buf: &mut ByteBuffer) -> Option<usize> {
27 loop {
28 if let Some(pos) = &buf.iter().position(|&x| x == b':') {
29 let data = buf.remove_data(*pos);
30 buf.consume(1); // the ':'
31 let len = match std::str::from_utf8(&data) {
32 Ok(lenstring) => match lenstring.parse() {
33 Ok(len) => len,
34 Err(err) => {
35 eprintln!("error parsing number: '{}'", err);
36 break;
37 }
38 },
39 Err(err) => {
40 eprintln!("error parsing number: '{}'", err);
41 break;
42 }
43 };
44 return Some(len);
45 } else if buf.len() > 20 {
46 buf.consume(20);
47 } else {
48 break;
49 }
50 }
51 None
52 }
53
54 fn process_queue(buf: &mut ByteBuffer, pty: &mut PTY) -> Option<usize> {
55 if buf.is_empty() {
56 return None;
57 }
58
59 loop {
60 if buf.len() < 2 {
61 break;
62 }
63
64 let msgtype = buf[0] - b'0';
65
66 if msgtype == MSG_TYPE_DATA {
67 buf.consume(2);
68 if let Some(len) = remove_number(buf) {
69 return Some(len);
70 }
71 } else if msgtype == MSG_TYPE_RESIZE {
72 buf.consume(2);
73 if let Some(cols) = remove_number(buf) {
74 if let Some(rows) = remove_number(buf) {
75 pty.set_size(cols as u16, rows as u16).ok()?;
76 }
77 }
78 // ignore incomplete messages
79 } else {
80 buf.consume(1);
81 // ignore invalid or ping (msgtype 2)
82 }
83 }
84
85 None
86 }
87
88 type TicketResult = Result<(Box<[u8]>, Box<[u8]>)>;
89
90 /// Reads from the stream and returns the first line and the rest
91 fn read_ticket_line(
92 stream: &mut TcpStream,
93 buf: &mut ByteBuffer,
94 timeout: Duration,
95 ) -> TicketResult {
96 let now = Instant::now();
97 while !&buf[..].contains(&b'\n') {
98 if buf.is_full() || now.elapsed() >= timeout {
99 io_bail!("authentication data is incomplete: {:?}", &buf[..]);
100 }
101 stream.set_read_timeout(Some(Duration::new(1, 0)))?;
102 match buf.read_from(stream) {
103 Ok(n) => {
104 if n == 0 {
105 io_bail!("connection closed before authentication");
106 }
107 }
108 Err(err) if err.kind() == ErrorKind::WouldBlock => {}
109 Err(err) => return Err(err),
110 }
111 }
112
113 stream.set_read_timeout(None)?;
114 let newline_idx = &buf[..].iter().position(|&x| x == b'\n').unwrap();
115
116 let line = buf.remove_data(*newline_idx);
117 buf.consume(1); // discard newline
118
119 match line.iter().position(|&b| b == b':') {
120 Some(pos) => {
121 let (username, ticket) = line.split_at(pos);
122 Ok((username.into(), ticket[1..].into()))
123 }
124 None => io_bail!("authentication data is invalid"),
125 }
126 }
127
128 fn authenticate(
129 username: &[u8],
130 ticket: &[u8],
131 path: &str,
132 perm: Option<&str>,
133 authport: u16,
134 port: Option<u16>,
135 ) -> Result<()> {
136 let mut curl = Easy::new();
137 curl.url(&format!(
138 "http://localhost:{}/api2/json/access/ticket",
139 authport
140 ))?;
141
142 let username = curl.url_encode(username);
143 let ticket = curl.url_encode(ticket);
144 let path = curl.url_encode(path.as_bytes());
145
146 let mut post_fields = Vec::with_capacity(5);
147 post_fields.push(format!("username={}", username));
148 post_fields.push(format!("password={}", ticket));
149 post_fields.push(format!("path={}", path));
150
151 if let Some(perm) = perm {
152 let perm = curl.url_encode(perm.as_bytes());
153 post_fields.push(format!("privs={}", perm));
154 }
155
156 if let Some(port) = port {
157 post_fields.push(format!("port={}", port));
158 }
159
160 curl.post_fields_copy(post_fields.join("&").as_bytes())?;
161 curl.post(true)?;
162 curl.perform()?;
163
164 let response_code = curl.response_code()?;
165
166 if response_code != 200 {
167 io_bail!("invalid authentication, code {}", response_code);
168 }
169
170 Ok(())
171 }
172
173 fn listen_and_accept(
174 hostname: &str,
175 port: u64,
176 port_as_fd: bool,
177 timeout: Duration,
178 ) -> Result<(TcpStream, u16)> {
179 let listener = if port_as_fd {
180 unsafe { std::net::TcpListener::from_raw_fd(port as i32) }
181 } else {
182 std::net::TcpListener::bind((hostname, port as u16))?
183 };
184 let port = listener.local_addr()?.port();
185 let listener = TcpListener::from_std(listener)?;
186 let poll = Poll::new()?;
187
188 poll.register(&listener, Token(0), Ready::readable(), PollOpt::edge())?;
189
190 let mut events = Events::with_capacity(1);
191
192 let mut timeout = timeout;
193 loop {
194 let now = Instant::now();
195 poll.poll(&mut events, Some(timeout))?;
196 let elapsed = now.elapsed();
197 if !events.is_empty() {
198 let (stream, client) = listener.accept_std()?;
199 println!("client connection: {:?}", client);
200 return Ok((stream, port));
201 }
202 if timeout >= elapsed {
203 timeout -= elapsed;
204 } else {
205 io_bail!("timed out");
206 }
207 }
208 }
209
210 fn run_pty(cmd: &OsStr, params: clap::OsValues) -> Result<PTY> {
211 let (mut pty, secondary_name) = PTY::new().map_err(io_err_other)?;
212
213 let mut filtered_env: HashMap<OsString, OsString> = std::env::vars_os()
214 .filter(|&(ref k, _)| {
215 k == "PATH"
216 || k == "USER"
217 || k == "HOME"
218 || k == "LANG"
219 || k == "LANGUAGE"
220 || k.to_string_lossy().starts_with("LC_")
221 })
222 .collect();
223 filtered_env.insert("TERM".into(), "xterm-256color".into());
224
225 let mut command = Command::new(cmd);
226
227 command.args(params).env_clear().envs(&filtered_env);
228
229 unsafe {
230 command.pre_exec(move || {
231 make_controlling_terminal(&secondary_name).map_err(io_err_other)?;
232 Ok(())
233 });
234 }
235
236 command.spawn()?;
237
238 pty.set_size(80, 20).map_err(|x| x.as_errno().unwrap())?;
239 Ok(pty)
240 }
241
242 const TCP: Token = Token(0);
243 const PTY: Token = Token(1);
244
245 fn do_main() -> Result<()> {
246 let matches = App::new("termproxy")
247 .setting(AppSettings::TrailingVarArg)
248 .arg(Arg::with_name("port").takes_value(true).required(true))
249 .arg(
250 Arg::with_name("authport")
251 .takes_value(true)
252 .long("authport"),
253 )
254 .arg(Arg::with_name("use-port-as-fd").long("port-as-fd"))
255 .arg(
256 Arg::with_name("path")
257 .takes_value(true)
258 .long("path")
259 .required(true),
260 )
261 .arg(Arg::with_name("perm").takes_value(true).long("perm"))
262 .arg(Arg::with_name("cmd").multiple(true).required(true))
263 .get_matches();
264
265 let port: u64 = matches
266 .value_of("port")
267 .unwrap()
268 .parse()
269 .map_err(io_err_other)?;
270 let path = matches.value_of("path").unwrap();
271 let perm: Option<&str> = matches.value_of("perm");
272 let mut cmdparams = matches.values_of_os("cmd").unwrap();
273 let cmd = cmdparams.next().unwrap();
274 let authport: u16 = matches
275 .value_of("authport")
276 .unwrap_or("85")
277 .parse()
278 .map_err(io_err_other)?;
279 let mut pty_buf = ByteBuffer::new();
280 let mut tcp_buf = ByteBuffer::new();
281
282 let use_port_as_fd = matches.is_present("use-port-as-fd");
283
284 if use_port_as_fd && port > u16::MAX as u64 {
285 return Err(io_format_err!("port too big"));
286 } else if port > i32::MAX as u64 {
287 return Err(io_format_err!("Invalid FD number"));
288 }
289
290 let (mut stream, port) =
291 listen_and_accept("localhost", port, use_port_as_fd, Duration::new(10, 0))
292 .map_err(|err| io_format_err!("failed waiting for client: {}", err))?;
293
294 let (username, ticket) = read_ticket_line(&mut stream, &mut pty_buf, Duration::new(10, 0))
295 .map_err(|err| io_format_err!("failed reading ticket: {}", err))?;
296 let port = if use_port_as_fd { Some(port) } else { None };
297 authenticate(&username, &ticket, path, perm, authport, port)?;
298 stream.write_all(b"OK").expect("error writing response");
299
300 let mut tcp_handle = mio::net::TcpStream::from_stream(stream)?;
301
302 let poll = Poll::new()?;
303 let mut events = Events::with_capacity(128);
304
305 let mut pty = run_pty(cmd, cmdparams)?;
306
307 poll.register(
308 &tcp_handle,
309 TCP,
310 Ready::readable() | Ready::writable() | UnixReady::hup(),
311 PollOpt::edge(),
312 )?;
313 poll.register(
314 &EventedFd(&pty.as_raw_fd()),
315 PTY,
316 Ready::readable() | Ready::writable() | UnixReady::hup(),
317 PollOpt::edge(),
318 )?;
319
320 let mut tcp_writable = true;
321 let mut pty_writable = true;
322 let mut tcp_readable = true;
323 let mut pty_readable = true;
324 let mut remaining = 0;
325 let mut finished = false;
326
327 while !finished {
328 if tcp_readable && !pty_buf.is_full() || pty_readable && !tcp_buf.is_full() {
329 poll.poll(&mut events, Some(Duration::new(0, 0)))?;
330 } else {
331 poll.poll(&mut events, None)?;
332 }
333
334 for event in &events {
335 let readiness = event.readiness();
336 let writable = readiness.is_writable();
337 let readable = readiness.is_readable();
338 if UnixReady::from(readiness).is_hup() {
339 finished = true;
340 }
341 match event.token() {
342 TCP => {
343 if readable {
344 tcp_readable = true;
345 }
346 if writable {
347 tcp_writable = true;
348 }
349 }
350 PTY => {
351 if readable {
352 pty_readable = true;
353 }
354 if writable {
355 pty_writable = true;
356 }
357 }
358 _ => unreachable!(),
359 }
360 }
361
362 while tcp_readable && !pty_buf.is_full() {
363 let bytes = match pty_buf.read_from(&mut tcp_handle) {
364 Ok(bytes) => bytes,
365 Err(err) if err.kind() == ErrorKind::WouldBlock => {
366 tcp_readable = false;
367 break;
368 }
369 Err(err) => {
370 if !finished {
371 return Err(io_format_err!("error reading from tcp: {}", err));
372 }
373 break;
374 }
375 };
376 if bytes == 0 {
377 finished = true;
378 break;
379 }
380 }
381
382 while pty_readable && !tcp_buf.is_full() {
383 let bytes = match tcp_buf.read_from(&mut pty) {
384 Ok(bytes) => bytes,
385 Err(err) if err.kind() == ErrorKind::WouldBlock => {
386 pty_readable = false;
387 break;
388 }
389 Err(err) => {
390 if !finished {
391 return Err(io_format_err!("error reading from pty: {}", err));
392 }
393 break;
394 }
395 };
396 if bytes == 0 {
397 finished = true;
398 break;
399 }
400 }
401
402 while !tcp_buf.is_empty() && tcp_writable {
403 let bytes = match tcp_handle.write(&tcp_buf[..]) {
404 Ok(bytes) => bytes,
405 Err(err) if err.kind() == ErrorKind::WouldBlock => {
406 tcp_writable = false;
407 break;
408 }
409 Err(err) => {
410 if !finished {
411 return Err(io_format_err!("error writing to tcp : {}", err));
412 }
413 break;
414 }
415 };
416 tcp_buf.consume(bytes);
417 }
418
419 while !pty_buf.is_empty() && pty_writable {
420 if remaining == 0 {
421 remaining = match process_queue(&mut pty_buf, &mut pty) {
422 Some(val) => val,
423 None => break,
424 };
425 }
426 let len = min(remaining, pty_buf.len());
427 let bytes = match pty.write(&pty_buf[..len]) {
428 Ok(bytes) => bytes,
429 Err(err) if err.kind() == ErrorKind::WouldBlock => {
430 pty_writable = false;
431 break;
432 }
433 Err(err) => {
434 if !finished {
435 return Err(io_format_err!("error writing to pty : {}", err));
436 }
437 break;
438 }
439 };
440 remaining -= bytes;
441 pty_buf.consume(bytes);
442 }
443 }
444
445 Ok(())
446 }
447
448 fn main() {
449 std::process::exit(match do_main() {
450 Ok(_) => 0,
451 Err(err) => {
452 eprintln!("{}", err);
453 1
454 }
455 });
456 }