]> git.proxmox.com Git - mirror_qemu.git/blob - util/vhost-user-server.c
util/vhost-user-server: s/fileds/fields/ typo fix
[mirror_qemu.git] / util / vhost-user-server.c
1 /*
2 * Sharing QEMU devices via vhost-user protocol
3 *
4 * Copyright (c) Coiby Xu <coiby.xu@gmail.com>.
5 * Copyright (c) 2020 Red Hat, Inc.
6 *
7 * This work is licensed under the terms of the GNU GPL, version 2 or
8 * later. See the COPYING file in the top-level directory.
9 */
10 #include "qemu/osdep.h"
11 #include "qemu/main-loop.h"
12 #include "vhost-user-server.h"
13
14 static void vmsg_close_fds(VhostUserMsg *vmsg)
15 {
16 int i;
17 for (i = 0; i < vmsg->fd_num; i++) {
18 close(vmsg->fds[i]);
19 }
20 }
21
22 static void vmsg_unblock_fds(VhostUserMsg *vmsg)
23 {
24 int i;
25 for (i = 0; i < vmsg->fd_num; i++) {
26 qemu_set_nonblock(vmsg->fds[i]);
27 }
28 }
29
30 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
31 gpointer opaque);
32
33 static void close_client(VuServer *server)
34 {
35 /*
36 * Before closing the client
37 *
38 * 1. Let vu_client_trip stop processing new vhost-user msg
39 *
40 * 2. remove kick_handler
41 *
42 * 3. wait for the kick handler to be finished
43 *
44 * 4. wait for the current vhost-user msg to be finished processing
45 */
46
47 QIOChannelSocket *sioc = server->sioc;
48 /* When this is set vu_client_trip will stop new processing vhost-user message */
49 server->sioc = NULL;
50
51 VuFdWatch *vu_fd_watch, *next;
52 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
53 aio_set_fd_handler(server->ioc->ctx, vu_fd_watch->fd, true, NULL,
54 NULL, NULL, NULL);
55 }
56
57 while (!QTAILQ_EMPTY(&server->vu_fd_watches)) {
58 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
59 if (!vu_fd_watch->processing) {
60 QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
61 g_free(vu_fd_watch);
62 }
63 }
64 }
65
66 while (server->processing_msg) {
67 if (server->ioc->read_coroutine) {
68 server->ioc->read_coroutine = NULL;
69 qio_channel_set_aio_fd_handler(server->ioc, server->ioc->ctx, NULL,
70 NULL, server->ioc);
71 server->processing_msg = false;
72 }
73 }
74
75 vu_deinit(&server->vu_dev);
76 object_unref(OBJECT(sioc));
77 object_unref(OBJECT(server->ioc));
78 }
79
80 static void panic_cb(VuDev *vu_dev, const char *buf)
81 {
82 VuServer *server = container_of(vu_dev, VuServer, vu_dev);
83
84 /* avoid while loop in close_client */
85 server->processing_msg = false;
86
87 if (buf) {
88 error_report("vu_panic: %s", buf);
89 }
90
91 if (server->sioc) {
92 close_client(server);
93 }
94
95 if (server->device_panic_notifier) {
96 server->device_panic_notifier(server);
97 }
98
99 /*
100 * Set the callback function for network listener so another
101 * vhost-user client can connect to this server
102 */
103 qio_net_listener_set_client_func(server->listener,
104 vu_accept,
105 server,
106 NULL);
107 }
108
109 static bool coroutine_fn
110 vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
111 {
112 struct iovec iov = {
113 .iov_base = (char *)vmsg,
114 .iov_len = VHOST_USER_HDR_SIZE,
115 };
116 int rc, read_bytes = 0;
117 Error *local_err = NULL;
118 /*
119 * Store fds/nfds returned from qio_channel_readv_full into
120 * temporary variables.
121 *
122 * VhostUserMsg is a packed structure, gcc will complain about passing
123 * pointer to a packed structure member if we pass &VhostUserMsg.fd_num
124 * and &VhostUserMsg.fds directly when calling qio_channel_readv_full,
125 * thus two temporary variables nfds and fds are used here.
126 */
127 size_t nfds = 0, nfds_t = 0;
128 const size_t max_fds = G_N_ELEMENTS(vmsg->fds);
129 int *fds_t = NULL;
130 VuServer *server = container_of(vu_dev, VuServer, vu_dev);
131 QIOChannel *ioc = server->ioc;
132
133 if (!ioc) {
134 error_report_err(local_err);
135 goto fail;
136 }
137
138 assert(qemu_in_coroutine());
139 do {
140 /*
141 * qio_channel_readv_full may have short reads, keeping calling it
142 * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
143 */
144 rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, &local_err);
145 if (rc < 0) {
146 if (rc == QIO_CHANNEL_ERR_BLOCK) {
147 qio_channel_yield(ioc, G_IO_IN);
148 continue;
149 } else {
150 error_report_err(local_err);
151 return false;
152 }
153 }
154 read_bytes += rc;
155 if (nfds_t > 0) {
156 if (nfds + nfds_t > max_fds) {
157 error_report("A maximum of %zu fds are allowed, "
158 "however got %zu fds now",
159 max_fds, nfds + nfds_t);
160 goto fail;
161 }
162 memcpy(vmsg->fds + nfds, fds_t,
163 nfds_t *sizeof(vmsg->fds[0]));
164 nfds += nfds_t;
165 g_free(fds_t);
166 }
167 if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) {
168 break;
169 }
170 iov.iov_base = (char *)vmsg + read_bytes;
171 iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes;
172 } while (true);
173
174 vmsg->fd_num = nfds;
175 /* qio_channel_readv_full will make socket fds blocking, unblock them */
176 vmsg_unblock_fds(vmsg);
177 if (vmsg->size > sizeof(vmsg->payload)) {
178 error_report("Error: too big message request: %d, "
179 "size: vmsg->size: %u, "
180 "while sizeof(vmsg->payload) = %zu",
181 vmsg->request, vmsg->size, sizeof(vmsg->payload));
182 goto fail;
183 }
184
185 struct iovec iov_payload = {
186 .iov_base = (char *)&vmsg->payload,
187 .iov_len = vmsg->size,
188 };
189 if (vmsg->size) {
190 rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
191 if (rc == -1) {
192 error_report_err(local_err);
193 goto fail;
194 }
195 }
196
197 return true;
198
199 fail:
200 vmsg_close_fds(vmsg);
201
202 return false;
203 }
204
205
206 static void vu_client_start(VuServer *server);
207 static coroutine_fn void vu_client_trip(void *opaque)
208 {
209 VuServer *server = opaque;
210
211 while (!server->aio_context_changed && server->sioc) {
212 server->processing_msg = true;
213 vu_dispatch(&server->vu_dev);
214 server->processing_msg = false;
215 }
216
217 if (server->aio_context_changed && server->sioc) {
218 server->aio_context_changed = false;
219 vu_client_start(server);
220 }
221 }
222
223 static void vu_client_start(VuServer *server)
224 {
225 server->co_trip = qemu_coroutine_create(vu_client_trip, server);
226 aio_co_enter(server->ctx, server->co_trip);
227 }
228
229 /*
230 * a wrapper for vu_kick_cb
231 *
232 * since aio_dispatch can only pass one user data pointer to the
233 * callback function, pack VuDev and pvt into a struct. Then unpack it
234 * and pass them to vu_kick_cb
235 */
236 static void kick_handler(void *opaque)
237 {
238 VuFdWatch *vu_fd_watch = opaque;
239 vu_fd_watch->processing = true;
240 vu_fd_watch->cb(vu_fd_watch->vu_dev, 0, vu_fd_watch->pvt);
241 vu_fd_watch->processing = false;
242 }
243
244
245 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd)
246 {
247
248 VuFdWatch *vu_fd_watch, *next;
249 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
250 if (vu_fd_watch->fd == fd) {
251 return vu_fd_watch;
252 }
253 }
254 return NULL;
255 }
256
257 static void
258 set_watch(VuDev *vu_dev, int fd, int vu_evt,
259 vu_watch_cb cb, void *pvt)
260 {
261
262 VuServer *server = container_of(vu_dev, VuServer, vu_dev);
263 g_assert(vu_dev);
264 g_assert(fd >= 0);
265 g_assert(cb);
266
267 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
268
269 if (!vu_fd_watch) {
270 VuFdWatch *vu_fd_watch = g_new0(VuFdWatch, 1);
271
272 QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next);
273
274 vu_fd_watch->fd = fd;
275 vu_fd_watch->cb = cb;
276 qemu_set_nonblock(fd);
277 aio_set_fd_handler(server->ioc->ctx, fd, true, kick_handler,
278 NULL, NULL, vu_fd_watch);
279 vu_fd_watch->vu_dev = vu_dev;
280 vu_fd_watch->pvt = pvt;
281 }
282 }
283
284
285 static void remove_watch(VuDev *vu_dev, int fd)
286 {
287 VuServer *server;
288 g_assert(vu_dev);
289 g_assert(fd >= 0);
290
291 server = container_of(vu_dev, VuServer, vu_dev);
292
293 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
294
295 if (!vu_fd_watch) {
296 return;
297 }
298 aio_set_fd_handler(server->ioc->ctx, fd, true, NULL, NULL, NULL, NULL);
299
300 QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
301 g_free(vu_fd_watch);
302 }
303
304
305 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
306 gpointer opaque)
307 {
308 VuServer *server = opaque;
309
310 if (server->sioc) {
311 warn_report("Only one vhost-user client is allowed to "
312 "connect the server one time");
313 return;
314 }
315
316 if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
317 vu_message_read, set_watch, remove_watch, server->vu_iface)) {
318 error_report("Failed to initialize libvhost-user");
319 return;
320 }
321
322 /*
323 * Unset the callback function for network listener to make another
324 * vhost-user client keeping waiting until this client disconnects
325 */
326 qio_net_listener_set_client_func(server->listener,
327 NULL,
328 NULL,
329 NULL);
330 server->sioc = sioc;
331 /*
332 * Increase the object reference, so sioc will not freed by
333 * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
334 */
335 object_ref(OBJECT(server->sioc));
336 qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
337 server->ioc = QIO_CHANNEL(sioc);
338 object_ref(OBJECT(server->ioc));
339 qio_channel_attach_aio_context(server->ioc, server->ctx);
340 qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL);
341 vu_client_start(server);
342 }
343
344
345 void vhost_user_server_stop(VuServer *server)
346 {
347 if (server->sioc) {
348 close_client(server);
349 }
350
351 if (server->listener) {
352 qio_net_listener_disconnect(server->listener);
353 object_unref(OBJECT(server->listener));
354 }
355
356 }
357
358 void vhost_user_server_set_aio_context(VuServer *server, AioContext *ctx)
359 {
360 VuFdWatch *vu_fd_watch, *next;
361 void *opaque = NULL;
362 IOHandler *io_read = NULL;
363 bool attach;
364
365 server->ctx = ctx ? ctx : qemu_get_aio_context();
366
367 if (!server->sioc) {
368 /* not yet serving any client*/
369 return;
370 }
371
372 if (ctx) {
373 qio_channel_attach_aio_context(server->ioc, ctx);
374 server->aio_context_changed = true;
375 io_read = kick_handler;
376 attach = true;
377 } else {
378 qio_channel_detach_aio_context(server->ioc);
379 /* server->ioc->ctx keeps the old AioConext */
380 ctx = server->ioc->ctx;
381 attach = false;
382 }
383
384 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
385 if (vu_fd_watch->cb) {
386 opaque = attach ? vu_fd_watch : NULL;
387 aio_set_fd_handler(ctx, vu_fd_watch->fd, true,
388 io_read, NULL, NULL,
389 opaque);
390 }
391 }
392 }
393
394
395 bool vhost_user_server_start(VuServer *server,
396 SocketAddress *socket_addr,
397 AioContext *ctx,
398 uint16_t max_queues,
399 DevicePanicNotifierFn *device_panic_notifier,
400 const VuDevIface *vu_iface,
401 Error **errp)
402 {
403 QIONetListener *listener = qio_net_listener_new();
404 if (qio_net_listener_open_sync(listener, socket_addr, 1,
405 errp) < 0) {
406 object_unref(OBJECT(listener));
407 return false;
408 }
409
410 /* zero out unspecified fields */
411 *server = (VuServer) {
412 .listener = listener,
413 .vu_iface = vu_iface,
414 .max_queues = max_queues,
415 .ctx = ctx,
416 .device_panic_notifier = device_panic_notifier,
417 };
418
419 qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
420
421 qio_net_listener_set_client_func(server->listener,
422 vu_accept,
423 server,
424 NULL);
425
426 QTAILQ_INIT(&server->vu_fd_watches);
427 return true;
428 }