#include "ram.h"
#include "migration.h"
#include "socket.h"
+#include "tls.h"
#include "qemu-file.h"
#include "trace.h"
#include "multifd.h"
MultiFDPages_t *pages = multifd_send_state->pages;
uint64_t transferred;
- if (atomic_read(&multifd_send_state->exiting)) {
+ if (qatomic_read(&multifd_send_state->exiting)) {
return -1;
}
* threads at the same time, we can end calling this function
* twice.
*/
- if (atomic_xchg(&multifd_send_state->exiting, 1)) {
+ if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
return;
}
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
+ g_free(p->tls_hostname);
+ p->tls_hostname = NULL;
multifd_pages_clear(p->pages);
p->pages = NULL;
p->packet_len = 0;
while (true) {
qemu_sem_wait(&p->sem);
- if (atomic_read(&multifd_send_state->exiting)) {
+ if (qatomic_read(&multifd_send_state->exiting)) {
break;
}
qemu_mutex_lock(&p->mutex);
return NULL;
}
+static bool multifd_channel_connect(MultiFDSendParams *p,
+ QIOChannel *ioc,
+ Error *error);
+
+static void multifd_tls_outgoing_handshake(QIOTask *task,
+ gpointer opaque)
+{
+ MultiFDSendParams *p = opaque;
+ QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
+ Error *err = NULL;
+
+ if (qio_task_propagate_error(task, &err)) {
+ trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
+ } else {
+ trace_multifd_tls_outgoing_handshake_complete(ioc);
+ }
+ multifd_channel_connect(p, ioc, err);
+}
+
+static void multifd_tls_channel_connect(MultiFDSendParams *p,
+ QIOChannel *ioc,
+ Error **errp)
+{
+ MigrationState *s = migrate_get_current();
+ const char *hostname = p->tls_hostname;
+ QIOChannelTLS *tioc;
+
+ tioc = migration_tls_client_create(s, ioc, hostname, errp);
+ if (!tioc) {
+ return;
+ }
+
+ trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
+ qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
+ qio_channel_tls_handshake(tioc,
+ multifd_tls_outgoing_handshake,
+ p,
+ NULL,
+ NULL);
+
+}
+
+static bool multifd_channel_connect(MultiFDSendParams *p,
+ QIOChannel *ioc,
+ Error *error)
+{
+ MigrationState *s = migrate_get_current();
+
+ trace_multifd_set_outgoing_channel(
+ ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error);
+
+ if (!error) {
+ if (s->parameters.tls_creds &&
+ *s->parameters.tls_creds &&
+ !object_dynamic_cast(OBJECT(ioc),
+ TYPE_QIO_CHANNEL_TLS)) {
+ multifd_tls_channel_connect(p, ioc, &error);
+ if (!error) {
+ /*
+ * tls_channel_connect will call back to this
+ * function after the TLS handshake,
+ * so we mustn't call multifd_send_thread until then
+ */
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ /* update for tls qio channel */
+ p->c = ioc;
+ qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+ QEMU_THREAD_JOINABLE);
+ }
+ return false;
+ }
+
+ return true;
+}
+
+static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
+ QIOChannel *ioc, Error *err)
+{
+ migrate_set_error(migrate_get_current(), err);
+ /* Error happen, we need to tell who pay attention to me */
+ qemu_sem_post(&multifd_send_state->channels_ready);
+ qemu_sem_post(&p->sem_sync);
+ /*
+ * Although multifd_send_thread is not created, but main migration
+ * thread neet to judge whether it is running, so we need to mark
+ * its status.
+ */
+ p->quit = true;
+ object_unref(OBJECT(ioc));
+ error_free(err);
+}
+
static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
{
MultiFDSendParams *p = opaque;
trace_multifd_new_send_channel_async(p->id);
if (qio_task_propagate_error(task, &local_err)) {
- migrate_set_error(migrate_get_current(), local_err);
- /* Error happen, we need to tell who pay attention to me */
- qemu_sem_post(&multifd_send_state->channels_ready);
- qemu_sem_post(&p->sem_sync);
- /*
- * Although multifd_send_thread is not created, but main migration
- * thread needs to judge whether it is running, so we need to mark
- * its status.
- */
- p->quit = true;
- object_unref(OBJECT(sioc));
- error_free(local_err);
+ goto cleanup;
} else {
p->c = QIO_CHANNEL(sioc);
qio_channel_set_delay(p->c, false);
p->running = true;
- qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
- QEMU_THREAD_JOINABLE);
+ if (multifd_channel_connect(p, sioc, local_err)) {
+ goto cleanup;
+ }
+ return;
}
+
+cleanup:
+ multifd_new_send_channel_cleanup(p, sioc, local_err);
}
int multifd_save_setup(Error **errp)
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
uint8_t i;
+ MigrationState *s;
if (!migrate_use_multifd()) {
return 0;
}
+ s = migrate_get_current();
thread_count = migrate_multifd_channels();
multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
multifd_send_state->pages = multifd_pages_init(page_count);
qemu_sem_init(&multifd_send_state->channels_ready, 0);
- atomic_set(&multifd_send_state->exiting, 0);
+ qatomic_set(&multifd_send_state->exiting, 0);
multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
for (i = 0; i < thread_count; i++) {
p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
p->packet->version = cpu_to_be32(MULTIFD_VERSION);
p->name = g_strdup_printf("multifdsend_%d", i);
+ p->tls_hostname = g_strdup(s->hostname);
socket_send_channel_create(multifd_new_send_channel_async, p);
}
thread_count = migrate_multifd_channels();
multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
- atomic_set(&multifd_recv_state->count, 0);
+ qatomic_set(&multifd_recv_state->count, 0);
qemu_sem_init(&multifd_recv_state->sem_sync, 0);
multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
return true;
}
- return thread_count == atomic_read(&multifd_recv_state->count);
+ return thread_count == qatomic_read(&multifd_recv_state->count);
}
/*
error_propagate_prepend(errp, local_err,
"failed to receive packet"
" via multifd channel %d: ",
- atomic_read(&multifd_recv_state->count));
+ qatomic_read(&multifd_recv_state->count));
return false;
}
trace_multifd_recv_new_channel(id);
p->running = true;
qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
QEMU_THREAD_JOINABLE);
- atomic_inc(&multifd_recv_state->count);
- return atomic_read(&multifd_recv_state->count) ==
+ qatomic_inc(&multifd_recv_state->count);
+ return qatomic_read(&multifd_recv_state->count) ==
migrate_multifd_channels();
}