]> git.proxmox.com Git - proxmox-backup-qemu.git/commitdiff
implement async connect command
authorDietmar Maurer <dietmar@proxmox.com>
Thu, 24 Oct 2019 11:52:16 +0000 (13:52 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Thu, 24 Oct 2019 11:52:16 +0000 (13:52 +0200)
src/capi_types.rs
src/commands.rs
src/lib.rs
src/worker_task.rs

index 35ed7af8c3299b2e3e5009e8ed54077aa0340ca6..035865a7ff6f6921ff5c47db1f616ecbef75e04c 100644 (file)
@@ -41,6 +41,9 @@ pub struct ProxmoxBackupHandle;
 pub(crate) enum BackupMessage {
     End,
     Abort,
+    Connect {
+        callback_info: CallbackPointers,
+    },
     AddConfig {
         name: String,
         data: DataPointer,
index 486db8f991a518199662edb12ba9bb9b8b33ca50..3ab84cc17ed38bfea148dfdd09421a7762fdef50 100644 (file)
@@ -28,6 +28,14 @@ pub(crate) struct BackupSetup {
     pub key_password: Option<String>,
 }
 
+impl BackupSetup {
+
+    pub(crate) async fn connect(&self) -> Result<Arc<BackupWriter>, Error> {
+        let client = HttpClient::new(&self.host, &self.user, self.password.clone())?;
+        BackupWriter::start(client, &self.store, "vm", &self.backup_id, self.backup_time, false).await
+    }
+}
+
 struct ImageUploadInfo {
     wid: u64,
     device_name: String,
index 6d8e5480a433efb2d5e81306bc614610536a674b..5c6c813c676226d73274eeda2336b69714a7a0eb 100644 (file)
@@ -51,14 +51,9 @@ macro_rules! raise_error_int {
     }}
 }
 
-/// Start new backup
-///
-/// Open a connection to the backup servers and start a new backup
-/// task.
-///
-/// Note: This call is currently not async and can block.
+/// Create new instance
 #[no_mangle]
-pub extern "C" fn proxmox_backup_connect(
+pub extern "C" fn proxmox_backup_new(
     repo: *const c_char,
     backup_id: *const c_char,
     backup_time: u64,
@@ -68,7 +63,7 @@ pub extern "C" fn proxmox_backup_connect(
     error: * mut * mut c_char,
 ) -> *mut ProxmoxBackupHandle {
 
-    let setup: Result<_, Error> = try_block!({
+    let task: Result<_, Error> = try_block!({
         let repo = unsafe { CStr::from_ptr(repo).to_str()?.to_owned() };
         let repo: BackupRepository = repo.parse()?;
 
@@ -94,7 +89,7 @@ pub extern "C" fn proxmox_backup_connect(
             Some(unsafe { CStr::from_ptr(key_password).to_str()?.to_owned() })
         };
 
-        Ok(BackupSetup {
+        let setup = BackupSetup {
             host: repo.host().to_owned(),
             user: repo.user().to_owned(),
             store: repo.store().to_owned(),
@@ -104,23 +99,46 @@ pub extern "C" fn proxmox_backup_connect(
             backup_time,
             keyfile,
             key_password,
-        })
+        };
+
+        BackupTask::new(setup)
     });
 
-    match setup {
-        Ok(setup) => {
-            match BackupTask::new(setup) {
-                Ok(task) => {
-                    let boxed_task = Box::new(task);
-                    Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle
-                }
-                Err(err) => raise_error_null!(error, err),
-            }
+    match task {
+        Ok(task) => {
+            let boxed_task = Box::new(task);
+            Box::into_raw(boxed_task) as * mut ProxmoxBackupHandle
         }
         Err(err) => raise_error_null!(error, err),
     }
 }
 
+/// Open connection to the backup server
+#[no_mangle]
+pub extern "C" fn proxmox_backup_connect_async(
+    handle: *mut ProxmoxBackupHandle,
+    callback: extern "C" fn(*mut c_void),
+    callback_data: *mut c_void,
+    error: * mut * mut c_char,
+) {
+    let task = unsafe { &mut *(handle as * mut BackupTask) };
+
+    if let Some(_reason) = &task.aborted {
+        let errmsg = CString::new("task already aborted".to_string()).unwrap();
+        unsafe { *error =  errmsg.into_raw(); }
+        callback(callback_data);
+        return;
+    }
+
+    let msg = BackupMessage::Connect {
+        callback_info: CallbackPointers { callback, callback_data, error },
+    };
+
+    println!("connect_async start");
+    let _res = task.command_tx.send(msg); // fixme: log errors
+    println!("connect_async end");
+}
+
 /// Abort a running backup task
 ///
 /// This stops the current backup task. It is still necessary to call
index 1c4d4942d20e45df9bc370a11da2ec244690da85..7bc1ca047a8af06215b6250ae628fd3ebf7b8232 100644 (file)
@@ -6,9 +6,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::mpsc::{channel, Sender, Receiver};
 
 use futures::future::{Future, Either, FutureExt};
-use tokio::runtime::Runtime;
 
-use proxmox_backup::client::*;
 use proxmox_backup::tools::BroadcastFuture;
 use proxmox_backup::backup::{CryptConfig, load_and_decrtypt_key};
 
@@ -61,16 +59,6 @@ impl BackupTask {
     }
 }
 
-fn connect(runtime: &mut Runtime, setup: &BackupSetup) -> Result<Arc<BackupWriter>, Error> {
-    let client = HttpClient::new(&setup.host, &setup.user, setup.password.clone())?;
-
-    let client = runtime.block_on(
-        BackupWriter::start(client, &setup.store, "vm", &setup.backup_id, setup.backup_time, false)
-    )?;
-
-    Ok(client)
-}
-
 fn handle_async_command<F: 'static + Send + Future<Output=Result<(), Error>>>(
     command_future: F,
     abort_future: impl 'static + Send + Future<Output=Result<(), Error>>,
@@ -103,7 +91,7 @@ fn backup_worker_task(
     builder.core_threads(4);
     builder.name_prefix("proxmox-backup-qemu-");
 
-    let mut runtime = match builder.build() {
+    let runtime = match builder.build() {
         Ok(runtime) => runtime,
         Err(err) =>  {
             connect_tx.send(Err(format_err!("create runtime failed: {}", err))).unwrap();
@@ -111,19 +99,11 @@ fn backup_worker_task(
         }
     };
 
-    let client = match connect(&mut runtime, &setup) {
-        Ok(client) => {
-            connect_tx.send(Ok(())).unwrap();
-            client
-        }
-        Err(err) => {
-            connect_tx.send(Err(err)).unwrap();
-            bail!("connection failed");
-        }
-    };
-
+    connect_tx.send(Ok(())).unwrap();
     drop(connect_tx); // no longer needed
 
+    let mut client = None;
+
     let (mut abort_tx, mut abort_rx) = tokio::sync::mpsc::channel(1);
     let abort_rx = async move {
         match abort_rx.recv().await {
@@ -149,6 +129,18 @@ fn backup_worker_task(
             let msg = command_rx.recv().unwrap(); // todo: should be blocking
 
             match msg {
+                BackupMessage::Connect { callback_info } => {
+                    client = match setup.connect().await {
+                        Ok(client) => {
+                            callback_info.send_result(Ok(()));
+                            Some(client)
+                        }
+                        Err(err) => {
+                            callback_info.send_result(Err(err));
+                            None
+                        }
+                    };
+                }
                 BackupMessage::Abort => {
                     println!("worker got abort mesage");
                     let res = abort_tx.send(()).await;
@@ -161,65 +153,99 @@ fn backup_worker_task(
                     break;
                 }
                 BackupMessage::AddConfig { name, data, size, result_channel } => {
-                    let res = add_config(
-                        client.clone(),
-                        crypt_config.clone(),
-                        registry.clone(),
-                        name,
-                        data,
-                        size,
-                    ).await;
-
-                    let _ = result_channel.lock().unwrap().send(res);
+                    match client {
+                        Some(ref client) => {
+                            let res = add_config(
+                                client.clone(),
+                                crypt_config.clone(),
+                                registry.clone(),
+                                name,
+                                data,
+                                size,
+                            ).await;
+
+                            let _ = result_channel.lock().unwrap().send(res);
+                        }
+                        None => {
+                           let _ = result_channel.lock().unwrap().send(Err(format_err!("not connected")));
+                        }
+                    }
                 }
                 BackupMessage::RegisterImage { device_name, size, result_channel } => {
-                    let res = register_image(
-                        client.clone(),
-                        crypt_config.clone(),
-                        registry.clone(),
-                        known_chunks.clone(),
-                        device_name,
-                        size,
-                        chunk_size,
-                    ).await;
-                    let _ = result_channel.lock().unwrap().send(res);
+                    match client {
+                        Some(ref client) => {
+                            let res = register_image(
+                                client.clone(),
+                                crypt_config.clone(),
+                                registry.clone(),
+                                known_chunks.clone(),
+                                device_name,
+                                size,
+                                chunk_size,
+                            ).await;
+                            let _ = result_channel.lock().unwrap().send(res);
+                        }
+                        None => {
+                           let _ = result_channel.lock().unwrap().send(Err(format_err!("not connected")));
+                        }
+                    }
                 }
                 BackupMessage::CloseImage { dev_id, callback_info } => {
-                    handle_async_command(
-                        close_image(client.clone(), registry.clone(), dev_id),
-                        abort.listen(),
-                        callback_info,
-                    ).await;
+                    match client {
+                        Some(ref client) => {
+                            handle_async_command(
+                                close_image(client.clone(), registry.clone(), dev_id),
+                                abort.listen(),
+                                callback_info,
+                            ).await;
+                        }
+                        None => {
+                            callback_info.send_result(Err(format_err!("not connected")));
+                        }
+                    }
                 }
                 BackupMessage::WriteData { dev_id, data, offset, size, callback_info } => {
-                    written_bytes2.fetch_add(size, Ordering::SeqCst);
-
-                    handle_async_command(
-                        write_data(
-                            client.clone(),
-                            crypt_config.clone(),
-                            registry.clone(),
-                            known_chunks.clone(),
-                            dev_id, data,
-                            offset,
-                            size,
-                            chunk_size,
-                        ),
-                        abort.listen(),
-                        callback_info,
-                    ).await;
+                    match client {
+                        Some(ref client) => {
+                            written_bytes2.fetch_add(size, Ordering::SeqCst);
+                            handle_async_command(
+                                write_data(
+                                    client.clone(),
+                                    crypt_config.clone(),
+                                    registry.clone(),
+                                    known_chunks.clone(),
+                                    dev_id, data,
+                                    offset,
+                                    size,
+                                    chunk_size,
+                                ),
+                                abort.listen(),
+                                callback_info,
+                            ).await;
+                        }
+                        None => {
+                            callback_info.send_result(Err(format_err!("not connected")));
+                        }
+                    }
                 }
                 BackupMessage::Finish { callback_info } => {
-                    handle_async_command(
-                        finish_backup(
-                            client.clone(),
-                            crypt_config.clone(),
-                            registry.clone(),
-                            setup.clone(),
-                        ),
-                        abort.listen(),
-                        callback_info,
-                    ).await;
+                    match client {
+                        Some(ref client) => {
+                            handle_async_command(
+                                finish_backup(
+                                    client.clone(),
+                                    crypt_config.clone(),
+                                    registry.clone(),
+                                    setup.clone(),
+                                ),
+                                abort.listen(),
+                                callback_info,
+                            ).await;
+                        }
+                        None => {
+                            callback_info.send_result(Err(format_err!("not connected")));
+                        }
+                    }
                 }
             }
         }