use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{channel, Sender, Receiver};
-use std::ffi::CString;
+use std::ffi::{CStr, CString};
use std::ptr;
use std::os::raw::{c_char, c_void};
struct BackupTask {
worker: JoinHandle<Result<BackupStats, Error>>,
command_tx: Sender<BackupMessage>,
+ aborted: Option<String>, // set on abort, conatins abort reason
}
#[derive(Debug)]
Ok(BackupTask {
worker,
command_tx,
+ aborted: None,
})
}
}
) -> Result<BackupStats, Error> {
let mut builder = tokio::runtime::Builder::new();
-
+
builder.blocking_threads(1);
builder.core_threads(4);
builder.name_prefix("proxmox-backup-qemu-");
runtime.spawn(async move {
loop {
- let msg = command_rx.recv().unwrap();
+ let msg = command_rx.recv().unwrap(); // todo: should be blocking
match msg {
BackupMessage::Abort => {
#[no_mangle]
pub extern "C" fn proxmox_backup_abort(
handle: *mut ProxmoxBackupHandle,
+ reason: * mut c_char,
) {
- let task = handle as * mut BackupTask;
+ let task = unsafe { &mut *(handle as * mut BackupTask) };
+
+ let reason = unsafe { CStr::from_ptr(reason).to_string_lossy().into_owned() };
+ task.aborted = Some(reason);
println!("send abort");
- let _res = unsafe { (*task).command_tx.send(BackupMessage::Abort) };
+ let _res = task.command_tx.send(BackupMessage::Abort);
}
#[no_mangle]
callback_data: *mut c_void,
error: * mut * mut c_char,
) {
+ let task = unsafe { &mut *(handle as * mut BackupTask) };
+
+ if let Some(_reason) = &task.aborted {
+ let errmsg = CString::new("task already aborted".to_string()).unwrap();
+ unsafe { *error = errmsg.into_raw(); }
+ callback(callback_data);
+ return;
+ }
let msg = BackupMessage::WriteData {
dev_id,
callback_info: CallbackPointers { callback, callback_data, error },
};
- let task = handle as * mut BackupTask;
-
println!("write_data_async start");
- let _res = unsafe { (*task).command_tx.send(msg) }; // fixme: log errors
+ let _res = task.command_tx.send(msg); // fixme: log errors
println!("write_data_async end");
}