]> git.proxmox.com Git - pve-xtermjs.git/blob - termproxy/src/main.rs
5eb14ee3585c3e1bf0d517f51dcddc869abb5785
[pve-xtermjs.git] / termproxy / src / main.rs
1 use std::cmp::min;
2 use std::collections::HashMap;
3 use std::ffi::OsString;
4 use std::io::{ErrorKind, Write};
5 use std::os::unix::io::{AsRawFd, FromRawFd};
6 use std::os::unix::process::CommandExt;
7 use std::process::Command;
8 use std::time::{Duration, Instant};
9
10 use anyhow::{bail, format_err, Result};
11 use clap::Arg;
12 use mio::net::{TcpListener, TcpStream};
13 use mio::unix::SourceFd;
14 use mio::{Events, Interest, Poll, Token};
15
16 use proxmox_io::ByteBuffer;
17 use proxmox_lang::error::io_err_other;
18 use proxmox_sys::linux::pty::{make_controlling_terminal, PTY};
19
20 const MSG_TYPE_DATA: u8 = 0;
21 const MSG_TYPE_RESIZE: u8 = 1;
22 //const MSG_TYPE_PING: u8 = 2;
23
24 fn remove_number(buf: &mut ByteBuffer) -> Option<usize> {
25 loop {
26 if let Some(pos) = &buf.iter().position(|&x| x == b':') {
27 let data = buf.remove_data(*pos);
28 buf.consume(1); // the ':'
29 let len = match std::str::from_utf8(&data) {
30 Ok(lenstring) => match lenstring.parse() {
31 Ok(len) => len,
32 Err(err) => {
33 eprintln!("error parsing number: '{}'", err);
34 break;
35 }
36 },
37 Err(err) => {
38 eprintln!("error parsing number: '{}'", err);
39 break;
40 }
41 };
42 return Some(len);
43 } else if buf.len() > 20 {
44 buf.consume(20);
45 } else {
46 break;
47 }
48 }
49 None
50 }
51
52 fn process_queue(buf: &mut ByteBuffer, pty: &mut PTY) -> Option<usize> {
53 if buf.is_empty() {
54 return None;
55 }
56
57 loop {
58 if buf.len() < 2 {
59 break;
60 }
61
62 let msgtype = buf[0] - b'0';
63
64 if msgtype == MSG_TYPE_DATA {
65 buf.consume(2);
66 if let Some(len) = remove_number(buf) {
67 return Some(len);
68 }
69 } else if msgtype == MSG_TYPE_RESIZE {
70 buf.consume(2);
71 if let Some(cols) = remove_number(buf) {
72 if let Some(rows) = remove_number(buf) {
73 pty.set_size(cols as u16, rows as u16).ok()?;
74 }
75 }
76 // ignore incomplete messages
77 } else {
78 buf.consume(1);
79 // ignore invalid or ping (msgtype 2)
80 }
81 }
82
83 None
84 }
85
86 type TicketResult = Result<(Box<[u8]>, Box<[u8]>)>;
87
88 /// Reads from the stream and returns the first line and the rest
89 fn read_ticket_line(
90 stream: &mut TcpStream,
91 buf: &mut ByteBuffer,
92 timeout: Duration,
93 ) -> TicketResult {
94 let mut poll = Poll::new()?;
95 poll.registry()
96 .register(stream, Token(0), Interest::READABLE)?;
97 let mut events = Events::with_capacity(1);
98
99 let now = Instant::now();
100 let mut elapsed = Duration::new(0, 0);
101
102 loop {
103 poll.poll(&mut events, Some(timeout - elapsed))?;
104 if !events.is_empty() {
105 match buf.read_from(stream) {
106 Ok(n) => {
107 if n == 0 {
108 bail!("connection closed before authentication");
109 }
110 }
111 Err(err) if err.kind() == ErrorKind::WouldBlock => {}
112 Err(err) => return Err(err.into()),
113 }
114
115 if buf[..].contains(&b'\n') {
116 break;
117 }
118
119 if buf.is_full() {
120 bail!("authentication data is incomplete: {:?}", &buf[..]);
121 }
122 }
123
124 elapsed = now.elapsed();
125 if elapsed > timeout {
126 bail!("timed out");
127 }
128 }
129
130 let newline_idx = &buf[..].iter().position(|&x| x == b'\n').unwrap();
131
132 let line = buf.remove_data(*newline_idx);
133 buf.consume(1); // discard newline
134
135 match line.iter().position(|&b| b == b':') {
136 Some(pos) => {
137 let (username, ticket) = line.split_at(pos);
138 Ok((username.into(), ticket[1..].into()))
139 }
140 None => bail!("authentication data is invalid"),
141 }
142 }
143
144 fn authenticate(
145 username: &[u8],
146 ticket: &[u8],
147 path: &str,
148 perm: Option<&str>,
149 authport: u16,
150 port: Option<u16>,
151 ) -> Result<()> {
152 let mut post_fields: Vec<(&str, &str)> = Vec::with_capacity(5);
153 post_fields.push(("username", std::str::from_utf8(username)?));
154 post_fields.push(("password", std::str::from_utf8(ticket)?));
155 post_fields.push(("path", path));
156 if let Some(perm) = perm {
157 post_fields.push(("privs", perm));
158 }
159 let port_str;
160 if let Some(port) = port {
161 port_str = port.to_string();
162 post_fields.push(("port", &port_str));
163 }
164
165 let url = format!("http://localhost:{}/api2/json/access/ticket", authport);
166
167 match ureq::post(&url).send_form(&post_fields[..]) {
168 Ok(res) if res.status() == 200 => Ok(()),
169 Ok(res) | Err(ureq::Error::Status(_, res)) => {
170 let code = res.status();
171 bail!("invalid authentication - {} {}", code, res.status_text())
172 }
173 Err(err) => bail!("authentication request failed - {}", err),
174 }
175 }
176
177 fn listen_and_accept(
178 hostname: &str,
179 port: u64,
180 port_as_fd: bool,
181 timeout: Duration,
182 ) -> Result<(TcpStream, u16)> {
183 let listener = if port_as_fd {
184 unsafe { std::net::TcpListener::from_raw_fd(port as i32) }
185 } else {
186 std::net::TcpListener::bind((hostname, port as u16))?
187 };
188 let port = listener.local_addr()?.port();
189 let mut listener = TcpListener::from_std(listener);
190 let mut poll = Poll::new()?;
191
192 poll.registry()
193 .register(&mut listener, Token(0), Interest::READABLE)?;
194
195 let mut events = Events::with_capacity(1);
196
197 let now = Instant::now();
198 let mut elapsed = Duration::new(0, 0);
199
200 loop {
201 poll.poll(&mut events, Some(timeout - elapsed))?;
202 if !events.is_empty() {
203 let (stream, client) = listener.accept()?;
204 println!("client connection: {:?}", client);
205 return Ok((stream, port));
206 }
207
208 elapsed = now.elapsed();
209 if elapsed > timeout {
210 bail!("timed out");
211 }
212 }
213 }
214
215 fn run_pty<'a>(mut full_cmd: impl Iterator<Item = &'a OsString>) -> Result<PTY> {
216 let cmd_exe = full_cmd.next().unwrap();
217 let params = full_cmd; // rest
218
219 let (mut pty, secondary_name) = PTY::new().map_err(io_err_other)?;
220
221 let mut filtered_env: HashMap<OsString, OsString> = std::env::vars_os()
222 .filter(|&(ref k, _)| {
223 k == "PATH"
224 || k == "USER"
225 || k == "HOME"
226 || k == "LANG"
227 || k == "LANGUAGE"
228 || k.to_string_lossy().starts_with("LC_")
229 })
230 .collect();
231 filtered_env.insert("TERM".into(), "xterm-256color".into());
232
233 let mut command = Command::new(cmd_exe);
234
235 command.args(params).env_clear().envs(&filtered_env);
236
237 unsafe {
238 command.pre_exec(move || {
239 make_controlling_terminal(&secondary_name).map_err(io_err_other)?;
240 Ok(())
241 });
242 }
243
244 command.spawn()?;
245
246 pty.set_size(80, 20)?;
247 Ok(pty)
248 }
249
250 const TCP: Token = Token(0);
251 const PTY: Token = Token(1);
252
253 fn do_main() -> Result<()> {
254 let matches = clap::builder::Command::new("termproxy")
255 .trailing_var_arg(true)
256 .arg(
257 Arg::new("port")
258 .num_args(1)
259 .required(true)
260 .value_parser(clap::value_parser!(u64)),
261 )
262 .arg(Arg::new("authport").num_args(1).long("authport"))
263 .arg(Arg::new("use-port-as-fd").long("port-as-fd"))
264 .arg(Arg::new("path").num_args(1).long("path").required(true))
265 .arg(Arg::new("perm").num_args(1).long("perm"))
266 .arg(
267 Arg::new("cmd")
268 .value_parser(clap::value_parser!(OsString))
269 .num_args(1..)
270 .required(true),
271 )
272 .get_matches();
273
274 let port: u64 = *matches.get_one("port").unwrap();
275 let path = matches.get_one::<String>("path").unwrap();
276 let perm = matches.get_one::<String>("perm").map(|x| x.as_str());
277 let full_cmd: clap::parser::ValuesRef<OsString> = matches.get_many("cmd").unwrap();
278 let authport: u16 = *matches.get_one("authport").unwrap_or(&85);
279 let use_port_as_fd = matches.contains_id("use-port-as-fd");
280
281 if use_port_as_fd && port > u16::MAX as u64 {
282 return Err(format_err!("port too big"));
283 } else if port > i32::MAX as u64 {
284 return Err(format_err!("Invalid FD number"));
285 }
286
287 let (mut tcp_handle, port) =
288 listen_and_accept("localhost", port, use_port_as_fd, Duration::new(10, 0))
289 .map_err(|err| format_err!("failed waiting for client: {}", err))?;
290
291 let mut pty_buf = ByteBuffer::new();
292 let mut tcp_buf = ByteBuffer::new();
293
294 let (username, ticket) = read_ticket_line(&mut tcp_handle, &mut pty_buf, Duration::new(10, 0))
295 .map_err(|err| format_err!("failed reading ticket: {}", err))?;
296 let port = if use_port_as_fd { Some(port) } else { None };
297 authenticate(&username, &ticket, &path, perm.as_deref(), authport, port)?;
298 tcp_handle.write_all(b"OK").expect("error writing response");
299
300 let mut poll = Poll::new()?;
301 let mut events = Events::with_capacity(128);
302
303 let mut pty = run_pty(full_cmd)?;
304
305 poll.registry().register(
306 &mut tcp_handle,
307 TCP,
308 Interest::READABLE | Interest::WRITABLE,
309 )?;
310 poll.registry().register(
311 &mut SourceFd(&pty.as_raw_fd()),
312 PTY,
313 Interest::READABLE | Interest::WRITABLE,
314 )?;
315
316 let mut tcp_writable = true;
317 let mut pty_writable = true;
318 let mut tcp_readable = true;
319 let mut pty_readable = true;
320 let mut remaining = 0;
321 let mut finished = false;
322
323 while !finished {
324 if tcp_readable && !pty_buf.is_full() || pty_readable && !tcp_buf.is_full() {
325 poll.poll(&mut events, Some(Duration::new(0, 0)))?;
326 } else {
327 poll.poll(&mut events, None)?;
328 }
329
330 for event in &events {
331 let writable = event.is_writable();
332 let readable = event.is_readable();
333 if event.is_read_closed() {
334 finished = true;
335 }
336 match event.token() {
337 TCP => {
338 if readable {
339 tcp_readable = true;
340 }
341 if writable {
342 tcp_writable = true;
343 }
344 }
345 PTY => {
346 if readable {
347 pty_readable = true;
348 }
349 if writable {
350 pty_writable = true;
351 }
352 }
353 _ => unreachable!(),
354 }
355 }
356
357 while tcp_readable && !pty_buf.is_full() {
358 let bytes = match pty_buf.read_from(&mut tcp_handle) {
359 Ok(bytes) => bytes,
360 Err(err) if err.kind() == ErrorKind::WouldBlock => {
361 tcp_readable = false;
362 break;
363 }
364 Err(err) => {
365 if !finished {
366 return Err(format_err!("error reading from tcp: {}", err));
367 }
368 break;
369 }
370 };
371 if bytes == 0 {
372 finished = true;
373 break;
374 }
375 }
376
377 while pty_readable && !tcp_buf.is_full() {
378 let bytes = match tcp_buf.read_from(&mut pty) {
379 Ok(bytes) => bytes,
380 Err(err) if err.kind() == ErrorKind::WouldBlock => {
381 pty_readable = false;
382 break;
383 }
384 Err(err) => {
385 if !finished {
386 return Err(format_err!("error reading from pty: {}", err));
387 }
388 break;
389 }
390 };
391 if bytes == 0 {
392 finished = true;
393 break;
394 }
395 }
396
397 while !tcp_buf.is_empty() && tcp_writable {
398 let bytes = match tcp_handle.write(&tcp_buf[..]) {
399 Ok(bytes) => bytes,
400 Err(err) if err.kind() == ErrorKind::WouldBlock => {
401 tcp_writable = false;
402 break;
403 }
404 Err(err) => {
405 if !finished {
406 return Err(format_err!("error writing to tcp : {}", err));
407 }
408 break;
409 }
410 };
411 tcp_buf.consume(bytes);
412 }
413
414 while !pty_buf.is_empty() && pty_writable {
415 if remaining == 0 {
416 remaining = match process_queue(&mut pty_buf, &mut pty) {
417 Some(val) => val,
418 None => break,
419 };
420 }
421 let len = min(remaining, pty_buf.len());
422 let bytes = match pty.write(&pty_buf[..len]) {
423 Ok(bytes) => bytes,
424 Err(err) if err.kind() == ErrorKind::WouldBlock => {
425 pty_writable = false;
426 break;
427 }
428 Err(err) => {
429 if !finished {
430 return Err(format_err!("error writing to pty : {}", err));
431 }
432 break;
433 }
434 };
435 remaining -= bytes;
436 pty_buf.consume(bytes);
437 }
438 }
439
440 Ok(())
441 }
442
443 fn main() {
444 std::process::exit(match do_main() {
445 Ok(_) => 0,
446 Err(err) => {
447 eprintln!("{}", err);
448 1
449 }
450 });
451 }