]> git.proxmox.com Git - mirror_ovs.git/commitdiff
stream: Allow timeout configuration for open_block.
authorIlya Maximets <i.maximets@samsung.com>
Wed, 9 Jan 2019 17:30:16 +0000 (20:30 +0300)
committerBen Pfaff <blp@ovn.org>
Thu, 10 Jan 2019 23:39:48 +0000 (15:39 -0800)
On some systems in case where remote is not responding, socket could
remain in SYN_SENT state for a really long time without errors waiting
for connection. This leads to situations where open_blok() hangs for
a few minutes waiting for connection to the DOWN remote.

For example, our "multiple remotes" idl tests hangs waiting for
connection to the WRONG_PORT on FreeBSD in CirrusCI environment.
This leads to test failures because Alarm signal arrives much faster
than ETIMEDOUT from the socket.

This patch allowes to specify timeout value for 'open_block' function.
If the connection takes more time, socket will be closed with
ETIMEDOUT error code. Negative value or None in python could be
used to wait infinitely.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
lib/stream.c
lib/stream.h
lib/unixctl.c
ovsdb/ovsdb-client.c
python/ovs/stream.py
tests/test-jsonrpc.c
tests/test-ovsdb.c
tests/test-ovsdb.py
tests/test-stream.c
tests/test-stream.py

index c4dabda396284bc82267d75cab51599c486275dc..e246b3773566b4bc3ffc77f25880286cb9398dab 100644 (file)
@@ -241,27 +241,39 @@ error:
 }
 
 /* Blocks until a previously started stream connection attempt succeeds or
- * fails.  'error' should be the value returned by stream_open() and 'streamp'
- * should point to the stream pointer set by stream_open().  Returns 0 if
- * successful, otherwise a positive errno value other than EAGAIN or
- * EINPROGRESS.  If successful, leaves '*streamp' untouched; on error, closes
- * '*streamp' and sets '*streamp' to null.
+ * fails, but no more than 'timeout' milliseconds.  'error' should be the
+ * value returned by stream_open() and 'streamp' should point to the stream
+ * pointer set by stream_open().  Returns 0 if successful, otherwise a
+ * positive errno value other than EAGAIN or EINPROGRESS.  If successful,
+ * leaves '*streamp' untouched; on error, closes '*streamp' and sets
+ * '*streamp' to null. Negative value of 'timeout' means infinite waiting.
  *
  * Typical usage:
- *   error = stream_open_block(stream_open("tcp:1.2.3.4:5", &stream), &stream);
+ *   error = stream_open_block(stream_open("tcp:1.2.3.4:5", &stream), -1,
+ *                             &stream);
  */
 int
-stream_open_block(int error, struct stream **streamp)
+stream_open_block(int error, long long int timeout, struct stream **streamp)
 {
     struct stream *stream = *streamp;
 
     fatal_signal_run();
 
     if (!error) {
+        long long int deadline = (timeout >= 0
+                                  ? time_msec() + timeout
+                                  : LLONG_MAX);
         while ((error = stream_connect(stream)) == EAGAIN) {
+            if (deadline != LLONG_MAX && time_msec() > deadline) {
+                error = ETIMEDOUT;
+                break;
+            }
             stream_run(stream);
             stream_run_wait(stream);
             stream_connect_wait(stream);
+            if (deadline != LLONG_MAX) {
+                poll_timer_wait_until(deadline);
+            }
             poll_block();
         }
         ovs_assert(error != EINPROGRESS);
index 88f576155108d1287b887a9eed4bfaa7603cabab..77bffa498c13b404fbcc8a959f367576c55afeca 100644 (file)
@@ -34,7 +34,7 @@ void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
 /* Bidirectional byte streams. */
 int stream_verify_name(const char *name);
 int stream_open(const char *name, struct stream **, uint8_t dscp);
-int stream_open_block(int error, struct stream **);
+int stream_open_block(int error, long long int timeout, struct stream **);
 void stream_close(struct stream *);
 const char *stream_get_name(const struct stream *);
 int stream_connect(struct stream *);
index 0bcfada9187d0435a01525ccac44e3a4e7ff07de..c216de3d05a7e9751ceefdb820f5ae8bb390e388 100644 (file)
@@ -460,7 +460,7 @@ unixctl_client_create(const char *path, struct jsonrpc **client)
     *client = NULL;
 
     error = stream_open_block(stream_open(unix_path, &stream, DSCP_DEFAULT),
-                              &stream);
+                              -1, &stream);
     free(unix_path);
     free(abs_path);
 
index 7c8a59d0e749671f915a1f3aba7a610cca63b1f3..83c3c12cc360369e63ea425ae20b6b48c9de1b12 100644 (file)
@@ -498,7 +498,7 @@ open_jsonrpc(const char *server)
     int error;
 
     error = stream_open_block(jsonrpc_stream_open(server, &stream,
-                              DSCP_DEFAULT), &stream);
+                              DSCP_DEFAULT), -1, &stream);
     if (error == EAFNOSUPPORT) {
         struct pstream *pstream;
 
index cdfcc399e51240308703df665f964a899cee932c..da683afd8133f0ece2e861b275582d1637770e93 100644 (file)
@@ -206,10 +206,12 @@ class Stream(object):
         raise NotImplementedError("This method must be overrided by subclass")
 
     @staticmethod
-    def open_block(error_stream):
+    def open_block(error_stream, timeout=None):
         """Blocks until a Stream completes its connection attempt, either
-        succeeding or failing.  (error, stream) should be the tuple returned by
-        Stream.open().  Returns a tuple of the same form.
+        succeeding or failing, but no more than 'timeout' milliseconds.
+        (error, stream) should be the tuple returned by Stream.open().
+        Negative value of 'timeout' means infinite waiting.
+        Returns a tuple of the same form.
 
         Typical usage:
         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
@@ -217,6 +219,9 @@ class Stream(object):
         # Py3 doesn't support tuple parameter unpacking - PEP 3113
         error, stream = error_stream
         if not error:
+            deadline = None
+            if timeout is not None and timeout >= 0:
+                deadline = ovs.timeval.msec() + timeout
             while True:
                 error = stream.connect()
                 if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
@@ -225,10 +230,15 @@ class Stream(object):
                     error = errno.EAGAIN
                 if error != errno.EAGAIN:
                     break
+                if deadline is not None and ovs.timeval.msec() > deadline:
+                    error = errno.ETIMEDOUT
+                    break
                 stream.run()
                 poller = ovs.poller.Poller()
                 stream.run_wait(poller)
                 stream.connect_wait(poller)
+                if deadline is not None:
+                    poller.timer_wait_until(deadline)
                 poller.block()
             if stream.socket is not None:
                 assert error != errno.EINPROGRESS
index 49d2b91bd0fbf2f84f59ab9c3b3f72496fed806a..04e941b1414ea7863a2c9341510a0041976ca190 100644 (file)
@@ -272,7 +272,7 @@ do_request(struct ovs_cmdl_context *ctx)
     }
 
     error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
-                              DSCP_DEFAULT), &stream);
+                              DSCP_DEFAULT), -1, &stream);
     if (error) {
         ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
     }
@@ -312,7 +312,7 @@ do_notify(struct ovs_cmdl_context *ctx)
     }
 
     error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
-                              DSCP_DEFAULT), &stream);
+                              DSCP_DEFAULT), -1, &stream);
     if (error) {
         ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
     }
index 453d88eabc8c3282f5a89a575a5c1bd5fdc2347b..187eb28671e607b1931983699bf2e9ce287e3635 100644 (file)
@@ -2416,7 +2416,7 @@ do_idl(struct ovs_cmdl_context *ctx)
         struct stream *stream;
 
         error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
-                                  DSCP_DEFAULT), &stream);
+                                  DSCP_DEFAULT), -1, &stream);
         if (error) {
             ovs_fatal(error, "failed to connect to \"%s\"", ctx->argv[1]);
         }
index c03476c7ffaf43bebe1cc50438449d55a7234184..2d1112dddd2b80d541bc1918f4841bf72e03e49a 100644 (file)
@@ -604,7 +604,7 @@ def do_idl(schema_file, remote, *commands):
         stream = None
         for r in remotes:
             error, stream = ovs.stream.Stream.open_block(
-                ovs.stream.Stream.open(r))
+                ovs.stream.Stream.open(r), 2000)
             if not error and stream:
                 break
             stream = None
index 4816de02d1bbbb73c3e2e27cb856162e683c8827..4af44200e801374f8872f0ac93d11925cc10f861 100644 (file)
@@ -37,7 +37,7 @@ main(int argc, char *argv[])
     }
 
     error = stream_open_block(stream_open(argv[1], &stream, DSCP_DEFAULT),
-                              &stream);
+                              10000, &stream);
     if (error) {
         VLOG_ERR("stream_open_block(%s) failure: %s",
                  argv[1], ovs_strerror(error));
index 4a5117501ca09799e68180d1047e537abf719693..93d63c019b3e849c95458c7a3fdb428b4204aa34 100644 (file)
@@ -20,7 +20,7 @@ import ovs.stream
 def main(argv):
     remote = argv[1]
     err, stream = ovs.stream.Stream.open_block(
-            ovs.stream.Stream.open(remote))
+            ovs.stream.Stream.open(remote), 10000)
 
     if err or stream is None:
         sys.exit(1)