2 * Copyright (c) 2008, 2009 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "vconn-stream.h"
24 #include <sys/types.h>
26 #include "fatal-signal.h"
27 #include "leak-checker.h"
29 #include "openflow/openflow.h"
30 #include "poll-loop.h"
31 #include "socket-util.h"
33 #include "vconn-provider.h"
37 #define THIS_MODULE VLM_vconn_stream
39 /* Active stream socket vconn. */
47 struct poll_waiter
*tx_waiter
;
51 static struct vconn_class stream_vconn_class
;
53 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 25);
55 static void stream_clear_txbuf(struct stream_vconn
*);
56 static void maybe_unlink_and_free(char *path
);
58 /* Creates a new vconn named 'name' that will send and receive data on 'fd' and
59 * stores a pointer to the vconn in '*vconnp'. Initial connection status
60 * 'connect_status' is interpreted as described for vconn_init().
62 * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
63 * fatal_signal_unlink_file_now() and then freed with free().
65 * Returns 0 if successful, otherwise a positive errno value. (The current
66 * implementation never fails.) */
68 new_stream_vconn(const char *name
, int fd
, int connect_status
,
69 char *unlink_path
, struct vconn
**vconnp
)
71 struct stream_vconn
*s
;
73 s
= xmalloc(sizeof *s
);
74 vconn_init(&s
->vconn
, &stream_vconn_class
, connect_status
, name
);
79 s
->unlink_path
= unlink_path
;
84 static struct stream_vconn
*
85 stream_vconn_cast(struct vconn
*vconn
)
87 vconn_assert_class(vconn
, &stream_vconn_class
);
88 return CONTAINER_OF(vconn
, struct stream_vconn
, vconn
);
92 stream_close(struct vconn
*vconn
)
94 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
95 poll_cancel(s
->tx_waiter
);
96 stream_clear_txbuf(s
);
97 ofpbuf_delete(s
->rxbuf
);
99 maybe_unlink_and_free(s
->unlink_path
);
104 stream_connect(struct vconn
*vconn
)
106 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
107 return check_connection_completion(s
->fd
);
111 stream_recv(struct vconn
*vconn
, struct ofpbuf
**bufferp
)
113 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
118 if (s
->rxbuf
== NULL
) {
119 s
->rxbuf
= ofpbuf_new(1564);
124 if (sizeof(struct ofp_header
) > rx
->size
) {
125 want_bytes
= sizeof(struct ofp_header
) - rx
->size
;
127 struct ofp_header
*oh
= rx
->data
;
128 size_t length
= ntohs(oh
->length
);
129 if (length
< sizeof(struct ofp_header
)) {
130 VLOG_ERR_RL(&rl
, "received too-short ofp_header (%zu bytes)",
134 want_bytes
= length
- rx
->size
;
141 ofpbuf_prealloc_tailroom(rx
, want_bytes
);
143 retval
= read(s
->fd
, ofpbuf_tail(rx
), want_bytes
);
146 if (retval
== want_bytes
) {
147 if (rx
->size
> sizeof(struct ofp_header
)) {
156 } else if (retval
== 0) {
158 VLOG_ERR_RL(&rl
, "connection dropped mid-packet");
169 stream_clear_txbuf(struct stream_vconn
*s
)
171 ofpbuf_delete(s
->txbuf
);
177 stream_do_tx(int fd UNUSED
, short int revents UNUSED
, void *vconn_
)
179 struct vconn
*vconn
= vconn_
;
180 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
181 ssize_t n
= write(s
->fd
, s
->txbuf
->data
, s
->txbuf
->size
);
183 if (errno
!= EAGAIN
) {
184 VLOG_ERR_RL(&rl
, "send: %s", strerror(errno
));
185 stream_clear_txbuf(s
);
189 ofpbuf_pull(s
->txbuf
, n
);
190 if (!s
->txbuf
->size
) {
191 stream_clear_txbuf(s
);
195 s
->tx_waiter
= poll_fd_callback(s
->fd
, POLLOUT
, stream_do_tx
, vconn
);
199 stream_send(struct vconn
*vconn
, struct ofpbuf
*buffer
)
201 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
208 retval
= write(s
->fd
, buffer
->data
, buffer
->size
);
209 if (retval
== buffer
->size
) {
210 ofpbuf_delete(buffer
);
212 } else if (retval
>= 0 || errno
== EAGAIN
) {
213 leak_checker_claim(buffer
);
216 ofpbuf_pull(buffer
, retval
);
218 s
->tx_waiter
= poll_fd_callback(s
->fd
, POLLOUT
, stream_do_tx
, vconn
);
226 stream_wait(struct vconn
*vconn
, enum vconn_wait_type wait
)
228 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
231 poll_fd_wait(s
->fd
, POLLOUT
);
236 poll_fd_wait(s
->fd
, POLLOUT
);
238 /* Nothing to do: need to drain txbuf first. */
243 poll_fd_wait(s
->fd
, POLLIN
);
251 static struct vconn_class stream_vconn_class
= {
254 stream_close
, /* close */
255 stream_connect
, /* connect */
256 stream_recv
, /* recv */
257 stream_send
, /* send */
258 stream_wait
, /* wait */
261 /* Passive stream socket vconn. */
263 struct pstream_pvconn
265 struct pvconn pvconn
;
267 int (*accept_cb
)(int fd
, const struct sockaddr
*, size_t sa_len
,
272 static struct pvconn_class pstream_pvconn_class
;
274 static struct pstream_pvconn
*
275 pstream_pvconn_cast(struct pvconn
*pvconn
)
277 pvconn_assert_class(pvconn
, &pstream_pvconn_class
);
278 return CONTAINER_OF(pvconn
, struct pstream_pvconn
, pvconn
);
281 /* Creates a new pvconn named 'name' that will accept new socket connections on
282 * 'fd' and stores a pointer to the vconn in '*pvconnp'.
284 * When a connection has been accepted, 'accept_cb' will be called with the new
285 * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
286 * accept_cb must return 0 if the connection is successful, in which case it
287 * must initialize '*vconnp' to the new vconn, or a positive errno value on
288 * error. In either case accept_cb takes ownership of the 'fd' passed in.
290 * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
291 * fatal_signal_unlink_file_now() and freed with free().
293 * Returns 0 if successful, otherwise a positive errno value. (The current
294 * implementation never fails.) */
296 new_pstream_pvconn(const char *name
, int fd
,
297 int (*accept_cb
)(int fd
, const struct sockaddr
*sa
,
298 size_t sa_len
, struct vconn
**vconnp
),
299 char *unlink_path
, struct pvconn
**pvconnp
)
301 struct pstream_pvconn
*ps
= xmalloc(sizeof *ps
);
302 pvconn_init(&ps
->pvconn
, &pstream_pvconn_class
, name
);
304 ps
->accept_cb
= accept_cb
;
305 ps
->unlink_path
= unlink_path
;
306 *pvconnp
= &ps
->pvconn
;
311 pstream_close(struct pvconn
*pvconn
)
313 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
315 maybe_unlink_and_free(ps
->unlink_path
);
320 pstream_accept(struct pvconn
*pvconn
, struct vconn
**new_vconnp
)
322 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
323 struct sockaddr_storage ss
;
324 socklen_t ss_len
= sizeof ss
;
328 new_fd
= accept(ps
->fd
, (struct sockaddr
*) &ss
, &ss_len
);
331 if (retval
!= EAGAIN
) {
332 VLOG_DBG_RL(&rl
, "accept: %s", strerror(retval
));
337 retval
= set_nonblocking(new_fd
);
343 return ps
->accept_cb(new_fd
, (const struct sockaddr
*) &ss
, ss_len
,
348 pstream_wait(struct pvconn
*pvconn
)
350 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
351 poll_fd_wait(ps
->fd
, POLLIN
);
354 static struct pvconn_class pstream_pvconn_class
= {
362 /* Helper functions. */
364 maybe_unlink_and_free(char *path
)
367 fatal_signal_unlink_file_now(path
);