ovs.jsonrpc.session.open(). The connection will maintain an in-memory
replica of the remote database.
+ 'remote' can be comma separated multiple remotes and each remote
+ should be in a form acceptable to ovs.jsonrpc.session.open().
+
'schema_helper' should be an instance of the SchemaHelper class which
generates schema for the remote database. The caller may have cut it
down by removing tables or columns that are not of interest. The IDL
self.tables = schema.tables
self.readonly = schema.readonly
self._db = schema
- self._session = ovs.jsonrpc.Session.open(remote,
+ remotes = self._parse_remotes(remote)
+ self._session = ovs.jsonrpc.Session.open_multiple(remotes,
probe_interval=probe_interval)
self._monitor_request_id = None
self._last_seqno = None
table.condition = [True]
table.cond_changed = False
+ def _parse_remotes(self, remote):
+ # If remote is -
+ # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
+ # this function returns
+ # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
+ remotes = []
+ for r in remote.split(','):
+ if remotes and r.find(":") == -1:
+ remotes[-1] += "," + r
+ else:
+ remotes.append(r)
+ return remotes
+
def index_create(self, table, name):
"""Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name)
import codecs
import errno
import os
+import random
import sys
import ovs.json
class Session(object):
"""A JSON-RPC session with reconnection."""
- def __init__(self, reconnect, rpc):
+ def __init__(self, reconnect, rpc, remotes):
self.reconnect = reconnect
self.rpc = rpc
self.stream = None
self.pstream = None
self.seqno = 0
+ if type(remotes) != list:
+ remotes = [remotes]
+ self.remotes = remotes
+ random.shuffle(self.remotes)
+ self.next_remote = 0
@staticmethod
def open(name, probe_interval=None):
feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS.
"""
+ return Session.open_multiple([name], probe_interval=probe_interval)
+
+ @staticmethod
+ def open_multiple(remotes, probe_interval=None):
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
- reconnect.set_name(name)
+ session = Session(reconnect, None, remotes)
+ session.pick_remote()
reconnect.enable(ovs.timeval.msec())
-
- if ovs.stream.PassiveStream.is_valid_name(name):
+ reconnect.set_backoff_free_tries(len(remotes))
+ if ovs.stream.PassiveStream.is_valid_name(reconnect.get_name()):
reconnect.set_passive(True, ovs.timeval.msec())
- if not ovs.stream.stream_or_pstream_needs_probes(name):
+ if not ovs.stream.stream_or_pstream_needs_probes(reconnect.get_name()):
reconnect.set_probe_interval(0)
elif probe_interval is not None:
reconnect.set_probe_interval(probe_interval)
- return Session(reconnect, None)
+ return session
@staticmethod
def open_unreliably(jsonrpc):
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
+ session = Session(reconnect, None, [jsonrpc.name])
reconnect.set_quiet(True)
- reconnect.set_name(jsonrpc.name)
+ session.pick_remote()
reconnect.set_max_tries(0)
reconnect.connected(ovs.timeval.msec())
- return Session(reconnect, jsonrpc)
+ return session
+
+ def pick_remote(self):
+ self.reconnect.set_name(self.remotes[self.next_remote])
+ self.next_remote = (self.next_remote + 1) % len(self.remotes)
def close(self):
if self.rpc is not None:
self.reconnect.connecting(ovs.timeval.msec())
else:
self.reconnect.connect_failed(ovs.timeval.msec(), error)
+ self.stream = None
+ self.pick_remote()
elif self.pstream is None:
error, self.pstream = ovs.stream.PassiveStream.open(name)
if not error:
if error != 0:
self.reconnect.disconnected(ovs.timeval.msec(), error)
self.__disconnect()
+ self.pick_remote()
elif self.stream is not None:
self.stream.run()
error = self.stream.connect()
self.stream = None
elif error != errno.EAGAIN:
self.reconnect.connect_failed(ovs.timeval.msec(), error)
+ self.pick_remote()
self.stream.close()
self.stream = None
def force_reconnect(self):
self.reconnect.force_reconnect(ovs.timeval.msec())
+
+ def get_num_of_remotes(self):
+ return len(self.remotes)
OVSDB_CHECK_IDL_TCP_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON3], [$PYTHON3])])
+# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp
+# with multiple remotes with only one remote reachable
+m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN],
+ [AT_SETUP([$1 - tcp])
+ AT_SKIP_IF([test $7 = no])
+ AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
+ AT_CHECK([ovsdb_start_idltest "ptcp:0:127.0.0.1"])
+ PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
+ WRONG_PORT_1=$((TCP_PORT + 1))
+ WRONG_PORT_2=$((TCP_PORT + 2))
+ remote=tcp:127.0.0.1:$WRONG_PORT_1,tcp:127.0.0.1:$TCP_PORT,tcp:127.0.0.1:$WRONG_PORT_2
+ m4_if([$2], [], [],
+ [AT_CHECK([ovsdb-client transact tcp:127.0.0.1:$TCP_PORT $2], [0], [ignore], [ignore])])
+ AT_CHECK([$8 $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema $remote $3],
+ [0], [stdout], [ignore])
+ AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
+ [0], [$4])
+ OVSDB_SERVER_SHUTDOWN
+ AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY],
+ [OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+ [$HAVE_PYTHON], [$PYTHON])
+ OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+ [$HAVE_PYTHON3], [$PYTHON3])])
+
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp6
m4_define([OVSDB_CHECK_IDL_TCP6_PYN],
[AT_SETUP([$1 - tcp6])
OVSDB_CHECK_IDL_TCP6_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON3], [$PYTHON3])])
+m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN],
+ [AT_SETUP([$1 - tcp6])
+ AT_SKIP_IF([test $7 = no])
+ AT_SKIP_IF([test "$IS_WIN32" = "yes"])
+ AT_SKIP_IF([test $HAVE_IPV6 = no])
+ AT_KEYWORDS([ovsdb server idl positive Python with tcp6 socket $5])
+ AT_CHECK([ovsdb_start_idltest "ptcp:0:[[::1]]"])
+ PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
+ WRONG_PORT_1=$((TCP_PORT + 1))
+ WRONG_PORT_2=$((TCP_PORT + 2))
+ remote="tcp:[[::1]]:$WRONG_PORT_1,tcp:[[::1]]:$TCP_PORT,tcp:[[::1]]:$WRONG_PORT_2"
+ m4_if([$2], [], [],
+ [AT_CHECK([ovsdb-client transact "tcp:[[::1]]:$TCP_PORT" $2], [0], [ignore], [ignore])])
+ AT_CHECK([$8 $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema $remote $3],
+ [0], [stdout], [ignore])
+ AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
+ [0], [$4])
+ OVSDB_SERVER_SHUTDOWN
+ AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY],
+ [OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+ [$HAVE_PYTHON], [$PYTHON])
+ OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
+ [$HAVE_PYTHON3], [$PYTHON3])])
+
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with SSL
m4_define([OVSDB_CHECK_IDL_SSL_PYN],
[AT_SETUP([$1 - SSL])
OVSDB_CHECK_IDL_PY($@)
OVSDB_CHECK_IDL_REGISTER_COLUMNS_PY($@)
OVSDB_CHECK_IDL_TCP_PY($@)
+ OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY($@)
OVSDB_CHECK_IDL_TCP6_PY($@)
+ OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY($@)
OVSDB_CHECK_IDL_SSL_PY($@)])
# This test uses the Python IDL implementation with passive tcp
idl.index_create("simple3", "simple3_by_name")
if commands:
- error, stream = ovs.stream.Stream.open_block(
- ovs.stream.Stream.open(remote))
- if error:
+ remotes = remote.split(',')
+ stream = None
+ for r in remotes:
+ error, stream = ovs.stream.Stream.open_block(
+ ovs.stream.Stream.open(r))
+ if not error and stream:
+ break
+ stream = None
+
+ if not stream:
sys.stderr.write("failed to connect to \"%s\"" % remote)
sys.exit(1)
rpc = ovs.jsonrpc.Connection(stream)