+++ /dev/null
-package PVE::CLI::termproxy;
-
-use strict;
-use warnings;
-
-use PVE::CLIHandler;
-use PVE::JSONSchema qw(get_standard_option);
-use PVE::PTY;
-use LWP::UserAgent;
-use IO::Select;
-use IO::Socket::IP;
-
-use base qw(PVE::CLIHandler);
-
-use constant MAX_QUEUE_LEN => 16*1024;
-
-sub verify_ticket {
- my ($ticket, $user, $path, $perm) = @_;
-
- # get all loopback addresses even if no IPv4 or IPv6 address is setup on
- # the host, IO::Socket::IP sets AI_ADDRCONFIG (man getaddrinfo) per default
- local @LWP::Protocol::http::EXTRA_SOCK_OPTS = (
- GetAddrInfoFlags => 0,
- );
-
- my $ua = LWP::UserAgent->new();
-
- my $params = {
- username => $user,
- password => $ticket,
- path => $path,
- };
-
- $params->{privs} = $perm if $perm;
-
- my $res = $ua->post ('http://127.0.0.1:85/api2/json/access/ticket', Content => $params);
-
- if (!$res->is_success) {
- my $err = $res->status_line;
- die "Authentication failed: '$err'\n";
- }
-}
-
-sub listen_and_authenticate {
- my ($port, $timeout, $path, $perm) = @_;
-
- my $params = {
- Listen => 1,
- ReuseAddr => 1,
- Proto => &Socket::IPPROTO_TCP,
- GetAddrInfoFlags => 0,
- LocalAddr => 'localhost',
- LocalPort => $port,
- };
-
- my $socket = IO::Socket::IP->new(%$params) or die "failed to open socket: $!\n";
-
- alarm 0;
- local $SIG{ALRM} = sub { die "timed out waiting for client\n" };
- alarm $timeout;
- my $client = $socket->accept; # Wait for a client
- alarm 0;
- close($socket);
-
- my $queue;
- my $n = sysread($client, $queue, 4096);
- if ($n && $queue =~ s/^([^:]+):(.+)\n//) {
- my $user = $1;
- my $ticket = $2;
-
- verify_ticket($ticket, $user, $path, $perm);
-
- die "aknowledge failed\n"
- if !syswrite($client, "OK");
-
- } else {
- die "malformed authentication string\n";
- }
-
- return ($queue, $client);
-}
-
-sub run_pty {
- my ($cmd, $webhandle, $queue) = @_;
-
- foreach my $k (keys %ENV) {
- next if $k eq 'PATH' || $k eq 'USER' || $k eq 'HOME' || $k eq 'LANG' || $k eq 'LANGUAGE';
- next if $k =~ m/^LC_/;
- delete $ENV{$k};
- }
-
- $ENV{TERM} = 'xterm-256color';
-
- my $pty = PVE::PTY->new();
-
- my $pid = fork();
- die "fork: $!\n" if !defined($pid);
- if (!$pid) {
- $pty->make_controlling_terminal();
- exec {$cmd->[0]} @$cmd
- or POSIX::_exit(1);
- }
-
- $pty->set_size(80,20);
-
- read_write_loop($webhandle, $pty->master, $queue, $pty);
-
- $pty->close();
- waitpid($pid,0);
- exit(0);
-}
-
-sub read_write_loop {
- my ($webhandle, $cmdhandle, $queue, $pty) = @_;
-
- my $select = new IO::Select;
-
- $select->add($webhandle);
- $select->add($cmdhandle);
-
- my @handles;
-
- # we may have already messages from the first read
- $queue = process_queue($queue, $cmdhandle, $pty);
-
- my $timeout = 5*60;
-
- while($select->count && scalar(@handles = $select->can_read($timeout))) {
- foreach my $h (@handles) {
- my $buf;
- my $n = $h->sysread($buf, 4096);
-
- if ($h == $webhandle) {
- if ($n && (length($queue) + $n) < MAX_QUEUE_LEN) {
- $queue = process_queue($queue.$buf, $cmdhandle, $pty);
- } else {
- return;
- }
- } elsif ($h == $cmdhandle) {
- if ($n) {
- syswrite($webhandle, $buf);
- } else {
- return;
- }
- }
- }
- }
-}
-
-sub process_queue {
- my ($queue, $handle, $pty) = @_;
-
- my $msg;
- while(length($queue)) {
- ($queue, $msg) = remove_message($queue, $pty);
- last if !defined($msg);
- syswrite($handle, $msg);
- }
- return $queue;
-}
-
-
-# we try to remove a whole message
-# if we succeed, we return the remaining queue and the msg
-# if we fail, the message is undef and the queue is not changed
-sub remove_message {
- my ($queue, $pty) = @_;
-
- my $msg;
- my $type = substr $queue, 0, 1;
-
- if ($type eq '0') {
- # normal message
- my ($length) = $queue =~ m/^0:(\d+):/;
- my $begin = 3 + length($length);
- if (defined($length) && length($queue) >= ($length + $begin)) {
- $msg = substr $queue, $begin, $length;
- if (defined($msg)) {
- # msg contains now $length chars after 0:$length:
- $queue = substr $queue, $begin + $length;
- }
- }
- } elsif ($type eq '1') {
- # resize message
- my ($cols, $rows) = $queue =~ m/^1:(\d+):(\d+):/;
- if (defined($cols) && defined($rows)) {
- $queue = substr $queue, (length($cols) + length ($rows) + 4);
- eval { $pty->set_size($cols, $rows) if defined($pty) };
- warn $@ if $@;
- $msg = "";
- }
- } elsif ($type eq '2') {
- # ping
- $queue = substr $queue, 1;
- $msg = "";
- } else {
- # ignore other input
- $queue = substr $queue, 1;
- $msg = "";
- }
-
- return ($queue, $msg);
-}
-
-__PACKAGE__->register_method ({
- name => 'exec',
- path => 'exec',
- method => 'POST',
- description => "Connects a TCP Socket with a commandline",
- parameters => {
- additionalProperties => 0,
- properties => {
- port => {
- type => 'integer',
- description => "The port to listen on."
- },
- path => {
- type => 'string',
- description => "The Authentication path.",
- },
- perm => {
- type => 'string',
- description => "The Authentication Permission.",
- optional => 1,
- },
- 'extra-args' => get_standard_option('extra-args'),
- },
- },
- returns => { type => 'null'},
- code => sub {
- my ($param) = @_;
-
- my $cmd;
- if (defined($param->{'extra-args'})) {
- $cmd = [@{$param->{'extra-args'}}];
- } else {
- die "No command given\n";
- }
-
- my ($queue, $handle) = listen_and_authenticate($param->{port}, 10,
- $param->{path}, $param->{perm});
-
- run_pty($cmd, $handle, $queue);
-
- return undef;
- }});
-
-our $cmddef = [ __PACKAGE__, 'exec', ['port', 'extra-args' ]];
-
-1;
--- /dev/null
+use std::cmp::min;
+use std::collections::HashMap;
+use std::ffi::{OsStr, OsString};
+use std::io::{ErrorKind, Read, Result, Write};
+use std::net::TcpStream;
+use std::os::unix::io::{AsRawFd, FromRawFd};
+use std::os::unix::process::CommandExt;
+use std::process::Command;
+use std::time::{Duration, Instant};
+
+use clap::{App, AppSettings, Arg};
+use curl::easy::Easy;
+use mio::net::TcpListener;
+use mio::unix::{EventedFd, UnixReady};
+use mio::{Events, Poll, PollOpt, Ready, Token};
+
+use proxmox::sys::error::io_err_other;
+use proxmox::sys::linux::pty::{make_controlling_terminal, PTY};
+use proxmox::tools::byte_buffer::ByteBuffer;
+use proxmox::{io_bail, io_format_err};
+
+const MSG_TYPE_DATA: u8 = 0;
+const MSG_TYPE_RESIZE: u8 = 1;
+//const MSG_TYPE_PING: u8 = 2;
+
+fn remove_number(buf: &mut ByteBuffer) -> Option<usize> {
+ loop {
+ if let Some(pos) = &buf.iter().position(|&x| x == b':') {
+ let data = buf.remove_data(*pos);
+ buf.consume(1); // the ':'
+ let len = match std::str::from_utf8(&data) {
+ Ok(lenstring) => match lenstring.parse() {
+ Ok(len) => len,
+ Err(err) => {
+ eprintln!("error parsing number: '{}'", err);
+ break;
+ }
+ },
+ Err(err) => {
+ eprintln!("error parsing number: '{}'", err);
+ break;
+ }
+ };
+ return Some(len);
+ } else if buf.len() > 20 {
+ buf.consume(20);
+ } else {
+ break;
+ }
+ }
+ None
+}
+
+fn process_queue(buf: &mut ByteBuffer, pty: &mut PTY) -> Option<usize> {
+ if buf.is_empty() {
+ return None;
+ }
+
+ loop {
+ if buf.len() < 2 {
+ break;
+ }
+
+ let msgtype = buf[0] - b'0';
+
+ if msgtype == MSG_TYPE_DATA {
+ buf.consume(2);
+ if let Some(len) = remove_number(buf) {
+ return Some(len);
+ }
+ } else if msgtype == MSG_TYPE_RESIZE {
+ buf.consume(2);
+ if let Some(cols) = remove_number(buf) {
+ if let Some(rows) = remove_number(buf) {
+ pty.set_size(cols as u16, rows as u16).ok()?;
+ }
+ }
+ // ignore incomplete messages
+ } else {
+ buf.consume(1);
+ // ignore invalid or ping (msgtype 2)
+ }
+ }
+
+ None
+}
+
+type TicketResult = Result<(Box<[u8]>, Box<[u8]>)>;
+
+/// Reads from the stream and returns the first line and the rest
+fn read_ticket_line(
+ stream: &mut TcpStream,
+ buf: &mut ByteBuffer,
+ timeout: Duration,
+) -> TicketResult {
+ let now = Instant::now();
+ while !&buf[..].contains(&b'\n') {
+ if buf.is_full() || now.elapsed() >= timeout {
+ io_bail!("authentication data is incomplete: {:?}", &buf[..]);
+ }
+ stream.set_read_timeout(Some(Duration::new(1, 0)))?;
+ match buf.read_from(stream) {
+ Ok(n) => {
+ if n == 0 {
+ io_bail!("connection closed before authentication");
+ }
+ }
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {}
+ Err(err) => return Err(err),
+ }
+ }
+
+ stream.set_read_timeout(None)?;
+ let newline_idx = &buf[..].iter().position(|&x| x == b'\n').unwrap();
+
+ let line = buf.remove_data(*newline_idx);
+ buf.consume(1); // discard newline
+
+ match line.iter().position(|&b| b == b':') {
+ Some(pos) => {
+ let (username, ticket) = line.split_at(pos);
+ Ok((username.into(), ticket[1..].into()))
+ }
+ None => io_bail!("authentication data is invalid"),
+ }
+}
+
+fn authenticate(
+ username: &[u8],
+ ticket: &[u8],
+ path: &str,
+ perm: Option<&str>,
+ authport: u16,
+ port: Option<u16>,
+) -> Result<()> {
+ let mut curl = Easy::new();
+ curl.url(&format!(
+ "http://localhost:{}/api2/json/access/ticket",
+ authport
+ ))?;
+
+ let username = curl.url_encode(username);
+ let ticket = curl.url_encode(ticket);
+ let path = curl.url_encode(path.as_bytes());
+
+ let mut post_fields = Vec::with_capacity(5);
+ post_fields.push(format!("username={}", username));
+ post_fields.push(format!("password={}", ticket));
+ post_fields.push(format!("path={}", path));
+
+ if let Some(perm) = perm {
+ let perm = curl.url_encode(perm.as_bytes());
+ post_fields.push(format!("privs={}", perm));
+ }
+
+ if let Some(port) = port {
+ post_fields.push(format!("port={}", port));
+ }
+
+ curl.post_fields_copy(post_fields.join("&").as_bytes())?;
+ curl.post(true)?;
+ curl.perform()?;
+
+ let response_code = curl.response_code()?;
+
+ if response_code != 200 {
+ io_bail!("invalid authentication, code {}", response_code);
+ }
+
+ Ok(())
+}
+
+fn listen_and_accept(
+ hostname: &str,
+ port: u64,
+ port_as_fd: bool,
+ timeout: Duration,
+) -> Result<(TcpStream, u16)> {
+ let listener = if port_as_fd {
+ unsafe { std::net::TcpListener::from_raw_fd(port as i32) }
+ } else {
+ std::net::TcpListener::bind((hostname, port as u16))?
+ };
+ let port = listener.local_addr()?.port();
+ let listener = TcpListener::from_std(listener)?;
+ let poll = Poll::new()?;
+
+ poll.register(&listener, Token(0), Ready::readable(), PollOpt::edge())?;
+
+ let mut events = Events::with_capacity(1);
+
+ let mut timeout = timeout;
+ loop {
+ let now = Instant::now();
+ poll.poll(&mut events, Some(timeout))?;
+ let elapsed = now.elapsed();
+ if !events.is_empty() {
+ let (stream, client) = listener.accept_std()?;
+ println!("client connection: {:?}", client);
+ return Ok((stream, port));
+ }
+ if timeout >= elapsed {
+ timeout -= elapsed;
+ } else {
+ io_bail!("timed out");
+ }
+ }
+}
+
+fn run_pty(cmd: &OsStr, params: clap::OsValues) -> Result<PTY> {
+ let (mut pty, secondary_name) = PTY::new().map_err(io_err_other)?;
+
+ let mut filtered_env: HashMap<OsString, OsString> = std::env::vars_os()
+ .filter(|&(ref k, _)| {
+ k == "PATH"
+ || k == "USER"
+ || k == "HOME"
+ || k == "LANG"
+ || k == "LANGUAGE"
+ || k.to_string_lossy().starts_with("LC_")
+ })
+ .collect();
+ filtered_env.insert("TERM".into(), "xterm-256color".into());
+
+ let mut command = Command::new(cmd);
+
+ command.args(params).env_clear().envs(&filtered_env);
+
+ unsafe {
+ command.pre_exec(move || {
+ make_controlling_terminal(&secondary_name).map_err(io_err_other)?;
+ Ok(())
+ });
+ }
+
+ command.spawn()?;
+
+ pty.set_size(80, 20).map_err(|x| x.as_errno().unwrap())?;
+ Ok(pty)
+}
+
+const TCP: Token = Token(0);
+const PTY: Token = Token(1);
+
+fn do_main() -> Result<()> {
+ let matches = App::new("termproxy")
+ .setting(AppSettings::TrailingVarArg)
+ .arg(Arg::with_name("port").takes_value(true).required(true))
+ .arg(
+ Arg::with_name("authport")
+ .takes_value(true)
+ .long("authport"),
+ )
+ .arg(Arg::with_name("use-port-as-fd").long("port-as-fd"))
+ .arg(
+ Arg::with_name("path")
+ .takes_value(true)
+ .long("path")
+ .required(true),
+ )
+ .arg(Arg::with_name("perm").takes_value(true).long("perm"))
+ .arg(Arg::with_name("cmd").multiple(true).required(true))
+ .get_matches();
+
+ let port: u64 = matches
+ .value_of("port")
+ .unwrap()
+ .parse()
+ .map_err(io_err_other)?;
+ let path = matches.value_of("path").unwrap();
+ let perm: Option<&str> = matches.value_of("perm");
+ let mut cmdparams = matches.values_of_os("cmd").unwrap();
+ let cmd = cmdparams.next().unwrap();
+ let authport: u16 = matches
+ .value_of("authport")
+ .unwrap_or("85")
+ .parse()
+ .map_err(io_err_other)?;
+ let mut pty_buf = ByteBuffer::new();
+ let mut tcp_buf = ByteBuffer::new();
+
+ let use_port_as_fd = matches.is_present("use-port-as-fd");
+
+ if use_port_as_fd && port > u16::MAX as u64 {
+ return Err(io_format_err!("port too big"));
+ } else if port > i32::MAX as u64 {
+ return Err(io_format_err!("Invalid FD number"));
+ }
+
+ let (mut stream, port) =
+ listen_and_accept("localhost", port, use_port_as_fd, Duration::new(10, 0))
+ .map_err(|err| io_format_err!("failed waiting for client: {}", err))?;
+
+ let (username, ticket) = read_ticket_line(&mut stream, &mut pty_buf, Duration::new(10, 0))
+ .map_err(|err| io_format_err!("failed reading ticket: {}", err))?;
+ let port = if use_port_as_fd { Some(port) } else { None };
+ authenticate(&username, &ticket, path, perm, authport, port)?;
+ stream.write_all(b"OK").expect("error writing response");
+
+ let mut tcp_handle = mio::net::TcpStream::from_stream(stream)?;
+
+ let poll = Poll::new()?;
+ let mut events = Events::with_capacity(128);
+
+ let mut pty = run_pty(cmd, cmdparams)?;
+
+ poll.register(
+ &tcp_handle,
+ TCP,
+ Ready::readable() | Ready::writable() | UnixReady::hup(),
+ PollOpt::edge(),
+ )?;
+ poll.register(
+ &EventedFd(&pty.as_raw_fd()),
+ PTY,
+ Ready::readable() | Ready::writable() | UnixReady::hup(),
+ PollOpt::edge(),
+ )?;
+
+ let mut tcp_writable = true;
+ let mut pty_writable = true;
+ let mut tcp_readable = true;
+ let mut pty_readable = true;
+ let mut remaining = 0;
+ let mut finished = false;
+
+ while !finished {
+ if tcp_readable && !pty_buf.is_full() || pty_readable && !tcp_buf.is_full() {
+ poll.poll(&mut events, Some(Duration::new(0, 0)))?;
+ } else {
+ poll.poll(&mut events, None)?;
+ }
+
+ for event in &events {
+ let readiness = event.readiness();
+ let writable = readiness.is_writable();
+ let readable = readiness.is_readable();
+ if UnixReady::from(readiness).is_hup() {
+ finished = true;
+ }
+ match event.token() {
+ TCP => {
+ if readable {
+ tcp_readable = true;
+ }
+ if writable {
+ tcp_writable = true;
+ }
+ }
+ PTY => {
+ if readable {
+ pty_readable = true;
+ }
+ if writable {
+ pty_writable = true;
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ while tcp_readable && !pty_buf.is_full() {
+ let bytes = match pty_buf.read_from(&mut tcp_handle) {
+ Ok(bytes) => bytes,
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {
+ tcp_readable = false;
+ break;
+ }
+ Err(err) => {
+ if !finished {
+ return Err(io_format_err!("error reading from tcp: {}", err));
+ }
+ break;
+ }
+ };
+ if bytes == 0 {
+ finished = true;
+ break;
+ }
+ }
+
+ while pty_readable && !tcp_buf.is_full() {
+ let bytes = match tcp_buf.read_from(&mut pty) {
+ Ok(bytes) => bytes,
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {
+ pty_readable = false;
+ break;
+ }
+ Err(err) => {
+ if !finished {
+ return Err(io_format_err!("error reading from pty: {}", err));
+ }
+ break;
+ }
+ };
+ if bytes == 0 {
+ finished = true;
+ break;
+ }
+ }
+
+ while !tcp_buf.is_empty() && tcp_writable {
+ let bytes = match tcp_handle.write(&tcp_buf[..]) {
+ Ok(bytes) => bytes,
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {
+ tcp_writable = false;
+ break;
+ }
+ Err(err) => {
+ if !finished {
+ return Err(io_format_err!("error writing to tcp : {}", err));
+ }
+ break;
+ }
+ };
+ tcp_buf.consume(bytes);
+ }
+
+ while !pty_buf.is_empty() && pty_writable {
+ if remaining == 0 {
+ remaining = match process_queue(&mut pty_buf, &mut pty) {
+ Some(val) => val,
+ None => break,
+ };
+ }
+ let len = min(remaining, pty_buf.len());
+ let bytes = match pty.write(&pty_buf[..len]) {
+ Ok(bytes) => bytes,
+ Err(err) if err.kind() == ErrorKind::WouldBlock => {
+ pty_writable = false;
+ break;
+ }
+ Err(err) => {
+ if !finished {
+ return Err(io_format_err!("error writing to pty : {}", err));
+ }
+ break;
+ }
+ };
+ remaining -= bytes;
+ pty_buf.consume(bytes);
+ }
+ }
+
+ Ok(())
+}
+
+fn main() {
+ std::process::exit(match do_main() {
+ Ok(_) => 0,
+ Err(err) => {
+ eprintln!("{}", err);
+ 1
+ }
+ });
+}