+++ /dev/null
-"""Implements (a subset of) Sun XDR -- eXternal Data Representation.\r
-\r
-See: RFC 1014\r
-\r
-"""\r
-\r
-import struct\r
-try:\r
- from cStringIO import StringIO as _StringIO\r
-except ImportError:\r
- from StringIO import StringIO as _StringIO\r
-\r
-__all__ = ["Error", "Packer", "Unpacker", "ConversionError"]\r
-\r
-# exceptions\r
-class Error(Exception):\r
- """Exception class for this module. Use:\r
-\r
- except xdrlib.Error, var:\r
- # var has the Error instance for the exception\r
-\r
- Public ivars:\r
- msg -- contains the message\r
-\r
- """\r
- def __init__(self, msg):\r
- self.msg = msg\r
- def __repr__(self):\r
- return repr(self.msg)\r
- def __str__(self):\r
- return str(self.msg)\r
-\r
-\r
-class ConversionError(Error):\r
- pass\r
-\r
-\r
-\r
-class Packer:\r
- """Pack various data representations into a buffer."""\r
-\r
- def __init__(self):\r
- self.reset()\r
-\r
- def reset(self):\r
- self.__buf = _StringIO()\r
-\r
- def get_buffer(self):\r
- return self.__buf.getvalue()\r
- # backwards compatibility\r
- get_buf = get_buffer\r
-\r
- def pack_uint(self, x):\r
- self.__buf.write(struct.pack('>L', x))\r
-\r
- def pack_int(self, x):\r
- self.__buf.write(struct.pack('>l', x))\r
-\r
- pack_enum = pack_int\r
-\r
- def pack_bool(self, x):\r
- if x: self.__buf.write('\0\0\0\1')\r
- else: self.__buf.write('\0\0\0\0')\r
-\r
- def pack_uhyper(self, x):\r
- self.pack_uint(x>>32 & 0xffffffffL)\r
- self.pack_uint(x & 0xffffffffL)\r
-\r
- pack_hyper = pack_uhyper\r
-\r
- def pack_float(self, x):\r
- try: self.__buf.write(struct.pack('>f', x))\r
- except struct.error, msg:\r
- raise ConversionError, msg\r
-\r
- def pack_double(self, x):\r
- try: self.__buf.write(struct.pack('>d', x))\r
- except struct.error, msg:\r
- raise ConversionError, msg\r
-\r
- def pack_fstring(self, n, s):\r
- if n < 0:\r
- raise ValueError, 'fstring size must be nonnegative'\r
- data = s[:n]\r
- n = ((n+3)//4)*4\r
- data = data + (n - len(data)) * '\0'\r
- self.__buf.write(data)\r
-\r
- pack_fopaque = pack_fstring\r
-\r
- def pack_string(self, s):\r
- n = len(s)\r
- self.pack_uint(n)\r
- self.pack_fstring(n, s)\r
-\r
- pack_opaque = pack_string\r
- pack_bytes = pack_string\r
-\r
- def pack_list(self, list, pack_item):\r
- for item in list:\r
- self.pack_uint(1)\r
- pack_item(item)\r
- self.pack_uint(0)\r
-\r
- def pack_farray(self, n, list, pack_item):\r
- if len(list) != n:\r
- raise ValueError, 'wrong array size'\r
- for item in list:\r
- pack_item(item)\r
-\r
- def pack_array(self, list, pack_item):\r
- n = len(list)\r
- self.pack_uint(n)\r
- self.pack_farray(n, list, pack_item)\r
-\r
-\r
-\r
-class Unpacker:\r
- """Unpacks various data representations from the given buffer."""\r
-\r
- def __init__(self, data):\r
- self.reset(data)\r
-\r
- def reset(self, data):\r
- self.__buf = data\r
- self.__pos = 0\r
-\r
- def get_position(self):\r
- return self.__pos\r
-\r
- def set_position(self, position):\r
- self.__pos = position\r
-\r
- def get_buffer(self):\r
- return self.__buf\r
-\r
- def done(self):\r
- if self.__pos < len(self.__buf):\r
- raise Error('unextracted data remains')\r
-\r
- def unpack_uint(self):\r
- i = self.__pos\r
- self.__pos = j = i+4\r
- data = self.__buf[i:j]\r
- if len(data) < 4:\r
- raise EOFError\r
- x = struct.unpack('>L', data)[0]\r
- try:\r
- return int(x)\r
- except OverflowError:\r
- return x\r
-\r
- def unpack_int(self):\r
- i = self.__pos\r
- self.__pos = j = i+4\r
- data = self.__buf[i:j]\r
- if len(data) < 4:\r
- raise EOFError\r
- return struct.unpack('>l', data)[0]\r
-\r
- unpack_enum = unpack_int\r
-\r
- def unpack_bool(self):\r
- return bool(self.unpack_int())\r
-\r
- def unpack_uhyper(self):\r
- hi = self.unpack_uint()\r
- lo = self.unpack_uint()\r
- return long(hi)<<32 | lo\r
-\r
- def unpack_hyper(self):\r
- x = self.unpack_uhyper()\r
- if x >= 0x8000000000000000L:\r
- x = x - 0x10000000000000000L\r
- return x\r
-\r
- def unpack_float(self):\r
- i = self.__pos\r
- self.__pos = j = i+4\r
- data = self.__buf[i:j]\r
- if len(data) < 4:\r
- raise EOFError\r
- return struct.unpack('>f', data)[0]\r
-\r
- def unpack_double(self):\r
- i = self.__pos\r
- self.__pos = j = i+8\r
- data = self.__buf[i:j]\r
- if len(data) < 8:\r
- raise EOFError\r
- return struct.unpack('>d', data)[0]\r
-\r
- def unpack_fstring(self, n):\r
- if n < 0:\r
- raise ValueError, 'fstring size must be nonnegative'\r
- i = self.__pos\r
- j = i + (n+3)//4*4\r
- if j > len(self.__buf):\r
- raise EOFError\r
- self.__pos = j\r
- return self.__buf[i:i+n]\r
-\r
- unpack_fopaque = unpack_fstring\r
-\r
- def unpack_string(self):\r
- n = self.unpack_uint()\r
- return self.unpack_fstring(n)\r
-\r
- unpack_opaque = unpack_string\r
- unpack_bytes = unpack_string\r
-\r
- def unpack_list(self, unpack_item):\r
- list = []\r
- while 1:\r
- x = self.unpack_uint()\r
- if x == 0: break\r
- if x != 1:\r
- raise ConversionError, '0 or 1 expected, got %r' % (x,)\r
- item = unpack_item()\r
- list.append(item)\r
- return list\r
-\r
- def unpack_farray(self, n, unpack_item):\r
- list = []\r
- for i in range(n):\r
- list.append(unpack_item())\r
- return list\r
-\r
- def unpack_array(self, unpack_item):\r
- n = self.unpack_uint()\r
- return self.unpack_farray(n, unpack_item)\r