char *cookie;
bool accept_range;
AioContext *aio_context;
+ QemuMutex mutex;
char *username;
char *password;
char *proxyusername;
switch (action) {
case CURL_POLL_IN:
aio_set_fd_handler(s->aio_context, fd, false,
- curl_multi_read, NULL, state);
+ curl_multi_read, NULL, NULL, state);
break;
case CURL_POLL_OUT:
aio_set_fd_handler(s->aio_context, fd, false,
- NULL, curl_multi_do, state);
+ NULL, curl_multi_do, NULL, state);
break;
case CURL_POLL_INOUT:
aio_set_fd_handler(s->aio_context, fd, false,
- curl_multi_read, curl_multi_do, state);
+ curl_multi_read, curl_multi_do, NULL, state);
break;
case CURL_POLL_REMOVE:
aio_set_fd_handler(s->aio_context, fd, false,
- NULL, NULL, NULL);
+ NULL, NULL, NULL, NULL);
break;
}
return FIND_RET_NONE;
}
+/* Called with s->mutex held. */
static void curl_multi_check_completion(BDRVCURLState *s)
{
int msgs_in_queue;
continue;
}
- acb->common.cb(acb->common.opaque, -EPROTO);
+ qemu_mutex_unlock(&s->mutex);
+ acb->common.cb(acb->common.opaque, -EIO);
+ qemu_mutex_lock(&s->mutex);
qemu_aio_unref(acb);
state->acb[i] = NULL;
}
}
}
-static void curl_multi_do(void *arg)
+/* Called with s->mutex held. */
+static void curl_multi_do_locked(CURLState *s)
{
- CURLState *s = (CURLState *)arg;
CURLSocket *socket, *next_socket;
int running;
int r;
}
}
+static void curl_multi_do(void *arg)
+{
+ CURLState *s = (CURLState *)arg;
+
+ qemu_mutex_lock(&s->s->mutex);
+ curl_multi_do_locked(s);
+ qemu_mutex_unlock(&s->s->mutex);
+}
+
static void curl_multi_read(void *arg)
{
CURLState *s = (CURLState *)arg;
- curl_multi_do(arg);
+ qemu_mutex_lock(&s->s->mutex);
+ curl_multi_do_locked(s);
curl_multi_check_completion(s->s);
+ qemu_mutex_unlock(&s->s->mutex);
}
static void curl_multi_timeout_do(void *arg)
return;
}
+ qemu_mutex_lock(&s->mutex);
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
curl_multi_check_completion(s);
+ qemu_mutex_unlock(&s->mutex);
#else
abort();
#endif
curl_easy_cleanup(state->curl);
state->curl = NULL;
+ qemu_mutex_init(&s->mutex);
curl_attach_aio_context(bs, bdrv_get_aio_context(bs));
qemu_opts_del(opts);
{
CURLState *state;
int running;
+ int ret = -EINPROGRESS;
CURLAIOCB *acb = p;
- BDRVCURLState *s = acb->common.bs->opaque;
+ BlockDriverState *bs = acb->common.bs;
+ BDRVCURLState *s = bs->opaque;
size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
size_t end;
+ qemu_mutex_lock(&s->mutex);
+
// In case we have the requested data already (e.g. read-ahead),
// we can just call the callback and be done.
switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
qemu_aio_unref(acb);
// fall through
case FIND_RET_WAIT:
- return;
+ goto out;
default:
break;
}
// No cache found, so let's start a new request
state = curl_init_state(acb->common.bs, s);
if (!state) {
- acb->common.cb(acb->common.opaque, -EIO);
- qemu_aio_unref(acb);
- return;
+ ret = -EIO;
+ goto out;
}
acb->start = 0;
state->orig_buf = g_try_malloc(state->buf_len);
if (state->buf_len && state->orig_buf == NULL) {
curl_clean_state(state);
- acb->common.cb(acb->common.opaque, -ENOMEM);
- qemu_aio_unref(acb);
- return;
+ ret = -ENOMEM;
+ goto out;
}
state->acb[0] = acb;
/* Tell curl it needs to kick things off */
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+
+out:
+ qemu_mutex_unlock(&s->mutex);
+ if (ret != -EINPROGRESS) {
+ acb->common.cb(acb->common.opaque, ret);
+ qemu_aio_unref(acb);
+ }
}
static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
DPRINTF("CURL: Close\n");
curl_detach_aio_context(bs);
+ qemu_mutex_destroy(&s->mutex);
g_free(s->cookie);
g_free(s->url);