1 //! Server/Node Configuration and Administration
3 use std
::net
::TcpListener
;
4 use std
::os
::unix
::io
::AsRawFd
;
6 use anyhow
::{bail, format_err, Error}
;
7 use futures
::future
::{FutureExt, TryFutureExt}
;
9 use hyper
::http
::request
::Parts
;
10 use hyper
::upgrade
::Upgraded
;
12 use serde_json
::{json, Value}
;
13 use tokio
::io
::{AsyncBufReadExt, BufReader}
;
15 use proxmox
::api
::router
::{Router, SubdirMap}
;
17 api
, schema
::*, ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
, RpcEnvironment
,
19 use proxmox
::list_subdirs_api_method
;
20 use proxmox_http
::websocket
::WebSocket
;
21 use proxmox
::{identity, sortable}
;
23 use pbs_tools
::auth
::private_auth_key
;
24 use pbs_tools
::ticket
::{self, Empty, Ticket}
;
26 use crate::api2
::types
::*;
27 use crate::config
::acl
::PRIV_SYS_CONSOLE
;
28 use crate::server
::WorkerTask
;
43 pub(crate) mod services
;
49 pub const SHELL_CMD_SCHEMA
: Schema
= StringSchema
::new("The command to run.")
50 .format(&ApiStringFormat
::Enum(&[
51 EnumEntry
::new("login", "Login"),
52 EnumEntry
::new("upgrade", "Upgrade"),
64 schema
: SHELL_CMD_SCHEMA
,
71 description
: "Object with the user, ticket, port and upid",
92 description
: "Restricted to users on realm 'pam'",
93 permission
: &Permission
::Privilege(&["system"], PRIV_SYS_CONSOLE
, false),
96 /// Call termproxy and return shell ticket
99 rpcenv
: &mut dyn RpcEnvironment
,
100 ) -> Result
<Value
, Error
> {
101 // intentionally user only for now
102 let auth_id
: Authid
= rpcenv
104 .ok_or_else(|| format_err
!("no authid available"))?
107 if auth_id
.is_token() {
108 bail
!("API tokens cannot access this API endpoint");
111 let userid
= auth_id
.user();
113 if userid
.realm() != "pam" {
114 bail
!("only pam users can use the console");
117 let path
= "/system";
119 // use port 0 and let the kernel decide which port is free
120 let listener
= TcpListener
::bind("localhost:0")?
;
121 let port
= listener
.local_addr()?
.port();
123 let ticket
= Ticket
::new(ticket
::TERM_PREFIX
, &Empty
)?
126 Some(&tools
::ticket
::term_aad(&userid
, &path
, port
)),
129 let mut command
= Vec
::new();
130 match cmd
.as_deref() {
131 Some("login") | None
=> {
132 command
.push("login");
133 if userid
== "root@pam" {
135 command
.push("root");
139 if userid
!= "root@pam" {
140 bail
!("only root@pam can upgrade");
142 // TODO: add nicer/safer wrapper like in PVE instead
145 command
.push("apt full-upgrade; bash -l");
147 _
=> bail
!("invalid command"),
150 let username
= userid
.name().to_owned();
151 let upid
= WorkerTask
::spawn(
156 move |worker
| async
move {
157 // move inside the worker so that it survives and does not close the port
158 // remove CLOEXEC from listenere so that we can reuse it in termproxy
159 tools
::fd_change_cloexec(listener
.as_raw_fd(), false)?
;
161 let mut arguments
: Vec
<&str> = Vec
::new();
162 let fd_string
= listener
.as_raw_fd().to_string();
163 arguments
.push(&fd_string
);
164 arguments
.extend_from_slice(&[
174 arguments
.extend_from_slice(&command
);
176 let mut cmd
= tokio
::process
::Command
::new("/usr/bin/termproxy");
179 .stdout(std
::process
::Stdio
::piped())
180 .stderr(std
::process
::Stdio
::piped());
182 let mut child
= cmd
.spawn().expect("error executing termproxy");
184 let stdout
= child
.stdout
.take().expect("no child stdout handle");
185 let stderr
= child
.stderr
.take().expect("no child stderr handle");
187 let worker_stdout
= worker
.clone();
188 let stdout_fut
= async
move {
189 let mut reader
= BufReader
::new(stdout
).lines();
190 while let Some(line
) = reader
.next_line().await?
{
191 worker_stdout
.log(line
);
196 let worker_stderr
= worker
.clone();
197 let stderr_fut
= async
move {
198 let mut reader
= BufReader
::new(stderr
).lines();
199 while let Some(line
) = reader
.next_line().await?
{
200 worker_stderr
.warn(line
);
205 let mut needs_kill
= false;
206 let res
= tokio
::select
!{
207 res
= child
.wait() => {
208 let exit_code
= res?
;
209 if !exit_code
.success() {
210 match exit_code
.code() {
211 Some(code
) => bail
!("termproxy exited with {}", code
),
212 None
=> bail
!("termproxy exited by signal"),
217 res
= stdout_fut
=> res
,
218 res
= stderr_fut
=> res
,
219 res
= worker
.abort_future() => {
221 res
.map_err(Error
::from
)
231 if let Err(err
) = child
.kill().await
{
232 worker
.warn(format
!("error killing termproxy: {}", err
));
233 } else if let Err(err
) = child
.wait().await
{
234 worker
.warn(format
!("error awaiting termproxy: {}", err
));
242 // FIXME: We're returning the user NAME only?
252 pub const API_METHOD_WEBSOCKET
: ApiMethod
= ApiMethod
::new(
253 &ApiHandler
::AsyncHttp(&upgrade_to_websocket
),
255 "Upgraded to websocket",
257 ("node", false, &NODE_SCHEMA
),
261 &StringSchema
::new("Terminal ticket").schema()
263 ("port", false, &IntegerSchema
::new("Terminal port").schema()),
268 Some("The user needs Sys.Console on /system."),
269 &Permission
::Privilege(&["system"], PRIV_SYS_CONSOLE
, false),
272 fn upgrade_to_websocket(
277 rpcenv
: Box
<dyn RpcEnvironment
>,
278 ) -> ApiResponseFuture
{
280 // intentionally user only for now
281 let auth_id
: Authid
= rpcenv
283 .ok_or_else(|| format_err
!("no authid available"))?
286 if auth_id
.is_token() {
287 bail
!("API tokens cannot access this API endpoint");
290 let userid
= auth_id
.user();
291 let ticket
= tools
::required_string_param(¶m
, "vncticket")?
;
292 let port
: u16 = tools
::required_integer_param(¶m
, "port")?
as u16;
294 // will be checked again by termproxy
295 Ticket
::<Empty
>::parse(ticket
)?
297 crate::auth_helpers
::public_auth_key(),
299 Some(&tools
::ticket
::term_aad(&userid
, "/system", port
)),
302 let (ws
, response
) = WebSocket
::new(parts
.headers
.clone())?
;
304 crate::server
::spawn_internal_task(async
move {
305 let conn
: Upgraded
= match hyper
::upgrade
::on(Request
::from_parts(parts
, req_body
)).map_err(Error
::from
).await
{
306 Ok(upgraded
) => upgraded
,
310 let local
= tokio
::net
::TcpStream
::connect(format
!("localhost:{}", port
)).await?
;
311 ws
.serve_connection(conn
, local
).await
319 pub const SUBDIRS
: SubdirMap
= &[
320 ("apt", &apt
::ROUTER
),
321 ("certificates", &certificates
::ROUTER
),
322 ("config", &config
::ROUTER
),
323 ("disks", &disks
::ROUTER
),
324 ("dns", &dns
::ROUTER
),
325 ("journal", &journal
::ROUTER
),
326 ("network", &network
::ROUTER
),
327 ("report", &report
::ROUTER
),
328 ("rrd", &rrd
::ROUTER
),
329 ("services", &services
::ROUTER
),
330 ("status", &status
::ROUTER
),
331 ("subscription", &subscription
::ROUTER
),
332 ("syslog", &syslog
::ROUTER
),
333 ("tasks", &tasks
::ROUTER
),
334 ("termproxy", &Router
::new().post(&API_METHOD_TERMPROXY
)),
335 ("time", &time
::ROUTER
),
338 &Router
::new().upgrade(&API_METHOD_WEBSOCKET
),
342 pub const ROUTER
: Router
= Router
::new()
343 .get(&list_subdirs_api_method
!(SUBDIRS
))