1 //! Server/Node Configuration and Administration
3 use std
::net
::TcpListener
;
4 use std
::os
::unix
::io
::AsRawFd
;
6 use anyhow
::{bail, format_err, Error}
;
7 use futures
::future
::{FutureExt, TryFutureExt}
;
9 use hyper
::http
::request
::Parts
;
10 use hyper
::upgrade
::Upgraded
;
12 use serde_json
::{json, Value}
;
13 use tokio
::io
::{AsyncBufReadExt, BufReader}
;
15 use proxmox
::api
::router
::{Router, SubdirMap}
;
17 api
, schema
::*, ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
, RpcEnvironment
,
19 use proxmox
::list_subdirs_api_method
;
20 use proxmox_http
::websocket
::WebSocket
;
21 use proxmox
::{identity, sortable}
;
23 use pbs_tools
::ticket
::{self, Empty, Ticket}
;
25 use crate::api2
::types
::*;
26 use crate::config
::acl
::PRIV_SYS_CONSOLE
;
27 use crate::server
::WorkerTask
;
42 pub(crate) mod services
;
48 pub const SHELL_CMD_SCHEMA
: Schema
= StringSchema
::new("The command to run.")
49 .format(&ApiStringFormat
::Enum(&[
50 EnumEntry
::new("login", "Login"),
51 EnumEntry
::new("upgrade", "Upgrade"),
63 schema
: SHELL_CMD_SCHEMA
,
70 description
: "Object with the user, ticket, port and upid",
91 description
: "Restricted to users on realm 'pam'",
92 permission
: &Permission
::Privilege(&["system"], PRIV_SYS_CONSOLE
, false),
95 /// Call termproxy and return shell ticket
98 rpcenv
: &mut dyn RpcEnvironment
,
99 ) -> Result
<Value
, Error
> {
100 // intentionally user only for now
101 let auth_id
: Authid
= rpcenv
103 .ok_or_else(|| format_err
!("no authid available"))?
106 if auth_id
.is_token() {
107 bail
!("API tokens cannot access this API endpoint");
110 let userid
= auth_id
.user();
112 if userid
.realm() != "pam" {
113 bail
!("only pam users can use the console");
116 let path
= "/system";
118 // use port 0 and let the kernel decide which port is free
119 let listener
= TcpListener
::bind("localhost:0")?
;
120 let port
= listener
.local_addr()?
.port();
122 let ticket
= Ticket
::new(ticket
::TERM_PREFIX
, &Empty
)?
124 crate::auth_helpers
::private_auth_key(),
125 Some(&tools
::ticket
::term_aad(&userid
, &path
, port
)),
128 let mut command
= Vec
::new();
129 match cmd
.as_deref() {
130 Some("login") | None
=> {
131 command
.push("login");
132 if userid
== "root@pam" {
134 command
.push("root");
138 if userid
!= "root@pam" {
139 bail
!("only root@pam can upgrade");
141 // TODO: add nicer/safer wrapper like in PVE instead
144 command
.push("apt full-upgrade; bash -l");
146 _
=> bail
!("invalid command"),
149 let username
= userid
.name().to_owned();
150 let upid
= WorkerTask
::spawn(
155 move |worker
| async
move {
156 // move inside the worker so that it survives and does not close the port
157 // remove CLOEXEC from listenere so that we can reuse it in termproxy
158 tools
::fd_change_cloexec(listener
.as_raw_fd(), false)?
;
160 let mut arguments
: Vec
<&str> = Vec
::new();
161 let fd_string
= listener
.as_raw_fd().to_string();
162 arguments
.push(&fd_string
);
163 arguments
.extend_from_slice(&[
173 arguments
.extend_from_slice(&command
);
175 let mut cmd
= tokio
::process
::Command
::new("/usr/bin/termproxy");
178 .stdout(std
::process
::Stdio
::piped())
179 .stderr(std
::process
::Stdio
::piped());
181 let mut child
= cmd
.spawn().expect("error executing termproxy");
183 let stdout
= child
.stdout
.take().expect("no child stdout handle");
184 let stderr
= child
.stderr
.take().expect("no child stderr handle");
186 let worker_stdout
= worker
.clone();
187 let stdout_fut
= async
move {
188 let mut reader
= BufReader
::new(stdout
).lines();
189 while let Some(line
) = reader
.next_line().await?
{
190 worker_stdout
.log(line
);
195 let worker_stderr
= worker
.clone();
196 let stderr_fut
= async
move {
197 let mut reader
= BufReader
::new(stderr
).lines();
198 while let Some(line
) = reader
.next_line().await?
{
199 worker_stderr
.warn(line
);
204 let mut needs_kill
= false;
205 let res
= tokio
::select
!{
206 res
= child
.wait() => {
207 let exit_code
= res?
;
208 if !exit_code
.success() {
209 match exit_code
.code() {
210 Some(code
) => bail
!("termproxy exited with {}", code
),
211 None
=> bail
!("termproxy exited by signal"),
216 res
= stdout_fut
=> res
,
217 res
= stderr_fut
=> res
,
218 res
= worker
.abort_future() => {
220 res
.map_err(Error
::from
)
230 if let Err(err
) = child
.kill().await
{
231 worker
.warn(format
!("error killing termproxy: {}", err
));
232 } else if let Err(err
) = child
.wait().await
{
233 worker
.warn(format
!("error awaiting termproxy: {}", err
));
241 // FIXME: We're returning the user NAME only?
251 pub const API_METHOD_WEBSOCKET
: ApiMethod
= ApiMethod
::new(
252 &ApiHandler
::AsyncHttp(&upgrade_to_websocket
),
254 "Upgraded to websocket",
256 ("node", false, &NODE_SCHEMA
),
260 &StringSchema
::new("Terminal ticket").schema()
262 ("port", false, &IntegerSchema
::new("Terminal port").schema()),
267 Some("The user needs Sys.Console on /system."),
268 &Permission
::Privilege(&["system"], PRIV_SYS_CONSOLE
, false),
271 fn upgrade_to_websocket(
276 rpcenv
: Box
<dyn RpcEnvironment
>,
277 ) -> ApiResponseFuture
{
279 // intentionally user only for now
280 let auth_id
: Authid
= rpcenv
282 .ok_or_else(|| format_err
!("no authid available"))?
285 if auth_id
.is_token() {
286 bail
!("API tokens cannot access this API endpoint");
289 let userid
= auth_id
.user();
290 let ticket
= tools
::required_string_param(¶m
, "vncticket")?
;
291 let port
: u16 = tools
::required_integer_param(¶m
, "port")?
as u16;
293 // will be checked again by termproxy
294 Ticket
::<Empty
>::parse(ticket
)?
296 crate::auth_helpers
::public_auth_key(),
298 Some(&tools
::ticket
::term_aad(&userid
, "/system", port
)),
301 let (ws
, response
) = WebSocket
::new(parts
.headers
.clone())?
;
303 crate::server
::spawn_internal_task(async
move {
304 let conn
: Upgraded
= match hyper
::upgrade
::on(Request
::from_parts(parts
, req_body
)).map_err(Error
::from
).await
{
305 Ok(upgraded
) => upgraded
,
309 let local
= tokio
::net
::TcpStream
::connect(format
!("localhost:{}", port
)).await?
;
310 ws
.serve_connection(conn
, local
).await
318 pub const SUBDIRS
: SubdirMap
= &[
319 ("apt", &apt
::ROUTER
),
320 ("certificates", &certificates
::ROUTER
),
321 ("config", &config
::ROUTER
),
322 ("disks", &disks
::ROUTER
),
323 ("dns", &dns
::ROUTER
),
324 ("journal", &journal
::ROUTER
),
325 ("network", &network
::ROUTER
),
326 ("report", &report
::ROUTER
),
327 ("rrd", &rrd
::ROUTER
),
328 ("services", &services
::ROUTER
),
329 ("status", &status
::ROUTER
),
330 ("subscription", &subscription
::ROUTER
),
331 ("syslog", &syslog
::ROUTER
),
332 ("tasks", &tasks
::ROUTER
),
333 ("termproxy", &Router
::new().post(&API_METHOD_TERMPROXY
)),
334 ("time", &time
::ROUTER
),
337 &Router
::new().upgrade(&API_METHOD_WEBSOCKET
),
341 pub const ROUTER
: Router
= Router
::new()
342 .get(&list_subdirs_api_method
!(SUBDIRS
))