and isinstance(json, real_types))
or (type_ == ovs.db.types.BooleanType and isinstance(json, bool))
or (type_ == ovs.db.types.StringType
- and isinstance(json, (str, unicode)))):
+ and isinstance(json, six.string_types))):
atom = Atom(type_, json)
elif type_ == ovs.db.types.UuidType:
atom = Atom(type_, ovs.ovsuuid.from_json(json, symtab))
@staticmethod
def from_python(base, value):
value = ovs.db.parser.float_to_int(value)
- if type(value) in base.type.python_types:
+ if isinstance(value, base.type.python_types):
atom = Atom(base.type, value)
else:
raise error.Error("expected %s, got %s" % (base.type, type(value)))
t = ovs.db.types.RealType
elif isinstance(x, bool):
t = ovs.db.types.BooleanType
- elif isinstance(x, (str, unicode)):
+ elif isinstance(x, six.string_types):
t = ovs.db.types.StringType
elif isinstance(x, uuid):
t = ovs.db.types.UuidType
member = float_to_int(self.json[name])
if is_identifier(member) and "id" in types:
return member
- if len(types) and type(member) not in types:
+ try:
+ if len(types) and not isinstance(member, tuple(types)):
+ self.__raise_error("Type mismatch for member '%s'." % name)
+ except TypeError:
self.__raise_error("Type mismatch for member '%s'." % name)
return member
else:
def is_identifier(s):
- return type(s) in [str, unicode] and id_re.match(s)
+ return isinstance(s, six.string_types) and id_re.match(s)
def json_type_to_string(type_):
return "array"
elif issubclass(type_, number_types):
return "number"
- elif issubclass(type_, (str, unicode)):
+ elif issubclass(type_, six.string_types):
return "string"
else:
return "<invalid>"
def unwrap_json(json, name, types, desc):
- if (type(json) not in (list, tuple) or len(json) != 2 or json[0] != name or
- type(json[1]) not in types):
+ if (not isinstance(json, (list, tuple))
+ or len(json) != 2 or json[0] != name
+ or not isinstance(json[1], tuple(types))):
raise error.Error('expected ["%s", <%s>]' % (name, desc), json)
return json[1]
def from_json(json):
parser = ovs.db.parser.Parser(json, "database schema")
name = parser.get("name", ['id'])
- version = parser.get_optional("version", [str, unicode])
- parser.get_optional("cksum", [str, unicode])
+ version = parser.get_optional("version", six.string_types)
+ parser.get_optional("cksum", six.string_types)
tablesJson = parser.get("tables", [dict])
parser.finish()
@staticmethod
def from_json(json):
parser = ovs.db.parser.Parser(json, "IDL schema")
- idlPrefix = parser.get("idlPrefix", [str, unicode])
- idlHeader = parser.get("idlHeader", [str, unicode])
+ idlPrefix = parser.get("idlPrefix", six.string_types)
+ idlHeader = parser.get("idlHeader", six.string_types)
subjson = dict(json)
del subjson["idlPrefix"]
raise error.Error("array of distinct column names expected", json)
else:
for column_name in json:
- if type(column_name) not in [str, unicode]:
+ if not isinstance(column_name, six.string_types):
raise error.Error("array of distinct column names expected",
json)
elif column_name not in columns:
parser = ovs.db.parser.Parser(json, "schema for column %s" % name)
mutable = parser.get_optional("mutable", [bool], True)
ephemeral = parser.get_optional("ephemeral", [bool], False)
- type_ = ovs.db.types.Type.from_json(parser.get("type",
- [dict, str, unicode]))
+ _types = list(six.string_types)
+ _types.extend([dict])
+ type_ = ovs.db.types.Type.from_json(parser.get("type", _types))
parser.finish()
return ColumnSchema(name, mutable, not ephemeral, type_)
@staticmethod
def from_json(json):
- if type(json) not in [str, unicode]:
+ if not isinstance(json, six.string_types):
raise error.Error("atomic-type expected", json)
else:
return AtomicType.from_string(json)
IntegerType = AtomicType("integer", 0, six.integer_types)
RealType = AtomicType("real", 0.0, REAL_PYTHON_TYPES)
BooleanType = AtomicType("boolean", False, (bool,))
-StringType = AtomicType("string", "", (str, unicode))
+StringType = AtomicType("string", "", six.string_types)
UuidType = AtomicType("uuid", ovs.ovsuuid.zero(), (uuid.UUID,))
ATOMIC_TYPES = [VoidType, IntegerType, RealType, BooleanType, StringType,
@staticmethod
def from_json(json):
- if type(json) in [str, unicode]:
+ if isinstance(json, six.string_types):
return BaseType(AtomicType.from_json(json))
parser = ovs.db.parser.Parser(json, "ovsdb type")
- atomic_type = AtomicType.from_json(parser.get("type", [str, unicode]))
+ atomic_type = AtomicType.from_json(parser.get("type",
+ six.string_types))
base = BaseType(atomic_type)
elif base.type == UuidType:
base.ref_table_name = parser.get_optional("refTable", ['id'])
if base.ref_table_name:
- base.ref_type = parser.get_optional("refType", [str, unicode],
+ base.ref_type = parser.get_optional("refType",
+ six.string_types,
"strong")
if base.ref_type not in ['strong', 'weak']:
raise error.Error('refType must be "strong" or "weak" '
@staticmethod
def from_json(json):
- if type(json) in [str, unicode]:
+ if isinstance(json, six.string_types):
return Type(BaseType.from_json(json))
parser = ovs.db.parser.Parser(json, "ovsdb type")
- key_json = parser.get("key", [dict, str, unicode])
- value_json = parser.get_optional("value", [dict, str, unicode])
+ _types = list(six.string_types)
+ _types.extend([dict])
+ key_json = parser.get("key", _types)
+ value_json = parser.get_optional("value", _types)
min_json = parser.get_optional("min", [int])
- max_json = parser.get_optional("max", [int, str, unicode])
+ _types = list(six.string_types)
+ _types.extend([int])
+ max_json = parser.get_optional("max", _types)
parser.finish()
key = BaseType.from_json(key_json)
self.stream.write(u"%d" % obj)
elif isinstance(obj, float):
self.stream.write("%.15g" % obj)
- elif isinstance(obj, unicode):
+ elif isinstance(obj, six.text_type):
+ # unicode() on Python 2, or str() in Python 3 (always unicode)
self.__serialize_string(obj)
elif isinstance(obj, str):
- self.__serialize_string(unicode(obj))
+ # This is for Python 2, where this comes out to unicode(str()).
+ # For Python 3, it's str(str()), but it's harmless.
+ self.__serialize_string(six.text_type(obj))
elif isinstance(obj, dict):
self.stream.write(u"{")
if i > 0:
self.stream.write(u",")
self.__indent_line()
- self.__serialize_string(unicode(key))
+ self.__serialize_string(six.text_type(key))
self.stream.write(u":")
if self.pretty:
self.stream.write(u' ')
def from_string(s):
- try:
- s = unicode(s, 'utf-8')
- except UnicodeDecodeError as e:
- seq = ' '.join(["0x%2x" % ord(c)
- for c in e.object[e.start:e.end] if ord(c) >= 0x80])
- return ("not a valid UTF-8 string: invalid UTF-8 sequence %s" % seq)
+ if not isinstance(s, six.text_type):
+ # We assume the input is a string. We will only hit this case for a
+ # str in Python 2 which is not unicode, so we need to go ahead and
+ # decode it.
+ try:
+ s = six.text_type(s, 'utf-8')
+ except UnicodeDecodeError as e:
+ seq = ' '.join(["0x%2x" % ord(c)
+ for c in e.object[e.start:e.end] if ord(c) >= 0x80])
+ return "not a valid UTF-8 string: invalid UTF-8 sequence %s" % seq
p = Parser(check_trailer=True)
p.feed(s)
return p.finish()
import errno
import os
+import six
+
import ovs.json
import ovs.poller
import ovs.reconnect
if "method" in json:
method = json.pop("method")
- if type(method) not in [str, unicode]:
+ if not isinstance(method, six.string_types):
return "method is not a JSON string"
else:
method = None
def __process_msg(self):
json = self.parser.finish()
self.parser = None
- if type(json) in [str, unicode]:
+ if isinstance(json, six.string_types):
# XXX rate-limit
vlog.warn("%s: error parsing stream: %s" % (self.name, json))
self.error(errno.EPROTO)
import re
import uuid
+import six
from six.moves import range
from ovs.db import error
def from_json(json, symtab=None):
try:
- s = ovs.db.parser.unwrap_json(json, "uuid", [str, unicode], "string")
+ s = ovs.db.parser.unwrap_json(json, "uuid", six.string_types, "string")
if not uuidRE.match(s):
raise error.Error("\"%s\" is not a valid UUID" % s, json)
return uuid.UUID(s)
raise e
try:
name = ovs.db.parser.unwrap_json(json, "named-uuid",
- [str, unicode], "string")
+ six.string_types, "string")
except error.Error:
raise e
import os
import types
+import six
from six.moves import range
import ovs.dirs
break
if error is None:
- unicode_params = [unicode(p) for p in params]
+ unicode_params = [six.text_type(p) for p in params]
command.callback(self, unicode_params, command.aux)
if error:
import sys
import ovs.json
+import six
def print_json(json):
- if type(json) in [str, unicode]:
+ if isinstance(json, six.string_types):
print("error: %s" % json)
return False
else:
def substitute_uuids(json, symtab):
- if type(json) in [str, unicode]:
+ if isinstance(json, six.string_types):
symbol = symtab.get(json)
if symbol:
return str(symbol)
def parse_uuids(json, symtab):
- if type(json) in [str, unicode] and ovs.ovsuuid.is_valid_string(json):
+ if (isinstance(json, six.string_types)
+ and ovs.ovsuuid.is_valid_string(json)):
name = "#%d#" % len(symtab)
sys.stderr.write("%s = %s\n" % (name, json))
symtab[name] = json
step += 1
else:
json = ovs.json.from_string(command)
- if type(json) in [str, unicode]:
+ if isinstance(json, six.string_types):
sys.stderr.write("\"%s\": %s\n" % (command, json))
sys.exit(1)
json = substitute_uuids(json, symtab)