static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg = {};
+ size_t size = sizeof(msg);
int ret;
msg.magic = cpu_to_be32(MULTIFD_MAGIC);
msg.id = p->id;
memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
- ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+ ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
if (ret != 0) {
return -1;
}
+ stat64_add(&mig_stats.multifd_bytes, size);
return 0;
}
}
if (msg.id > migrate_multifd_channels()) {
- error_setg(errp, "multifd: received channel version %u "
- "expected %u", msg.version, MULTIFD_VERSION);
+ error_setg(errp, "multifd: received channel id %u is greater than "
+ "number of channels %u", msg.id, migrate_multifd_channels());
return -1;
}
return msg.id;
}
-static MultiFDPages_t *multifd_pages_init(size_t size)
+static MultiFDPages_t *multifd_pages_init(uint32_t n)
{
MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
- pages->allocated = size;
- pages->offset = g_new0(ram_addr_t, size);
+ pages->allocated = n;
+ pages->offset = g_new0(ram_addr_t, n);
return pages;
}
{
pages->num = 0;
pages->allocated = 0;
- pages->packet_num = 0;
pages->block = NULL;
g_free(pages->offset);
pages->offset = NULL;
* false.
*/
-static int multifd_send_pages(QEMUFile *f)
+static int multifd_send_pages(void)
{
int i;
static int next_channel;
MultiFDSendParams *p = NULL; /* make happy gcc */
MultiFDPages_t *pages = multifd_send_state->pages;
- uint64_t transferred;
if (qatomic_read(&multifd_send_state->exiting)) {
return -1;
p->packet_num = multifd_send_state->packet_num++;
multifd_send_state->pages = p->pages;
p->pages = pages;
- transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len;
- qemu_file_acct_rate_limit(f, transferred);
qemu_mutex_unlock(&p->mutex);
- stat64_add(&mig_stats.transferred, transferred);
- stat64_add(&mig_stats.multifd_bytes, transferred);
qemu_sem_post(&p->sem);
return 1;
}
-int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
+int multifd_queue_page(RAMBlock *block, ram_addr_t offset)
{
MultiFDPages_t *pages = multifd_send_state->pages;
bool changed = false;
changed = true;
}
- if (multifd_send_pages(f) < 0) {
+ if (multifd_send_pages() < 0) {
return -1;
}
if (changed) {
- return multifd_queue_page(f, block, offset);
+ return multifd_queue_page(block, offset);
}
return 1;
}
}
+static int multifd_send_channel_destroy(QIOChannel *send)
+{
+ return socket_send_channel_destroy(send);
+}
+
void multifd_save_cleanup(void)
{
int i;
if (p->registered_yank) {
migration_ioc_unregister_yank(p->c);
}
- socket_send_channel_destroy(p->c);
+ multifd_send_channel_destroy(p->c);
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
return ret;
}
-int multifd_send_sync_main(QEMUFile *f)
+int multifd_send_sync_main(void)
{
int i;
bool flush_zero_copy;
return 0;
}
if (multifd_send_state->pages->num) {
- if (multifd_send_pages(f) < 0) {
+ if (multifd_send_pages() < 0) {
error_report("%s: multifd_send_pages fail", __func__);
return -1;
}
int ret = 0;
bool use_zero_copy_send = migrate_zero_copy_send();
- thread = MigrationThreadAdd(p->name, qemu_get_thread_id());
+ thread = migration_threads_add(p->name, qemu_get_thread_id());
trace_multifd_send_thread_start(p->id);
rcu_register_thread();
break;
}
+ stat64_add(&mig_stats.multifd_bytes,
+ p->next_packet_size + p->packet_len);
+ p->next_packet_size = 0;
qemu_mutex_lock(&p->mutex);
p->pending_job--;
qemu_mutex_unlock(&p->mutex);
if (flags & MULTIFD_FLAG_SYNC) {
qemu_sem_post(&p->sem_sync);
}
- } else if (p->quit) {
- qemu_mutex_unlock(&p->mutex);
- break;
} else {
qemu_mutex_unlock(&p->mutex);
/* sometimes there are spurious wakeups */
}
out:
- if (local_err) {
+ if (ret) {
+ assert(local_err);
trace_multifd_send_error(p->id);
multifd_send_terminate_threads(local_err);
- error_free(local_err);
- }
-
- /*
- * Error happen, I will exit, but I can't just leave, tell
- * who pay attention to me.
- */
- if (ret != 0) {
qemu_sem_post(&p->sem_sync);
qemu_sem_post(&multifd_send_state->channels_ready);
+ error_free(local_err);
}
qemu_mutex_lock(&p->mutex);
qemu_mutex_unlock(&p->mutex);
rcu_unregister_thread();
- MigrationThreadDel(thread);
+ migration_threads_remove(thread);
trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
return NULL;
static bool multifd_channel_connect(MultiFDSendParams *p,
QIOChannel *ioc,
- Error *error);
+ Error **errp);
static void multifd_tls_outgoing_handshake(QIOTask *task,
gpointer opaque)
QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL;
- if (qio_task_propagate_error(task, &err)) {
- trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
- } else {
+ if (!qio_task_propagate_error(task, &err)) {
trace_multifd_tls_outgoing_handshake_complete(ioc);
+ if (multifd_channel_connect(p, ioc, &err)) {
+ return;
+ }
}
- if (!multifd_channel_connect(p, ioc, err)) {
- /*
- * Error happen, mark multifd_send_thread status as 'quit' although it
- * is not created, and then tell who pay attention to me.
- */
- p->quit = true;
- qemu_sem_post(&multifd_send_state->channels_ready);
- qemu_sem_post(&p->sem_sync);
- }
+ trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
+
+ migrate_set_error(migrate_get_current(), err);
+ /*
+ * Error happen, mark multifd_send_thread status as 'quit' although it
+ * is not created, and then tell who pay attention to me.
+ */
+ p->quit = true;
+ qemu_sem_post(&multifd_send_state->channels_ready);
+ qemu_sem_post(&p->sem_sync);
+ error_free(err);
}
static void *multifd_tls_handshake_thread(void *opaque)
return NULL;
}
-static void multifd_tls_channel_connect(MultiFDSendParams *p,
+static bool multifd_tls_channel_connect(MultiFDSendParams *p,
QIOChannel *ioc,
Error **errp)
{
tioc = migration_tls_client_create(ioc, hostname, errp);
if (!tioc) {
- return;
+ return false;
}
object_unref(OBJECT(ioc));
qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
multifd_tls_handshake_thread, p,
QEMU_THREAD_JOINABLE);
+ return true;
}
static bool multifd_channel_connect(MultiFDSendParams *p,
QIOChannel *ioc,
- Error *error)
+ Error **errp)
{
trace_multifd_set_outgoing_channel(
ioc, object_get_typename(OBJECT(ioc)),
- migrate_get_current()->hostname, error);
+ migrate_get_current()->hostname);
- if (error) {
- return false;
- }
if (migrate_channel_requires_tls_upgrade(ioc)) {
- multifd_tls_channel_connect(p, ioc, &error);
- if (!error) {
- /*
- * tls_channel_connect will call back to this
- * function after the TLS handshake,
- * so we mustn't call multifd_send_thread until then
- */
- return true;
- } else {
- return false;
- }
- } else {
- migration_ioc_register_yank(ioc);
- p->registered_yank = true;
- p->c = ioc;
- qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
- QEMU_THREAD_JOINABLE);
+ /*
+ * tls_channel_connect will call back to this
+ * function after the TLS handshake,
+ * so we mustn't call multifd_send_thread until then
+ */
+ return multifd_tls_channel_connect(p, ioc, errp);
}
+
+ migration_ioc_register_yank(ioc);
+ p->registered_yank = true;
+ p->c = ioc;
+ qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
+ QEMU_THREAD_JOINABLE);
return true;
}
qemu_sem_post(&p->sem_sync);
/*
* Although multifd_send_thread is not created, but main migration
- * thread neet to judge whether it is running, so we need to mark
+ * thread need to judge whether it is running, so we need to mark
* its status.
*/
p->quit = true;
static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
{
MultiFDSendParams *p = opaque;
- QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+ QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *local_err = NULL;
trace_multifd_new_send_channel_async(p->id);
if (!qio_task_propagate_error(task, &local_err)) {
- p->c = QIO_CHANNEL(sioc);
- qio_channel_set_delay(p->c, false);
+ qio_channel_set_delay(ioc, false);
p->running = true;
- if (multifd_channel_connect(p, sioc, local_err)) {
+ if (multifd_channel_connect(p, ioc, &local_err)) {
return;
}
}
- multifd_new_send_channel_cleanup(p, sioc, local_err);
+ trace_multifd_new_send_channel_async_error(p->id, local_err);
+ multifd_new_send_channel_cleanup(p, ioc, local_err);
+}
+
+static void multifd_new_send_channel_create(gpointer opaque)
+{
+ socket_send_channel_create(multifd_new_send_channel_async, opaque);
}
int multifd_save_setup(Error **errp)
p->write_flags = 0;
}
- socket_send_channel_create(multifd_new_send_channel_async, p);
+ multifd_new_send_channel_create(p);
}
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- Error *local_err = NULL;
int ret;
- ret = multifd_send_state->ops->send_setup(p, &local_err);
+ ret = multifd_send_state->ops->send_setup(p, errp);
if (ret) {
- error_propagate(errp, local_err);
return ret;
}
}
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
- Error *local_err = NULL;
int ret;
- ret = multifd_recv_state->ops->recv_setup(p, &local_err);
+ ret = multifd_recv_state->ops->recv_setup(p, errp);
if (ret) {
- error_propagate(errp, local_err);
return ret;
}
}