]>
Commit | Line | Data |
---|---|---|
70eb2c07 CX |
1 | /* |
2 | * Sharing QEMU devices via vhost-user protocol | |
3 | * | |
4 | * Copyright (c) Coiby Xu <coiby.xu@gmail.com>. | |
5 | * Copyright (c) 2020 Red Hat, Inc. | |
6 | * | |
7 | * This work is licensed under the terms of the GNU GPL, version 2 or | |
8 | * later. See the COPYING file in the top-level directory. | |
9 | */ | |
10 | #include "qemu/osdep.h" | |
5feed38c | 11 | #include "qemu/error-report.h" |
70eb2c07 | 12 | #include "qemu/main-loop.h" |
80a06cc5 | 13 | #include "qemu/vhost-user-server.h" |
7185c857 | 14 | #include "block/aio-wait.h" |
70eb2c07 | 15 | |
7185c857 SH |
16 | /* |
17 | * Theory of operation: | |
18 | * | |
19 | * VuServer is started and stopped by vhost_user_server_start() and | |
20 | * vhost_user_server_stop() from the main loop thread. Starting the server | |
21 | * opens a vhost-user UNIX domain socket and listens for incoming connections. | |
22 | * Only one connection is allowed at a time. | |
23 | * | |
24 | * The connection is handled by the vu_client_trip() coroutine in the | |
25 | * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop | |
26 | * where libvhost-user calls vu_message_read() to receive the next vhost-user | |
27 | * protocol messages over the UNIX domain socket. | |
28 | * | |
29 | * When virtqueues are set up libvhost-user calls set_watch() to monitor kick | |
30 | * fds. These fds are also handled in the VuServer->ctx AioContext. | |
31 | * | |
32 | * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down | |
33 | * the socket connection. Shutting down the socket connection causes | |
34 | * vu_message_read() to fail since no more data can be received from the socket. | |
35 | * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop | |
36 | * libvhost-user before terminating the coroutine. vu_deinit() calls | |
37 | * remove_watch() to stop monitoring kick fds and this stops virtqueue | |
38 | * processing. | |
39 | * | |
40 | * When vu_client_trip() has finished cleaning up it schedules a BH in the main | |
41 | * loop thread to accept the next client connection. | |
42 | * | |
43 | * When libvhost-user detects an error it calls panic_cb() and sets the | |
44 | * dev->broken flag. Both vu_client_trip() and kick fd processing stop when | |
45 | * the dev->broken flag is set. | |
46 | * | |
47 | * It is possible to switch AioContexts using | |
48 | * vhost_user_server_detach_aio_context() and | |
49 | * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old | |
50 | * AioContext and resume monitoring in the new AioContext. The vu_client_trip() | |
51 | * coroutine remains in a yielded state during the switch. This is made | |
52 | * possible by QIOChannel's support for spurious coroutine re-entry in | |
53 | * qio_channel_yield(). The coroutine will restart I/O when re-entered from the | |
54 | * new AioContext. | |
55 | */ | |
56 | ||
70eb2c07 CX |
57 | static void vmsg_close_fds(VhostUserMsg *vmsg) |
58 | { | |
59 | int i; | |
60 | for (i = 0; i < vmsg->fd_num; i++) { | |
61 | close(vmsg->fds[i]); | |
62 | } | |
63 | } | |
64 | ||
65 | static void vmsg_unblock_fds(VhostUserMsg *vmsg) | |
66 | { | |
67 | int i; | |
68 | for (i = 0; i < vmsg->fd_num; i++) { | |
ff5927ba | 69 | qemu_socket_set_nonblock(vmsg->fds[i]); |
70eb2c07 CX |
70 | } |
71 | } | |
72 | ||
70eb2c07 CX |
73 | static void panic_cb(VuDev *vu_dev, const char *buf) |
74 | { | |
7185c857 | 75 | error_report("vu_panic: %s", buf); |
70eb2c07 CX |
76 | } |
77 | ||
75d33e85 | 78 | void vhost_user_server_inc_in_flight(VuServer *server) |
520d8b40 KW |
79 | { |
80 | assert(!server->wait_idle); | |
8f5e9a8e | 81 | qatomic_inc(&server->in_flight); |
520d8b40 KW |
82 | } |
83 | ||
75d33e85 | 84 | void vhost_user_server_dec_in_flight(VuServer *server) |
520d8b40 | 85 | { |
8f5e9a8e SH |
86 | if (qatomic_fetch_dec(&server->in_flight) == 1) { |
87 | if (server->wait_idle) { | |
88 | aio_co_wake(server->co_trip); | |
89 | } | |
520d8b40 KW |
90 | } |
91 | } | |
92 | ||
8f5e9a8e SH |
93 | bool vhost_user_server_has_in_flight(VuServer *server) |
94 | { | |
95 | return qatomic_load_acquire(&server->in_flight) > 0; | |
96 | } | |
97 | ||
70eb2c07 CX |
98 | static bool coroutine_fn |
99 | vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg) | |
100 | { | |
101 | struct iovec iov = { | |
102 | .iov_base = (char *)vmsg, | |
103 | .iov_len = VHOST_USER_HDR_SIZE, | |
104 | }; | |
105 | int rc, read_bytes = 0; | |
106 | Error *local_err = NULL; | |
70eb2c07 | 107 | const size_t max_fds = G_N_ELEMENTS(vmsg->fds); |
70eb2c07 CX |
108 | VuServer *server = container_of(vu_dev, VuServer, vu_dev); |
109 | QIOChannel *ioc = server->ioc; | |
110 | ||
8c7f7cbc | 111 | vmsg->fd_num = 0; |
70eb2c07 CX |
112 | if (!ioc) { |
113 | error_report_err(local_err); | |
114 | goto fail; | |
115 | } | |
116 | ||
117 | assert(qemu_in_coroutine()); | |
118 | do { | |
8c7f7cbc SH |
119 | size_t nfds = 0; |
120 | int *fds = NULL; | |
121 | ||
70eb2c07 CX |
122 | /* |
123 | * qio_channel_readv_full may have short reads, keeping calling it | |
124 | * until getting VHOST_USER_HDR_SIZE or 0 bytes in total | |
125 | */ | |
84615a19 | 126 | rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, 0, &local_err); |
70eb2c07 CX |
127 | if (rc < 0) { |
128 | if (rc == QIO_CHANNEL_ERR_BLOCK) { | |
8c7f7cbc | 129 | assert(local_err == NULL); |
06e0f098 SH |
130 | if (server->ctx) { |
131 | server->in_qio_channel_yield = true; | |
132 | qio_channel_yield(ioc, G_IO_IN); | |
133 | server->in_qio_channel_yield = false; | |
134 | } else { | |
411132c9 | 135 | return false; |
06e0f098 | 136 | } |
70eb2c07 CX |
137 | continue; |
138 | } else { | |
139 | error_report_err(local_err); | |
8c7f7cbc | 140 | goto fail; |
70eb2c07 CX |
141 | } |
142 | } | |
8c7f7cbc SH |
143 | |
144 | if (nfds > 0) { | |
145 | if (vmsg->fd_num + nfds > max_fds) { | |
70eb2c07 CX |
146 | error_report("A maximum of %zu fds are allowed, " |
147 | "however got %zu fds now", | |
8c7f7cbc SH |
148 | max_fds, vmsg->fd_num + nfds); |
149 | g_free(fds); | |
70eb2c07 CX |
150 | goto fail; |
151 | } | |
8c7f7cbc SH |
152 | memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0])); |
153 | vmsg->fd_num += nfds; | |
154 | g_free(fds); | |
70eb2c07 | 155 | } |
8c7f7cbc SH |
156 | |
157 | if (rc == 0) { /* socket closed */ | |
158 | goto fail; | |
70eb2c07 | 159 | } |
70eb2c07 | 160 | |
8c7f7cbc SH |
161 | iov.iov_base += rc; |
162 | iov.iov_len -= rc; | |
163 | read_bytes += rc; | |
164 | } while (read_bytes != VHOST_USER_HDR_SIZE); | |
165 | ||
70eb2c07 CX |
166 | /* qio_channel_readv_full will make socket fds blocking, unblock them */ |
167 | vmsg_unblock_fds(vmsg); | |
168 | if (vmsg->size > sizeof(vmsg->payload)) { | |
169 | error_report("Error: too big message request: %d, " | |
170 | "size: vmsg->size: %u, " | |
171 | "while sizeof(vmsg->payload) = %zu", | |
172 | vmsg->request, vmsg->size, sizeof(vmsg->payload)); | |
173 | goto fail; | |
174 | } | |
175 | ||
176 | struct iovec iov_payload = { | |
177 | .iov_base = (char *)&vmsg->payload, | |
178 | .iov_len = vmsg->size, | |
179 | }; | |
180 | if (vmsg->size) { | |
181 | rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err); | |
edaf6205 SH |
182 | if (rc != 1) { |
183 | if (local_err) { | |
184 | error_report_err(local_err); | |
185 | } | |
70eb2c07 CX |
186 | goto fail; |
187 | } | |
188 | } | |
189 | ||
190 | return true; | |
191 | ||
192 | fail: | |
193 | vmsg_close_fds(vmsg); | |
194 | ||
195 | return false; | |
196 | } | |
197 | ||
70eb2c07 CX |
198 | static coroutine_fn void vu_client_trip(void *opaque) |
199 | { | |
200 | VuServer *server = opaque; | |
7185c857 | 201 | VuDev *vu_dev = &server->vu_dev; |
70eb2c07 | 202 | |
411132c9 KW |
203 | while (!vu_dev->broken) { |
204 | if (server->quiescing) { | |
205 | server->co_trip = NULL; | |
206 | aio_wait_kick(); | |
207 | return; | |
208 | } | |
209 | /* vu_dispatch() returns false if server->ctx went away */ | |
210 | if (!vu_dispatch(vu_dev) && server->ctx) { | |
211 | break; | |
212 | } | |
70eb2c07 CX |
213 | } |
214 | ||
8f5e9a8e | 215 | if (vhost_user_server_has_in_flight(server)) { |
520d8b40 KW |
216 | /* Wait for requests to complete before we can unmap the memory */ |
217 | server->wait_idle = true; | |
218 | qemu_coroutine_yield(); | |
219 | server->wait_idle = false; | |
220 | } | |
8f5e9a8e | 221 | assert(!vhost_user_server_has_in_flight(server)); |
520d8b40 | 222 | |
7185c857 | 223 | vu_deinit(vu_dev); |
70eb2c07 | 224 | |
7185c857 SH |
225 | /* vu_deinit() should have called remove_watch() */ |
226 | assert(QTAILQ_EMPTY(&server->vu_fd_watches)); | |
227 | ||
228 | object_unref(OBJECT(server->sioc)); | |
229 | server->sioc = NULL; | |
230 | ||
231 | object_unref(OBJECT(server->ioc)); | |
232 | server->ioc = NULL; | |
233 | ||
234 | server->co_trip = NULL; | |
235 | if (server->restart_listener_bh) { | |
236 | qemu_bh_schedule(server->restart_listener_bh); | |
237 | } | |
238 | aio_wait_kick(); | |
70eb2c07 CX |
239 | } |
240 | ||
241 | /* | |
242 | * a wrapper for vu_kick_cb | |
243 | * | |
244 | * since aio_dispatch can only pass one user data pointer to the | |
245 | * callback function, pack VuDev and pvt into a struct. Then unpack it | |
246 | * and pass them to vu_kick_cb | |
247 | */ | |
248 | static void kick_handler(void *opaque) | |
249 | { | |
250 | VuFdWatch *vu_fd_watch = opaque; | |
7185c857 | 251 | VuDev *vu_dev = vu_fd_watch->vu_dev; |
70eb2c07 | 252 | |
7185c857 SH |
253 | vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt); |
254 | ||
255 | /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */ | |
256 | if (vu_dev->broken) { | |
257 | VuServer *server = container_of(vu_dev, VuServer, vu_dev); | |
258 | ||
259 | qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); | |
260 | } | |
261 | } | |
70eb2c07 CX |
262 | |
263 | static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd) | |
264 | { | |
265 | ||
266 | VuFdWatch *vu_fd_watch, *next; | |
267 | QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) { | |
268 | if (vu_fd_watch->fd == fd) { | |
269 | return vu_fd_watch; | |
270 | } | |
271 | } | |
272 | return NULL; | |
273 | } | |
274 | ||
275 | static void | |
276 | set_watch(VuDev *vu_dev, int fd, int vu_evt, | |
277 | vu_watch_cb cb, void *pvt) | |
278 | { | |
279 | ||
280 | VuServer *server = container_of(vu_dev, VuServer, vu_dev); | |
281 | g_assert(vu_dev); | |
282 | g_assert(fd >= 0); | |
283 | g_assert(cb); | |
284 | ||
285 | VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); | |
286 | ||
287 | if (!vu_fd_watch) { | |
fbf58f21 | 288 | vu_fd_watch = g_new0(VuFdWatch, 1); |
70eb2c07 CX |
289 | |
290 | QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next); | |
291 | ||
292 | vu_fd_watch->fd = fd; | |
293 | vu_fd_watch->cb = cb; | |
ff5927ba | 294 | qemu_socket_set_nonblock(fd); |
06e0f098 | 295 | aio_set_fd_handler(server->ctx, fd, kick_handler, |
826cc324 | 296 | NULL, NULL, NULL, vu_fd_watch); |
70eb2c07 CX |
297 | vu_fd_watch->vu_dev = vu_dev; |
298 | vu_fd_watch->pvt = pvt; | |
299 | } | |
300 | } | |
301 | ||
302 | ||
303 | static void remove_watch(VuDev *vu_dev, int fd) | |
304 | { | |
305 | VuServer *server; | |
306 | g_assert(vu_dev); | |
307 | g_assert(fd >= 0); | |
308 | ||
309 | server = container_of(vu_dev, VuServer, vu_dev); | |
310 | ||
311 | VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); | |
312 | ||
313 | if (!vu_fd_watch) { | |
314 | return; | |
315 | } | |
06e0f098 | 316 | aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL); |
70eb2c07 CX |
317 | |
318 | QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next); | |
319 | g_free(vu_fd_watch); | |
320 | } | |
321 | ||
322 | ||
323 | static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, | |
324 | gpointer opaque) | |
325 | { | |
326 | VuServer *server = opaque; | |
327 | ||
328 | if (server->sioc) { | |
329 | warn_report("Only one vhost-user client is allowed to " | |
330 | "connect the server one time"); | |
331 | return; | |
332 | } | |
333 | ||
334 | if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb, | |
335 | vu_message_read, set_watch, remove_watch, server->vu_iface)) { | |
336 | error_report("Failed to initialize libvhost-user"); | |
337 | return; | |
338 | } | |
339 | ||
340 | /* | |
341 | * Unset the callback function for network listener to make another | |
342 | * vhost-user client keeping waiting until this client disconnects | |
343 | */ | |
344 | qio_net_listener_set_client_func(server->listener, | |
345 | NULL, | |
346 | NULL, | |
347 | NULL); | |
348 | server->sioc = sioc; | |
349 | /* | |
350 | * Increase the object reference, so sioc will not freed by | |
351 | * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc)) | |
352 | */ | |
353 | object_ref(OBJECT(server->sioc)); | |
354 | qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client"); | |
355 | server->ioc = QIO_CHANNEL(sioc); | |
356 | object_ref(OBJECT(server->ioc)); | |
7185c857 SH |
357 | |
358 | /* TODO vu_message_write() spins if non-blocking! */ | |
46a096c8 | 359 | qio_channel_set_blocking(server->ioc, false, NULL); |
70eb2c07 | 360 | |
06e0f098 SH |
361 | qio_channel_set_follow_coroutine_ctx(server->ioc, true); |
362 | ||
7185c857 | 363 | vhost_user_server_attach_aio_context(server, server->ctx); |
7185c857 | 364 | } |
70eb2c07 | 365 | |
2957dc40 | 366 | /* server->ctx acquired by caller */ |
70eb2c07 CX |
367 | void vhost_user_server_stop(VuServer *server) |
368 | { | |
7185c857 SH |
369 | qemu_bh_delete(server->restart_listener_bh); |
370 | server->restart_listener_bh = NULL; | |
371 | ||
70eb2c07 | 372 | if (server->sioc) { |
7185c857 SH |
373 | VuFdWatch *vu_fd_watch; |
374 | ||
375 | QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { | |
60f782b6 | 376 | aio_set_fd_handler(server->ctx, vu_fd_watch->fd, |
826cc324 | 377 | NULL, NULL, NULL, NULL, vu_fd_watch); |
7185c857 SH |
378 | } |
379 | ||
380 | qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); | |
381 | ||
382 | AIO_WAIT_WHILE(server->ctx, server->co_trip); | |
70eb2c07 CX |
383 | } |
384 | ||
385 | if (server->listener) { | |
386 | qio_net_listener_disconnect(server->listener); | |
387 | object_unref(OBJECT(server->listener)); | |
388 | } | |
7185c857 SH |
389 | } |
390 | ||
391 | /* | |
392 | * Allow the next client to connect to the server. Called from a BH in the main | |
393 | * loop. | |
394 | */ | |
395 | static void restart_listener_bh(void *opaque) | |
396 | { | |
397 | VuServer *server = opaque; | |
70eb2c07 | 398 | |
7185c857 SH |
399 | qio_net_listener_set_client_func(server->listener, vu_accept, server, |
400 | NULL); | |
70eb2c07 CX |
401 | } |
402 | ||
7185c857 SH |
403 | /* Called with ctx acquired */ |
404 | void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx) | |
70eb2c07 | 405 | { |
7185c857 | 406 | VuFdWatch *vu_fd_watch; |
70eb2c07 | 407 | |
7185c857 | 408 | server->ctx = ctx; |
70eb2c07 CX |
409 | |
410 | if (!server->sioc) { | |
70eb2c07 CX |
411 | return; |
412 | } | |
413 | ||
7185c857 | 414 | QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { |
60f782b6 | 415 | aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL, |
826cc324 | 416 | NULL, NULL, vu_fd_watch); |
70eb2c07 CX |
417 | } |
418 | ||
411132c9 KW |
419 | if (server->co_trip) { |
420 | /* | |
421 | * The caller didn't fully shut down co_trip (this can happen on | |
422 | * non-polling drains like in bdrv_graph_wrlock()). This is okay as long | |
423 | * as it no longer tries to shut it down and we're guaranteed to still | |
424 | * be in the same AioContext as before. | |
425 | * | |
426 | * co_ctx can still be NULL if we get multiple calls and only just | |
427 | * scheduled a new coroutine in the else branch. | |
428 | */ | |
429 | AioContext *co_ctx = qemu_coroutine_get_aio_context(server->co_trip); | |
430 | ||
431 | assert(!server->quiescing); | |
432 | assert(!co_ctx || co_ctx == ctx); | |
433 | } else { | |
434 | server->co_trip = qemu_coroutine_create(vu_client_trip, server); | |
435 | assert(!server->in_qio_channel_yield); | |
436 | aio_co_schedule(ctx, server->co_trip); | |
437 | } | |
7185c857 SH |
438 | } |
439 | ||
440 | /* Called with server->ctx acquired */ | |
441 | void vhost_user_server_detach_aio_context(VuServer *server) | |
442 | { | |
443 | if (server->sioc) { | |
444 | VuFdWatch *vu_fd_watch; | |
445 | ||
446 | QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { | |
60f782b6 | 447 | aio_set_fd_handler(server->ctx, vu_fd_watch->fd, |
826cc324 | 448 | NULL, NULL, NULL, NULL, vu_fd_watch); |
70eb2c07 CX |
449 | } |
450 | } | |
70eb2c07 | 451 | |
7185c857 | 452 | server->ctx = NULL; |
06e0f098 SH |
453 | |
454 | if (server->ioc) { | |
455 | if (server->in_qio_channel_yield) { | |
456 | /* Stop receiving the next vhost-user message */ | |
457 | qio_channel_wake_read(server->ioc); | |
458 | } | |
459 | } | |
7185c857 | 460 | } |
70eb2c07 CX |
461 | |
462 | bool vhost_user_server_start(VuServer *server, | |
463 | SocketAddress *socket_addr, | |
464 | AioContext *ctx, | |
465 | uint16_t max_queues, | |
70eb2c07 CX |
466 | const VuDevIface *vu_iface, |
467 | Error **errp) | |
468 | { | |
7185c857 | 469 | QEMUBH *bh; |
90fc91d5 SH |
470 | QIONetListener *listener; |
471 | ||
472 | if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX && | |
473 | socket_addr->type != SOCKET_ADDRESS_TYPE_FD) { | |
474 | error_setg(errp, "Only socket address types 'unix' and 'fd' are supported"); | |
475 | return false; | |
476 | } | |
477 | ||
478 | listener = qio_net_listener_new(); | |
70eb2c07 CX |
479 | if (qio_net_listener_open_sync(listener, socket_addr, 1, |
480 | errp) < 0) { | |
481 | object_unref(OBJECT(listener)); | |
482 | return false; | |
483 | } | |
484 | ||
7185c857 SH |
485 | bh = qemu_bh_new(restart_listener_bh, server); |
486 | ||
1d787456 | 487 | /* zero out unspecified fields */ |
70eb2c07 CX |
488 | *server = (VuServer) { |
489 | .listener = listener, | |
7185c857 | 490 | .restart_listener_bh = bh, |
70eb2c07 CX |
491 | .vu_iface = vu_iface, |
492 | .max_queues = max_queues, | |
493 | .ctx = ctx, | |
70eb2c07 CX |
494 | }; |
495 | ||
496 | qio_net_listener_set_name(server->listener, "vhost-user-backend-listener"); | |
497 | ||
498 | qio_net_listener_set_client_func(server->listener, | |
499 | vu_accept, | |
500 | server, | |
501 | NULL); | |
502 | ||
503 | QTAILQ_INIT(&server->vu_fd_watches); | |
504 | return true; | |
505 | } |