]> git.proxmox.com Git - ovs.git/commitdiff
python jsonrpc: Allow jsonrpc_session to have more than one remote.
authorNuman Siddique <nusiddiq@redhat.com>
Tue, 7 Aug 2018 11:38:11 +0000 (17:08 +0530)
committerBen Pfaff <blp@ovn.org>
Wed, 15 Aug 2018 17:19:01 +0000 (10:19 -0700)
Python IDL implementation doesn't have the support to connect to the
cluster dbs. This patch adds this support. We are still missing the
support in python idl class to connect to the cluster master. That
support will be added in an upcoming patch.

This patch is similar to the commit 8cf6bbb184 which added multiple remote
support in the C jsonrpc implementation.

Acked-by: Mark Michelson <mmichels@redhat.com>
Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
python/ovs/db/idl.py
python/ovs/jsonrpc.py
tests/ovsdb-idl.at
tests/test-ovsdb.py

index 64eb1a886a5c094678e3f94a5ccb2bec0b58ea09..03110a76f3184af7c4b58e627b25b55e0e240dc3 100644 (file)
@@ -101,6 +101,9 @@ class Idl(object):
         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
         replica of the remote database.
 
+        'remote' can be comma separated multiple remotes and each remote
+        should be in a form acceptable to ovs.jsonrpc.session.open().
+
         'schema_helper' should be an instance of the SchemaHelper class which
         generates schema for the remote database. The caller may have cut it
         down by removing tables or columns that are not of interest.  The IDL
@@ -127,7 +130,8 @@ class Idl(object):
         self.tables = schema.tables
         self.readonly = schema.readonly
         self._db = schema
-        self._session = ovs.jsonrpc.Session.open(remote,
+        remotes = self._parse_remotes(remote)
+        self._session = ovs.jsonrpc.Session.open_multiple(remotes,
             probe_interval=probe_interval)
         self._monitor_request_id = None
         self._last_seqno = None
@@ -155,6 +159,19 @@ class Idl(object):
             table.condition = [True]
             table.cond_changed = False
 
+    def _parse_remotes(self, remote):
+        # If remote is -
+        # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
+        # this function returns
+        # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
+        remotes = []
+        for r in remote.split(','):
+            if remotes and r.find(":") == -1:
+                remotes[-1] += "," + r
+            else:
+                remotes.append(r)
+        return remotes
+
     def index_create(self, table, name):
         """Create a named multi-column index on a table"""
         return self.tables[table].rows.index_create(name)
index 7c24e574a6a35514d077827dd9da2cfa08b74673..4873cff98dd1e413dda44f522476ddf329a8be9c 100644 (file)
@@ -14,6 +14,7 @@
 import codecs
 import errno
 import os
+import random
 import sys
 
 import ovs.json
@@ -368,12 +369,17 @@ class Connection(object):
 class Session(object):
     """A JSON-RPC session with reconnection."""
 
-    def __init__(self, reconnect, rpc):
+    def __init__(self, reconnect, rpc, remotes):
         self.reconnect = reconnect
         self.rpc = rpc
         self.stream = None
         self.pstream = None
         self.seqno = 0
+        if type(remotes) != list:
+            remotes = [remotes]
+        self.remotes = remotes
+        random.shuffle(self.remotes)
+        self.next_remote = 0
 
     @staticmethod
     def open(name, probe_interval=None):
@@ -393,28 +399,38 @@ class Session(object):
         feature. If non-zero the value will be forced to at least 1000
         milliseconds. If None it will just use the default value in OVS.
         """
+        return Session.open_multiple([name], probe_interval=probe_interval)
+
+    @staticmethod
+    def open_multiple(remotes, probe_interval=None):
         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
-        reconnect.set_name(name)
+        session = Session(reconnect, None, remotes)
+        session.pick_remote()
         reconnect.enable(ovs.timeval.msec())
-
-        if ovs.stream.PassiveStream.is_valid_name(name):
+        reconnect.set_backoff_free_tries(len(remotes))
+        if ovs.stream.PassiveStream.is_valid_name(reconnect.get_name()):
             reconnect.set_passive(True, ovs.timeval.msec())
 
-        if not ovs.stream.stream_or_pstream_needs_probes(name):
+        if not ovs.stream.stream_or_pstream_needs_probes(reconnect.get_name()):
             reconnect.set_probe_interval(0)
         elif probe_interval is not None:
             reconnect.set_probe_interval(probe_interval)
 
-        return Session(reconnect, None)
+        return session
 
     @staticmethod
     def open_unreliably(jsonrpc):
         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
+        session = Session(reconnect, None, [jsonrpc.name])
         reconnect.set_quiet(True)
-        reconnect.set_name(jsonrpc.name)
+        session.pick_remote()
         reconnect.set_max_tries(0)
         reconnect.connected(ovs.timeval.msec())
-        return Session(reconnect, jsonrpc)
+        return session
+
+    def pick_remote(self):
+        self.reconnect.set_name(self.remotes[self.next_remote])
+        self.next_remote = (self.next_remote + 1) % len(self.remotes)
 
     def close(self):
         if self.rpc is not None:
@@ -448,6 +464,8 @@ class Session(object):
                 self.reconnect.connecting(ovs.timeval.msec())
             else:
                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
+                self.stream = None
+                self.pick_remote()
         elif self.pstream is None:
             error, self.pstream = ovs.stream.PassiveStream.open(name)
             if not error:
@@ -490,6 +508,7 @@ class Session(object):
             if error != 0:
                 self.reconnect.disconnected(ovs.timeval.msec(), error)
                 self.__disconnect()
+                self.pick_remote()
         elif self.stream is not None:
             self.stream.run()
             error = self.stream.connect()
@@ -499,6 +518,7 @@ class Session(object):
                 self.stream = None
             elif error != errno.EAGAIN:
                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
+                self.pick_remote()
                 self.stream.close()
                 self.stream = None
 
@@ -583,3 +603,6 @@ class Session(object):
 
     def force_reconnect(self):
         self.reconnect.force_reconnect(ovs.timeval.msec())
+
+    def get_num_of_remotes(self):
+        return len(self.remotes)
index 8c5e9fe5075be85f4ade10df994aa84009ce6bbc..e9b4161f03965ab40162f362862fbb90cdee7c83 100644 (file)
@@ -106,6 +106,32 @@ m4_define([OVSDB_CHECK_IDL_TCP_PY],
     OVSDB_CHECK_IDL_TCP_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
                         [$HAVE_PYTHON3], [$PYTHON3])])
 
+# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp
+# with multiple remotes with only one remote reachable
+m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN],
+  [AT_SETUP([$1 - tcp])
+   AT_SKIP_IF([test $7 = no])
+   AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
+   AT_CHECK([ovsdb_start_idltest "ptcp:0:127.0.0.1"])
+   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
+   WRONG_PORT_1=$((TCP_PORT + 1))
+   WRONG_PORT_2=$((TCP_PORT + 2))
+   remote=tcp:127.0.0.1:$WRONG_PORT_1,tcp:127.0.0.1:$TCP_PORT,tcp:127.0.0.1:$WRONG_PORT_2
+   m4_if([$2], [], [],
+     [AT_CHECK([ovsdb-client transact tcp:127.0.0.1:$TCP_PORT $2], [0], [ignore], [ignore])])
+   AT_CHECK([$8 $srcdir/test-ovsdb.py  -t10 idl $srcdir/idltest.ovsschema $remote $3],
+            [0], [stdout], [ignore])
+   AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
+            [0], [$4])
+   OVSDB_SERVER_SHUTDOWN
+   AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY],
+   [OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+                        [$HAVE_PYTHON], [$PYTHON])
+    OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+                        [$HAVE_PYTHON3], [$PYTHON3])])
+
 # same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp6
 m4_define([OVSDB_CHECK_IDL_TCP6_PYN],
   [AT_SETUP([$1 - tcp6])
@@ -132,6 +158,32 @@ m4_define([OVSDB_CHECK_IDL_TCP6_PY],
     OVSDB_CHECK_IDL_TCP6_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
                         [$HAVE_PYTHON3], [$PYTHON3])])
 
+m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN],
+  [AT_SETUP([$1 - tcp6])
+   AT_SKIP_IF([test $7 = no])
+   AT_SKIP_IF([test "$IS_WIN32" = "yes"])
+   AT_SKIP_IF([test $HAVE_IPV6 = no])
+   AT_KEYWORDS([ovsdb server idl positive Python with tcp6 socket $5])
+   AT_CHECK([ovsdb_start_idltest "ptcp:0:[[::1]]"])
+   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
+   WRONG_PORT_1=$((TCP_PORT + 1))
+   WRONG_PORT_2=$((TCP_PORT + 2))
+   remote="tcp:[[::1]]:$WRONG_PORT_1,tcp:[[::1]]:$TCP_PORT,tcp:[[::1]]:$WRONG_PORT_2"
+   m4_if([$2], [], [],
+     [AT_CHECK([ovsdb-client transact "tcp:[[::1]]:$TCP_PORT" $2], [0], [ignore], [ignore])])
+   AT_CHECK([$8 $srcdir/test-ovsdb.py  -t10 idl $srcdir/idltest.ovsschema $remote $3],
+            [0], [stdout], [ignore])
+   AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
+            [0], [$4])
+   OVSDB_SERVER_SHUTDOWN
+   AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY],
+   [OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+                        [$HAVE_PYTHON], [$PYTHON])
+    OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+                        [$HAVE_PYTHON3], [$PYTHON3])])
+
 # same as OVSDB_CHECK_IDL but uses the Python IDL implementation with SSL
 m4_define([OVSDB_CHECK_IDL_SSL_PYN],
   [AT_SETUP([$1 - SSL])
@@ -178,7 +230,9 @@ m4_define([OVSDB_CHECK_IDL],
    OVSDB_CHECK_IDL_PY($@)
    OVSDB_CHECK_IDL_REGISTER_COLUMNS_PY($@)
    OVSDB_CHECK_IDL_TCP_PY($@)
+   OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY($@)
    OVSDB_CHECK_IDL_TCP6_PY($@)
+   OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY($@)
    OVSDB_CHECK_IDL_SSL_PY($@)])
 
 # This test uses the Python IDL implementation with passive tcp
index ec60354479f5461b9ede477af2d5d35e1ecbbabc..ed5d21b0c142d9c67d3a50d748e7769994412b39 100644 (file)
@@ -595,9 +595,16 @@ def do_idl(schema_file, remote, *commands):
         idl.index_create("simple3", "simple3_by_name")
 
     if commands:
-        error, stream = ovs.stream.Stream.open_block(
-            ovs.stream.Stream.open(remote))
-        if error:
+        remotes = remote.split(',')
+        stream = None
+        for r in remotes:
+            error, stream = ovs.stream.Stream.open_block(
+                ovs.stream.Stream.open(r))
+            if not error and stream:
+                break
+            stream = None
+
+        if not stream:
             sys.stderr.write("failed to connect to \"%s\"" % remote)
             sys.exit(1)
         rpc = ovs.jsonrpc.Connection(stream)