}
/* Blocks until a previously started stream connection attempt succeeds or
- * fails. 'error' should be the value returned by stream_open() and 'streamp'
- * should point to the stream pointer set by stream_open(). Returns 0 if
- * successful, otherwise a positive errno value other than EAGAIN or
- * EINPROGRESS. If successful, leaves '*streamp' untouched; on error, closes
- * '*streamp' and sets '*streamp' to null.
+ * fails, but no more than 'timeout' milliseconds. 'error' should be the
+ * value returned by stream_open() and 'streamp' should point to the stream
+ * pointer set by stream_open(). Returns 0 if successful, otherwise a
+ * positive errno value other than EAGAIN or EINPROGRESS. If successful,
+ * leaves '*streamp' untouched; on error, closes '*streamp' and sets
+ * '*streamp' to null. Negative value of 'timeout' means infinite waiting.
*
* Typical usage:
- * error = stream_open_block(stream_open("tcp:1.2.3.4:5", &stream), &stream);
+ * error = stream_open_block(stream_open("tcp:1.2.3.4:5", &stream), -1,
+ * &stream);
*/
int
-stream_open_block(int error, struct stream **streamp)
+stream_open_block(int error, long long int timeout, struct stream **streamp)
{
struct stream *stream = *streamp;
fatal_signal_run();
if (!error) {
+ long long int deadline = (timeout >= 0
+ ? time_msec() + timeout
+ : LLONG_MAX);
while ((error = stream_connect(stream)) == EAGAIN) {
+ if (deadline != LLONG_MAX && time_msec() > deadline) {
+ error = ETIMEDOUT;
+ break;
+ }
stream_run(stream);
stream_run_wait(stream);
stream_connect_wait(stream);
+ if (deadline != LLONG_MAX) {
+ poll_timer_wait_until(deadline);
+ }
poll_block();
}
ovs_assert(error != EINPROGRESS);
/* Bidirectional byte streams. */
int stream_verify_name(const char *name);
int stream_open(const char *name, struct stream **, uint8_t dscp);
-int stream_open_block(int error, struct stream **);
+int stream_open_block(int error, long long int timeout, struct stream **);
void stream_close(struct stream *);
const char *stream_get_name(const struct stream *);
int stream_connect(struct stream *);
*client = NULL;
error = stream_open_block(stream_open(unix_path, &stream, DSCP_DEFAULT),
- &stream);
+ -1, &stream);
free(unix_path);
free(abs_path);
int error;
error = stream_open_block(jsonrpc_stream_open(server, &stream,
- DSCP_DEFAULT), &stream);
+ DSCP_DEFAULT), -1, &stream);
if (error == EAFNOSUPPORT) {
struct pstream *pstream;
raise NotImplementedError("This method must be overrided by subclass")
@staticmethod
- def open_block(error_stream):
+ def open_block(error_stream, timeout=None):
"""Blocks until a Stream completes its connection attempt, either
- succeeding or failing. (error, stream) should be the tuple returned by
- Stream.open(). Returns a tuple of the same form.
+ succeeding or failing, but no more than 'timeout' milliseconds.
+ (error, stream) should be the tuple returned by Stream.open().
+ Negative value of 'timeout' means infinite waiting.
+ Returns a tuple of the same form.
Typical usage:
error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
# Py3 doesn't support tuple parameter unpacking - PEP 3113
error, stream = error_stream
if not error:
+ deadline = None
+ if timeout is not None and timeout >= 0:
+ deadline = ovs.timeval.msec() + timeout
while True:
error = stream.connect()
if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
error = errno.EAGAIN
if error != errno.EAGAIN:
break
+ if deadline is not None and ovs.timeval.msec() > deadline:
+ error = errno.ETIMEDOUT
+ break
stream.run()
poller = ovs.poller.Poller()
stream.run_wait(poller)
stream.connect_wait(poller)
+ if deadline is not None:
+ poller.timer_wait_until(deadline)
poller.block()
if stream.socket is not None:
assert error != errno.EINPROGRESS
}
error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
- DSCP_DEFAULT), &stream);
+ DSCP_DEFAULT), -1, &stream);
if (error) {
ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
}
}
error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
- DSCP_DEFAULT), &stream);
+ DSCP_DEFAULT), -1, &stream);
if (error) {
ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
}
struct stream *stream;
error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
- DSCP_DEFAULT), &stream);
+ DSCP_DEFAULT), -1, &stream);
if (error) {
ovs_fatal(error, "failed to connect to \"%s\"", ctx->argv[1]);
}
stream = None
for r in remotes:
error, stream = ovs.stream.Stream.open_block(
- ovs.stream.Stream.open(r))
+ ovs.stream.Stream.open(r), 2000)
if not error and stream:
break
stream = None
}
error = stream_open_block(stream_open(argv[1], &stream, DSCP_DEFAULT),
- &stream);
+ 10000, &stream);
if (error) {
VLOG_ERR("stream_open_block(%s) failure: %s",
argv[1], ovs_strerror(error));
def main(argv):
remote = argv[1]
err, stream = ovs.stream.Stream.open_block(
- ovs.stream.Stream.open(remote))
+ ovs.stream.Stream.open(remote), 10000)
if err or stream is None:
sys.exit(1)