#include "qemu/option.h"
#include "qemu/cutils.h"
#include "qemu/main-loop.h"
-#include "qemu/atomic.h"
#include "qapi/qapi-visit-sockets.h"
#include "qapi/qmp/qstring.h"
typedef struct {
Coroutine *coroutine;
uint64_t offset; /* original offset of the request */
- bool receiving; /* waiting for connection_co? */
+ bool receiving; /* sleeping in the yield in nbd_receive_replies */
} NBDClientRequest;
typedef enum NBDClientState {
QIOChannel *ioc; /* The current I/O channel */
NBDExportInfo info;
- CoMutex send_mutex;
- CoQueue free_sema;
- Coroutine *connection_co;
- Coroutine *teardown_co;
- QemuCoSleep reconnect_sleep;
- bool drained;
- bool wait_drained_end;
- int in_flight;
+ /*
+ * Protects state, free_sema, in_flight, requests[].coroutine,
+ * reconnect_delay_timer.
+ */
+ QemuMutex requests_lock;
NBDClientState state;
- bool wait_in_flight;
-
+ CoQueue free_sema;
+ unsigned in_flight;
+ NBDClientRequest requests[MAX_NBD_REQUESTS];
QEMUTimer *reconnect_delay_timer;
- NBDClientRequest requests[MAX_NBD_REQUESTS];
+ /* Protects sending data on the socket. */
+ CoMutex send_mutex;
+
+ /*
+ * Protects receiving reply headers from the socket, as well as the
+ * fields reply and requests[].receiving
+ */
+ CoMutex receive_mutex;
NBDReply reply;
+
+ QEMUTimer *open_timer;
+
BlockDriverState *bs;
/* Connection parameters */
uint32_t reconnect_delay;
+ uint32_t open_timeout;
SocketAddress *saddr;
- char *export, *tlscredsid;
+ char *export;
+ char *tlscredsid;
QCryptoTLSCreds *tlscreds;
- const char *hostname;
+ char *tlshostname;
char *x_dirty_bitmap;
bool alloc_depth;
yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
+ /* Must not leave timers behind that would access freed data */
+ assert(!s->reconnect_delay_timer);
+ assert(!s->open_timer);
+
object_unref(OBJECT(s->tlscreds));
qapi_free_SocketAddress(s->saddr);
s->saddr = NULL;
s->export = NULL;
g_free(s->tlscredsid);
s->tlscredsid = NULL;
+ g_free(s->tlshostname);
+ s->tlshostname = NULL;
g_free(s->x_dirty_bitmap);
s->x_dirty_bitmap = NULL;
}
-static bool nbd_client_connected(BDRVNBDState *s)
+/* Called with s->receive_mutex taken. */
+static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
{
- return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
+ if (req->receiving) {
+ req->receiving = false;
+ aio_co_wake(req->coroutine);
+ return true;
+ }
+
+ return false;
}
-static void nbd_channel_error(BDRVNBDState *s, int ret)
+static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
{
+ int i;
+
+ QEMU_LOCK_GUARD(&s->receive_mutex);
+ for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+ if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
+ return;
+ }
+ }
+}
+
+/* Called with s->requests_lock held. */
+static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
+{
+ if (s->state == NBD_CLIENT_CONNECTED) {
+ qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ }
+
if (ret == -EIO) {
- if (nbd_client_connected(s)) {
+ if (s->state == NBD_CLIENT_CONNECTED) {
s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
NBD_CLIENT_CONNECTING_NOWAIT;
}
} else {
- if (nbd_client_connected(s)) {
- qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
- }
s->state = NBD_CLIENT_QUIT;
}
}
-static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
+static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
{
- int i;
-
- for (i = 0; i < MAX_NBD_REQUESTS; i++) {
- NBDClientRequest *req = &s->requests[i];
-
- if (req->coroutine && req->receiving) {
- req->receiving = false;
- aio_co_wake(req->coroutine);
- }
- }
+ QEMU_LOCK_GUARD(&s->requests_lock);
+ nbd_channel_error_locked(s, ret);
}
static void reconnect_delay_timer_del(BDRVNBDState *s)
{
BDRVNBDState *s = opaque;
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
- s->state = NBD_CLIENT_CONNECTING_NOWAIT;
- while (qemu_co_enter_next(&s->free_sema, NULL)) {
- /* Resume all queued requests */
+ reconnect_delay_timer_del(s);
+ WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+ if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
+ return;
}
+ s->state = NBD_CLIENT_CONNECTING_NOWAIT;
}
-
- reconnect_delay_timer_del(s);
+ nbd_co_establish_connection_cancel(s->conn);
}
static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
{
- if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) {
- return;
- }
-
assert(!s->reconnect_delay_timer);
s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
QEMU_CLOCK_REALTIME,
timer_mod(s->reconnect_delay_timer, expire_time_ns);
}
-static void nbd_client_detach_aio_context(BlockDriverState *bs)
+static void nbd_teardown_connection(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- /* Timer is deleted in nbd_client_co_drain_begin() */
- assert(!s->reconnect_delay_timer);
- /*
- * If reconnect is in progress we may have no ->ioc. It will be
- * re-instantiated in the proper aio context once the connection is
- * reestablished.
- */
+ assert(!s->in_flight);
+
if (s->ioc) {
- qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
+ qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+ nbd_yank, s->bs);
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
}
-}
-static void nbd_client_attach_aio_context_bh(void *opaque)
-{
- BlockDriverState *bs = opaque;
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- if (s->connection_co) {
- /*
- * The node is still drained, so we know the coroutine has yielded in
- * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
- * it is entered for the first time. Both places are safe for entering
- * the coroutine.
- */
- qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
+ WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+ s->state = NBD_CLIENT_QUIT;
}
- bdrv_dec_in_flight(bs);
}
-static void nbd_client_attach_aio_context(BlockDriverState *bs,
- AioContext *new_context)
+static void open_timer_del(BDRVNBDState *s)
{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- /*
- * s->connection_co is either yielded from nbd_receive_reply or from
- * nbd_co_reconnect_loop()
- */
- if (nbd_client_connected(s)) {
- qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
+ if (s->open_timer) {
+ timer_free(s->open_timer);
+ s->open_timer = NULL;
}
-
- bdrv_inc_in_flight(bs);
-
- /*
- * Need to wait here for the BH to run because the BH must run while the
- * node is still drained.
- */
- aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
}
-static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
+static void open_timer_cb(void *opaque)
{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- s->drained = true;
- qemu_co_sleep_wake(&s->reconnect_sleep);
+ BDRVNBDState *s = opaque;
nbd_co_establish_connection_cancel(s->conn);
-
- reconnect_delay_timer_del(s);
-
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
- s->state = NBD_CLIENT_CONNECTING_NOWAIT;
- qemu_co_queue_restart_all(&s->free_sema);
- }
+ open_timer_del(s);
}
-static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
+static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- s->drained = false;
- if (s->wait_drained_end) {
- s->wait_drained_end = false;
- aio_co_wake(s->connection_co);
- }
+ assert(!s->open_timer);
+ s->open_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
+ QEMU_CLOCK_REALTIME,
+ SCALE_NS,
+ open_timer_cb, s);
+ timer_mod(s->open_timer, expire_time_ns);
}
-
-static void nbd_teardown_connection(BlockDriverState *bs)
+static bool nbd_client_will_reconnect(BDRVNBDState *s)
{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- if (s->ioc) {
- /* finish any pending coroutines */
- qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
- }
-
- s->state = NBD_CLIENT_QUIT;
- if (s->connection_co) {
- qemu_co_sleep_wake(&s->reconnect_sleep);
- nbd_co_establish_connection_cancel(s->conn);
- }
- if (qemu_in_coroutine()) {
- s->teardown_co = qemu_coroutine_self();
- /* connection_co resumes us when it terminates */
- qemu_coroutine_yield();
- s->teardown_co = NULL;
- } else {
- BDRV_POLL_WHILE(bs, s->connection_co);
- }
- assert(!s->connection_co);
-}
-
-static bool nbd_client_connecting(BDRVNBDState *s)
-{
- NBDClientState state = qatomic_load_acquire(&s->state);
- return state == NBD_CLIENT_CONNECTING_WAIT ||
- state == NBD_CLIENT_CONNECTING_NOWAIT;
-}
-
-static bool nbd_client_connecting_wait(BDRVNBDState *s)
-{
- return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
+ /*
+ * Called only after a socket error, so this is not performance sensitive.
+ */
+ QEMU_LOCK_GUARD(&s->requests_lock);
+ return s->state == NBD_CLIENT_CONNECTING_WAIT;
}
/*
}
int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
- Error **errp)
+ bool blocking, Error **errp)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int ret;
+ IO_CODE();
assert(!s->ioc);
- s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
+ s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
if (!s->ioc) {
return -ECONNREFUSED;
}
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
+ bs);
+
ret = nbd_handle_updated_info(s->bs, NULL);
if (ret < 0) {
/*
nbd_send_request(s->ioc, &request);
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+ nbd_yank, bs);
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
qio_channel_set_blocking(s->ioc, false, NULL);
qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
- yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
- bs);
-
/* successfully connected */
- s->state = NBD_CLIENT_CONNECTED;
- qemu_co_queue_restart_all(&s->free_sema);
+ WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
+ s->state = NBD_CLIENT_CONNECTED;
+ }
return 0;
}
-static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
+/* Called with s->requests_lock held. */
+static bool nbd_client_connecting(BDRVNBDState *s)
{
- if (!nbd_client_connecting(s)) {
- return;
- }
-
- /* Wait for completion of all in-flight requests */
-
- qemu_co_mutex_lock(&s->send_mutex);
-
- while (s->in_flight > 0) {
- qemu_co_mutex_unlock(&s->send_mutex);
- nbd_recv_coroutines_wake_all(s);
- s->wait_in_flight = true;
- qemu_coroutine_yield();
- s->wait_in_flight = false;
- qemu_co_mutex_lock(&s->send_mutex);
- }
-
- qemu_co_mutex_unlock(&s->send_mutex);
+ return s->state == NBD_CLIENT_CONNECTING_WAIT ||
+ s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+}
- if (!nbd_client_connecting(s)) {
- return;
- }
+/* Called with s->requests_lock taken. */
+static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
+{
+ int ret;
+ bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
/*
* Now we are sure that nobody is accessing the channel, and no one will
* try until we set the state to CONNECTED.
*/
+ assert(nbd_client_connecting(s));
+ assert(s->in_flight == 1);
+
+ trace_nbd_reconnect_attempt(s->bs->in_flight);
+
+ if (blocking && !s->reconnect_delay_timer) {
+ /*
+ * It's the first reconnect attempt after switching to
+ * NBD_CLIENT_CONNECTING_WAIT
+ */
+ g_assert(s->reconnect_delay);
+ reconnect_delay_timer_init(s,
+ qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+ s->reconnect_delay * NANOSECONDS_PER_SECOND);
+ }
/* Finalize previous connection if any */
if (s->ioc) {
s->ioc = NULL;
}
- nbd_co_do_establish_connection(s->bs, NULL);
+ qemu_mutex_unlock(&s->requests_lock);
+ ret = nbd_co_do_establish_connection(s->bs, blocking, NULL);
+ trace_nbd_reconnect_attempt_result(ret, s->bs->in_flight);
+ qemu_mutex_lock(&s->requests_lock);
+
+ /*
+ * The reconnect attempt is done (maybe successfully, maybe not), so
+ * we no longer need this timer. Delete it so it will not outlive
+ * this I/O request (so draining removes all timers).
+ */
+ reconnect_delay_timer_del(s);
}
-static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
+static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
{
- uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
- uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
-
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
- reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
- s->reconnect_delay * NANOSECONDS_PER_SECOND);
- }
-
- nbd_reconnect_attempt(s);
+ int ret;
+ uint64_t ind = HANDLE_TO_INDEX(s, handle), ind2;
+ QEMU_LOCK_GUARD(&s->receive_mutex);
- while (nbd_client_connecting(s)) {
- if (s->drained) {
- bdrv_dec_in_flight(s->bs);
- s->wait_drained_end = true;
- while (s->drained) {
- /*
- * We may be entered once from nbd_client_attach_aio_context_bh
- * and then from nbd_client_co_drain_end. So here is a loop.
- */
- qemu_coroutine_yield();
- }
- bdrv_inc_in_flight(s->bs);
- } else {
- qemu_co_sleep_ns_wakeable(&s->reconnect_sleep,
- QEMU_CLOCK_REALTIME, timeout);
- if (s->drained) {
- continue;
- }
- if (timeout < max_timeout) {
- timeout *= 2;
- }
+ while (true) {
+ if (s->reply.handle == handle) {
+ /* We are done */
+ return 0;
}
- nbd_reconnect_attempt(s);
- }
-
- reconnect_delay_timer_del(s);
-}
-
-static coroutine_fn void nbd_connection_entry(void *opaque)
-{
- BDRVNBDState *s = opaque;
- uint64_t i;
- int ret = 0;
- Error *local_err = NULL;
+ if (s->reply.handle != 0) {
+ /*
+ * Some other request is being handled now. It should already be
+ * woken by whoever set s->reply.handle (or never wait in this
+ * yield). So, we should not wake it here.
+ */
+ ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+ assert(!s->requests[ind2].receiving);
- while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
- /*
- * The NBD client can only really be considered idle when it has
- * yielded from qio_channel_readv_all_eof(), waiting for data. This is
- * the point where the additional scheduled coroutine entry happens
- * after nbd_client_attach_aio_context().
- *
- * Therefore we keep an additional in_flight reference all the time and
- * only drop it temporarily here.
- */
+ s->requests[ind].receiving = true;
+ qemu_co_mutex_unlock(&s->receive_mutex);
- if (nbd_client_connecting(s)) {
- nbd_co_reconnect_loop(s);
- }
+ qemu_coroutine_yield();
+ /*
+ * We may be woken for 2 reasons:
+ * 1. From this function, executing in parallel coroutine, when our
+ * handle is received.
+ * 2. From nbd_co_receive_one_chunk(), when previous request is
+ * finished and s->reply.handle set to 0.
+ * Anyway, it's OK to lock the mutex and go to the next iteration.
+ */
- if (!nbd_client_connected(s)) {
+ qemu_co_mutex_lock(&s->receive_mutex);
+ assert(!s->requests[ind].receiving);
continue;
}
+ /* We are under mutex and handle is 0. We have to do the dirty work. */
assert(s->reply.handle == 0);
- ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
-
- if (local_err) {
- trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
- error_free(local_err);
- local_err = NULL;
- }
+ ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, NULL);
if (ret <= 0) {
- nbd_channel_error(s, ret ? ret : -EIO);
- continue;
+ ret = ret ? ret : -EIO;
+ nbd_channel_error(s, ret);
+ return ret;
}
-
- /*
- * There's no need for a mutex on the receive side, because the
- * handler acts as a synchronization point and ensures that only
- * one coroutine is called until the reply finishes.
- */
- i = HANDLE_TO_INDEX(s, s->reply.handle);
- if (i >= MAX_NBD_REQUESTS ||
- !s->requests[i].coroutine ||
- !s->requests[i].receiving ||
- (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
- {
+ if (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply) {
nbd_channel_error(s, -EINVAL);
- continue;
+ return -EINVAL;
}
-
- /*
- * We're woken up again by the request itself. Note that there
- * is no race between yielding and reentering connection_co. This
- * is because:
- *
- * - if the request runs on the same AioContext, it is only
- * entered after we yield
- *
- * - if the request runs on a different AioContext, reentering
- * connection_co happens through a bottom half, which can only
- * run after we yield.
- */
- s->requests[i].receiving = false;
- aio_co_wake(s->requests[i].coroutine);
- qemu_coroutine_yield();
- }
-
- qemu_co_queue_restart_all(&s->free_sema);
- nbd_recv_coroutines_wake_all(s);
- bdrv_dec_in_flight(s->bs);
-
- s->connection_co = NULL;
- if (s->ioc) {
- qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
- yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
- nbd_yank, s->bs);
- object_unref(OBJECT(s->ioc));
- s->ioc = NULL;
- }
-
- if (s->teardown_co) {
- aio_co_wake(s->teardown_co);
+ ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
+ nbd_channel_error(s, -EINVAL);
+ return -EINVAL;
+ }
+ if (s->reply.handle == handle) {
+ /* We are done */
+ return 0;
+ }
+ nbd_recv_coroutine_wake_one(&s->requests[ind2]);
}
- aio_wait_kick();
}
-static int nbd_co_send_request(BlockDriverState *bs,
- NBDRequest *request,
- QEMUIOVector *qiov)
+static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
+ NBDRequest *request,
+ QEMUIOVector *qiov)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int rc, i = -1;
- qemu_co_mutex_lock(&s->send_mutex);
- while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
- qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
- }
-
- if (!nbd_client_connected(s)) {
- rc = -EIO;
- goto err;
+ qemu_mutex_lock(&s->requests_lock);
+ while (s->in_flight == MAX_NBD_REQUESTS ||
+ (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
+ qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
}
s->in_flight++;
+ if (s->state != NBD_CLIENT_CONNECTED) {
+ if (nbd_client_connecting(s)) {
+ nbd_reconnect_attempt(s);
+ qemu_co_queue_restart_all(&s->free_sema);
+ }
+ if (s->state != NBD_CLIENT_CONNECTED) {
+ rc = -EIO;
+ goto err;
+ }
+ }
for (i = 0; i < MAX_NBD_REQUESTS; i++) {
if (s->requests[i].coroutine == NULL) {
}
}
- g_assert(qemu_in_coroutine());
assert(i < MAX_NBD_REQUESTS);
-
s->requests[i].coroutine = qemu_coroutine_self();
s->requests[i].offset = request->from;
s->requests[i].receiving = false;
+ qemu_mutex_unlock(&s->requests_lock);
+ qemu_co_mutex_lock(&s->send_mutex);
request->handle = INDEX_TO_HANDLE(s, i);
assert(s->ioc);
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
- if (nbd_client_connected(s) && rc >= 0) {
- if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
- NULL) < 0) {
- rc = -EIO;
- }
- } else if (rc >= 0) {
+ if (rc >= 0 && qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
+ NULL) < 0) {
rc = -EIO;
}
qio_channel_set_cork(s->ioc, false);
} else {
rc = nbd_send_request(s->ioc, request);
}
+ qemu_co_mutex_unlock(&s->send_mutex);
-err:
if (rc < 0) {
- nbd_channel_error(s, rc);
+ qemu_mutex_lock(&s->requests_lock);
+err:
+ nbd_channel_error_locked(s, rc);
if (i != -1) {
s->requests[i].coroutine = NULL;
- s->in_flight--;
- }
- if (s->in_flight == 0 && s->wait_in_flight) {
- aio_co_wake(s->connection_co);
- } else {
- qemu_co_queue_next(&s->free_sema);
}
+ s->in_flight--;
+ qemu_co_queue_next(&s->free_sema);
+ qemu_mutex_unlock(&s->requests_lock);
}
- qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
return 0;
}
-static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
- uint64_t orig_offset,
- QEMUIOVector *qiov, Error **errp)
+static int coroutine_fn
+nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
+ QEMUIOVector *qiov, Error **errp)
{
QEMUIOVector sub_qiov;
uint64_t offset;
}
*request_ret = 0;
- /* Wait until we're woken up by nbd_connection_entry. */
- s->requests[i].receiving = true;
- qemu_coroutine_yield();
- assert(!s->requests[i].receiving);
- if (!nbd_client_connected(s)) {
+ ret = nbd_receive_replies(s, handle);
+ if (ret < 0) {
error_setg(errp, "Connection closed");
return -EIO;
}
}
s->reply.handle = 0;
- if (s->connection_co && !s->wait_in_flight) {
- /*
- * We must check s->wait_in_flight, because we may entered by
- * nbd_recv_coroutines_wake_all(), in this case we should not
- * wake connection_co here, it will woken by last request.
- */
- aio_co_wake(s->connection_co);
- }
+ nbd_recv_coroutines_wake(s);
return ret;
}
* nbd_reply_chunk_iter_receive
* The pointer stored in @payload requires g_free() to free it.
*/
-static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
- NBDReplyChunkIter *iter,
- uint64_t handle,
- QEMUIOVector *qiov, NBDReply *reply,
- void **payload)
+static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
+ NBDReplyChunkIter *iter,
+ uint64_t handle,
+ QEMUIOVector *qiov,
+ NBDReply *reply,
+ void **payload)
{
int ret, request_ret;
NBDReply local_reply;
NBDStructuredReplyChunk *chunk;
Error *local_err = NULL;
- if (!nbd_client_connected(s)) {
- error_setg(&local_err, "Connection closed");
- nbd_iter_channel_error(iter, -EIO, &local_err);
- goto break_loop;
- }
if (iter->done) {
/* Previous iteration was last. */
}
/* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
- if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
+ if (nbd_reply_is_simple(reply) || iter->ret < 0) {
goto break_loop;
}
return true;
break_loop:
+ qemu_mutex_lock(&s->requests_lock);
s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
- qemu_co_mutex_lock(&s->send_mutex);
s->in_flight--;
- if (s->in_flight == 0 && s->wait_in_flight) {
- aio_co_wake(s->connection_co);
- } else {
- qemu_co_queue_next(&s->free_sema);
- }
- qemu_co_mutex_unlock(&s->send_mutex);
+ qemu_co_queue_next(&s->free_sema);
+ qemu_mutex_unlock(&s->requests_lock);
return false;
}
-static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
- int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
+ int *request_ret, Error **errp)
{
NBDReplyChunkIter iter;
return iter.ret;
}
-static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
- uint64_t offset, QEMUIOVector *qiov,
- int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
+ uint64_t offset, QEMUIOVector *qiov,
+ int *request_ret, Error **errp)
{
NBDReplyChunkIter iter;
NBDReply reply;
return iter.ret;
}
-static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
- uint64_t handle, uint64_t length,
- NBDExtent *extent,
- int *request_ret, Error **errp)
+static int coroutine_fn nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
+ uint64_t handle, uint64_t length,
+ NBDExtent *extent,
+ int *request_ret, Error **errp)
{
NBDReplyChunkIter iter;
NBDReply reply;
return iter.ret;
}
-static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
- QEMUIOVector *write_qiov)
+static int coroutine_fn nbd_co_request(BlockDriverState *bs, NBDRequest *request,
+ QEMUIOVector *write_qiov)
{
int ret, request_ret;
Error *local_err = NULL;
error_free(local_err);
local_err = NULL;
}
- } while (ret < 0 && nbd_client_connecting_wait(s));
+ } while (ret < 0 && nbd_client_will_reconnect(s));
return ret ? ret : request_ret;
}
-static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
- uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn nbd_client_co_preadv(BlockDriverState *bs, int64_t offset,
+ int64_t bytes, QEMUIOVector *qiov,
+ BdrvRequestFlags flags)
{
int ret, request_ret;
Error *local_err = NULL;
};
assert(bytes <= NBD_MAX_BUFFER_SIZE);
- assert(!flags);
if (!bytes) {
return 0;
error_free(local_err);
local_err = NULL;
}
- } while (ret < 0 && nbd_client_connecting_wait(s));
+ } while (ret < 0 && nbd_client_will_reconnect(s));
return ret ? ret : request_ret;
}
-static int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
- uint64_t bytes, QEMUIOVector *qiov, int flags)
+static int coroutine_fn nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset,
+ int64_t bytes, QEMUIOVector *qiov,
+ BdrvRequestFlags flags)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
NBDRequest request = {
return nbd_co_request(bs, &request, qiov);
}
-static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
- int bytes, BdrvRequestFlags flags)
+static int coroutine_fn nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
+ int64_t bytes, BdrvRequestFlags flags)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
NBDRequest request = {
.type = NBD_CMD_WRITE_ZEROES,
.from = offset,
- .len = bytes,
+ .len = bytes, /* .len is uint32_t actually */
};
+ assert(bytes <= UINT32_MAX); /* rely on max_pwrite_zeroes */
+
assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
return -ENOTSUP;
return nbd_co_request(bs, &request, NULL);
}
-static int nbd_client_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_client_co_flush(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
NBDRequest request = { .type = NBD_CMD_FLUSH };
return nbd_co_request(bs, &request, NULL);
}
-static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
- int bytes)
+static int coroutine_fn nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
+ int64_t bytes)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
NBDRequest request = {
.type = NBD_CMD_TRIM,
.from = offset,
- .len = bytes,
+ .len = bytes, /* len is uint32_t */
};
+ assert(bytes <= UINT32_MAX); /* rely on max_pdiscard */
+
assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
return 0;
error_free(local_err);
local_err = NULL;
}
- } while (ret < 0 && nbd_client_connecting_wait(s));
+ } while (ret < 0 && nbd_client_will_reconnect(s));
if (ret < 0 || request_ret < 0) {
return ret ? ret : request_ret;
BlockDriverState *bs = opaque;
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
+ QEMU_LOCK_GUARD(&s->requests_lock);
qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ s->state = NBD_CLIENT_QUIT;
}
static void nbd_client_close(BlockDriverState *bs)
return NULL;
}
- if (creds->endpoint != QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT) {
- error_setg(errp,
- "Expecting TLS credentials with a client endpoint");
+ if (!qcrypto_tls_creds_check_endpoint(creds,
+ QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
+ errp)) {
return NULL;
}
object_ref(obj);
.type = QEMU_OPT_STRING,
.help = "ID of the TLS credentials to use",
},
+ {
+ .name = "tls-hostname",
+ .type = QEMU_OPT_STRING,
+ .help = "Override hostname for validating TLS x509 certificate",
+ },
{
.name = "x-dirty-bitmap",
.type = QEMU_OPT_STRING,
"future requests before a successful reconnect will "
"immediately fail. Default 0",
},
+ {
+ .name = "open-timeout",
+ .type = QEMU_OPT_NUMBER,
+ .help = "In seconds. If zero, the nbd driver tries the connection "
+ "only once, and fails to open if the connection fails. "
+ "If non-zero, the nbd driver will repeat connection "
+ "attempts until successful or until @open-timeout seconds "
+ "have elapsed. Default 0",
+ },
{ /* end of list */ }
},
};
goto error;
}
- /* TODO SOCKET_ADDRESS_KIND_FD where fd has AF_INET or AF_INET6 */
- if (s->saddr->type != SOCKET_ADDRESS_TYPE_INET) {
- error_setg(errp, "TLS only supported over IP sockets");
- goto error;
+ s->tlshostname = g_strdup(qemu_opt_get(opts, "tls-hostname"));
+ if (!s->tlshostname &&
+ s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
+ s->tlshostname = g_strdup(s->saddr->u.inet.host);
}
- s->hostname = s->saddr->u.inet.host;
}
s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
}
s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
+ s->open_timeout = qemu_opt_get_number(opts, "open-timeout", 0);
ret = 0;
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
s->bs = bs;
- qemu_co_mutex_init(&s->send_mutex);
+ qemu_mutex_init(&s->requests_lock);
qemu_co_queue_init(&s->free_sema);
+ qemu_co_mutex_init(&s->send_mutex);
+ qemu_co_mutex_init(&s->receive_mutex);
if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
return -EEXIST;
}
s->conn = nbd_client_connection_new(s->saddr, true, s->export,
- s->x_dirty_bitmap, s->tlscreds);
+ s->x_dirty_bitmap, s->tlscreds,
+ s->tlshostname);
+
+ if (s->open_timeout) {
+ nbd_client_connection_enable_retry(s->conn);
+ open_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+ s->open_timeout * NANOSECONDS_PER_SECOND);
+ }
- /* TODO: Configurable retry-until-timeout behaviour. */
- ret = nbd_do_establish_connection(bs, errp);
+ s->state = NBD_CLIENT_CONNECTING_WAIT;
+ ret = nbd_do_establish_connection(bs, true, errp);
if (ret < 0) {
goto fail;
}
- s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
- bdrv_inc_in_flight(bs);
- aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
+ /*
+ * The connect attempt is done, so we no longer need this timer.
+ * Delete it, because we do not want it to be around when this node
+ * is drained or closed.
+ */
+ open_timer_del(s);
+
+ nbd_client_connection_enable_retry(s->conn);
return 0;
fail:
+ open_timer_del(s);
nbd_clear_bdrvstate(bs);
return ret;
}
-static int nbd_co_flush(BlockDriverState *bs)
+static int coroutine_fn nbd_co_flush(BlockDriverState *bs)
{
return nbd_client_co_flush(bs);
}
return 0;
}
-static int64_t nbd_getlength(BlockDriverState *bs)
+static int64_t coroutine_fn nbd_co_getlength(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
"port",
"export",
"tls-creds",
+ "tls-hostname",
"server.",
NULL
reconnect_delay_timer_del(s);
+ qemu_mutex_lock(&s->requests_lock);
if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
- qemu_co_queue_restart_all(&s->free_sema);
+ }
+ qemu_mutex_unlock(&s->requests_lock);
+
+ nbd_co_establish_connection_cancel(s->conn);
+}
+
+static void nbd_attach_aio_context(BlockDriverState *bs,
+ AioContext *new_context)
+{
+ BDRVNBDState *s = bs->opaque;
+
+ /* The open_timer is used only during nbd_open() */
+ assert(!s->open_timer);
+
+ /*
+ * The reconnect_delay_timer is scheduled in I/O paths when the
+ * connection is lost, to cancel the reconnection attempt after a
+ * given time. Once this attempt is done (successfully or not),
+ * nbd_reconnect_attempt() ensures the timer is deleted before the
+ * respective I/O request is resumed.
+ * Since the AioContext can only be changed when a node is drained,
+ * the reconnect_delay_timer cannot be active here.
+ */
+ assert(!s->reconnect_delay_timer);
+
+ if (s->ioc) {
+ qio_channel_attach_aio_context(s->ioc, new_context);
+ }
+}
+
+static void nbd_detach_aio_context(BlockDriverState *bs)
+{
+ BDRVNBDState *s = bs->opaque;
+
+ assert(!s->open_timer);
+ assert(!s->reconnect_delay_timer);
+
+ if (s->ioc) {
+ qio_channel_detach_aio_context(s->ioc);
}
}
.bdrv_co_pdiscard = nbd_client_co_pdiscard,
.bdrv_refresh_limits = nbd_refresh_limits,
.bdrv_co_truncate = nbd_co_truncate,
- .bdrv_getlength = nbd_getlength,
- .bdrv_detach_aio_context = nbd_client_detach_aio_context,
- .bdrv_attach_aio_context = nbd_client_attach_aio_context,
- .bdrv_co_drain_begin = nbd_client_co_drain_begin,
- .bdrv_co_drain_end = nbd_client_co_drain_end,
+ .bdrv_co_getlength = nbd_co_getlength,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
.strong_runtime_opts = nbd_strong_runtime_opts,
.bdrv_cancel_in_flight = nbd_cancel_in_flight,
+
+ .bdrv_attach_aio_context = nbd_attach_aio_context,
+ .bdrv_detach_aio_context = nbd_detach_aio_context,
};
static BlockDriver bdrv_nbd_tcp = {
.bdrv_co_pdiscard = nbd_client_co_pdiscard,
.bdrv_refresh_limits = nbd_refresh_limits,
.bdrv_co_truncate = nbd_co_truncate,
- .bdrv_getlength = nbd_getlength,
- .bdrv_detach_aio_context = nbd_client_detach_aio_context,
- .bdrv_attach_aio_context = nbd_client_attach_aio_context,
- .bdrv_co_drain_begin = nbd_client_co_drain_begin,
- .bdrv_co_drain_end = nbd_client_co_drain_end,
+ .bdrv_co_getlength = nbd_co_getlength,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
.strong_runtime_opts = nbd_strong_runtime_opts,
.bdrv_cancel_in_flight = nbd_cancel_in_flight,
+
+ .bdrv_attach_aio_context = nbd_attach_aio_context,
+ .bdrv_detach_aio_context = nbd_detach_aio_context,
};
static BlockDriver bdrv_nbd_unix = {
.bdrv_co_pdiscard = nbd_client_co_pdiscard,
.bdrv_refresh_limits = nbd_refresh_limits,
.bdrv_co_truncate = nbd_co_truncate,
- .bdrv_getlength = nbd_getlength,
- .bdrv_detach_aio_context = nbd_client_detach_aio_context,
- .bdrv_attach_aio_context = nbd_client_attach_aio_context,
- .bdrv_co_drain_begin = nbd_client_co_drain_begin,
- .bdrv_co_drain_end = nbd_client_co_drain_end,
+ .bdrv_co_getlength = nbd_co_getlength,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
.strong_runtime_opts = nbd_strong_runtime_opts,
.bdrv_cancel_in_flight = nbd_cancel_in_flight,
+
+ .bdrv_attach_aio_context = nbd_attach_aio_context,
+ .bdrv_detach_aio_context = nbd_detach_aio_context,
};
static void bdrv_nbd_init(void)