]> git.proxmox.com Git - mirror_qemu.git/blame - net/stream.c
vdpa: Add SetSteeringEBPF method for NetClientState
[mirror_qemu.git] / net / stream.c
CommitLineData
5166fe0a
LV
1/*
2 * QEMU System Emulator
3 *
4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2022 Red Hat, Inc.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
24 */
25
26#include "qemu/osdep.h"
27
28#include "net/net.h"
29#include "clients.h"
30#include "monitor/monitor.h"
31#include "qapi/error.h"
32#include "qemu/error-report.h"
33#include "qemu/option.h"
34#include "qemu/sockets.h"
35#include "qemu/iov.h"
36#include "qemu/main-loop.h"
37#include "qemu/cutils.h"
1f9c890f
LV
38#include "io/channel.h"
39#include "io/channel-socket.h"
40#include "io/net-listener.h"
e506fee8 41#include "qapi/qapi-events-net.h"
148fbf0d
LV
42#include "qapi/qapi-visit-sockets.h"
43#include "qapi/clone-visitor.h"
5166fe0a
LV
44
45typedef struct NetStreamState {
46 NetClientState nc;
1f9c890f
LV
47 QIOChannel *listen_ioc;
48 QIONetListener *listener;
49 QIOChannel *ioc;
50 guint ioc_read_tag;
51 guint ioc_write_tag;
5166fe0a
LV
52 SocketReadState rs;
53 unsigned int send_index; /* number of bytes sent*/
148fbf0d
LV
54 uint32_t reconnect;
55 guint timer_tag;
56 SocketAddress *addr;
5166fe0a
LV
57} NetStreamState;
58
1f9c890f
LV
59static void net_stream_listen(QIONetListener *listener,
60 QIOChannelSocket *cioc,
61 void *opaque);
148fbf0d 62static void net_stream_arm_reconnect(NetStreamState *s);
5166fe0a 63
1f9c890f
LV
64static gboolean net_stream_writable(QIOChannel *ioc,
65 GIOCondition condition,
66 gpointer data)
5166fe0a 67{
1f9c890f 68 NetStreamState *s = data;
5166fe0a 69
1f9c890f 70 s->ioc_write_tag = 0;
5166fe0a
LV
71
72 qemu_flush_queued_packets(&s->nc);
1f9c890f
LV
73
74 return G_SOURCE_REMOVE;
5166fe0a
LV
75}
76
77static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
78 size_t size)
79{
80 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
81 uint32_t len = htonl(size);
82 struct iovec iov[] = {
83 {
84 .iov_base = &len,
85 .iov_len = sizeof(len),
86 }, {
87 .iov_base = (void *)buf,
88 .iov_len = size,
89 },
90 };
1f9c890f
LV
91 struct iovec local_iov[2];
92 unsigned int nlocal_iov;
5166fe0a
LV
93 size_t remaining;
94 ssize_t ret;
95
96 remaining = iov_size(iov, 2) - s->send_index;
1f9c890f
LV
97 nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
98 ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
99 if (ret == QIO_CHANNEL_ERR_BLOCK) {
5166fe0a
LV
100 ret = 0; /* handled further down */
101 }
102 if (ret == -1) {
103 s->send_index = 0;
104 return -errno;
105 }
106 if (ret < (ssize_t)remaining) {
107 s->send_index += ret;
1f9c890f
LV
108 s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
109 net_stream_writable, s, NULL);
5166fe0a
LV
110 return 0;
111 }
112 s->send_index = 0;
113 return size;
114}
115
1f9c890f
LV
116static gboolean net_stream_send(QIOChannel *ioc,
117 GIOCondition condition,
118 gpointer data);
119
5166fe0a
LV
120static void net_stream_send_completed(NetClientState *nc, ssize_t len)
121{
122 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
123
1f9c890f
LV
124 if (!s->ioc_read_tag) {
125 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
126 net_stream_send, s, NULL);
5166fe0a
LV
127 }
128}
129
130static void net_stream_rs_finalize(SocketReadState *rs)
131{
132 NetStreamState *s = container_of(rs, NetStreamState, rs);
133
134 if (qemu_send_packet_async(&s->nc, rs->buf,
135 rs->packet_len,
136 net_stream_send_completed) == 0) {
1f9c890f
LV
137 if (s->ioc_read_tag) {
138 g_source_remove(s->ioc_read_tag);
139 s->ioc_read_tag = 0;
140 }
5166fe0a
LV
141 }
142}
143
1f9c890f
LV
144static gboolean net_stream_send(QIOChannel *ioc,
145 GIOCondition condition,
146 gpointer data)
5166fe0a 147{
1f9c890f 148 NetStreamState *s = data;
5166fe0a
LV
149 int size;
150 int ret;
1f9c890f
LV
151 char buf1[NET_BUFSIZE];
152 const char *buf;
5166fe0a 153
1f9c890f 154 size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
5166fe0a
LV
155 if (size < 0) {
156 if (errno != EWOULDBLOCK) {
157 goto eoc;
158 }
159 } else if (size == 0) {
160 /* end of connection */
161 eoc:
1f9c890f
LV
162 s->ioc_read_tag = 0;
163 if (s->ioc_write_tag) {
164 g_source_remove(s->ioc_write_tag);
165 s->ioc_write_tag = 0;
5166fe0a 166 }
1f9c890f
LV
167 if (s->listener) {
168 qio_net_listener_set_client_func(s->listener, net_stream_listen,
169 s, NULL);
170 }
171 object_unref(OBJECT(s->ioc));
172 s->ioc = NULL;
5166fe0a 173
5166fe0a
LV
174 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
175 s->nc.link_down = true;
ac149498 176 qemu_set_info_str(&s->nc, "%s", "");
5166fe0a 177
e506fee8 178 qapi_event_send_netdev_stream_disconnected(s->nc.name);
148fbf0d 179 net_stream_arm_reconnect(s);
e506fee8 180
1f9c890f 181 return G_SOURCE_REMOVE;
5166fe0a
LV
182 }
183 buf = buf1;
184
1f9c890f 185 ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
5166fe0a
LV
186
187 if (ret == -1) {
188 goto eoc;
189 }
1f9c890f
LV
190
191 return G_SOURCE_CONTINUE;
5166fe0a
LV
192}
193
194static void net_stream_cleanup(NetClientState *nc)
195{
196 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
148fbf0d
LV
197 if (s->timer_tag) {
198 g_source_remove(s->timer_tag);
199 s->timer_tag = 0;
200 }
201 if (s->addr) {
202 qapi_free_SocketAddress(s->addr);
203 s->addr = NULL;
204 }
1f9c890f
LV
205 if (s->ioc) {
206 if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
207 if (s->ioc_read_tag) {
208 g_source_remove(s->ioc_read_tag);
209 s->ioc_read_tag = 0;
210 }
211 if (s->ioc_write_tag) {
212 g_source_remove(s->ioc_write_tag);
213 s->ioc_write_tag = 0;
214 }
215 }
216 object_unref(OBJECT(s->ioc));
217 s->ioc = NULL;
5166fe0a 218 }
1f9c890f
LV
219 if (s->listen_ioc) {
220 if (s->listener) {
221 qio_net_listener_disconnect(s->listener);
222 object_unref(OBJECT(s->listener));
223 s->listener = NULL;
224 }
225 object_unref(OBJECT(s->listen_ioc));
226 s->listen_ioc = NULL;
5166fe0a
LV
227 }
228}
229
5166fe0a
LV
230static NetClientInfo net_stream_info = {
231 .type = NET_CLIENT_DRIVER_STREAM,
232 .size = sizeof(NetStreamState),
233 .receive = net_stream_receive,
234 .cleanup = net_stream_cleanup,
235};
236
1f9c890f
LV
237static void net_stream_listen(QIONetListener *listener,
238 QIOChannelSocket *cioc,
239 void *opaque)
5166fe0a 240{
1f9c890f
LV
241 NetStreamState *s = opaque;
242 SocketAddress *addr;
243 char *uri;
5166fe0a 244
1f9c890f 245 object_ref(OBJECT(cioc));
5166fe0a 246
1f9c890f 247 qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
5166fe0a 248
1f9c890f
LV
249 s->ioc = QIO_CHANNEL(cioc);
250 qio_channel_set_name(s->ioc, "stream-server");
251 s->nc.link_down = false;
5166fe0a 252
1f9c890f
LV
253 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
254 s, NULL);
5166fe0a 255
1f9c890f
LV
256 if (cioc->localAddr.ss_family == AF_UNIX) {
257 addr = qio_channel_socket_get_local_address(cioc, NULL);
5166fe0a 258 } else {
1f9c890f 259 addr = qio_channel_socket_get_remote_address(cioc, NULL);
5166fe0a 260 }
1f9c890f
LV
261 g_assert(addr != NULL);
262 uri = socket_uri(addr);
ac149498 263 qemu_set_info_str(&s->nc, "%s", uri);
1f9c890f 264 g_free(uri);
e506fee8 265 qapi_event_send_netdev_stream_connected(s->nc.name, addr);
1f9c890f 266 qapi_free_SocketAddress(addr);
5166fe0a
LV
267}
268
1f9c890f 269static void net_stream_server_listening(QIOTask *task, gpointer opaque)
5166fe0a
LV
270{
271 NetStreamState *s = opaque;
1f9c890f
LV
272 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
273 SocketAddress *addr;
274 int ret;
5166fe0a 275
1f9c890f
LV
276 if (listen_sioc->fd < 0) {
277 qemu_set_info_str(&s->nc, "connection error");
278 return;
13c6be96 279 }
13c6be96 280
1f9c890f
LV
281 addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
282 g_assert(addr != NULL);
283 ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
284 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
285 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
286 addr->u.fd.str, -ret);
287 return;
13c6be96 288 }
1f9c890f
LV
289 g_assert(ret == 0);
290 qapi_free_SocketAddress(addr);
291
292 s->nc.link_down = true;
293 s->listener = qio_net_listener_new();
294
295 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
296 qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
297 qio_net_listener_add(s->listener, listen_sioc);
5166fe0a
LV
298}
299
300static int net_stream_server_init(NetClientState *peer,
301 const char *model,
302 const char *name,
303 SocketAddress *addr,
304 Error **errp)
305{
306 NetClientState *nc;
307 NetStreamState *s;
1f9c890f 308 QIOChannelSocket *listen_sioc = qio_channel_socket_new();
5166fe0a 309
1f9c890f
LV
310 nc = qemu_new_net_client(&net_stream_info, peer, model, name);
311 s = DO_UPCAST(NetStreamState, nc, nc);
5166fe0a 312
1f9c890f
LV
313 s->listen_ioc = QIO_CHANNEL(listen_sioc);
314 qio_channel_socket_listen_async(listen_sioc, addr, 0,
315 net_stream_server_listening, s,
316 NULL, NULL);
5166fe0a 317
1f9c890f
LV
318 return 0;
319}
13c6be96 320
1f9c890f
LV
321static void net_stream_client_connected(QIOTask *task, gpointer opaque)
322{
323 NetStreamState *s = opaque;
324 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
325 SocketAddress *addr;
326 gchar *uri;
327 int ret;
13c6be96 328
1f9c890f
LV
329 if (sioc->fd < 0) {
330 qemu_set_info_str(&s->nc, "connection error");
331 goto error;
5166fe0a
LV
332 }
333
1f9c890f
LV
334 addr = qio_channel_socket_get_remote_address(sioc, NULL);
335 g_assert(addr != NULL);
336 uri = socket_uri(addr);
ac149498 337 qemu_set_info_str(&s->nc, "%s", uri);
1f9c890f
LV
338 g_free(uri);
339
340 ret = qemu_socket_try_set_nonblock(sioc->fd);
341 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
342 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
343 addr->u.fd.str, -ret);
344 qapi_free_SocketAddress(addr);
345 goto error;
5166fe0a 346 }
1f9c890f 347 g_assert(ret == 0);
5166fe0a 348
5166fe0a
LV
349 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
350
1f9c890f
LV
351 /* Disable Nagle algorithm on TCP sockets to reduce latency */
352 qio_channel_set_delay(s->ioc, false);
353
354 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
355 s, NULL);
356 s->nc.link_down = false;
e506fee8 357 qapi_event_send_netdev_stream_connected(s->nc.name, addr);
1f9c890f
LV
358 qapi_free_SocketAddress(addr);
359
360 return;
361error:
362 object_unref(OBJECT(s->ioc));
363 s->ioc = NULL;
148fbf0d
LV
364 net_stream_arm_reconnect(s);
365}
366
367static gboolean net_stream_reconnect(gpointer data)
368{
369 NetStreamState *s = data;
370 QIOChannelSocket *sioc;
371
372 s->timer_tag = 0;
373
374 sioc = qio_channel_socket_new();
375 s->ioc = QIO_CHANNEL(sioc);
376 qio_channel_socket_connect_async(sioc, s->addr,
377 net_stream_client_connected, s,
378 NULL, NULL);
379 return G_SOURCE_REMOVE;
380}
381
382static void net_stream_arm_reconnect(NetStreamState *s)
383{
384 if (s->reconnect && s->timer_tag == 0) {
385 s->timer_tag = g_timeout_add_seconds(s->reconnect,
386 net_stream_reconnect, s);
387 }
5166fe0a
LV
388}
389
390static int net_stream_client_init(NetClientState *peer,
391 const char *model,
392 const char *name,
393 SocketAddress *addr,
148fbf0d 394 uint32_t reconnect,
5166fe0a
LV
395 Error **errp)
396{
397 NetStreamState *s;
1f9c890f
LV
398 NetClientState *nc;
399 QIOChannelSocket *sioc = qio_channel_socket_new();
5166fe0a 400
1f9c890f
LV
401 nc = qemu_new_net_client(&net_stream_info, peer, model, name);
402 s = DO_UPCAST(NetStreamState, nc, nc);
13c6be96 403
1f9c890f
LV
404 s->ioc = QIO_CHANNEL(sioc);
405 s->nc.link_down = true;
406
148fbf0d
LV
407 s->reconnect = reconnect;
408 if (reconnect) {
409 s->addr = QAPI_CLONE(SocketAddress, addr);
410 }
1f9c890f
LV
411 qio_channel_socket_connect_async(sioc, addr,
412 net_stream_client_connected, s,
413 NULL, NULL);
5166fe0a 414
5166fe0a
LV
415 return 0;
416}
417
418int net_init_stream(const Netdev *netdev, const char *name,
419 NetClientState *peer, Error **errp)
420{
421 const NetdevStreamOptions *sock;
422
423 assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
424 sock = &netdev->u.stream;
425
426 if (!sock->has_server || !sock->server) {
148fbf0d
LV
427 return net_stream_client_init(peer, "stream", name, sock->addr,
428 sock->has_reconnect ? sock->reconnect : 0,
429 errp);
430 }
431 if (sock->has_reconnect) {
432 error_setg(errp, "'reconnect' option is incompatible with "
433 "socket in server mode");
434 return -1;
5166fe0a
LV
435 }
436 return net_stream_server_init(peer, "stream", name, sock->addr, errp);
437}