2 * Copyright (c) 2008, 2009 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "vconn-stream.h"
24 #include <sys/types.h>
26 #include "leak-checker.h"
28 #include "openflow/openflow.h"
29 #include "poll-loop.h"
30 #include "socket-util.h"
32 #include "vconn-provider.h"
36 #define THIS_MODULE VLM_vconn_stream
38 /* Active stream socket vconn. */
44 void (*connect_success_cb
)(struct vconn
*, int);
47 struct poll_waiter
*tx_waiter
;
50 static struct vconn_class stream_vconn_class
;
52 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 25);
54 static void stream_clear_txbuf(struct stream_vconn
*);
57 new_stream_vconn(const char *name
, int fd
, int connect_status
,
58 uint32_t remote_ip
, uint16_t remote_port
,
60 connect_success_cb_func
*connect_success_cb
,
61 struct vconn
**vconnp
)
63 struct stream_vconn
*s
;
65 s
= xmalloc(sizeof *s
);
66 vconn_init(&s
->vconn
, &stream_vconn_class
, connect_status
, remote_ip
,
67 remote_port
, name
, reconnectable
);
72 s
->connect_success_cb
= connect_success_cb
;
77 static struct stream_vconn
*
78 stream_vconn_cast(struct vconn
*vconn
)
80 vconn_assert_class(vconn
, &stream_vconn_class
);
81 return CONTAINER_OF(vconn
, struct stream_vconn
, vconn
);
85 stream_close(struct vconn
*vconn
)
87 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
88 poll_cancel(s
->tx_waiter
);
89 stream_clear_txbuf(s
);
90 ofpbuf_delete(s
->rxbuf
);
96 stream_connect(struct vconn
*vconn
)
98 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
99 int retval
= check_connection_completion(s
->fd
);
103 if (s
->connect_success_cb
) {
104 s
->connect_success_cb(vconn
, s
->fd
);
110 stream_recv(struct vconn
*vconn
, struct ofpbuf
**bufferp
)
112 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
117 if (s
->rxbuf
== NULL
) {
118 s
->rxbuf
= ofpbuf_new(1564);
123 if (sizeof(struct ofp_header
) > rx
->size
) {
124 want_bytes
= sizeof(struct ofp_header
) - rx
->size
;
126 struct ofp_header
*oh
= rx
->data
;
127 size_t length
= ntohs(oh
->length
);
128 if (length
< sizeof(struct ofp_header
)) {
129 VLOG_ERR_RL(&rl
, "received too-short ofp_header (%zu bytes)",
133 want_bytes
= length
- rx
->size
;
140 ofpbuf_prealloc_tailroom(rx
, want_bytes
);
142 retval
= read(s
->fd
, ofpbuf_tail(rx
), want_bytes
);
145 if (retval
== want_bytes
) {
146 if (rx
->size
> sizeof(struct ofp_header
)) {
155 } else if (retval
== 0) {
157 VLOG_ERR_RL(&rl
, "connection dropped mid-packet");
168 stream_clear_txbuf(struct stream_vconn
*s
)
170 ofpbuf_delete(s
->txbuf
);
176 stream_do_tx(int fd UNUSED
, short int revents UNUSED
, void *vconn_
)
178 struct vconn
*vconn
= vconn_
;
179 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
180 ssize_t n
= write(s
->fd
, s
->txbuf
->data
, s
->txbuf
->size
);
182 if (errno
!= EAGAIN
) {
183 VLOG_ERR_RL(&rl
, "send: %s", strerror(errno
));
184 stream_clear_txbuf(s
);
188 ofpbuf_pull(s
->txbuf
, n
);
189 if (!s
->txbuf
->size
) {
190 stream_clear_txbuf(s
);
194 s
->tx_waiter
= poll_fd_callback(s
->fd
, POLLOUT
, stream_do_tx
, vconn
);
198 stream_send(struct vconn
*vconn
, struct ofpbuf
*buffer
)
200 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
207 retval
= write(s
->fd
, buffer
->data
, buffer
->size
);
208 if (retval
== buffer
->size
) {
209 ofpbuf_delete(buffer
);
211 } else if (retval
>= 0 || errno
== EAGAIN
) {
212 leak_checker_claim(buffer
);
215 ofpbuf_pull(buffer
, retval
);
217 s
->tx_waiter
= poll_fd_callback(s
->fd
, POLLOUT
, stream_do_tx
, vconn
);
225 stream_wait(struct vconn
*vconn
, enum vconn_wait_type wait
)
227 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
230 poll_fd_wait(s
->fd
, POLLOUT
);
235 poll_fd_wait(s
->fd
, POLLOUT
);
237 /* Nothing to do: need to drain txbuf first. */
242 poll_fd_wait(s
->fd
, POLLIN
);
250 static struct vconn_class stream_vconn_class
= {
253 stream_close
, /* close */
254 stream_connect
, /* connect */
255 stream_recv
, /* recv */
256 stream_send
, /* send */
257 stream_wait
, /* wait */
260 /* Passive stream socket vconn. */
262 struct pstream_pvconn
264 struct pvconn pvconn
;
266 int (*accept_cb
)(int fd
, const struct sockaddr
*, size_t sa_len
,
270 static struct pvconn_class pstream_pvconn_class
;
272 static struct pstream_pvconn
*
273 pstream_pvconn_cast(struct pvconn
*pvconn
)
275 pvconn_assert_class(pvconn
, &pstream_pvconn_class
);
276 return CONTAINER_OF(pvconn
, struct pstream_pvconn
, pvconn
);
280 new_pstream_pvconn(const char *name
, int fd
,
281 int (*accept_cb
)(int fd
, const struct sockaddr
*,
282 size_t sa_len
, struct vconn
**),
283 struct pvconn
**pvconnp
)
285 struct pstream_pvconn
*ps
;
288 retval
= set_nonblocking(fd
);
294 if (listen(fd
, 10) < 0) {
296 VLOG_ERR("%s: listen: %s", name
, strerror(error
));
301 ps
= xmalloc(sizeof *ps
);
302 pvconn_init(&ps
->pvconn
, &pstream_pvconn_class
, name
);
304 ps
->accept_cb
= accept_cb
;
305 *pvconnp
= &ps
->pvconn
;
310 pstream_close(struct pvconn
*pvconn
)
312 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
318 pstream_accept(struct pvconn
*pvconn
, struct vconn
**new_vconnp
)
320 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
321 struct sockaddr_storage ss
;
322 socklen_t ss_len
= sizeof ss
;
326 new_fd
= accept(ps
->fd
, (struct sockaddr
*) &ss
, &ss_len
);
329 if (retval
!= EAGAIN
) {
330 VLOG_DBG_RL(&rl
, "accept: %s", strerror(retval
));
335 retval
= set_nonblocking(new_fd
);
341 return ps
->accept_cb(new_fd
, (const struct sockaddr
*) &ss
, ss_len
,
346 pstream_wait(struct pvconn
*pvconn
)
348 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
349 poll_fd_wait(ps
->fd
, POLLIN
);
352 static struct pvconn_class pstream_pvconn_class
= {