]> git.proxmox.com Git - pve-xtermjs.git/blob - src/main.rs
update to proxmox-lang 1.1 / proxmox-sys 0.3.0
[pve-xtermjs.git] / src / main.rs
1 use std::cmp::min;
2 use std::collections::HashMap;
3 use std::ffi::{OsStr, OsString};
4 use std::io::{ErrorKind, Write};
5 use std::os::unix::io::{AsRawFd, FromRawFd};
6 use std::os::unix::process::CommandExt;
7 use std::process::Command;
8 use std::time::{Duration, Instant};
9
10 use anyhow::{bail, format_err, Result};
11 use clap::{App, AppSettings, Arg};
12 use mio::net::{TcpListener, TcpStream};
13 use mio::unix::SourceFd;
14 use mio::{Events, Interest, Poll, Token};
15
16 use proxmox_io::ByteBuffer;
17 use proxmox_lang::error::io_err_other;
18 use proxmox_sys::{
19 linux::pty::{make_controlling_terminal, PTY},
20 };
21
22 const MSG_TYPE_DATA: u8 = 0;
23 const MSG_TYPE_RESIZE: u8 = 1;
24 //const MSG_TYPE_PING: u8 = 2;
25
26 fn remove_number(buf: &mut ByteBuffer) -> Option<usize> {
27 loop {
28 if let Some(pos) = &buf.iter().position(|&x| x == b':') {
29 let data = buf.remove_data(*pos);
30 buf.consume(1); // the ':'
31 let len = match std::str::from_utf8(&data) {
32 Ok(lenstring) => match lenstring.parse() {
33 Ok(len) => len,
34 Err(err) => {
35 eprintln!("error parsing number: '{}'", err);
36 break;
37 }
38 },
39 Err(err) => {
40 eprintln!("error parsing number: '{}'", err);
41 break;
42 }
43 };
44 return Some(len);
45 } else if buf.len() > 20 {
46 buf.consume(20);
47 } else {
48 break;
49 }
50 }
51 None
52 }
53
54 fn process_queue(buf: &mut ByteBuffer, pty: &mut PTY) -> Option<usize> {
55 if buf.is_empty() {
56 return None;
57 }
58
59 loop {
60 if buf.len() < 2 {
61 break;
62 }
63
64 let msgtype = buf[0] - b'0';
65
66 if msgtype == MSG_TYPE_DATA {
67 buf.consume(2);
68 if let Some(len) = remove_number(buf) {
69 return Some(len);
70 }
71 } else if msgtype == MSG_TYPE_RESIZE {
72 buf.consume(2);
73 if let Some(cols) = remove_number(buf) {
74 if let Some(rows) = remove_number(buf) {
75 pty.set_size(cols as u16, rows as u16).ok()?;
76 }
77 }
78 // ignore incomplete messages
79 } else {
80 buf.consume(1);
81 // ignore invalid or ping (msgtype 2)
82 }
83 }
84
85 None
86 }
87
88 type TicketResult = Result<(Box<[u8]>, Box<[u8]>)>;
89
90 /// Reads from the stream and returns the first line and the rest
91 fn read_ticket_line(
92 stream: &mut TcpStream,
93 buf: &mut ByteBuffer,
94 timeout: Duration,
95 ) -> TicketResult {
96 let mut poll = Poll::new()?;
97 poll.registry()
98 .register(stream, Token(0), Interest::READABLE)?;
99 let mut events = Events::with_capacity(1);
100
101 let now = Instant::now();
102 let mut elapsed = Duration::new(0, 0);
103
104 loop {
105 poll.poll(&mut events, Some(timeout - elapsed))?;
106 if !events.is_empty() {
107 match buf.read_from(stream) {
108 Ok(n) => {
109 if n == 0 {
110 bail!("connection closed before authentication");
111 }
112 }
113 Err(err) if err.kind() == ErrorKind::WouldBlock => {}
114 Err(err) => return Err(err.into()),
115 }
116
117 if buf[..].contains(&b'\n') {
118 break;
119 }
120
121 if buf.is_full() {
122 bail!("authentication data is incomplete: {:?}", &buf[..]);
123 }
124 }
125
126 elapsed = now.elapsed();
127 if elapsed > timeout {
128 bail!("timed out");
129 }
130 }
131
132 let newline_idx = &buf[..].iter().position(|&x| x == b'\n').unwrap();
133
134 let line = buf.remove_data(*newline_idx);
135 buf.consume(1); // discard newline
136
137 match line.iter().position(|&b| b == b':') {
138 Some(pos) => {
139 let (username, ticket) = line.split_at(pos);
140 Ok((username.into(), ticket[1..].into()))
141 }
142 None => bail!("authentication data is invalid"),
143 }
144 }
145
146 fn authenticate(
147 username: &[u8],
148 ticket: &[u8],
149 path: &str,
150 perm: Option<&str>,
151 authport: u16,
152 port: Option<u16>,
153 ) -> Result<()> {
154 let mut post_fields: Vec<(&str, &str)> = Vec::with_capacity(5);
155 post_fields.push(("username", std::str::from_utf8(username)?));
156 post_fields.push(("password", std::str::from_utf8(ticket)?));
157 post_fields.push(("path", path));
158 if let Some(perm) = perm {
159 post_fields.push(("privs", perm));
160 }
161 let port_str;
162 if let Some(port) = port {
163 port_str = port.to_string();
164 post_fields.push(("port", &port_str));
165 }
166
167 let url = format!("http://localhost:{}/api2/json/access/ticket", authport);
168
169 match ureq::post(&url).send_form(&post_fields[..]) {
170 Ok(res) if res.status() == 200 => Ok(()),
171 Ok(res) | Err(ureq::Error::Status(_, res)) => {
172 let code = res.status();
173 bail!("invalid authentication - {} {}", code, res.status_text())
174 }
175 Err(err) => bail!("authentication request failed - {}", err),
176 }
177 }
178
179 fn listen_and_accept(
180 hostname: &str,
181 port: u64,
182 port_as_fd: bool,
183 timeout: Duration,
184 ) -> Result<(TcpStream, u16)> {
185 let listener = if port_as_fd {
186 unsafe { std::net::TcpListener::from_raw_fd(port as i32) }
187 } else {
188 std::net::TcpListener::bind((hostname, port as u16))?
189 };
190 let port = listener.local_addr()?.port();
191 let mut listener = TcpListener::from_std(listener);
192 let mut poll = Poll::new()?;
193
194 poll.registry()
195 .register(&mut listener, Token(0), Interest::READABLE)?;
196
197 let mut events = Events::with_capacity(1);
198
199 let now = Instant::now();
200 let mut elapsed = Duration::new(0, 0);
201
202 loop {
203 poll.poll(&mut events, Some(timeout - elapsed))?;
204 if !events.is_empty() {
205 let (stream, client) = listener.accept()?;
206 println!("client connection: {:?}", client);
207 return Ok((stream, port));
208 }
209
210 elapsed = now.elapsed();
211 if elapsed > timeout {
212 bail!("timed out");
213 }
214 }
215 }
216
217 fn run_pty(cmd: &OsStr, params: clap::OsValues) -> Result<PTY> {
218 let (mut pty, secondary_name) = PTY::new().map_err(io_err_other)?;
219
220 let mut filtered_env: HashMap<OsString, OsString> = std::env::vars_os()
221 .filter(|&(ref k, _)| {
222 k == "PATH"
223 || k == "USER"
224 || k == "HOME"
225 || k == "LANG"
226 || k == "LANGUAGE"
227 || k.to_string_lossy().starts_with("LC_")
228 })
229 .collect();
230 filtered_env.insert("TERM".into(), "xterm-256color".into());
231
232 let mut command = Command::new(cmd);
233
234 command.args(params).env_clear().envs(&filtered_env);
235
236 unsafe {
237 command.pre_exec(move || {
238 make_controlling_terminal(&secondary_name).map_err(io_err_other)?;
239 Ok(())
240 });
241 }
242
243 command.spawn()?;
244
245 pty.set_size(80, 20)?;
246 Ok(pty)
247 }
248
249 const TCP: Token = Token(0);
250 const PTY: Token = Token(1);
251
252 fn do_main() -> Result<()> {
253 let matches = App::new("termproxy")
254 .setting(AppSettings::TrailingVarArg)
255 .arg(Arg::with_name("port").takes_value(true).required(true))
256 .arg(
257 Arg::with_name("authport")
258 .takes_value(true)
259 .long("authport"),
260 )
261 .arg(Arg::with_name("use-port-as-fd").long("port-as-fd"))
262 .arg(
263 Arg::with_name("path")
264 .takes_value(true)
265 .long("path")
266 .required(true),
267 )
268 .arg(Arg::with_name("perm").takes_value(true).long("perm"))
269 .arg(Arg::with_name("cmd").multiple(true).required(true))
270 .get_matches();
271
272 let port: u64 = matches
273 .value_of("port")
274 .unwrap()
275 .parse()
276 .map_err(io_err_other)?;
277 let path = matches.value_of("path").unwrap();
278 let perm: Option<&str> = matches.value_of("perm");
279 let mut cmdparams = matches.values_of_os("cmd").unwrap();
280 let cmd = cmdparams.next().unwrap();
281 let authport: u16 = matches
282 .value_of("authport")
283 .unwrap_or("85")
284 .parse()
285 .map_err(io_err_other)?;
286 let mut pty_buf = ByteBuffer::new();
287 let mut tcp_buf = ByteBuffer::new();
288
289 let use_port_as_fd = matches.is_present("use-port-as-fd");
290
291 if use_port_as_fd && port > u16::MAX as u64 {
292 return Err(format_err!("port too big"));
293 } else if port > i32::MAX as u64 {
294 return Err(format_err!("Invalid FD number"));
295 }
296
297 let (mut tcp_handle, port) =
298 listen_and_accept("localhost", port, use_port_as_fd, Duration::new(10, 0))
299 .map_err(|err| format_err!("failed waiting for client: {}", err))?;
300
301 let (username, ticket) = read_ticket_line(&mut tcp_handle, &mut pty_buf, Duration::new(10, 0))
302 .map_err(|err| format_err!("failed reading ticket: {}", err))?;
303 let port = if use_port_as_fd { Some(port) } else { None };
304 authenticate(&username, &ticket, path, perm, authport, port)?;
305 tcp_handle.write_all(b"OK").expect("error writing response");
306
307 let mut poll = Poll::new()?;
308 let mut events = Events::with_capacity(128);
309
310 let mut pty = run_pty(cmd, cmdparams)?;
311
312 poll.registry().register(
313 &mut tcp_handle,
314 TCP,
315 Interest::READABLE | Interest::WRITABLE,
316 )?;
317 poll.registry().register(
318 &mut SourceFd(&pty.as_raw_fd()),
319 PTY,
320 Interest::READABLE | Interest::WRITABLE,
321 )?;
322
323 let mut tcp_writable = true;
324 let mut pty_writable = true;
325 let mut tcp_readable = true;
326 let mut pty_readable = true;
327 let mut remaining = 0;
328 let mut finished = false;
329
330 while !finished {
331 if tcp_readable && !pty_buf.is_full() || pty_readable && !tcp_buf.is_full() {
332 poll.poll(&mut events, Some(Duration::new(0, 0)))?;
333 } else {
334 poll.poll(&mut events, None)?;
335 }
336
337 for event in &events {
338 let writable = event.is_writable();
339 let readable = event.is_readable();
340 if event.is_read_closed() {
341 finished = true;
342 }
343 match event.token() {
344 TCP => {
345 if readable {
346 tcp_readable = true;
347 }
348 if writable {
349 tcp_writable = true;
350 }
351 }
352 PTY => {
353 if readable {
354 pty_readable = true;
355 }
356 if writable {
357 pty_writable = true;
358 }
359 }
360 _ => unreachable!(),
361 }
362 }
363
364 while tcp_readable && !pty_buf.is_full() {
365 let bytes = match pty_buf.read_from(&mut tcp_handle) {
366 Ok(bytes) => bytes,
367 Err(err) if err.kind() == ErrorKind::WouldBlock => {
368 tcp_readable = false;
369 break;
370 }
371 Err(err) => {
372 if !finished {
373 return Err(format_err!("error reading from tcp: {}", err));
374 }
375 break;
376 }
377 };
378 if bytes == 0 {
379 finished = true;
380 break;
381 }
382 }
383
384 while pty_readable && !tcp_buf.is_full() {
385 let bytes = match tcp_buf.read_from(&mut pty) {
386 Ok(bytes) => bytes,
387 Err(err) if err.kind() == ErrorKind::WouldBlock => {
388 pty_readable = false;
389 break;
390 }
391 Err(err) => {
392 if !finished {
393 return Err(format_err!("error reading from pty: {}", err));
394 }
395 break;
396 }
397 };
398 if bytes == 0 {
399 finished = true;
400 break;
401 }
402 }
403
404 while !tcp_buf.is_empty() && tcp_writable {
405 let bytes = match tcp_handle.write(&tcp_buf[..]) {
406 Ok(bytes) => bytes,
407 Err(err) if err.kind() == ErrorKind::WouldBlock => {
408 tcp_writable = false;
409 break;
410 }
411 Err(err) => {
412 if !finished {
413 return Err(format_err!("error writing to tcp : {}", err));
414 }
415 break;
416 }
417 };
418 tcp_buf.consume(bytes);
419 }
420
421 while !pty_buf.is_empty() && pty_writable {
422 if remaining == 0 {
423 remaining = match process_queue(&mut pty_buf, &mut pty) {
424 Some(val) => val,
425 None => break,
426 };
427 }
428 let len = min(remaining, pty_buf.len());
429 let bytes = match pty.write(&pty_buf[..len]) {
430 Ok(bytes) => bytes,
431 Err(err) if err.kind() == ErrorKind::WouldBlock => {
432 pty_writable = false;
433 break;
434 }
435 Err(err) => {
436 if !finished {
437 return Err(format_err!("error writing to pty : {}", err));
438 }
439 break;
440 }
441 };
442 remaining -= bytes;
443 pty_buf.consume(bytes);
444 }
445 }
446
447 Ok(())
448 }
449
450 fn main() {
451 std::process::exit(match do_main() {
452 Ok(_) => 0,
453 Err(err) => {
454 eprintln!("{}", err);
455 1
456 }
457 });
458 }