2 * Copyright (c) 2008, 2009 Nicira Networks.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "vconn-stream.h"
24 #include <sys/types.h>
26 #include "leak-checker.h"
28 #include "openflow/openflow.h"
29 #include "poll-loop.h"
30 #include "socket-util.h"
32 #include "vconn-provider.h"
36 #define THIS_MODULE VLM_vconn_stream
38 /* Active stream socket vconn. */
46 struct poll_waiter
*tx_waiter
;
49 static struct vconn_class stream_vconn_class
;
51 static struct vlog_rate_limit rl
= VLOG_RATE_LIMIT_INIT(10, 25);
53 static void stream_clear_txbuf(struct stream_vconn
*);
56 new_stream_vconn(const char *name
, int fd
, int connect_status
,
57 bool reconnectable
, struct vconn
**vconnp
)
59 struct stream_vconn
*s
;
61 s
= xmalloc(sizeof *s
);
62 vconn_init(&s
->vconn
, &stream_vconn_class
, connect_status
,
72 static struct stream_vconn
*
73 stream_vconn_cast(struct vconn
*vconn
)
75 vconn_assert_class(vconn
, &stream_vconn_class
);
76 return CONTAINER_OF(vconn
, struct stream_vconn
, vconn
);
80 stream_close(struct vconn
*vconn
)
82 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
83 poll_cancel(s
->tx_waiter
);
84 stream_clear_txbuf(s
);
85 ofpbuf_delete(s
->rxbuf
);
91 stream_connect(struct vconn
*vconn
)
93 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
94 return check_connection_completion(s
->fd
);
98 stream_recv(struct vconn
*vconn
, struct ofpbuf
**bufferp
)
100 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
105 if (s
->rxbuf
== NULL
) {
106 s
->rxbuf
= ofpbuf_new(1564);
111 if (sizeof(struct ofp_header
) > rx
->size
) {
112 want_bytes
= sizeof(struct ofp_header
) - rx
->size
;
114 struct ofp_header
*oh
= rx
->data
;
115 size_t length
= ntohs(oh
->length
);
116 if (length
< sizeof(struct ofp_header
)) {
117 VLOG_ERR_RL(&rl
, "received too-short ofp_header (%zu bytes)",
121 want_bytes
= length
- rx
->size
;
128 ofpbuf_prealloc_tailroom(rx
, want_bytes
);
130 retval
= read(s
->fd
, ofpbuf_tail(rx
), want_bytes
);
133 if (retval
== want_bytes
) {
134 if (rx
->size
> sizeof(struct ofp_header
)) {
143 } else if (retval
== 0) {
145 VLOG_ERR_RL(&rl
, "connection dropped mid-packet");
156 stream_clear_txbuf(struct stream_vconn
*s
)
158 ofpbuf_delete(s
->txbuf
);
164 stream_do_tx(int fd UNUSED
, short int revents UNUSED
, void *vconn_
)
166 struct vconn
*vconn
= vconn_
;
167 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
168 ssize_t n
= write(s
->fd
, s
->txbuf
->data
, s
->txbuf
->size
);
170 if (errno
!= EAGAIN
) {
171 VLOG_ERR_RL(&rl
, "send: %s", strerror(errno
));
172 stream_clear_txbuf(s
);
176 ofpbuf_pull(s
->txbuf
, n
);
177 if (!s
->txbuf
->size
) {
178 stream_clear_txbuf(s
);
182 s
->tx_waiter
= poll_fd_callback(s
->fd
, POLLOUT
, stream_do_tx
, vconn
);
186 stream_send(struct vconn
*vconn
, struct ofpbuf
*buffer
)
188 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
195 retval
= write(s
->fd
, buffer
->data
, buffer
->size
);
196 if (retval
== buffer
->size
) {
197 ofpbuf_delete(buffer
);
199 } else if (retval
>= 0 || errno
== EAGAIN
) {
200 leak_checker_claim(buffer
);
203 ofpbuf_pull(buffer
, retval
);
205 s
->tx_waiter
= poll_fd_callback(s
->fd
, POLLOUT
, stream_do_tx
, vconn
);
213 stream_wait(struct vconn
*vconn
, enum vconn_wait_type wait
)
215 struct stream_vconn
*s
= stream_vconn_cast(vconn
);
218 poll_fd_wait(s
->fd
, POLLOUT
);
223 poll_fd_wait(s
->fd
, POLLOUT
);
225 /* Nothing to do: need to drain txbuf first. */
230 poll_fd_wait(s
->fd
, POLLIN
);
238 static struct vconn_class stream_vconn_class
= {
241 stream_close
, /* close */
242 stream_connect
, /* connect */
243 stream_recv
, /* recv */
244 stream_send
, /* send */
245 stream_wait
, /* wait */
248 /* Passive stream socket vconn. */
250 struct pstream_pvconn
252 struct pvconn pvconn
;
254 int (*accept_cb
)(int fd
, const struct sockaddr
*, size_t sa_len
,
258 static struct pvconn_class pstream_pvconn_class
;
260 static struct pstream_pvconn
*
261 pstream_pvconn_cast(struct pvconn
*pvconn
)
263 pvconn_assert_class(pvconn
, &pstream_pvconn_class
);
264 return CONTAINER_OF(pvconn
, struct pstream_pvconn
, pvconn
);
268 new_pstream_pvconn(const char *name
, int fd
,
269 int (*accept_cb
)(int fd
, const struct sockaddr
*,
270 size_t sa_len
, struct vconn
**),
271 struct pvconn
**pvconnp
)
273 struct pstream_pvconn
*ps
= xmalloc(sizeof *ps
);
274 pvconn_init(&ps
->pvconn
, &pstream_pvconn_class
, name
);
276 ps
->accept_cb
= accept_cb
;
277 *pvconnp
= &ps
->pvconn
;
282 pstream_close(struct pvconn
*pvconn
)
284 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
290 pstream_accept(struct pvconn
*pvconn
, struct vconn
**new_vconnp
)
292 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
293 struct sockaddr_storage ss
;
294 socklen_t ss_len
= sizeof ss
;
298 new_fd
= accept(ps
->fd
, (struct sockaddr
*) &ss
, &ss_len
);
301 if (retval
!= EAGAIN
) {
302 VLOG_DBG_RL(&rl
, "accept: %s", strerror(retval
));
307 retval
= set_nonblocking(new_fd
);
313 return ps
->accept_cb(new_fd
, (const struct sockaddr
*) &ss
, ss_len
,
318 pstream_wait(struct pvconn
*pvconn
)
320 struct pstream_pvconn
*ps
= pstream_pvconn_cast(pvconn
);
321 poll_fd_wait(ps
->fd
, POLLIN
);
324 static struct pvconn_class pstream_pvconn_class
= {