env_logger = "0.7"
flate2 = "1.0"
anyhow = "1.0"
-foreign-types = "0.3"
thiserror = "1.0"
futures = "0.3"
h2 = { version = "0.3", features = [ "stream" ] }
[dependencies]
anyhow = "1.0"
base64 = "0.12"
+foreign-types = "0.3"
+futures = "0.3"
+lazy_static = "1.4"
libc = "0.2"
nix = "0.19.1"
nom = "5.1"
regex = "1.2"
serde = "1.0"
serde_json = "1.0"
+# rt-multi-thread is required for block_in_place
+tokio = { version = "1.6", features = [ "rt", "rt-multi-thread", "sync" ] }
url = "2.1"
proxmox = { version = "0.11.5", default-features = false, features = [] }
+
+pbs-buildcfg = { path = "../pbs-buildcfg" }
--- /dev/null
+//! Helpers for authentication used by both client and server.
+
+use anyhow::Error;
+use lazy_static::lazy_static;
+use openssl::pkey::{PKey, Private};
+use openssl::rsa::Rsa;
+
+use proxmox::tools::fs::file_get_contents;
+
+use pbs_buildcfg::configdir;
+
+fn load_private_auth_key() -> Result<PKey<Private>, Error> {
+ let pem = file_get_contents(configdir!("/authkey.key"))?;
+ let rsa = Rsa::private_key_from_pem(&pem)?;
+ let key = PKey::from_rsa(rsa)?;
+
+ Ok(key)
+}
+
+pub fn private_auth_key() -> &'static PKey<Private> {
+ lazy_static! {
+ static ref KEY: PKey<Private> = load_private_auth_key().unwrap();
+ }
+
+ &KEY
+}
--- /dev/null
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+
+use anyhow::{format_err, Error};
+use futures::future::{FutureExt, TryFutureExt};
+use tokio::sync::oneshot;
+
+/// Broadcast results to registered listeners using asnyc oneshot channels
+#[derive(Default)]
+pub struct BroadcastData<T> {
+ result: Option<Result<T, String>>,
+ listeners: Vec<oneshot::Sender<Result<T, Error>>>,
+}
+
+impl <T: Clone> BroadcastData<T> {
+
+ pub fn new() -> Self {
+ Self {
+ result: None,
+ listeners: vec![],
+ }
+ }
+
+ pub fn notify_listeners(&mut self, result: Result<T, String>) {
+
+ self.result = Some(result.clone());
+
+ loop {
+ match self.listeners.pop() {
+ None => { break; },
+ Some(ch) => {
+ match &result {
+ Ok(result) => { let _ = ch.send(Ok(result.clone())); },
+ Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
+ }
+ },
+ }
+ }
+ }
+
+ pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
+ use futures::future::{ok, Either};
+
+ match &self.result {
+ None => {},
+ Some(Ok(result)) => return Either::Left(ok(result.clone())),
+ Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))),
+ }
+
+ let (tx, rx) = oneshot::channel::<Result<T, Error>>();
+
+ self.listeners.push(tx);
+
+ Either::Right(rx
+ .map(|res| match res {
+ Ok(Ok(t)) => Ok(t),
+ Ok(Err(e)) => Err(e),
+ Err(e) => Err(Error::from(e)),
+ })
+ )
+ }
+}
+
+type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
+
+struct BroadCastFutureBinding<T> {
+ broadcast: BroadcastData<T>,
+ future: Option<SourceFuture<T>>,
+}
+
+/// Broadcast future results to registered listeners
+pub struct BroadcastFuture<T> {
+ inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
+}
+
+impl<T: Clone + Send + 'static> BroadcastFuture<T> {
+ /// Create instance for specified source future.
+ ///
+ /// The result of the future is sent to all registered listeners.
+ pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
+ let inner = BroadCastFutureBinding {
+ broadcast: BroadcastData::new(),
+ future: Some(Pin::from(source)),
+ };
+ Self { inner: Arc::new(Mutex::new(inner)) }
+ }
+
+ /// Creates a new instance with a oneshot channel as trigger
+ pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
+
+ let (tx, rx) = oneshot::channel::<Result<T, Error>>();
+ let rx = rx
+ .map_err(Error::from)
+ .and_then(futures::future::ready);
+
+ (Self::new(Box::new(rx)), tx)
+ }
+
+ fn notify_listeners(
+ inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
+ result: Result<T, String>,
+ ) {
+ let mut data = inner.lock().unwrap();
+ data.broadcast.notify_listeners(result);
+ }
+
+ fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
+ let mut data = inner.lock().unwrap();
+
+ if let Some(source) = data.future.take() {
+
+ let inner1 = inner.clone();
+
+ let task = source.map(move |value| {
+ match value {
+ Ok(value) => Self::notify_listeners(inner1, Ok(value)),
+ Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
+ }
+ });
+ tokio::spawn(task);
+ }
+
+ data.broadcast.listen()
+ }
+
+ /// Register a listener
+ pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
+ let inner2 = self.inner.clone();
+ async move { Self::spawn(inner2).await }
+ }
+}
+
+#[test]
+fn test_broadcast_future() {
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
+
+ let (sender, trigger) = BroadcastFuture::new_oneshot();
+
+ let receiver1 = sender.listen()
+ .map_ok(|res| {
+ CHECKSUM.fetch_add(res, Ordering::SeqCst);
+ })
+ .map_err(|err| { panic!("got error {}", err); })
+ .map(|_| ());
+
+ let receiver2 = sender.listen()
+ .map_ok(|res| {
+ CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
+ })
+ .map_err(|err| { panic!("got error {}", err); })
+ .map(|_| ());
+
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async move {
+ let r1 = tokio::spawn(receiver1);
+ let r2 = tokio::spawn(receiver2);
+
+ trigger.send(Ok(1)).unwrap();
+ let _ = r1.await;
+ let _ = r2.await;
+ });
+
+ let result = CHECKSUM.load(Ordering::SeqCst);
+
+ assert_eq!(result, 3);
+
+ // the result stays available until the BroadcastFuture is dropped
+ rt.block_on(sender.listen()
+ .map_ok(|res| {
+ CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
+ })
+ .map_err(|err| { panic!("got error {}", err); })
+ .map(|_| ()));
+
+ let result = CHECKSUM.load(Ordering::SeqCst);
+ assert_eq!(result, 7);
+}
--- /dev/null
+use std::path::PathBuf;
+use std::mem::MaybeUninit;
+
+use anyhow::{bail, format_err, Error};
+use foreign_types::ForeignTypeRef;
+use openssl::x509::{X509, GeneralName};
+use openssl::stack::Stack;
+use openssl::pkey::{Public, PKey};
+
+use pbs_buildcfg::configdir;
+
+// C type:
+#[allow(non_camel_case_types)]
+type ASN1_TIME = <openssl::asn1::Asn1TimeRef as ForeignTypeRef>::CType;
+
+extern "C" {
+ fn ASN1_TIME_to_tm(s: *const ASN1_TIME, tm: *mut libc::tm) -> libc::c_int;
+}
+
+fn asn1_time_to_unix(time: &openssl::asn1::Asn1TimeRef) -> Result<i64, Error> {
+ let mut c_tm = MaybeUninit::<libc::tm>::uninit();
+ let rc = unsafe { ASN1_TIME_to_tm(time.as_ptr(), c_tm.as_mut_ptr()) };
+ if rc != 1 {
+ bail!("failed to parse ASN1 time");
+ }
+ let mut c_tm = unsafe { c_tm.assume_init() };
+ proxmox::tools::time::timegm(&mut c_tm)
+}
+
+pub struct CertInfo {
+ x509: X509,
+}
+
+fn x509name_to_string(name: &openssl::x509::X509NameRef) -> Result<String, Error> {
+ let mut parts = Vec::new();
+ for entry in name.entries() {
+ parts.push(format!("{} = {}", entry.object().nid().short_name()?, entry.data().as_utf8()?));
+ }
+ Ok(parts.join(", "))
+}
+
+impl CertInfo {
+ pub fn new() -> Result<Self, Error> {
+ Self::from_path(PathBuf::from(configdir!("/proxy.pem")))
+ }
+
+ pub fn from_path(path: PathBuf) -> Result<Self, Error> {
+ Self::from_pem(&proxmox::tools::fs::file_get_contents(&path)?)
+ .map_err(|err| format_err!("failed to load certificate from {:?} - {}", path, err))
+ }
+
+ pub fn from_pem(cert_pem: &[u8]) -> Result<Self, Error> {
+ let x509 = openssl::x509::X509::from_pem(&cert_pem)?;
+ Ok(Self{
+ x509
+ })
+ }
+
+ pub fn subject_alt_names(&self) -> Option<Stack<GeneralName>> {
+ self.x509.subject_alt_names()
+ }
+
+ pub fn subject_name(&self) -> Result<String, Error> {
+ Ok(x509name_to_string(self.x509.subject_name())?)
+ }
+
+ pub fn issuer_name(&self) -> Result<String, Error> {
+ Ok(x509name_to_string(self.x509.issuer_name())?)
+ }
+
+ pub fn fingerprint(&self) -> Result<String, Error> {
+ let fp = self.x509.digest(openssl::hash::MessageDigest::sha256())?;
+ let fp_string = proxmox::tools::digest_to_hex(&fp);
+ let fp_string = fp_string.as_bytes().chunks(2).map(|v| std::str::from_utf8(v).unwrap())
+ .collect::<Vec<&str>>().join(":");
+ Ok(fp_string)
+ }
+
+ pub fn public_key(&self) -> Result<PKey<Public>, Error> {
+ let pubkey = self.x509.public_key()?;
+ Ok(pubkey)
+ }
+
+ pub fn not_before(&self) -> &openssl::asn1::Asn1TimeRef {
+ self.x509.not_before()
+ }
+
+ pub fn not_after(&self) -> &openssl::asn1::Asn1TimeRef {
+ self.x509.not_after()
+ }
+
+ pub fn not_before_unix(&self) -> Result<i64, Error> {
+ asn1_time_to_unix(&self.not_before())
+ }
+
+ pub fn not_after_unix(&self) -> Result<i64, Error> {
+ asn1_time_to_unix(&self.not_after())
+ }
+
+ /// Check if the certificate is expired at or after a specific unix epoch.
+ pub fn is_expired_after_epoch(&self, epoch: i64) -> Result<bool, Error> {
+ Ok(self.not_after_unix()? < epoch)
+ }
+}
+pub mod auth;
pub mod borrow;
+pub mod broadcast_future;
+pub mod cert;
pub mod format;
pub mod fs;
pub mod json;
pub mod nom;
+pub mod percent_encoding;
pub mod process_locker;
pub mod sha;
pub mod str;
+pub mod sync;
pub mod ticket;
+pub mod tokio;
mod command;
pub use command::{command_output, command_output_as_string, run_command};
--- /dev/null
+use percent_encoding::{utf8_percent_encode, AsciiSet};
+
+/// This used to be: `SIMPLE_ENCODE_SET` plus space, `"`, `#`, `<`, `>`, backtick, `?`, `{`, `}`
+pub const DEFAULT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS // 0..1f and 7e
+ // The SIMPLE_ENCODE_SET adds space and anything >= 0x7e (7e itself is already included above)
+ .add(0x20)
+ .add(0x7f)
+ // the DEFAULT_ENCODE_SET added:
+ .add(b' ')
+ .add(b'"')
+ .add(b'#')
+ .add(b'<')
+ .add(b'>')
+ .add(b'`')
+ .add(b'?')
+ .add(b'{')
+ .add(b'}');
+
+/// percent encode a url component
+pub fn percent_encode_component(comp: &str) -> String {
+ utf8_percent_encode(comp, percent_encoding::NON_ALPHANUMERIC).to_string()
+}
--- /dev/null
+mod std_channel_writer;
+pub use std_channel_writer::StdChannelWriter;
--- /dev/null
+use std::io::Write;
+use std::sync::mpsc::SyncSender;
+
+use anyhow::{Error};
+
+/// Wrapper around SyncSender, which implements Write
+///
+/// Each write in translated into a send(Vec<u8>).
+pub struct StdChannelWriter(SyncSender<Result<Vec<u8>, Error>>);
+
+impl StdChannelWriter {
+ pub fn new(sender: SyncSender<Result<Vec<u8>, Error>>) -> Self {
+ Self(sender)
+ }
+}
+
+impl Write for StdChannelWriter {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+ self.0
+ .send(Ok(buf.to_vec()))
+ .map_err(proxmox::sys::error::io_err_other)
+ .and(Ok(buf.len()))
+ }
+
+ fn flush(&mut self) -> Result<(), std::io::Error> {
+ Ok(())
+ }
+}
--- /dev/null
+pub mod tokio_writer_adapter;
+pub use tokio_writer_adapter::TokioWriterAdapter;
--- /dev/null
+use std::io::Write;
+
+use tokio::task::block_in_place;
+
+/// Wrapper around a writer which implements Write
+///
+/// wraps each write with a 'block_in_place' so that
+/// any (blocking) writer can be safely used in async context in a
+/// tokio runtime
+pub struct TokioWriterAdapter<W: Write>(W);
+
+impl<W: Write> TokioWriterAdapter<W> {
+ pub fn new(writer: W) -> Self {
+ Self(writer)
+ }
+}
+
+impl<W: Write> Write for TokioWriterAdapter<W> {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+ block_in_place(|| self.0.write(buf))
+ }
+
+ fn flush(&mut self) -> Result<(), std::io::Error> {
+ block_in_place(|| self.0.flush())
+ }
+}
use proxmox::{http_err, list_subdirs_api_method};
use proxmox::{identity, sortable};
+use pbs_tools::auth::private_auth_key;
use pbs_tools::ticket::{self, Empty, Ticket};
use crate::api2::types::*;
use proxmox_openid::{OpenIdAuthenticator, OpenIdConfig};
use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M;
+use pbs_tools::auth::private_auth_key;
use pbs_tools::ticket::Ticket;
use crate::server::ticket::ApiTicket;
use proxmox::list_subdirs_api_method;
use pbs_buildcfg::configdir;
+use pbs_tools::cert;
use crate::acme::AcmeClient;
use crate::api2::types::Authid;
use crate::config::acl::PRIV_SYS_MODIFY;
use crate::config::node::NodeConfig;
use crate::server::WorkerTask;
-use crate::tools::cert;
pub const ROUTER: Router = Router::new()
.get(&list_subdirs_api_method!(SUBDIRS))
use proxmox_http::websocket::WebSocket;
use proxmox::{identity, sortable};
+use pbs_tools::auth::private_auth_key;
use pbs_tools::ticket::{self, Empty, Ticket};
use crate::api2::types::*;
let ticket = Ticket::new(ticket::TERM_PREFIX, &Empty)?
.sign(
- crate::auth_helpers::private_auth_key(),
+ private_auth_key(),
Some(&tools::ticket::term_aad(&userid, &path, port)),
)?;
use proxmox::api::{api, ApiMethod, Router, RpcEnvironment, Permission};
+use pbs_tools::cert::CertInfo;
+
use crate::api2::types::*;
use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_POWER_MANAGEMENT};
-use crate::tools::cert::CertInfo;
impl std::convert::From<procfs::ProcFsCPUInfo> for NodeCpuInformation {
fn from(info: procfs::ProcFsCPUInfo) -> Self {
+use std::path::PathBuf;
+
use anyhow::{bail, format_err, Error};
use lazy_static::lazy_static;
-
-use openssl::rsa::{Rsa};
-use openssl::pkey::{PKey, Public, Private};
+use openssl::pkey::{PKey, Public};
+use openssl::rsa::Rsa;
use openssl::sha;
-use std::path::PathBuf;
-
use proxmox::tools::fs::{file_get_contents, replace_file, CreateOptions};
use proxmox::try_block;
use pbs_buildcfg::configdir;
-
-use crate::api2::types::Userid;
+use pbs_api_types::Userid;
fn compute_csrf_secret_digest(
timestamp: i64,
&SECRET
}
-fn load_private_auth_key() -> Result<PKey<Private>, Error> {
-
- let pem = file_get_contents(configdir!("/authkey.key"))?;
- let rsa = Rsa::private_key_from_pem(&pem)?;
- let key = PKey::from_rsa(rsa)?;
-
- Ok(key)
-}
-
-pub fn private_auth_key() -> &'static PKey<Private> {
-
- lazy_static! {
- static ref KEY: PKey<Private> = load_private_auth_key().unwrap();
- }
-
- &KEY
-}
-
fn load_public_auth_key() -> Result<PKey<Public>, Error> {
let pem = file_get_contents(configdir!("/authkey.pub"))?;
use proxmox::try_block;
use proxmox::api::RpcEnvironmentType;
-//use proxmox_backup::tools;
-//use proxmox_backup::api_schema::config::*;
+use pbs_tools::auth::private_auth_key;
+
use proxmox_backup::server::{
self,
auth::default_api_auth,
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use pbs_datastore::catalog::BackupCatalogWriter;
+use pbs_tools::sync::StdChannelWriter;
+use pbs_tools::tokio::TokioWriterAdapter;
-use proxmox_backup::tools::{
- self,
- StdChannelWriter,
- TokioWriterAdapter,
-};
use proxmox_backup::api2::types::*;
use proxmox_backup::api2::version;
use proxmox_backup::client::*;
Shell,
PruneOptions,
};
+use proxmox_backup::tools;
mod proxmox_backup_client;
use proxmox_backup_client::*;
use proxmox::api::{api, cli::*, RpcEnvironment};
+use pbs_tools::percent_encoding::percent_encode_component;
+
use proxmox_backup::tools;
use proxmox_backup::config;
use proxmox_backup::api2::{self, types::* };
let mut client = connect_to_localhost()?;
- let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
+ let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
let _ = client.delete(&path, None).await?;
Ok(Value::Null)
use proxmox::api::{api, cli::*};
+use pbs_tools::percent_encoding::percent_encode_component;
+
use proxmox_backup::tools;
use proxmox_backup::client::*;
let mut client = connect(&repo)?;
- let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
+ let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
let _ = client.delete(&path, None).await?;
Ok(Value::Null)
use proxmox::api::{api, cli::*};
+use pbs_tools::cert::CertInfo;
+
use proxmox_backup::config;
use proxmox_backup::auth_helpers::*;
-use proxmox_backup::tools::cert::CertInfo;
#[api]
/// Display node certificate information.
use proxmox::tools::digest_to_hex;
+use pbs_datastore::{CATALOG_NAME, CryptConfig};
+use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
+use pbs_datastore::dynamic_index::DynamicIndexReader;
+use pbs_datastore::fixed_index::FixedIndexReader;
+use pbs_datastore::index::IndexFile;
+use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
use pbs_tools::format::HumanByte;
use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
-use crate::backup::*;
use super::{H2Client, HttpClient};
if let Some(manifest) = options.previous_manifest {
// try, but ignore errors
- match archive_type(archive_name) {
+ match ArchiveType::from_path(archive_name) {
Ok(ArchiveType::FixedIndex) => {
let _ = self
.download_previous_fixed_index(
use proxmox_http::uri::build_authority;
use pbs_api_types::{Authid, Userid};
+use pbs_tools::broadcast_future::BroadcastFuture;
use pbs_tools::json::json_object_to_query;
use pbs_tools::ticket;
+use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
use super::pipe_to_stream::PipeToSendStream;
-use crate::tools::{
- BroadcastFuture,
- DEFAULT_ENCODE_SET,
- PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
-};
+use super::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
/// Timeout used for several HTTP operations that are expected to finish quickly but may block in
/// certain error conditions. Keep it generous, to avoid false-positive under high load.
use pbs_api_types::{Authid, Userid};
use pbs_tools::ticket::Ticket;
-
-use crate::{
- tools::cert::CertInfo,
- auth_helpers::private_auth_key,
-};
+use pbs_tools::cert::CertInfo;
+use pbs_tools::auth::private_auth_key;
mod merge_known_chunks;
pub mod pipe_to_stream;
mod backup_specification;
pub use backup_specification::*;
+pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120;
+
/// Connect to localhost:8007 as root@pam
///
/// This automatically creates a ticket if run as 'root' user.
use nix::sys::stat::Mode;
use pbs_datastore::catalog::CatalogWriter;
-
-use crate::tools::{
- StdChannelWriter,
- TokioWriterAdapter,
-};
+use pbs_tools::sync::StdChannelWriter;
+use pbs_tools::tokio::TokioWriterAdapter;
/// Stream implementation to encode and upload .pxar archives.
///
use proxmox::api::cli::format_and_print_result;
-use super::HttpClient;
-use crate::{
- server::{
- worker_is_active_local,
- UPID,
- },
- tools,
-};
+use pbs_tools::percent_encoding::percent_encode_component;
+use super::HttpClient;
+use crate::server::{UPID, worker_is_active_local};
/// Display task log on console
///
let abort = abort_count.load(Ordering::Relaxed);
if abort > 0 {
- let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
+ let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
let _ = client.delete(&path, None).await?;
}
let param = json!({ "start": start, "limit": limit, "test-status": true });
- let path = format!("api2/json/nodes/localhost/tasks/{}/log", tools::percent_encode_component(upid_str));
+ let path = format!("api2/json/nodes/localhost/tasks/{}/log", percent_encode_component(upid_str));
let result = client.get(&path, Some(param)).await?;
let active = result["active"].as_bool().unwrap();
-use anyhow::{bail, format_err, Error};
-use futures::*;
-
-use core::task::Context;
use std::pin::Pin;
-use std::task::Poll;
+use std::task::{Context, Poll};
+use anyhow::{bail, format_err, Error};
+use futures::*;
use http::Uri;
use http::{Request, Response};
use hyper::client::connect::{Connected, Connection};
+++ /dev/null
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-
-use anyhow::{format_err, Error};
-use futures::future::{FutureExt, TryFutureExt};
-use tokio::sync::oneshot;
-
-/// Broadcast results to registered listeners using asnyc oneshot channels
-#[derive(Default)]
-pub struct BroadcastData<T> {
- result: Option<Result<T, String>>,
- listeners: Vec<oneshot::Sender<Result<T, Error>>>,
-}
-
-impl <T: Clone> BroadcastData<T> {
-
- pub fn new() -> Self {
- Self {
- result: None,
- listeners: vec![],
- }
- }
-
- pub fn notify_listeners(&mut self, result: Result<T, String>) {
-
- self.result = Some(result.clone());
-
- loop {
- match self.listeners.pop() {
- None => { break; },
- Some(ch) => {
- match &result {
- Ok(result) => { let _ = ch.send(Ok(result.clone())); },
- Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
- }
- },
- }
- }
- }
-
- pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
- use futures::future::{ok, Either};
-
- match &self.result {
- None => {},
- Some(Ok(result)) => return Either::Left(ok(result.clone())),
- Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))),
- }
-
- let (tx, rx) = oneshot::channel::<Result<T, Error>>();
-
- self.listeners.push(tx);
-
- Either::Right(rx
- .map(|res| match res {
- Ok(Ok(t)) => Ok(t),
- Ok(Err(e)) => Err(e),
- Err(e) => Err(Error::from(e)),
- })
- )
- }
-}
-
-type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
-
-struct BroadCastFutureBinding<T> {
- broadcast: BroadcastData<T>,
- future: Option<SourceFuture<T>>,
-}
-
-/// Broadcast future results to registered listeners
-pub struct BroadcastFuture<T> {
- inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
-}
-
-impl<T: Clone + Send + 'static> BroadcastFuture<T> {
- /// Create instance for specified source future.
- ///
- /// The result of the future is sent to all registered listeners.
- pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
- let inner = BroadCastFutureBinding {
- broadcast: BroadcastData::new(),
- future: Some(Pin::from(source)),
- };
- Self { inner: Arc::new(Mutex::new(inner)) }
- }
-
- /// Creates a new instance with a oneshot channel as trigger
- pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
-
- let (tx, rx) = oneshot::channel::<Result<T, Error>>();
- let rx = rx
- .map_err(Error::from)
- .and_then(futures::future::ready);
-
- (Self::new(Box::new(rx)), tx)
- }
-
- fn notify_listeners(
- inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
- result: Result<T, String>,
- ) {
- let mut data = inner.lock().unwrap();
- data.broadcast.notify_listeners(result);
- }
-
- fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
- let mut data = inner.lock().unwrap();
-
- if let Some(source) = data.future.take() {
-
- let inner1 = inner.clone();
-
- let task = source.map(move |value| {
- match value {
- Ok(value) => Self::notify_listeners(inner1, Ok(value)),
- Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
- }
- });
- tokio::spawn(task);
- }
-
- data.broadcast.listen()
- }
-
- /// Register a listener
- pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
- let inner2 = self.inner.clone();
- async move { Self::spawn(inner2).await }
- }
-}
-
-#[test]
-fn test_broadcast_future() {
- use std::sync::atomic::{AtomicUsize, Ordering};
-
- static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
-
- let (sender, trigger) = BroadcastFuture::new_oneshot();
-
- let receiver1 = sender.listen()
- .map_ok(|res| {
- CHECKSUM.fetch_add(res, Ordering::SeqCst);
- })
- .map_err(|err| { panic!("got error {}", err); })
- .map(|_| ());
-
- let receiver2 = sender.listen()
- .map_ok(|res| {
- CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
- })
- .map_err(|err| { panic!("got error {}", err); })
- .map(|_| ());
-
- let rt = tokio::runtime::Runtime::new().unwrap();
- rt.block_on(async move {
- let r1 = tokio::spawn(receiver1);
- let r2 = tokio::spawn(receiver2);
-
- trigger.send(Ok(1)).unwrap();
- let _ = r1.await;
- let _ = r2.await;
- });
-
- let result = CHECKSUM.load(Ordering::SeqCst);
-
- assert_eq!(result, 3);
-
- // the result stays available until the BroadcastFuture is dropped
- rt.block_on(sender.listen()
- .map_ok(|res| {
- CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
- })
- .map_err(|err| { panic!("got error {}", err); })
- .map(|_| ()));
-
- let result = CHECKSUM.load(Ordering::SeqCst);
- assert_eq!(result, 7);
-}
+++ /dev/null
-use std::path::PathBuf;
-use std::mem::MaybeUninit;
-
-use anyhow::{bail, format_err, Error};
-use foreign_types::ForeignTypeRef;
-use openssl::x509::{X509, GeneralName};
-use openssl::stack::Stack;
-use openssl::pkey::{Public, PKey};
-
-use pbs_buildcfg::configdir;
-
-// C type:
-#[allow(non_camel_case_types)]
-type ASN1_TIME = <openssl::asn1::Asn1TimeRef as ForeignTypeRef>::CType;
-
-extern "C" {
- fn ASN1_TIME_to_tm(s: *const ASN1_TIME, tm: *mut libc::tm) -> libc::c_int;
-}
-
-fn asn1_time_to_unix(time: &openssl::asn1::Asn1TimeRef) -> Result<i64, Error> {
- let mut c_tm = MaybeUninit::<libc::tm>::uninit();
- let rc = unsafe { ASN1_TIME_to_tm(time.as_ptr(), c_tm.as_mut_ptr()) };
- if rc != 1 {
- bail!("failed to parse ASN1 time");
- }
- let mut c_tm = unsafe { c_tm.assume_init() };
- proxmox::tools::time::timegm(&mut c_tm)
-}
-
-pub struct CertInfo {
- x509: X509,
-}
-
-fn x509name_to_string(name: &openssl::x509::X509NameRef) -> Result<String, Error> {
- let mut parts = Vec::new();
- for entry in name.entries() {
- parts.push(format!("{} = {}", entry.object().nid().short_name()?, entry.data().as_utf8()?));
- }
- Ok(parts.join(", "))
-}
-
-impl CertInfo {
- pub fn new() -> Result<Self, Error> {
- Self::from_path(PathBuf::from(configdir!("/proxy.pem")))
- }
-
- pub fn from_path(path: PathBuf) -> Result<Self, Error> {
- Self::from_pem(&proxmox::tools::fs::file_get_contents(&path)?)
- .map_err(|err| format_err!("failed to load certificate from {:?} - {}", path, err))
- }
-
- pub fn from_pem(cert_pem: &[u8]) -> Result<Self, Error> {
- let x509 = openssl::x509::X509::from_pem(&cert_pem)?;
- Ok(Self{
- x509
- })
- }
-
- pub fn subject_alt_names(&self) -> Option<Stack<GeneralName>> {
- self.x509.subject_alt_names()
- }
-
- pub fn subject_name(&self) -> Result<String, Error> {
- Ok(x509name_to_string(self.x509.subject_name())?)
- }
-
- pub fn issuer_name(&self) -> Result<String, Error> {
- Ok(x509name_to_string(self.x509.issuer_name())?)
- }
-
- pub fn fingerprint(&self) -> Result<String, Error> {
- let fp = self.x509.digest(openssl::hash::MessageDigest::sha256())?;
- let fp_string = proxmox::tools::digest_to_hex(&fp);
- let fp_string = fp_string.as_bytes().chunks(2).map(|v| std::str::from_utf8(v).unwrap())
- .collect::<Vec<&str>>().join(":");
- Ok(fp_string)
- }
-
- pub fn public_key(&self) -> Result<PKey<Public>, Error> {
- let pubkey = self.x509.public_key()?;
- Ok(pubkey)
- }
-
- pub fn not_before(&self) -> &openssl::asn1::Asn1TimeRef {
- self.x509.not_before()
- }
-
- pub fn not_after(&self) -> &openssl::asn1::Asn1TimeRef {
- self.x509.not_after()
- }
-
- pub fn not_before_unix(&self) -> Result<i64, Error> {
- asn1_time_to_unix(&self.not_before())
- }
-
- pub fn not_after_unix(&self) -> Result<i64, Error> {
- asn1_time_to_unix(&self.not_after())
- }
-
- /// Check if the certificate is expired at or after a specific unix epoch.
- pub fn is_expired_after_epoch(&self, epoch: i64) -> Result<bool, Error> {
- Ok(self.not_after_unix()? < epoch)
- }
-}
use anyhow::{bail, format_err, Error};
use serde_json::Value;
use openssl::hash::{hash, DigestBytes, MessageDigest};
-use percent_encoding::{utf8_percent_encode, AsciiSet};
pub use proxmox::tools::fd::Fd;
use proxmox::tools::fs::{create_path, CreateOptions};
pub mod acl;
pub mod apt;
pub mod async_io;
-pub mod cert;
pub mod compression;
pub mod config;
pub mod cpio;
mod async_channel_writer;
pub use async_channel_writer::AsyncChannelWriter;
-mod std_channel_writer;
-pub use std_channel_writer::StdChannelWriter;
-
-mod tokio_writer_adapter;
-pub use tokio_writer_adapter::TokioWriterAdapter;
-
mod file_logger;
pub use file_logger::{FileLogger, FileLogOptions};
-mod broadcast_future;
-pub use broadcast_future::{BroadcastData, BroadcastFuture};
+pub use pbs_tools::broadcast_future::{BroadcastData, BroadcastFuture};
/// The `BufferedRead` trait provides a single function
/// `buffered_read`. It returns a reference to an internal buffer. The
None
}
-/// percent encode a url component
-pub fn percent_encode_component(comp: &str) -> String {
- utf8_percent_encode(comp, percent_encoding::NON_ALPHANUMERIC).to_string()
-}
-
/// Detect modified configuration files
///
/// This function fails with a reasonable error message if checksums do not match.
SimpleHttp::with_options(options)
}
-/// This used to be: `SIMPLE_ENCODE_SET` plus space, `"`, `#`, `<`, `>`, backtick, `?`, `{`, `}`
-pub const DEFAULT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS // 0..1f and 7e
- // The SIMPLE_ENCODE_SET adds space and anything >= 0x7e (7e itself is already included above)
- .add(0x20)
- .add(0x7f)
- // the DEFAULT_ENCODE_SET added:
- .add(b' ')
- .add(b'"')
- .add(b'#')
- .add(b'<')
- .add(b'>')
- .add(b'`')
- .add(b'?')
- .add(b'{')
- .add(b'}');
-
/// Get an iterator over lines of a file, skipping empty lines and comments (lines starting with a
/// `#`).
pub fn file_get_non_comment_lines<P: AsRef<Path>>(
+++ /dev/null
-use std::io::Write;
-use std::sync::mpsc::SyncSender;
-
-use anyhow::{Error};
-
-/// Wrapper around SyncSender, which implements Write
-///
-/// Each write in translated into a send(Vec<u8>).
-pub struct StdChannelWriter(SyncSender<Result<Vec<u8>, Error>>);
-
-impl StdChannelWriter {
- pub fn new(sender: SyncSender<Result<Vec<u8>, Error>>) -> Self {
- Self(sender)
- }
-}
-
-impl Write for StdChannelWriter {
- fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
- self.0
- .send(Ok(buf.to_vec()))
- .map_err(proxmox::sys::error::io_err_other)
- .and(Ok(buf.len()))
- }
-
- fn flush(&mut self) -> Result<(), std::io::Error> {
- Ok(())
- }
-}
+++ /dev/null
-use std::io::Write;
-
-use tokio::task::block_in_place;
-
-/// Wrapper around a writer which implements Write
-///
-/// wraps each write with a 'block_in_place' so that
-/// any (blocking) writer can be safely used in async context in a
-/// tokio runtime
-pub struct TokioWriterAdapter<W: Write>(W);
-
-impl<W: Write> TokioWriterAdapter<W> {
- pub fn new(writer: W) -> Self {
- Self(writer)
- }
-}
-
-impl<W: Write> Write for TokioWriterAdapter<W> {
- fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
- block_in_place(|| self.0.write(buf))
- }
-
- fn flush(&mut self) -> Result<(), std::io::Error> {
- block_in_place(|| self.0.flush())
- }
-}