]> git.proxmox.com Git - mirror_qemu.git/blobdiff - io/task.c
MAINTAINERS: update RBD block maintainer
[mirror_qemu.git] / io / task.c
index bf1a3339b2651080401cc4472bc4411527e85d40..64c4c7126afcbaf7fc18aca4eeec6c1fb4aaf5c1 100644 (file)
--- a/io/task.c
+++ b/io/task.c
 
 #include "qemu/osdep.h"
 #include "io/task.h"
+#include "qapi/error.h"
 #include "qemu/thread.h"
 #include "trace.h"
 
+struct QIOTaskThreadData {
+    QIOTaskWorker worker;
+    gpointer opaque;
+    GDestroyNotify destroy;
+    GMainContext *context;
+    GSource *completion;
+};
+
+
 struct QIOTask {
     Object *source;
     QIOTaskFunc func;
     gpointer opaque;
     GDestroyNotify destroy;
+    Error *err;
+    gpointer result;
+    GDestroyNotify destroyResult;
+    QemuMutex thread_lock;
+    QemuCond thread_cond;
+    struct QIOTaskThreadData *thread;
 };
 
 
@@ -45,6 +61,8 @@ QIOTask *qio_task_new(Object *source,
     task->func = func;
     task->opaque = opaque;
     task->destroy = destroy;
+    qemu_mutex_init(&task->thread_lock);
+    qemu_cond_init(&task->thread_cond);
 
     trace_qio_task_new(task, source, func, opaque);
 
@@ -53,42 +71,44 @@ QIOTask *qio_task_new(Object *source,
 
 static void qio_task_free(QIOTask *task)
 {
+    qemu_mutex_lock(&task->thread_lock);
+    if (task->thread) {
+        if (task->thread->destroy) {
+            task->thread->destroy(task->thread->opaque);
+        }
+
+        if (task->thread->context) {
+            g_main_context_unref(task->thread->context);
+        }
+
+        g_free(task->thread);
+    }
+
     if (task->destroy) {
         task->destroy(task->opaque);
     }
+    if (task->destroyResult) {
+        task->destroyResult(task->result);
+    }
+    if (task->err) {
+        error_free(task->err);
+    }
     object_unref(task->source);
 
+    qemu_mutex_unlock(&task->thread_lock);
+    qemu_mutex_destroy(&task->thread_lock);
+    qemu_cond_destroy(&task->thread_cond);
+
     g_free(task);
 }
 
 
-struct QIOTaskThreadData {
-    QIOTask *task;
-    QIOTaskWorker worker;
-    gpointer opaque;
-    GDestroyNotify destroy;
-    Error *err;
-    int ret;
-};
-
-
-static gboolean gio_task_thread_result(gpointer opaque)
+static gboolean qio_task_thread_result(gpointer opaque)
 {
-    struct QIOTaskThreadData *data = opaque;
-
-    trace_qio_task_thread_result(data->task);
-    if (data->ret == 0) {
-        qio_task_complete(data->task);
-    } else {
-        qio_task_abort(data->task, data->err);
-    }
+    QIOTask *task = opaque;
 
-    error_free(data->err);
-    if (data->destroy) {
-        data->destroy(data->opaque);
-    }
-
-    g_free(data);
+    trace_qio_task_thread_result(task);
+    qio_task_complete(task);
 
     return FALSE;
 }
@@ -96,21 +116,31 @@ static gboolean gio_task_thread_result(gpointer opaque)
 
 static gpointer qio_task_thread_worker(gpointer opaque)
 {
-    struct QIOTaskThreadData *data = opaque;
+    QIOTask *task = opaque;
 
-    trace_qio_task_thread_run(data->task);
-    data->ret = data->worker(data->task, &data->err, data->opaque);
-    if (data->ret < 0 && data->err == NULL) {
-        error_setg(&data->err, "Task worker failed but did not set an error");
-    }
+    trace_qio_task_thread_run(task);
+
+    task->thread->worker(task, task->thread->opaque);
 
     /* We're running in the background thread, and must only
      * ever report the task results in the main event loop
      * thread. So we schedule an idle callback to report
      * the worker results
      */
-    trace_qio_task_thread_exit(data->task);
-    g_idle_add(gio_task_thread_result, data);
+    trace_qio_task_thread_exit(task);
+
+    qemu_mutex_lock(&task->thread_lock);
+
+    task->thread->completion = g_idle_source_new();
+    g_source_set_callback(task->thread->completion,
+                          qio_task_thread_result, task, NULL);
+    g_source_attach(task->thread->completion,
+                    task->thread->context);
+    trace_qio_task_thread_source_attach(task, task->thread->completion);
+
+    qemu_cond_signal(&task->thread_cond);
+    qemu_mutex_unlock(&task->thread_lock);
+
     return NULL;
 }
 
@@ -118,43 +148,92 @@ static gpointer qio_task_thread_worker(gpointer opaque)
 void qio_task_run_in_thread(QIOTask *task,
                             QIOTaskWorker worker,
                             gpointer opaque,
-                            GDestroyNotify destroy)
+                            GDestroyNotify destroy,
+                            GMainContext *context)
 {
     struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
     QemuThread thread;
 
-    data->task = task;
+    if (context) {
+        g_main_context_ref(context);
+    }
+
     data->worker = worker;
     data->opaque = opaque;
     data->destroy = destroy;
+    data->context = context;
+
+    task->thread = data;
 
     trace_qio_task_thread_start(task, worker, opaque);
     qemu_thread_create(&thread,
                        "io-task-worker",
                        qio_task_thread_worker,
-                       data,
+                       task,
                        QEMU_THREAD_DETACHED);
 }
 
 
+void qio_task_wait_thread(QIOTask *task)
+{
+    qemu_mutex_lock(&task->thread_lock);
+    g_assert(task->thread != NULL);
+    while (task->thread->completion == NULL) {
+        qemu_cond_wait(&task->thread_cond, &task->thread_lock);
+    }
+
+    trace_qio_task_thread_source_cancel(task, task->thread->completion);
+    g_source_destroy(task->thread->completion);
+    qemu_mutex_unlock(&task->thread_lock);
+
+    qio_task_thread_result(task);
+}
+
+
 void qio_task_complete(QIOTask *task)
 {
-    task->func(task->source, NULL, task->opaque);
+    task->func(task, task->opaque);
     trace_qio_task_complete(task);
     qio_task_free(task);
 }
 
-void qio_task_abort(QIOTask *task,
-                    Error *err)
+
+void qio_task_set_error(QIOTask *task,
+                        Error *err)
+{
+    error_propagate(&task->err, err);
+}
+
+
+bool qio_task_propagate_error(QIOTask *task,
+                              Error **errp)
 {
-    task->func(task->source, err, task->opaque);
-    trace_qio_task_abort(task);
-    qio_task_free(task);
+    if (task->err) {
+        error_propagate(errp, task->err);
+        task->err = NULL;
+        return true;
+    }
+
+    return false;
+}
+
+
+void qio_task_set_result_pointer(QIOTask *task,
+                                 gpointer result,
+                                 GDestroyNotify destroy)
+{
+    task->result = result;
+    task->destroyResult = destroy;
+}
+
+
+gpointer qio_task_get_result_pointer(QIOTask *task)
+{
+    return task->result;
 }
 
 
 Object *qio_task_get_source(QIOTask *task)
 {
-    object_ref(task->source);
     return task->source;
 }