1 use anyhow
::{bail, format_err, Error}
;
3 use std
::collections
::VecDeque
;
6 use futures
::future
::FutureExt
;
9 use hyper
::client
::{Client, HttpConnector}
;
10 use hyper
::header
::{SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE}
;
11 use hyper
::upgrade
::Upgraded
;
12 use hyper
::{Body, Request, StatusCode}
;
14 use openssl
::ssl
::{SslConnector, SslMethod}
;
15 use percent_encoding
::{utf8_percent_encode, NON_ALPHANUMERIC}
;
17 use serde
::{Deserialize, Serialize}
;
18 use serde_json
::{Map, Value}
;
19 use tokio
::io
::{AsyncBufReadExt, AsyncWriteExt, BufReader}
;
20 use tokio
::net
::{UnixListener, UnixStream}
;
21 use tokio
::sync
::{mpsc, oneshot}
;
22 use tokio_stream
::wrappers
::LinesStream
;
23 use tokio_stream
::StreamExt
;
25 use proxmox_http
::client
::HttpsConnector
;
26 use proxmox_http
::websocket
::{OpCode, WebSocket, WebSocketReader, WebSocketWriter}
;
28 #[derive(Serialize, Deserialize, Debug)]
29 #[serde(rename_all = "kebab-case")]
36 type CmdData
= Map
<String
, Value
>;
38 #[derive(Serialize, Deserialize, Debug)]
39 #[serde(rename_all = "kebab-case")]
40 struct ConnectCmdData
{
41 // target URL for WS connection
43 // fingerprint of TLS certificate
44 fingerprint
: Option
<String
>,
45 // addition headers such as authorization
46 headers
: Option
<Vec
<(String
, String
)>>,
49 #[derive(Serialize, Deserialize, Debug, Clone)]
50 #[serde(rename_all = "kebab-case")]
51 struct ForwardCmdData
{
52 // target URL for WS connection
54 // addition headers such as authorization
55 headers
: Option
<Vec
<(String
, String
)>>,
56 // fingerprint of TLS certificate
57 fingerprint
: Option
<String
>,
58 // local UNIX socket path for forwarding
60 // request ticket using these parameters
61 ticket
: Option
<Map
<String
, Value
>>,
65 sender
: Option
<mpsc
::UnboundedSender
<(Value
, oneshot
::Sender
<String
>)>>,
69 async
fn read_cmd_loop(mut self) -> Result
<(), Error
> {
70 let mut stdin_stream
= LinesStream
::new(BufReader
::new(tokio
::io
::stdin()).lines());
71 while let Some(res
) = stdin_stream
.next().await
{
74 let (cmd_type
, data
) = Self::parse_cmd(&line
)?
;
76 CmdType
::Connect
=> self.handle_connect_cmd(data
).await
,
78 let res
= self.handle_forward_cmd(data
).await
;
80 Ok(()) => println
!("{}", serde_json
::json
!({"success": true}
)),
83 serde_json
::json
!({"success": false, "msg": msg.to_string()}
)
88 CmdType
::NonControl
=> self
89 .handle_tunnel_cmd(data
)
91 .map(|res
| println
!("{}", res
)),
94 Err(err
) => bail
!("Failed to read from STDIN - {}", err
),
101 fn parse_cmd(line
: &str) -> Result
<(CmdType
, CmdData
), Error
> {
102 let mut json
: Map
<String
, Value
> = serde_json
::from_str(line
)?
;
103 match json
.remove("control") {
104 Some(Value
::Bool(true)) => {
105 match json
.remove("cmd").map(serde_json
::from_value
::<CmdType
>) {
106 None
=> bail
!("input has 'control' flag, but no control 'cmd' set.."),
107 Some(Err(e
)) => bail
!("failed to parse control cmd - {}", e
),
108 Some(Ok(cmd_type
)) => Ok((cmd_type
, json
)),
111 _
=> Ok((CmdType
::NonControl
, json
)),
115 async
fn websocket_connect(
117 extra_headers
: Vec
<(String
, String
)>,
118 fingerprint
: Option
<String
>,
119 ) -> Result
<Upgraded
, Error
> {
120 let ws_key
= proxmox_sys
::linux
::random_data(16)?
;
121 let ws_key
= base64
::encode(&ws_key
);
122 let mut req
= Request
::builder()
124 .header(UPGRADE
, "websocket")
125 .header(SEC_WEBSOCKET_VERSION
, "13")
126 .header(SEC_WEBSOCKET_KEY
, ws_key
)
127 .body(Body
::empty())?
;
129 let headers
= req
.headers_mut();
130 for (name
, value
) in extra_headers
{
131 let name
= hyper
::header
::HeaderName
::from_bytes(name
.as_bytes())?
;
132 let value
= hyper
::header
::HeaderValue
::from_str(&value
)?
;
133 headers
.insert(name
, value
);
136 let mut ssl_connector_builder
= SslConnector
::builder(SslMethod
::tls())?
;
137 if let Some(expected
) = fingerprint
{
138 ssl_connector_builder
.set_verify_callback(
139 openssl
::ssl
::SslVerifyMode
::PEER
,
141 let cert
= match ctx
.current_cert() {
145 eprintln
!("SSL context lacks current certificate.");
150 // skip CA certificates, we only care about the peer cert
151 let depth
= ctx
.error_depth();
156 let fp
= match cert
.digest(openssl
::hash
::MessageDigest
::sha256()) {
160 eprintln
!("failed to calculate certificate FP - {}", err
);
164 let fp_string
= hex
::encode(&fp
);
165 let fp_string
= fp_string
168 .map(|v
| std
::str::from_utf8(v
).unwrap())
169 .collect
::<Vec
<&str>>()
172 let expected
= expected
.to_lowercase();
173 if expected
== fp_string
{
176 eprintln
!("certificate fingerprint does not match expected fingerprint!");
177 eprintln
!("expected: {}", expected
);
178 eprintln
!("encountered: {}", fp_string
);
184 ssl_connector_builder
.set_verify(openssl
::ssl
::SslVerifyMode
::PEER
);
187 let mut httpc
= HttpConnector
::new();
188 httpc
.enforce_http(false); // we want https...
189 httpc
.set_connect_timeout(Some(std
::time
::Duration
::new(10, 0)));
190 let https
= HttpsConnector
::with_connector(httpc
, ssl_connector_builder
.build(), 120);
192 let client
= Client
::builder().build
::<_
, Body
>(https
);
193 let res
= client
.request(req
).await?
;
194 if res
.status() != StatusCode
::SWITCHING_PROTOCOLS
{
195 bail
!("server didn't upgrade: {}", res
.status());
198 hyper
::upgrade
::on(res
)
200 .map_err(|err
| format_err
!("failed to upgrade - {}", err
))
203 async
fn handle_connect_cmd(&mut self, mut data
: CmdData
) -> Result
<(), Error
> {
204 let mut data
: ConnectCmdData
= data
206 .ok_or_else(|| format_err
!("'connect' command missing 'data'"))
207 .map(serde_json
::from_value
)??
;
209 if self.sender
.is_some() {
210 bail
!("already connected!");
213 let upgraded
= Self::websocket_connect(
215 data
.headers
.take().unwrap_or_else(Vec
::new
),
216 data
.fingerprint
.take(),
220 let (tx
, rx
) = mpsc
::unbounded_channel();
221 self.sender
= Some(tx
);
222 tokio
::spawn(async
move {
223 if let Err(err
) = Self::handle_ctrl_tunnel(upgraded
, rx
).await
{
224 eprintln
!("Tunnel to {} failed - {}", data
.url
, err
);
231 async
fn handle_forward_cmd(&mut self, mut data
: CmdData
) -> Result
<(), Error
> {
232 let data
: ForwardCmdData
= data
234 .ok_or_else(|| format_err
!("'forward' command missing 'data'"))
235 .map(serde_json
::from_value
)??
;
237 if self.sender
.is_none() && data
.ticket
.is_some() {
238 bail
!("dynamically requesting ticket requires cmd tunnel connection!");
241 let unix_listener
= UnixListener
::bind(data
.unix
.clone())?
;
242 let data
= Arc
::new(data
);
243 let cmd_sender
= self.sender
.clone();
245 tokio
::spawn(async
move {
246 let mut tasks
: Vec
<tokio
::task
::JoinHandle
<()>> = Vec
::new();
247 let data2
= data
.clone();
250 let data3
= data2
.clone();
252 match unix_listener
.accept().await
{
253 Ok((unix_stream
, _
)) => {
254 eprintln
!("accepted new connection on '{}'", data3
.unix
);
255 let cmd_sender2
= cmd_sender
.clone();
257 let task
= tokio
::spawn(async
move {
258 if let Err(err
) = Self::handle_forward_tunnel(
265 eprintln
!("Tunnel for {} failed - {}", data3
.unix
, err
);
270 Err(err
) => eprintln
!(
271 "Failed to accept unix connection on {} - {}",
281 async
fn handle_tunnel_cmd(&mut self, data
: CmdData
) -> Result
<String
, Error
> {
282 match &mut self.sender
{
283 None
=> bail
!("not connected!"),
285 let data
: Value
= data
.into();
286 let (tx
, rx
) = oneshot
::channel
::<String
>();
287 if let Some(cmd
) = data
.get("cmd") {
288 eprintln
!("-> sending command {} to remote", cmd
);
290 eprintln
!("-> sending data line to remote");
292 sender
.send((data
, tx
))?
;
294 eprintln
!("<- got reply");
300 async
fn handle_ctrl_tunnel(
302 mut cmd_receiver
: mpsc
::UnboundedReceiver
<(Value
, oneshot
::Sender
<String
>)>,
303 ) -> Result
<(), Error
> {
304 let (tunnel_reader
, tunnel_writer
) = tokio
::io
::split(websocket
);
305 let (ws_close_tx
, mut ws_close_rx
) = mpsc
::unbounded_channel();
306 let ws_reader
= WebSocketReader
::new(tunnel_reader
, ws_close_tx
);
307 let mut ws_writer
= WebSocketWriter
::new(Some([0, 0, 0, 0]), tunnel_writer
);
309 let mut framed_reader
=
310 tokio_util
::codec
::FramedRead
::new(ws_reader
, tokio_util
::codec
::LinesCodec
::new());
312 let mut resp_tx_queue
: VecDeque
<oneshot
::Sender
<String
>> = VecDeque
::new();
313 let mut shutting_down
= false;
316 let mut close_future
= ws_close_rx
.recv().boxed().fuse();
317 let mut frame_future
= framed_reader
.next().boxed().fuse();
318 let mut cmd_future
= cmd_receiver
.recv().boxed().fuse();
321 res
= close_future
=> {
322 let res
= res
.ok_or_else(|| format_err
!("WS control channel closed"))?
;
323 eprintln
!("WS: received control message: '{:?}'", res
);
324 shutting_down
= true;
326 res
= frame_future
=> {
328 None
if shutting_down
=> {
329 eprintln
!("WS closed");
332 None
=> bail
!("WS closed unexpectedly"),
336 .ok_or_else(|| format_err
!("no response handler"))?
338 .map_err(|msg
| format_err
!("failed to send tunnel response '{}' back to requester - receiver already closed?", msg
))?
;
341 bail
!("reading from control tunnel failed - WS receive failed: {}", err
);
345 res
= cmd_future
=> {
346 if shutting_down { continue }
;
349 eprintln
!("CMD channel closed, shutting down");
350 ws_writer
.send_control_frame(Some([1,2,3,4]), OpCode
::Close
, &[]).await?
;
351 shutting_down
= true;
353 Some((msg
, resp_tx
)) => {
354 resp_tx_queue
.push_back(resp_tx
);
356 let line
= format
!("{}\n", msg
);
357 ws_writer
.write_all(line
.as_bytes()).await?
;
358 ws_writer
.flush().await?
;
368 async
fn handle_forward_tunnel(
369 cmd_sender
: Option
<mpsc
::UnboundedSender
<(Value
, oneshot
::Sender
<String
>)>>,
370 data
: Arc
<ForwardCmdData
>,
372 ) -> Result
<(), Error
> {
373 let data
= match (&cmd_sender
, &data
.ticket
) {
374 (Some(cmd_sender
), Some(_
)) => Self::get_ticket(cmd_sender
, data
.clone()).await
,
375 _
=> Ok(data
.clone()),
378 let upgraded
= Self::websocket_connect(
380 data
.headers
.clone().unwrap_or_else(Vec
::new
),
381 data
.fingerprint
.clone(),
386 mask
: Some([0, 0, 0, 0]),
388 eprintln
!("established new WS for forwarding '{}'", data
.unix
);
389 ws
.serve_connection(upgraded
, unix
).await?
;
391 eprintln
!("done handling forwarded connection from '{}'", data
.unix
);
397 cmd_sender
: &mpsc
::UnboundedSender
<(Value
, oneshot
::Sender
<String
>)>,
398 cmd_data
: Arc
<ForwardCmdData
>,
399 ) -> Result
<Arc
<ForwardCmdData
>, Error
> {
400 eprintln
!("requesting WS ticket via tunnel");
401 let ticket_cmd
= match cmd_data
.ticket
.clone() {
402 Some(mut ticket_cmd
) => {
403 ticket_cmd
.insert("cmd".to_string(), serde_json
::json
!("ticket"));
406 None
=> bail
!("can't get ticket without ticket parameters"),
408 let (tx
, rx
) = oneshot
::channel
::<String
>();
409 cmd_sender
.send((serde_json
::json
!(ticket_cmd
), tx
))?
;
410 let ticket
= rx
.await?
;
411 let mut ticket
: Map
<String
, Value
> = serde_json
::from_str(&ticket
)?
;
414 .ok_or_else(|| format_err
!("failed to retrieve ticket via tunnel"))?
;
418 .ok_or_else(|| format_err
!("failed to format received ticket"))?
;
419 let ticket
= utf8_percent_encode(ticket
, NON_ALPHANUMERIC
).to_string();
421 let mut data
= cmd_data
.clone();
422 let mut url
= data
.url
.clone();
423 url
.push_str("ticket=");
424 url
.push_str(&ticket
);
425 let mut d
= Arc
::make_mut(&mut data
);
432 async
fn main() -> Result
<(), Error
> {
436 async
fn do_main() -> Result
<(), Error
> {
437 let tunnel
= CtrlTunnel { sender: None }
;
438 tunnel
.read_cmd_loop().await