Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
use anyhow::{bail, Error};
use serde_json::Value;
use ::serde::{Deserialize, Serialize};
use anyhow::{bail, Error};
use serde_json::Value;
use ::serde::{Deserialize, Serialize};
use proxmox::api::{api, ApiMethod, Router, RpcEnvironment, Permission};
use proxmox::tools::fs::open_file_locked;
use proxmox::api::{api, ApiMethod, Router, RpcEnvironment, Permission};
use proxmox::tools::fs::open_file_locked;
}
status.removed_chunks += 1;
status.removed_bytes += stat.st_size as u64;
}
status.removed_chunks += 1;
status.removed_bytes += stat.st_size as u64;
+ } else if stat.st_atime < oldest_writer {
+ status.pending_chunks += 1;
+ status.pending_bytes += stat.st_size as u64;
- if stat.st_atime < oldest_writer {
- status.pending_chunks += 1;
- status.pending_bytes += stat.st_size as u64;
- } else {
- status.disk_chunks += 1;
- status.disk_bytes += stat.st_size as u64;
- }
+ status.disk_chunks += 1;
+ status.disk_bytes += stat.st_size as u64;
let mut file = open_options.open(&path)
.map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
let mut file = open_options.open(&path)
.map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
- write!(file, "{}\n", userid)
+ writeln!(file, "{}", userid)
.map_err(|err| format_err!("unable to write owner file {:?} - {}", path, err))?;
Ok(())
.map_err(|err| format_err!("unable to write owner file {:?} - {}", path, err))?;
Ok(())
+type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
+type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
+
impl BackupWriter {
fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
impl BackupWriter {
fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
let archive = if self.verbose {
archive_name.to_string()
} else {
let archive = if self.verbose {
archive_name.to_string()
} else {
- crate::tools::format::strip_server_file_extension(archive_name.clone())
+ crate::tools::format::strip_server_file_extension(archive_name)
};
if archive_name != CATALOG_NAME {
let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
};
if archive_name != CATALOG_NAME {
let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
(verify_queue_tx, verify_result_rx)
}
(verify_queue_tx, verify_result_rx)
}
- fn append_chunk_queue(h2: H2Client, wid: u64, path: String, verbose: bool) -> (
- mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
- oneshot::Receiver<Result<(), Error>>,
- ) {
+ fn append_chunk_queue(
+ h2: H2Client,
+ wid: u64,
+ path: String,
+ verbose: bool,
+ ) -> (UploadQueueSender, UploadResultReceiver) {
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
let (verify_result_tx, verify_result_rx) = oneshot::channel();
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
let (verify_result_tx, verify_result_rx) = oneshot::channel();
- let h2_2 = h2.clone();
-
// FIXME: async-block-ify this code!
tokio::spawn(
verify_queue_rx
// FIXME: async-block-ify this code!
tokio::spawn(
verify_queue_rx
let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
let param_data = bytes::Bytes::from(param.to_string().into_bytes());
let upload_data = Some(param_data);
let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
let param_data = bytes::Bytes::from(param.to_string().into_bytes());
let upload_data = Some(param_data);
- h2_2.send_request(request, upload_data)
+ h2.send_request(request, upload_data)
.and_then(move |response| {
response
.map_err(Error::from)
.and_then(move |response| {
response
.map_err(Error::from)
+ // We have no `self` here for `h2` and `verbose`, the only other arg "common" with 1 other
+ // funciton in the same path is `wid`, so those 3 could be in a struct, but there's no real use
+ // since this is a private method.
+ #[allow(clippy::too_many_arguments)]
fn upload_chunk_info_stream(
h2: H2Client,
wid: u64,
fn upload_chunk_info_stream(
h2: H2Client,
wid: u64,
let is_fixed_chunk_size = prefix == "fixed";
let (upload_queue, upload_result) =
let is_fixed_chunk_size = prefix == "fixed";
let (upload_queue, upload_result) =
- Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned(), verbose);
+ Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, verbose);
let start_time = std::time::Instant::now();
let start_time = std::time::Instant::now();
let digest = chunk_info.digest;
let digest_str = digest_to_hex(&digest);
let digest = chunk_info.digest;
let digest_str = digest_to_hex(&digest);
- if false && verbose { // TO verbose, needs finer verbosity setting granularity
+ /* too verbose, needs finer verbosity setting granularity
+ if verbose {
println!("upload new chunk {} ({} bytes, offset {})", digest_str,
chunk_info.chunk_len, offset);
}
println!("upload new chunk {} ({} bytes, offset {})", digest_str,
chunk_info.chunk_len, offset);
}
let chunk_data = chunk_info.chunk.into_inner();
let param = json!({
let chunk_data = chunk_info.chunk.into_inner();
let param = json!({
for line in raw.split('\n') {
let items: Vec<String> = line.split_whitespace().map(String::from).collect();
for line in raw.split('\n') {
let items: Vec<String> = line.split_whitespace().map(String::from).collect();
- if items.len() == 2 {
- if &items[0] == server {
- return Some(items[1].clone());
- }
+ if items.len() == 2 && &items[0] == server {
+ return Some(items[1].clone());
) {
let final_result = match err.downcast::<io::Error>() {
Ok(err) => {
) {
let final_result = match err.downcast::<io::Error>() {
Ok(err) => {
- if err.kind() == io::ErrorKind::Other {
- if self.verbose {
- eprintln!("an IO error occurred: {}", err);
- }
+ if err.kind() == io::ErrorKind::Other && self.verbose {
+ eprintln!("an IO error occurred: {}", err);
// acl to extended attribute names constants
// from: acl/include/acl_ea.h
// acl to extended attribute names constants
// from: acl/include/acl_ea.h
-pub const ACL_EA_ACCESS: &'static str = "system.posix_acl_access";
-pub const ACL_EA_DEFAULT: &'static str = "system.posix_acl_default";
+pub const ACL_EA_ACCESS: &str = "system.posix_acl_access";
+pub const ACL_EA_DEFAULT: &str = "system.posix_acl_default";
pub const ACL_EA_VERSION: u32 = 0x0002;
#[link(name = "acl")]
pub const ACL_EA_VERSION: u32 = 0x0002;
#[link(name = "acl")]
use super::loopdev;
use super::fs;
use super::loopdev;
use super::fs;
-const RUN_DIR: &'static str = "/run/pbs-loopdev";
+const RUN_DIR: &str = "/run/pbs-loopdev";
const_regex! {
pub LOOPDEV_REGEX = r"^loop\d+$";
const_regex! {
pub LOOPDEV_REGEX = r"^loop\d+$";