]> git.proxmox.com Git - mirror_qemu.git/blame - net/stream.c
.gitlab-ci.d/windows.yml: Unify the prerequisite packages
[mirror_qemu.git] / net / stream.c
CommitLineData
5166fe0a
LV
1/*
2 * QEMU System Emulator
3 *
4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2022 Red Hat, Inc.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
24 */
25
26#include "qemu/osdep.h"
27
28#include "net/net.h"
29#include "clients.h"
30#include "monitor/monitor.h"
31#include "qapi/error.h"
32#include "qemu/error-report.h"
33#include "qemu/option.h"
34#include "qemu/sockets.h"
35#include "qemu/iov.h"
36#include "qemu/main-loop.h"
37#include "qemu/cutils.h"
1f9c890f
LV
38#include "io/channel.h"
39#include "io/channel-socket.h"
40#include "io/net-listener.h"
e506fee8 41#include "qapi/qapi-events-net.h"
5166fe0a
LV
42
43typedef struct NetStreamState {
44 NetClientState nc;
1f9c890f
LV
45 QIOChannel *listen_ioc;
46 QIONetListener *listener;
47 QIOChannel *ioc;
48 guint ioc_read_tag;
49 guint ioc_write_tag;
5166fe0a
LV
50 SocketReadState rs;
51 unsigned int send_index; /* number of bytes sent*/
5166fe0a
LV
52} NetStreamState;
53
1f9c890f
LV
54static void net_stream_listen(QIONetListener *listener,
55 QIOChannelSocket *cioc,
56 void *opaque);
5166fe0a 57
1f9c890f
LV
58static gboolean net_stream_writable(QIOChannel *ioc,
59 GIOCondition condition,
60 gpointer data)
5166fe0a 61{
1f9c890f 62 NetStreamState *s = data;
5166fe0a 63
1f9c890f 64 s->ioc_write_tag = 0;
5166fe0a
LV
65
66 qemu_flush_queued_packets(&s->nc);
1f9c890f
LV
67
68 return G_SOURCE_REMOVE;
5166fe0a
LV
69}
70
71static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
72 size_t size)
73{
74 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
75 uint32_t len = htonl(size);
76 struct iovec iov[] = {
77 {
78 .iov_base = &len,
79 .iov_len = sizeof(len),
80 }, {
81 .iov_base = (void *)buf,
82 .iov_len = size,
83 },
84 };
1f9c890f
LV
85 struct iovec local_iov[2];
86 unsigned int nlocal_iov;
5166fe0a
LV
87 size_t remaining;
88 ssize_t ret;
89
90 remaining = iov_size(iov, 2) - s->send_index;
1f9c890f
LV
91 nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
92 ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
93 if (ret == QIO_CHANNEL_ERR_BLOCK) {
5166fe0a
LV
94 ret = 0; /* handled further down */
95 }
96 if (ret == -1) {
97 s->send_index = 0;
98 return -errno;
99 }
100 if (ret < (ssize_t)remaining) {
101 s->send_index += ret;
1f9c890f
LV
102 s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
103 net_stream_writable, s, NULL);
5166fe0a
LV
104 return 0;
105 }
106 s->send_index = 0;
107 return size;
108}
109
1f9c890f
LV
110static gboolean net_stream_send(QIOChannel *ioc,
111 GIOCondition condition,
112 gpointer data);
113
5166fe0a
LV
114static void net_stream_send_completed(NetClientState *nc, ssize_t len)
115{
116 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
117
1f9c890f
LV
118 if (!s->ioc_read_tag) {
119 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
120 net_stream_send, s, NULL);
5166fe0a
LV
121 }
122}
123
124static void net_stream_rs_finalize(SocketReadState *rs)
125{
126 NetStreamState *s = container_of(rs, NetStreamState, rs);
127
128 if (qemu_send_packet_async(&s->nc, rs->buf,
129 rs->packet_len,
130 net_stream_send_completed) == 0) {
1f9c890f
LV
131 if (s->ioc_read_tag) {
132 g_source_remove(s->ioc_read_tag);
133 s->ioc_read_tag = 0;
134 }
5166fe0a
LV
135 }
136}
137
1f9c890f
LV
138static gboolean net_stream_send(QIOChannel *ioc,
139 GIOCondition condition,
140 gpointer data)
5166fe0a 141{
1f9c890f 142 NetStreamState *s = data;
5166fe0a
LV
143 int size;
144 int ret;
1f9c890f
LV
145 char buf1[NET_BUFSIZE];
146 const char *buf;
5166fe0a 147
1f9c890f 148 size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
5166fe0a
LV
149 if (size < 0) {
150 if (errno != EWOULDBLOCK) {
151 goto eoc;
152 }
153 } else if (size == 0) {
154 /* end of connection */
155 eoc:
1f9c890f
LV
156 s->ioc_read_tag = 0;
157 if (s->ioc_write_tag) {
158 g_source_remove(s->ioc_write_tag);
159 s->ioc_write_tag = 0;
5166fe0a 160 }
1f9c890f
LV
161 if (s->listener) {
162 qio_net_listener_set_client_func(s->listener, net_stream_listen,
163 s, NULL);
164 }
165 object_unref(OBJECT(s->ioc));
166 s->ioc = NULL;
5166fe0a 167
5166fe0a
LV
168 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
169 s->nc.link_down = true;
ac149498 170 qemu_set_info_str(&s->nc, "%s", "");
5166fe0a 171
e506fee8
LV
172 qapi_event_send_netdev_stream_disconnected(s->nc.name);
173
1f9c890f 174 return G_SOURCE_REMOVE;
5166fe0a
LV
175 }
176 buf = buf1;
177
1f9c890f 178 ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
5166fe0a
LV
179
180 if (ret == -1) {
181 goto eoc;
182 }
1f9c890f
LV
183
184 return G_SOURCE_CONTINUE;
5166fe0a
LV
185}
186
187static void net_stream_cleanup(NetClientState *nc)
188{
189 NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
1f9c890f
LV
190 if (s->ioc) {
191 if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
192 if (s->ioc_read_tag) {
193 g_source_remove(s->ioc_read_tag);
194 s->ioc_read_tag = 0;
195 }
196 if (s->ioc_write_tag) {
197 g_source_remove(s->ioc_write_tag);
198 s->ioc_write_tag = 0;
199 }
200 }
201 object_unref(OBJECT(s->ioc));
202 s->ioc = NULL;
5166fe0a 203 }
1f9c890f
LV
204 if (s->listen_ioc) {
205 if (s->listener) {
206 qio_net_listener_disconnect(s->listener);
207 object_unref(OBJECT(s->listener));
208 s->listener = NULL;
209 }
210 object_unref(OBJECT(s->listen_ioc));
211 s->listen_ioc = NULL;
5166fe0a
LV
212 }
213}
214
5166fe0a
LV
215static NetClientInfo net_stream_info = {
216 .type = NET_CLIENT_DRIVER_STREAM,
217 .size = sizeof(NetStreamState),
218 .receive = net_stream_receive,
219 .cleanup = net_stream_cleanup,
220};
221
1f9c890f
LV
222static void net_stream_listen(QIONetListener *listener,
223 QIOChannelSocket *cioc,
224 void *opaque)
5166fe0a 225{
1f9c890f
LV
226 NetStreamState *s = opaque;
227 SocketAddress *addr;
228 char *uri;
5166fe0a 229
1f9c890f 230 object_ref(OBJECT(cioc));
5166fe0a 231
1f9c890f 232 qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
5166fe0a 233
1f9c890f
LV
234 s->ioc = QIO_CHANNEL(cioc);
235 qio_channel_set_name(s->ioc, "stream-server");
236 s->nc.link_down = false;
5166fe0a 237
1f9c890f
LV
238 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
239 s, NULL);
5166fe0a 240
1f9c890f
LV
241 if (cioc->localAddr.ss_family == AF_UNIX) {
242 addr = qio_channel_socket_get_local_address(cioc, NULL);
5166fe0a 243 } else {
1f9c890f 244 addr = qio_channel_socket_get_remote_address(cioc, NULL);
5166fe0a 245 }
1f9c890f
LV
246 g_assert(addr != NULL);
247 uri = socket_uri(addr);
ac149498 248 qemu_set_info_str(&s->nc, "%s", uri);
1f9c890f 249 g_free(uri);
e506fee8 250 qapi_event_send_netdev_stream_connected(s->nc.name, addr);
1f9c890f 251 qapi_free_SocketAddress(addr);
5166fe0a
LV
252}
253
1f9c890f 254static void net_stream_server_listening(QIOTask *task, gpointer opaque)
5166fe0a
LV
255{
256 NetStreamState *s = opaque;
1f9c890f
LV
257 QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
258 SocketAddress *addr;
259 int ret;
5166fe0a 260
1f9c890f
LV
261 if (listen_sioc->fd < 0) {
262 qemu_set_info_str(&s->nc, "connection error");
263 return;
13c6be96 264 }
13c6be96 265
1f9c890f
LV
266 addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
267 g_assert(addr != NULL);
268 ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
269 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
270 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
271 addr->u.fd.str, -ret);
272 return;
13c6be96 273 }
1f9c890f
LV
274 g_assert(ret == 0);
275 qapi_free_SocketAddress(addr);
276
277 s->nc.link_down = true;
278 s->listener = qio_net_listener_new();
279
280 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
281 qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
282 qio_net_listener_add(s->listener, listen_sioc);
5166fe0a
LV
283}
284
285static int net_stream_server_init(NetClientState *peer,
286 const char *model,
287 const char *name,
288 SocketAddress *addr,
289 Error **errp)
290{
291 NetClientState *nc;
292 NetStreamState *s;
1f9c890f 293 QIOChannelSocket *listen_sioc = qio_channel_socket_new();
5166fe0a 294
1f9c890f
LV
295 nc = qemu_new_net_client(&net_stream_info, peer, model, name);
296 s = DO_UPCAST(NetStreamState, nc, nc);
5166fe0a 297
1f9c890f
LV
298 s->listen_ioc = QIO_CHANNEL(listen_sioc);
299 qio_channel_socket_listen_async(listen_sioc, addr, 0,
300 net_stream_server_listening, s,
301 NULL, NULL);
5166fe0a 302
1f9c890f
LV
303 return 0;
304}
13c6be96 305
1f9c890f
LV
306static void net_stream_client_connected(QIOTask *task, gpointer opaque)
307{
308 NetStreamState *s = opaque;
309 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
310 SocketAddress *addr;
311 gchar *uri;
312 int ret;
13c6be96 313
1f9c890f
LV
314 if (sioc->fd < 0) {
315 qemu_set_info_str(&s->nc, "connection error");
316 goto error;
5166fe0a
LV
317 }
318
1f9c890f
LV
319 addr = qio_channel_socket_get_remote_address(sioc, NULL);
320 g_assert(addr != NULL);
321 uri = socket_uri(addr);
ac149498 322 qemu_set_info_str(&s->nc, "%s", uri);
1f9c890f
LV
323 g_free(uri);
324
325 ret = qemu_socket_try_set_nonblock(sioc->fd);
326 if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
327 qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
328 addr->u.fd.str, -ret);
329 qapi_free_SocketAddress(addr);
330 goto error;
5166fe0a 331 }
1f9c890f 332 g_assert(ret == 0);
5166fe0a 333
5166fe0a
LV
334 net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
335
1f9c890f
LV
336 /* Disable Nagle algorithm on TCP sockets to reduce latency */
337 qio_channel_set_delay(s->ioc, false);
338
339 s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
340 s, NULL);
341 s->nc.link_down = false;
e506fee8 342 qapi_event_send_netdev_stream_connected(s->nc.name, addr);
1f9c890f
LV
343 qapi_free_SocketAddress(addr);
344
345 return;
346error:
347 object_unref(OBJECT(s->ioc));
348 s->ioc = NULL;
5166fe0a
LV
349}
350
351static int net_stream_client_init(NetClientState *peer,
352 const char *model,
353 const char *name,
354 SocketAddress *addr,
355 Error **errp)
356{
357 NetStreamState *s;
1f9c890f
LV
358 NetClientState *nc;
359 QIOChannelSocket *sioc = qio_channel_socket_new();
5166fe0a 360
1f9c890f
LV
361 nc = qemu_new_net_client(&net_stream_info, peer, model, name);
362 s = DO_UPCAST(NetStreamState, nc, nc);
13c6be96 363
1f9c890f
LV
364 s->ioc = QIO_CHANNEL(sioc);
365 s->nc.link_down = true;
366
367 qio_channel_socket_connect_async(sioc, addr,
368 net_stream_client_connected, s,
369 NULL, NULL);
5166fe0a 370
5166fe0a
LV
371 return 0;
372}
373
374int net_init_stream(const Netdev *netdev, const char *name,
375 NetClientState *peer, Error **errp)
376{
377 const NetdevStreamOptions *sock;
378
379 assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
380 sock = &netdev->u.stream;
381
382 if (!sock->has_server || !sock->server) {
383 return net_stream_client_init(peer, "stream", name, sock->addr, errp);
384 }
385 return net_stream_server_init(peer, "stream", name, sock->addr, errp);
386}