]> git.proxmox.com Git - mirror_ovs.git/blob - lib/vconn-stream.c
vconn: Convert vconn code to modern OVS structure.
[mirror_ovs.git] / lib / vconn-stream.c
1 /*
2 * Copyright (c) 2008, 2009 Nicira Networks.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <config.h>
18 #include "vconn-stream.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <poll.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include "fatal-signal.h"
27 #include "leak-checker.h"
28 #include "ofpbuf.h"
29 #include "openflow/openflow.h"
30 #include "poll-loop.h"
31 #include "socket-util.h"
32 #include "util.h"
33 #include "vconn-provider.h"
34 #include "vconn.h"
35
36 #include "vlog.h"
37 #define THIS_MODULE VLM_vconn_stream
38
39 /* Active stream socket vconn. */
40
41 struct stream_vconn
42 {
43 struct vconn vconn;
44 int fd;
45 struct ofpbuf *rxbuf;
46 struct ofpbuf *txbuf;
47 char *unlink_path;
48 };
49
50 static struct vconn_class stream_vconn_class;
51
52 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
53
54 static void stream_clear_txbuf(struct stream_vconn *);
55 static void maybe_unlink_and_free(char *path);
56
57 /* Creates a new vconn named 'name' that will send and receive data on 'fd' and
58 * stores a pointer to the vconn in '*vconnp'. Initial connection status
59 * 'connect_status' is interpreted as described for vconn_init().
60 *
61 * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
62 * fatal_signal_unlink_file_now() and then freed with free().
63 *
64 * Returns 0 if successful, otherwise a positive errno value. (The current
65 * implementation never fails.) */
66 int
67 new_stream_vconn(const char *name, int fd, int connect_status,
68 char *unlink_path, struct vconn **vconnp)
69 {
70 struct stream_vconn *s;
71
72 s = xmalloc(sizeof *s);
73 vconn_init(&s->vconn, &stream_vconn_class, connect_status, name);
74 s->fd = fd;
75 s->txbuf = NULL;
76 s->rxbuf = NULL;
77 s->unlink_path = unlink_path;
78 *vconnp = &s->vconn;
79 return 0;
80 }
81
82 static struct stream_vconn *
83 stream_vconn_cast(struct vconn *vconn)
84 {
85 vconn_assert_class(vconn, &stream_vconn_class);
86 return CONTAINER_OF(vconn, struct stream_vconn, vconn);
87 }
88
89 static void
90 stream_close(struct vconn *vconn)
91 {
92 struct stream_vconn *s = stream_vconn_cast(vconn);
93 stream_clear_txbuf(s);
94 ofpbuf_delete(s->rxbuf);
95 close(s->fd);
96 maybe_unlink_and_free(s->unlink_path);
97 free(s);
98 }
99
100 static int
101 stream_connect(struct vconn *vconn)
102 {
103 struct stream_vconn *s = stream_vconn_cast(vconn);
104 return check_connection_completion(s->fd);
105 }
106
107 static int
108 stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
109 {
110 struct stream_vconn *s = stream_vconn_cast(vconn);
111 struct ofpbuf *rx;
112 size_t want_bytes;
113 ssize_t retval;
114
115 if (s->rxbuf == NULL) {
116 s->rxbuf = ofpbuf_new(1564);
117 }
118 rx = s->rxbuf;
119
120 again:
121 if (sizeof(struct ofp_header) > rx->size) {
122 want_bytes = sizeof(struct ofp_header) - rx->size;
123 } else {
124 struct ofp_header *oh = rx->data;
125 size_t length = ntohs(oh->length);
126 if (length < sizeof(struct ofp_header)) {
127 VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
128 length);
129 return EPROTO;
130 }
131 want_bytes = length - rx->size;
132 if (!want_bytes) {
133 *bufferp = rx;
134 s->rxbuf = NULL;
135 return 0;
136 }
137 }
138 ofpbuf_prealloc_tailroom(rx, want_bytes);
139
140 retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
141 if (retval > 0) {
142 rx->size += retval;
143 if (retval == want_bytes) {
144 if (rx->size > sizeof(struct ofp_header)) {
145 *bufferp = rx;
146 s->rxbuf = NULL;
147 return 0;
148 } else {
149 goto again;
150 }
151 }
152 return EAGAIN;
153 } else if (retval == 0) {
154 if (rx->size) {
155 VLOG_ERR_RL(&rl, "connection dropped mid-packet");
156 return EPROTO;
157 } else {
158 return EOF;
159 }
160 } else {
161 return errno;
162 }
163 }
164
165 static void
166 stream_clear_txbuf(struct stream_vconn *s)
167 {
168 ofpbuf_delete(s->txbuf);
169 s->txbuf = NULL;
170 }
171
172 static int
173 stream_send(struct vconn *vconn, struct ofpbuf *buffer)
174 {
175 struct stream_vconn *s = stream_vconn_cast(vconn);
176 ssize_t retval;
177
178 if (s->txbuf) {
179 return EAGAIN;
180 }
181
182 retval = write(s->fd, buffer->data, buffer->size);
183 if (retval == buffer->size) {
184 ofpbuf_delete(buffer);
185 return 0;
186 } else if (retval >= 0 || errno == EAGAIN) {
187 leak_checker_claim(buffer);
188 s->txbuf = buffer;
189 if (retval > 0) {
190 ofpbuf_pull(buffer, retval);
191 }
192 return 0;
193 } else {
194 return errno;
195 }
196 }
197
198 static void
199 stream_run(struct vconn *vconn)
200 {
201 struct stream_vconn *s = stream_vconn_cast(vconn);
202 ssize_t n;
203
204 if (!s->txbuf) {
205 return;
206 }
207
208 n = write(s->fd, s->txbuf->data, s->txbuf->size);
209 if (n < 0) {
210 if (errno != EAGAIN) {
211 VLOG_ERR_RL(&rl, "send: %s", strerror(errno));
212 stream_clear_txbuf(s);
213 return;
214 }
215 } else if (n > 0) {
216 ofpbuf_pull(s->txbuf, n);
217 if (!s->txbuf->size) {
218 stream_clear_txbuf(s);
219 return;
220 }
221 }
222 }
223
224 static void
225 stream_run_wait(struct vconn *vconn)
226 {
227 struct stream_vconn *s = stream_vconn_cast(vconn);
228
229 if (s->txbuf) {
230 poll_fd_wait(s->fd, POLLOUT);
231 }
232 }
233
234 static void
235 stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
236 {
237 struct stream_vconn *s = stream_vconn_cast(vconn);
238 switch (wait) {
239 case WAIT_CONNECT:
240 poll_fd_wait(s->fd, POLLOUT);
241 break;
242
243 case WAIT_SEND:
244 if (!s->txbuf) {
245 poll_fd_wait(s->fd, POLLOUT);
246 } else {
247 /* Nothing to do: need to drain txbuf first. stream_run_wait()
248 * will arrange to wake up when there room to send data, so there's
249 * no point in calling poll_fd_wait() redundantly here. */
250 }
251 break;
252
253 case WAIT_RECV:
254 poll_fd_wait(s->fd, POLLIN);
255 break;
256
257 default:
258 NOT_REACHED();
259 }
260 }
261
262 static struct vconn_class stream_vconn_class = {
263 "stream", /* name */
264 NULL, /* open */
265 stream_close, /* close */
266 stream_connect, /* connect */
267 stream_recv, /* recv */
268 stream_send, /* send */
269 stream_run, /* run */
270 stream_run_wait, /* run_wait */
271 stream_wait, /* wait */
272 };
273 \f
274 /* Passive stream socket vconn. */
275
276 struct pstream_pvconn
277 {
278 struct pvconn pvconn;
279 int fd;
280 int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
281 struct vconn **);
282 char *unlink_path;
283 };
284
285 static struct pvconn_class pstream_pvconn_class;
286
287 static struct pstream_pvconn *
288 pstream_pvconn_cast(struct pvconn *pvconn)
289 {
290 pvconn_assert_class(pvconn, &pstream_pvconn_class);
291 return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
292 }
293
294 /* Creates a new pvconn named 'name' that will accept new socket connections on
295 * 'fd' and stores a pointer to the vconn in '*pvconnp'.
296 *
297 * When a connection has been accepted, 'accept_cb' will be called with the new
298 * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
299 * accept_cb must return 0 if the connection is successful, in which case it
300 * must initialize '*vconnp' to the new vconn, or a positive errno value on
301 * error. In either case accept_cb takes ownership of the 'fd' passed in.
302 *
303 * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
304 * fatal_signal_unlink_file_now() and freed with free().
305 *
306 * Returns 0 if successful, otherwise a positive errno value. (The current
307 * implementation never fails.) */
308 int
309 new_pstream_pvconn(const char *name, int fd,
310 int (*accept_cb)(int fd, const struct sockaddr *sa,
311 size_t sa_len, struct vconn **vconnp),
312 char *unlink_path, struct pvconn **pvconnp)
313 {
314 struct pstream_pvconn *ps = xmalloc(sizeof *ps);
315 pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
316 ps->fd = fd;
317 ps->accept_cb = accept_cb;
318 ps->unlink_path = unlink_path;
319 *pvconnp = &ps->pvconn;
320 return 0;
321 }
322
323 static void
324 pstream_close(struct pvconn *pvconn)
325 {
326 struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
327 close(ps->fd);
328 maybe_unlink_and_free(ps->unlink_path);
329 free(ps);
330 }
331
332 static int
333 pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
334 {
335 struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
336 struct sockaddr_storage ss;
337 socklen_t ss_len = sizeof ss;
338 int new_fd;
339 int retval;
340
341 new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
342 if (new_fd < 0) {
343 int retval = errno;
344 if (retval != EAGAIN) {
345 VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
346 }
347 return retval;
348 }
349
350 retval = set_nonblocking(new_fd);
351 if (retval) {
352 close(new_fd);
353 return retval;
354 }
355
356 return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
357 new_vconnp);
358 }
359
360 static void
361 pstream_wait(struct pvconn *pvconn)
362 {
363 struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
364 poll_fd_wait(ps->fd, POLLIN);
365 }
366
367 static struct pvconn_class pstream_pvconn_class = {
368 "pstream",
369 NULL,
370 pstream_close,
371 pstream_accept,
372 pstream_wait
373 };
374 \f
375 /* Helper functions. */
376 static void
377 maybe_unlink_and_free(char *path)
378 {
379 if (path) {
380 fatal_signal_unlink_file_now(path);
381 free(path);
382 }
383 }