2 * Copyright (c) 2008, 2009 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "vconn-stream.h"
24 #include <sys/types.h>
26 #include "fatal-signal.h"
27 #include "leak-checker.h"
29 #include "openflow/openflow.h"
30 #include "poll-loop.h"
31 #include "socket-util.h"
33 #include "vconn-provider.h"
37 #define THIS_MODULE VLM_vconn_stream
39 /* Active stream socket vconn. */
50 static struct vconn_class stream_vconn_class
;
52 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 25);
54 static void stream_clear_txbuf(struct stream_vconn
*);
55 static void maybe_unlink_and_free(char *path
);
57 /* Creates a new vconn named 'name' that will send and receive data on 'fd' and
58 * stores a pointer to the vconn in '*vconnp'. Initial connection status
59 * 'connect_status' is interpreted as described for vconn_init().
61 * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
62 * fatal_signal_unlink_file_now() and then freed with free().
64 * Returns 0 if successful, otherwise a positive errno value. (The current
65 * implementation never fails.) */
67 new_stream_vconn(const char *name
, int fd
, int connect_status
,
68 char *unlink_path
, struct vconn
**vconnp
)
70 struct stream_vconn
*s
;
72 s
= xmalloc(sizeof *s
);
73 vconn_init(&s
->vconn
, &stream_vconn_class
, connect_status
, name
);
77 s
->unlink_path
= unlink_path
;
82 static struct stream_vconn
*
83 stream_vconn_cast(struct vconn
*vconn
)
85 vconn_assert_class(vconn
, &stream_vconn_class
);
86 return CONTAINER_OF(vconn
, struct stream_vconn
, vconn
);
90 stream_close(struct vconn
*vconn
)
92 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
93 stream_clear_txbuf(s
);
94 ofpbuf_delete(s
->rxbuf
);
96 maybe_unlink_and_free(s
->unlink_path
);
101 stream_connect(struct vconn
*vconn
)
103 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
104 return check_connection_completion(s
->fd
);
108 stream_recv(struct vconn
*vconn
, struct ofpbuf
**bufferp
)
110 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
115 if (s
->rxbuf
== NULL
) {
116 s
->rxbuf
= ofpbuf_new(1564);
121 if (sizeof(struct ofp_header
) > rx
->size
) {
122 want_bytes
= sizeof(struct ofp_header
) - rx
->size
;
124 struct ofp_header
*oh
= rx
->data
;
125 size_t length
= ntohs(oh
->length
);
126 if (length
< sizeof(struct ofp_header
)) {
127 VLOG_ERR_RL(&rl
, "received too-short ofp_header (%zu bytes)",
131 want_bytes
= length
- rx
->size
;
138 ofpbuf_prealloc_tailroom(rx
, want_bytes
);
140 retval
= read(s
->fd
, ofpbuf_tail(rx
), want_bytes
);
143 if (retval
== want_bytes
) {
144 if (rx
->size
> sizeof(struct ofp_header
)) {
153 } else if (retval
== 0) {
155 VLOG_ERR_RL(&rl
, "connection dropped mid-packet");
166 stream_clear_txbuf(struct stream_vconn
*s
)
168 ofpbuf_delete(s
->txbuf
);
173 stream_send(struct vconn
*vconn
, struct ofpbuf
*buffer
)
175 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
182 retval
= write(s
->fd
, buffer
->data
, buffer
->size
);
183 if (retval
== buffer
->size
) {
184 ofpbuf_delete(buffer
);
186 } else if (retval
>= 0 || errno
== EAGAIN
) {
187 leak_checker_claim(buffer
);
190 ofpbuf_pull(buffer
, retval
);
199 stream_run(struct vconn
*vconn
)
201 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
208 n
= write(s
->fd
, s
->txbuf
->data
, s
->txbuf
->size
);
210 if (errno
!= EAGAIN
) {
211 VLOG_ERR_RL(&rl
, "send: %s", strerror(errno
));
212 stream_clear_txbuf(s
);
216 ofpbuf_pull(s
->txbuf
, n
);
217 if (!s
->txbuf
->size
) {
218 stream_clear_txbuf(s
);
225 stream_run_wait(struct vconn
*vconn
)
227 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
230 poll_fd_wait(s
->fd
, POLLOUT
);
235 stream_wait(struct vconn
*vconn
, enum vconn_wait_type wait
)
237 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
240 poll_fd_wait(s
->fd
, POLLOUT
);
245 poll_fd_wait(s
->fd
, POLLOUT
);
247 /* Nothing to do: need to drain txbuf first. stream_run_wait()
248 * will arrange to wake up when there room to send data, so there's
249 * no point in calling poll_fd_wait() redundantly here. */
254 poll_fd_wait(s
->fd
, POLLIN
);
262 static struct vconn_class stream_vconn_class
= {
265 stream_close
, /* close */
266 stream_connect
, /* connect */
267 stream_recv
, /* recv */
268 stream_send
, /* send */
269 stream_run
, /* run */
270 stream_run_wait
, /* run_wait */
271 stream_wait
, /* wait */
274 /* Passive stream socket vconn. */
276 struct pstream_pvconn
278 struct pvconn pvconn
;
280 int (*accept_cb
)(int fd
, const struct sockaddr
*, size_t sa_len
,
285 static struct pvconn_class pstream_pvconn_class
;
287 static struct pstream_pvconn
*
288 pstream_pvconn_cast(struct pvconn
*pvconn
)
290 pvconn_assert_class(pvconn
, &pstream_pvconn_class
);
291 return CONTAINER_OF(pvconn
, struct pstream_pvconn
, pvconn
);
294 /* Creates a new pvconn named 'name' that will accept new socket connections on
295 * 'fd' and stores a pointer to the vconn in '*pvconnp'.
297 * When a connection has been accepted, 'accept_cb' will be called with the new
298 * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
299 * accept_cb must return 0 if the connection is successful, in which case it
300 * must initialize '*vconnp' to the new vconn, or a positive errno value on
301 * error. In either case accept_cb takes ownership of the 'fd' passed in.
303 * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
304 * fatal_signal_unlink_file_now() and freed with free().
306 * Returns 0 if successful, otherwise a positive errno value. (The current
307 * implementation never fails.) */
309 new_pstream_pvconn(const char *name
, int fd
,
310 int (*accept_cb
)(int fd
, const struct sockaddr
*sa
,
311 size_t sa_len
, struct vconn
**vconnp
),
312 char *unlink_path
, struct pvconn
**pvconnp
)
314 struct pstream_pvconn
*ps
= xmalloc(sizeof *ps
);
315 pvconn_init(&ps
->pvconn
, &pstream_pvconn_class
, name
);
317 ps
->accept_cb
= accept_cb
;
318 ps
->unlink_path
= unlink_path
;
319 *pvconnp
= &ps
->pvconn
;
324 pstream_close(struct pvconn
*pvconn
)
326 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
328 maybe_unlink_and_free(ps
->unlink_path
);
333 pstream_accept(struct pvconn
*pvconn
, struct vconn
**new_vconnp
)
335 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
336 struct sockaddr_storage ss
;
337 socklen_t ss_len
= sizeof ss
;
341 new_fd
= accept(ps
->fd
, (struct sockaddr
*) &ss
, &ss_len
);
344 if (retval
!= EAGAIN
) {
345 VLOG_DBG_RL(&rl
, "accept: %s", strerror(retval
));
350 retval
= set_nonblocking(new_fd
);
356 return ps
->accept_cb(new_fd
, (const struct sockaddr
*) &ss
, ss_len
,
361 pstream_wait(struct pvconn
*pvconn
)
363 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
364 poll_fd_wait(ps
->fd
, POLLIN
);
367 static struct pvconn_class pstream_pvconn_class
= {
375 /* Helper functions. */
377 maybe_unlink_and_free(char *path
)
380 fatal_signal_unlink_file_now(path
);