}}
}
-/// Start new backup
-///
-/// Open a connection to the backup servers and start a new backup
-/// task.
-///
-/// Note: This call is currently not async and can block.
+/// Create new instance
#[no_mangle]
-pub extern "C" fn proxmox_backup_connect(
+pub extern "C" fn proxmox_backup_new(
repo: *const c_char,
backup_id: *const c_char,
backup_time: u64,
error: * mut * mut c_char,
) -> *mut ProxmoxBackupHandle {
- let setup: Result<_, Error> = try_block!({
+ let task: Result<_, Error> = try_block!({
let repo = unsafe { CStr::from_ptr(repo).to_str()?.to_owned() };
let repo: BackupRepository = repo.parse()?;
Some(unsafe { CStr::from_ptr(key_password).to_str()?.to_owned() })
};
- Ok(BackupSetup {
+ let setup = BackupSetup {
host: repo.host().to_owned(),
user: repo.user().to_owned(),
store: repo.store().to_owned(),
backup_time,
keyfile,
key_password,
- })
+ };
+
+ BackupTask::new(setup)
});
- match setup {
- Ok(setup) => {
- match BackupTask::new(setup) {
- Ok(task) => {
- let boxed_task = Box::new(task);
- Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle
- }
- Err(err) => raise_error_null!(error, err),
- }
+ match task {
+ Ok(task) => {
+ let boxed_task = Box::new(task);
+ Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle
}
Err(err) => raise_error_null!(error, err),
}
}
+/// Open connection to the backup server
+#[no_mangle]
+pub extern "C" fn proxmox_backup_connect_async(
+ handle: *mut ProxmoxBackupHandle,
+ callback: extern "C" fn(*mut c_void),
+ callback_data: *mut c_void,
+ error: * mut * mut c_char,
+) {
+ let task = unsafe { &mut *(handle as * mut BackupTask) };
+
+ if let Some(_reason) = &task.aborted {
+ let errmsg = CString::new("task already aborted".to_string()).unwrap();
+ unsafe { *error = errmsg.into_raw(); }
+ callback(callback_data);
+ return;
+ }
+
+ let msg = BackupMessage::Connect {
+ callback_info: CallbackPointers { callback, callback_data, error },
+ };
+
+ println!("connect_async start");
+ let _res = task.command_tx.send(msg); // fixme: log errors
+ println!("connect_async end");
+}
+
/// Abort a running backup task
///
/// This stops the current backup task. It is still necessary to call
use std::sync::mpsc::{channel, Sender, Receiver};
use futures::future::{Future, Either, FutureExt};
-use tokio::runtime::Runtime;
-use proxmox_backup::client::*;
use proxmox_backup::tools::BroadcastFuture;
use proxmox_backup::backup::{CryptConfig, load_and_decrtypt_key};
}
}
-fn connect(runtime: &mut Runtime, setup: &BackupSetup) -> Result<Arc<BackupWriter>, Error> {
- let client = HttpClient::new(&setup.host, &setup.user, setup.password.clone())?;
-
- let client = runtime.block_on(
- BackupWriter::start(client, &setup.store, "vm", &setup.backup_id, setup.backup_time, false)
- )?;
-
- Ok(client)
-}
-
fn handle_async_command<F: 'static + Send + Future<Output=Result<(), Error>>>(
command_future: F,
abort_future: impl 'static + Send + Future<Output=Result<(), Error>>,
builder.core_threads(4);
builder.name_prefix("proxmox-backup-qemu-");
- let mut runtime = match builder.build() {
+ let runtime = match builder.build() {
Ok(runtime) => runtime,
Err(err) => {
connect_tx.send(Err(format_err!("create runtime failed: {}", err))).unwrap();
}
};
- let client = match connect(&mut runtime, &setup) {
- Ok(client) => {
- connect_tx.send(Ok(())).unwrap();
- client
- }
- Err(err) => {
- connect_tx.send(Err(err)).unwrap();
- bail!("connection failed");
- }
- };
-
+ connect_tx.send(Ok(())).unwrap();
drop(connect_tx); // no longer needed
+ let mut client = None;
+
let (mut abort_tx, mut abort_rx) = tokio::sync::mpsc::channel(1);
let abort_rx = async move {
match abort_rx.recv().await {
let msg = command_rx.recv().unwrap(); // todo: should be blocking
match msg {
+ BackupMessage::Connect { callback_info } => {
+ client = match setup.connect().await {
+ Ok(client) => {
+ callback_info.send_result(Ok(()));
+ Some(client)
+ }
+ Err(err) => {
+ callback_info.send_result(Err(err));
+ None
+ }
+ };
+ }
BackupMessage::Abort => {
println!("worker got abort mesage");
let res = abort_tx.send(()).await;
break;
}
BackupMessage::AddConfig { name, data, size, result_channel } => {
- let res = add_config(
- client.clone(),
- crypt_config.clone(),
- registry.clone(),
- name,
- data,
- size,
- ).await;
-
- let _ = result_channel.lock().unwrap().send(res);
+ match client {
+ Some(ref client) => {
+ let res = add_config(
+ client.clone(),
+ crypt_config.clone(),
+ registry.clone(),
+ name,
+ data,
+ size,
+ ).await;
+
+ let _ = result_channel.lock().unwrap().send(res);
+ }
+ None => {
+ let _ = result_channel.lock().unwrap().send(Err(format_err!("not connected")));
+ }
+ }
}
BackupMessage::RegisterImage { device_name, size, result_channel } => {
- let res = register_image(
- client.clone(),
- crypt_config.clone(),
- registry.clone(),
- known_chunks.clone(),
- device_name,
- size,
- chunk_size,
- ).await;
- let _ = result_channel.lock().unwrap().send(res);
+ match client {
+ Some(ref client) => {
+ let res = register_image(
+ client.clone(),
+ crypt_config.clone(),
+ registry.clone(),
+ known_chunks.clone(),
+ device_name,
+ size,
+ chunk_size,
+ ).await;
+ let _ = result_channel.lock().unwrap().send(res);
+ }
+ None => {
+ let _ = result_channel.lock().unwrap().send(Err(format_err!("not connected")));
+ }
+ }
}
BackupMessage::CloseImage { dev_id, callback_info } => {
- handle_async_command(
- close_image(client.clone(), registry.clone(), dev_id),
- abort.listen(),
- callback_info,
- ).await;
+ match client {
+ Some(ref client) => {
+ handle_async_command(
+ close_image(client.clone(), registry.clone(), dev_id),
+ abort.listen(),
+ callback_info,
+ ).await;
+ }
+ None => {
+ callback_info.send_result(Err(format_err!("not connected")));
+ }
+ }
}
BackupMessage::WriteData { dev_id, data, offset, size, callback_info } => {
- written_bytes2.fetch_add(size, Ordering::SeqCst);
-
- handle_async_command(
- write_data(
- client.clone(),
- crypt_config.clone(),
- registry.clone(),
- known_chunks.clone(),
- dev_id, data,
- offset,
- size,
- chunk_size,
- ),
- abort.listen(),
- callback_info,
- ).await;
+ match client {
+ Some(ref client) => {
+ written_bytes2.fetch_add(size, Ordering::SeqCst);
+ handle_async_command(
+ write_data(
+ client.clone(),
+ crypt_config.clone(),
+ registry.clone(),
+ known_chunks.clone(),
+ dev_id, data,
+ offset,
+ size,
+ chunk_size,
+ ),
+ abort.listen(),
+ callback_info,
+ ).await;
+ }
+ None => {
+ callback_info.send_result(Err(format_err!("not connected")));
+ }
+ }
}
BackupMessage::Finish { callback_info } => {
- handle_async_command(
- finish_backup(
- client.clone(),
- crypt_config.clone(),
- registry.clone(),
- setup.clone(),
- ),
- abort.listen(),
- callback_info,
- ).await;
+ match client {
+ Some(ref client) => {
+ handle_async_command(
+ finish_backup(
+ client.clone(),
+ crypt_config.clone(),
+ registry.clone(),
+ setup.clone(),
+ ),
+ abort.listen(),
+ callback_info,
+ ).await;
+ }
+ None => {
+ callback_info.send_result(Err(format_err!("not connected")));
+ }
+ }
}
}
}