]> git.proxmox.com Git - mirror_qemu.git/blobdiff - block/curl.c
parallels: wrong call to bdrv_truncate
[mirror_qemu.git] / block / curl.c
index 0404c1b5fa560506e89626d0627d4c26152b48a2..34dbd335f4211d878427cf604d66e56ec56c9f41 100644 (file)
@@ -135,6 +135,7 @@ typedef struct BDRVCURLState {
     char *cookie;
     bool accept_range;
     AioContext *aio_context;
+    QemuMutex mutex;
     char *username;
     char *password;
     char *proxyusername;
@@ -192,19 +193,19 @@ static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action,
     switch (action) {
         case CURL_POLL_IN:
             aio_set_fd_handler(s->aio_context, fd, false,
-                               curl_multi_read, NULL, state);
+                               curl_multi_read, NULL, NULL, state);
             break;
         case CURL_POLL_OUT:
             aio_set_fd_handler(s->aio_context, fd, false,
-                               NULL, curl_multi_do, state);
+                               NULL, curl_multi_do, NULL, state);
             break;
         case CURL_POLL_INOUT:
             aio_set_fd_handler(s->aio_context, fd, false,
-                               curl_multi_read, curl_multi_do, state);
+                               curl_multi_read, curl_multi_do, NULL, state);
             break;
         case CURL_POLL_REMOVE:
             aio_set_fd_handler(s->aio_context, fd, false,
-                               NULL, NULL, NULL);
+                               NULL, NULL, NULL, NULL);
             break;
     }
 
@@ -333,6 +334,7 @@ static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len,
     return FIND_RET_NONE;
 }
 
+/* Called with s->mutex held.  */
 static void curl_multi_check_completion(BDRVCURLState *s)
 {
     int msgs_in_queue;
@@ -374,7 +376,9 @@ static void curl_multi_check_completion(BDRVCURLState *s)
                         continue;
                     }
 
-                    acb->common.cb(acb->common.opaque, -EPROTO);
+                    qemu_mutex_unlock(&s->mutex);
+                    acb->common.cb(acb->common.opaque, -EIO);
+                    qemu_mutex_lock(&s->mutex);
                     qemu_aio_unref(acb);
                     state->acb[i] = NULL;
                 }
@@ -386,9 +390,9 @@ static void curl_multi_check_completion(BDRVCURLState *s)
     }
 }
 
-static void curl_multi_do(void *arg)
+/* Called with s->mutex held.  */
+static void curl_multi_do_locked(CURLState *s)
 {
-    CURLState *s = (CURLState *)arg;
     CURLSocket *socket, *next_socket;
     int running;
     int r;
@@ -406,12 +410,23 @@ static void curl_multi_do(void *arg)
     }
 }
 
+static void curl_multi_do(void *arg)
+{
+    CURLState *s = (CURLState *)arg;
+
+    qemu_mutex_lock(&s->s->mutex);
+    curl_multi_do_locked(s);
+    qemu_mutex_unlock(&s->s->mutex);
+}
+
 static void curl_multi_read(void *arg)
 {
     CURLState *s = (CURLState *)arg;
 
-    curl_multi_do(arg);
+    qemu_mutex_lock(&s->s->mutex);
+    curl_multi_do_locked(s);
     curl_multi_check_completion(s->s);
+    qemu_mutex_unlock(&s->s->mutex);
 }
 
 static void curl_multi_timeout_do(void *arg)
@@ -424,9 +439,11 @@ static void curl_multi_timeout_do(void *arg)
         return;
     }
 
+    qemu_mutex_lock(&s->mutex);
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
 
     curl_multi_check_completion(s);
+    qemu_mutex_unlock(&s->mutex);
 #else
     abort();
 #endif
@@ -759,6 +776,7 @@ static int curl_open(BlockDriverState *bs, QDict *options, int flags,
     curl_easy_cleanup(state->curl);
     state->curl = NULL;
 
+    qemu_mutex_init(&s->mutex);
     curl_attach_aio_context(bs, bdrv_get_aio_context(bs));
 
     qemu_opts_del(opts);
@@ -784,13 +802,17 @@ static void curl_readv_bh_cb(void *p)
 {
     CURLState *state;
     int running;
+    int ret = -EINPROGRESS;
 
     CURLAIOCB *acb = p;
-    BDRVCURLState *s = acb->common.bs->opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVCURLState *s = bs->opaque;
 
     size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
     size_t end;
 
+    qemu_mutex_lock(&s->mutex);
+
     // In case we have the requested data already (e.g. read-ahead),
     // we can just call the callback and be done.
     switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
@@ -798,7 +820,7 @@ static void curl_readv_bh_cb(void *p)
             qemu_aio_unref(acb);
             // fall through
         case FIND_RET_WAIT:
-            return;
+            goto out;
         default:
             break;
     }
@@ -806,9 +828,8 @@ static void curl_readv_bh_cb(void *p)
     // No cache found, so let's start a new request
     state = curl_init_state(acb->common.bs, s);
     if (!state) {
-        acb->common.cb(acb->common.opaque, -EIO);
-        qemu_aio_unref(acb);
-        return;
+        ret = -EIO;
+        goto out;
     }
 
     acb->start = 0;
@@ -822,9 +843,8 @@ static void curl_readv_bh_cb(void *p)
     state->orig_buf = g_try_malloc(state->buf_len);
     if (state->buf_len && state->orig_buf == NULL) {
         curl_clean_state(state);
-        acb->common.cb(acb->common.opaque, -ENOMEM);
-        qemu_aio_unref(acb);
-        return;
+        ret = -ENOMEM;
+        goto out;
     }
     state->acb[0] = acb;
 
@@ -837,6 +857,13 @@ static void curl_readv_bh_cb(void *p)
 
     /* Tell curl it needs to kick things off */
     curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+
+out:
+    qemu_mutex_unlock(&s->mutex);
+    if (ret != -EINPROGRESS) {
+        acb->common.cb(acb->common.opaque, ret);
+        qemu_aio_unref(acb);
+    }
 }
 
 static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
@@ -861,6 +888,7 @@ static void curl_close(BlockDriverState *bs)
 
     DPRINTF("CURL: Close\n");
     curl_detach_aio_context(bs);
+    qemu_mutex_destroy(&s->mutex);
 
     g_free(s->cookie);
     g_free(s->url);