# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+import socket
import logging
import struct
-from ipaddr import IPNetwork, IPv4Address, IPv6Address, IPAddress
from binascii import hexlify
from pprint import pformat
from socket import AF_UNSPEC, AF_INET, AF_INET6, AF_BRIDGE, htons
from string import printable
from struct import pack, unpack, calcsize
+
+from . import ipnetwork
+
+
log = logging.getLogger(__name__)
SYSLOG_EXTRA_DEBUG = 5
AF_FAMILY = dict()
-import socket
-
for family in [attr for attr in dir(socket) if attr.startswith('AF_')]:
AF_FAMILY[getattr(socket, family)] = family
Remove the last character if it is a NULL...having that NULL
causes python to print a garbage character
"""
-
- if ord(line[-1]) == 0:
+ if line[-1] == 0:
line = line[:-1]
return line
-def mac_int_to_str(mac_int):
- """
- Return an integer in MAC string format
- """
-
- # [2:] to remove the leading 0x, then fill out to 12 zeroes, then uppercase
- all_caps = hex(int(mac_int))[2:].zfill(12).upper()
-
- if all_caps[-1] == 'L':
- all_caps = all_caps[:-1]
- all_caps = all_caps.zfill(12).upper()
-
- return "%s.%s.%s" % (all_caps[0:4], all_caps[4:8], all_caps[8:12])
+mac_int_to_str = lambda mac_int: ":".join(("%012x" % mac_int)[i:i + 2] for i in range(0, 12, 2))
def data_to_color_text(line_number, color, data, extra=''):
def padded_length(length):
- return int((length + 3) / 4) * 4
+ return int((length + 3) // 4) * 4
class NetlinkPacket_IFLA_LINKINFO_Attributes:
pad = self.pad_bytes_needed(length)
if pad:
- raw += '\0' * pad
+ raw += ("\0" * pad).encode("utf-8")
return raw
def dump_lines(self, dump_buffer, line_number, color):
line_number = self.dump_first_line(dump_buffer, line_number, color)
- for x in xrange(1, self.attr_end/4):
+ for x in range(1, self.attr_end//4):
start = x * 4
end = start + 4
dump_buffer.append(data_to_color_text(line_number, color, self.data[start:end], ''))
@staticmethod
def decode_one_byte_attribute(data, _=None):
- return unpack("=B", data[4])[0]
+ # we don't need to use the unpack function because bytes are a list of ints
+ return data[4]
@staticmethod
def decode_two_bytes_attribute(data, _=None):
@staticmethod
def decode_ipv4_address_attribute(data, _=None):
- return IPv4Address(unpack(">L", data[4:8])[0])
+ return ipnetwork.IPNetwork(unpack(">L", data[4:8])[0])
@staticmethod
def decode_ipv6_address_attribute(data, _=None):
(data1, data2) = unpack(">QQ", data[4:20])
- return IPv6Address(data1 << 64 | data2)
+ return ipnetwork.IPNetwork(data1 << 64 | data2)
@staticmethod
def decode_bond_ad_info_attribute(data, info_data_end):
@staticmethod
def decode_vlan_protocol_attribute(data, _=None):
- return Link.ifla_vlan_protocol_dict.get(int("0x%s" % data[4:6].encode("hex"), base=16))
+ return Link.ifla_vlan_protocol_dict.get(unpack(">H", data[4:6])[0])
############################################################################
# encode methods
sub_attr_payload.append(5) # length
sub_attr_payload.append(info_data_type)
- sub_attr_pack_layout.append("B")
+ sub_attr_pack_layout.append("Bxxx")
sub_attr_payload.append(info_data_value)
- # pad 3 bytes
- sub_attr_pack_layout.extend("xxx")
-
- @staticmethod
- def encode_one_byte_attribute_from_string_boolean(sub_attr_pack_layout, sub_attr_payload, info_data_type, info_data_value):
- return Attribute.encode_one_byte_attribute(
- sub_attr_pack_layout, sub_attr_payload, info_data_type, value_to_bool_dict.get(info_data_value)
- )
-
@staticmethod
def encode_bond_xmit_hash_policy_attribute(sub_attr_pack_layout, sub_attr_payload, info_data_type, info_data_value):
return Attribute.encode_one_byte_attribute(
sub_attr_payload.append(6) # length
sub_attr_payload.append(info_data_type)
- sub_attr_pack_layout.append("H")
+ sub_attr_pack_layout.append("Hxx")
sub_attr_payload.append(info_data_value)
- # pad 2 bytes
- sub_attr_pack_layout.extend("xx")
-
@staticmethod
def encode_four_bytes_attribute(sub_attr_pack_layout, sub_attr_payload, info_data_type, info_data_value):
sub_attr_pack_layout.append("HH")
sub_attr_payload.append(8) # length
sub_attr_payload.append(info_data_type)
- sub_attr_pack_layout.append("L")
+ sub_attr_pack_layout.append("BBBB")
if info_data_value:
- reorder = unpack("<L", IPv4Address(info_data_value).packed)[0]
- sub_attr_payload.append(IPv4Address(reorder))
+ sub_attr_payload.extend(info_data_value.packed)
else:
- sub_attr_payload.append(0)
+ sub_attr_payload.extend([0, 0, 0, 0])
@staticmethod
def encode_ipv6_attribute(sub_attr_pack_layout, sub_attr_payload, info_data_type, info_data_value):
if not vlan_protocol:
raise NotImplementedError('vlan protocol %s not implemented' % info_data_value)
- sub_attr_pack_layout.append("H")
+ sub_attr_pack_layout.append("Hxx")
sub_attr_payload.append(htons(vlan_protocol))
- # pad 2 bytes
- sub_attr_pack_layout.extend("xx")
-
@staticmethod
def encode_vxlan_port_attribute(sub_attr_pack_layout, sub_attr_payload, info_data_type, info_data_value):
sub_attr_pack_layout.append("HH")
sub_attr_payload.append(6)
sub_attr_payload.append(info_data_type)
- sub_attr_pack_layout.append("H")
+ sub_attr_pack_layout.append("Hxx")
# byte swap
swaped = pack(">H", info_data_value)
- sub_attr_payload.append(unpack("<H", swaped)[0])
- # pad 2 bytes
- sub_attr_pack_layout.extend("xx")
+ sub_attr_payload.append(unpack("<H", swaped)[0])
@staticmethod
def encode_mac_address_attribute(sub_attr_pack_layout, sub_attr_payload, info_data_type, info_data_value):
sub_attr_payload.append(10) # length
sub_attr_payload.append(info_data_type)
- sub_attr_pack_layout.append("6B")
+ sub_attr_pack_layout.append("6Bxx")
for mbyte in info_data_value.replace(".", " ").replace(":", " ").split():
sub_attr_payload.append(int(mbyte, 16))
- # pad 2 bytes
- sub_attr_pack_layout.extend('xx')
-
class AttributeCACHEINFO(Attribute):
"""
def decode(self, parent_msg, data):
self.decode_length_type(data)
- wordcount = (self.attr_end - 4)/4
+ wordcount = (self.attr_end - 4)//4
self.PACK = '=%dL' % wordcount
self.LEN = calcsize(self.PACK)
def encode(self):
# some interface names come from JSON as unicode strings
# and cannot be packed as is so we must convert them to strings
- if isinstance(self.value, unicode):
+ if isinstance(self.value, str):
self.value = str(self.value)
self.PACK = '%ds' % len(self.value)
self.LEN = calcsize(self.PACK)
length = self.HEADER_LEN + self.LEN
- raw = pack(self.HEADER_PACK, length, self.atype) + pack(self.PACK, self.value)
+ raw = pack(self.HEADER_PACK, length, self.atype) + pack(self.PACK, self.value.encode())
raw = self.pad(length, raw)
return raw
self.LEN = calcsize(self.PACK)
try:
- self.value = str(remove_trailing_null(unpack(self.PACK, self.data[4:self.length])[0]))
+ self.value = remove_trailing_null(unpack(self.PACK, self.data[4:self.length])[0]).decode("utf-8")
except struct.error:
self.log.error("%s unpack of %s failed, data 0x%s" % (self, self.PACK, hexlify(self.data[4:self.length])))
raise
def __init__(self, atype, string, family, logger):
Attribute.__init__(self, atype, string, logger)
- self.value_int = None
- self.value_int_str = None
self.family = family
if self.family == AF_INET:
self.LEN = calcsize(self.PACK)
- def set_value(self, value):
- if value is None:
- self.value = None
- else:
- self.value = IPNetwork(value)
-
def decode(self, parent_msg, data):
self.decode_length_type(data)
try:
+ try:
+ prefixlen = parent_msg.prefixlen
+ except AttributeError:
+ prefixlen = None
+ try:
+ scope = parent_msg.scope
+ except AttributeError:
+ scope = 0
+
+ if isinstance(parent_msg, Route):
+ if self.atype == Route.RTA_SRC:
+ prefixlen = parent_msg.src_len
+ elif self.atype == Route.RTA_DST:
+ prefixlen = parent_msg.dst_len
+
if self.family in (AF_INET, AF_BRIDGE):
- self.value = IPv4Address(unpack(self.PACK, self.data[4:])[0])
+ self.value = ipnetwork.IPNetwork(unpack(self.PACK, self.data[4:])[0], prefixlen, scope)
elif self.family == AF_INET6:
(data1, data2) = unpack(self.PACK, self.data[4:])
- self.value = IPv6Address(data1 << 64 | data2)
+ self.value = ipnetwork.IPNetwork(data1 << 64 | data2, prefixlen, scope)
- if isinstance(parent_msg, Route):
- if self.atype == Route.RTA_SRC:
- self.value = IPNetwork('%s/%s' % (self.value, parent_msg.src_len))
- elif self.atype == Route.RTA_DST:
- self.value = IPNetwork('%s/%s' % (self.value, parent_msg.dst_len))
else:
- try:
- self.value = IPNetwork('%s/%s' % (self.value, parent_msg.prefixlen))
- except AttributeError:
- self.value = IPNetwork('%s' % self.value)
-
- self.value_int = int(self.value)
- self.value_int_str = str(self.value_int)
+ self.log.debug("AttributeIPAddress: decode: unsupported address family ({})".format(self.family))
except struct.error:
self.value = None
- self.value_int = None
- self.value_int_str = None
self.log.error("%s unpack of %s failed, data 0x%s" % (self, self.PACK, hexlify(self.data[4:])))
raise
elif self.family == AF_INET6:
- for x in xrange(1, self.attr_end/4):
+ for x in range(1, self.attr_end//4):
start = x * 4
end = start + 4
dump_buffer.append(data_to_color_text(line_number, color, self.data[start:end], self.value))
try:
# GRE interface uses a 4-byte IP address for this attribute
if self.length == 8:
- self.value = IPv4Address(unpack('>L', self.data[4:])[0])
- self.value_int = int(self.value)
- self.value_int_str = str(self.value_int)
+ self.value = ipnetwork.IPNetwork(unpack('>L', self.data[4:])[0])
+
# MAC Address
elif self.length == 10:
(data1, data2) = unpack(self.PACK, self.data[4:])
self.value = mac_int_to_str(self.raw)
# GREv6 interface uses a 16-byte IP address for this attribute
elif self.length == 20:
- self.value = IPv6Address(unpack('>L', self.data[16:])[0])
- self.value_int = int(self.value)
- self.value_int_str = str(self.value_int)
+ self.value = ipnetwork.IPNetwork(unpack('>L', self.data[16:])[0])
+
else:
self.log.info("Length of MACAddress attribute not supported: %d" % self.length)
self.value = None
def __init__(self, atype, string, family, logger):
Attribute.__init__(self, atype, string, logger)
- self.value_int = None
- self.value_int_str = None
self.family = family
self.PACK = '>HBB'
self.traffic_class = ((label_low_tc_s & 0xf) >> 1)
self.label = (label_high << 4) | (label_low_tc_s >> 4)
self.value = self.label
- self.value_int = self.value
- self.value_int_str = str(self.value_int)
except struct.error:
self.value = None
- self.value_int = None
- self.value_int_str = None
self.log.error("%s unpack of %s failed, data 0x%s" % (self, self.PACK, hexlify(self.data[4:])))
raise
def decode(self, parent_msg, data):
self.decode_length_type(data)
- wordcount = (self.attr_end - 4)/4
+ wordcount = (self.attr_end - 4)//4
self.PACK = '=%dL' % wordcount
self.LEN = calcsize(self.PACK)
# pack everything via a single pack() call.
sub_attr_to_add = []
- for (sub_attr_type, sub_attr_value) in self.value.iteritems():
+ for (sub_attr_type, sub_attr_value) in self.value.items():
if sub_attr_type == Link.IFLA_BRIDGE_FLAGS:
sub_attr_to_add.append((sub_attr_type, sub_attr_value))
sub_attr_payload[sub_attr_length_index] = sub_attr_length
# add padding
- for x in xrange(self.pad_bytes_needed(sub_attr_length)):
+ for x in range(self.pad_bytes_needed(sub_attr_length)):
sub_attr_pack_layout.append('x')
# The [1:] is to remove the leading = so that when we do the ''.join() later
# 1 byte attr
if inet6_attr_type == Link.IFLA_INET6_ADDR_GEN_MODE:
- inet6_attr[inet6_attr_type] = unpack('=B', sub_attr_data[4])[0]
+ inet6_attr[inet6_attr_type] = self.decode_one_byte_attribute(sub_attr_data)
+
# nlmanager doesn't support multiple kernel version
# all the other attributes like IFLA_INET6_CONF are
# based on DEVCONF_MAX from _UAPI_IPV6_H.
next_sub_attr_line = 0
sub_attr_line = True
- for x in xrange(1, self.attr_end/4):
+ for x in range(1, self.attr_end//4):
start = x * 4
end = start + 4
(sub_attr_length, sub_attr_type) = unpack('=HH', self.data[start:start+4])
sub_attr_end = padded_length(sub_attr_length)
- next_sub_attr_line = line_number + (sub_attr_end/4)
+ next_sub_attr_line = line_number + (sub_attr_end//4)
if sub_attr_end == sub_attr_length:
padded_to = ','
value_pretty = {}
if self.family == AF_BRIDGE:
- for (sub_key, sub_value) in self.value.iteritems():
+ for (sub_key, sub_value) in self.value.items():
sub_key_pretty = "(%2d) %s" % (sub_key, Link.ifla_bridge_af_spec_to_string.get(sub_key))
value_pretty[sub_key_pretty] = sub_value
elif self.family == AF_UNSPEC:
- for (family, family_attr) in self.value.iteritems():
+ for (family, family_attr) in self.value.items():
family_value_pretty = {}
if family == AF_INET6:
else:
continue # log error?
- for (sub_key, sub_value) in family_attr.iteritems():
+ for (sub_key, sub_value) in family_attr.items():
sub_key_pretty = "(%2d) %s" % (sub_key, family_af_spec_to_string.get(sub_key))
family_value_pretty[sub_key_pretty] = sub_value
value_pretty = family_value_pretty
if self.family == AF_INET:
if len(data) < self.IPV4_LEN:
break
- nexthop = IPv4Address(unpack('>L', data[:self.IPV4_LEN])[0])
+ nexthop = ipnetwork.IPNetwork(unpack('>L', data[:self.IPV4_LEN])[0])
self.value.append((nexthop, rtnh_ifindex, rtnh_flags, rtnh_hops))
elif self.family == AF_INET6:
if len(data) < self.IPV6_LEN:
break
(data1, data2) = unpack('>QQ', data[:self.IPV6_LEN])
- nexthop = IPv6Address(data1 << 64 | data2)
+ nexthop = ipnetwork.IPNetwork(data1 << 64 | data2)
self.value.append((nexthop, rtnh_ifindex, rtnh_flags, rtnh_hops))
data = data[(rtnh_len-self.RTNH_LEN-self.HEADER_LEN):]
"bond": {
# 1 byte attributes ############################################
NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_NUM_PEER_NOTIF: Attribute.encode_one_byte_attribute,
-
- NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_USE_CARRIER: Attribute.encode_one_byte_attribute_from_string_boolean,
- NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_AD_LACP_BYPASS: Attribute.encode_one_byte_attribute_from_string_boolean,
- NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_AD_LACP_RATE: Attribute.encode_one_byte_attribute_from_string_boolean,
+ NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_USE_CARRIER: Attribute.encode_one_byte_attribute,
+ NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_AD_LACP_BYPASS: Attribute.encode_one_byte_attribute,
+ NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_AD_LACP_RATE: Attribute.encode_one_byte_attribute,
# bond-mode attribute ##########################################
NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BOND_MODE: Attribute.encode_bond_mode_attribute,
},
"bridge": {
# 1 byte attributes ############################################
- NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BR_VLAN_FILTERING: Attribute.encode_one_byte_attribute_from_string_boolean,
-
+ NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BR_VLAN_FILTERING: Attribute.encode_one_byte_attribute,
NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BR_TOPOLOGY_CHANGE: Attribute.encode_one_byte_attribute,
NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BR_TOPOLOGY_CHANGE_DETECTED: Attribute.encode_one_byte_attribute,
NetlinkPacket_IFLA_LINKINFO_Attributes.IFLA_BR_MCAST_ROUTER: Attribute.encode_one_byte_attribute,
"Add support for encoding %s for %s link kind" % (sub_attr_type_string, kind)
)
else:
- for (info_data_type, info_data_value) in sub_attr_value.iteritems():
+ for (info_data_type, info_data_value) in sub_attr_value.items():
encode_handler = encode_handlers.get(info_data_type)
if encode_handler:
# Until we cross that bridge though we will keep things nice and simple and
# pack everything via a single pack() call.
- for (sub_attr_type, sub_attr_value) in self.value.iteritems():
+ for (sub_attr_type, sub_attr_value) in self.value.items():
sub_attr_pack_layout = ['=', 'HH']
sub_attr_payload = [0, sub_attr_type]
sub_attr_length_index = 0
if sub_attr_type in (Link.IFLA_INFO_KIND, Link.IFLA_INFO_SLAVE_KIND):
sub_attr_pack_layout.append('%ds' % len(sub_attr_value))
- sub_attr_payload.append(sub_attr_value)
+ sub_attr_payload.append(sub_attr_value.encode("utf-8"))
elif sub_attr_type == Link.IFLA_INFO_DATA:
sub_attr_payload = self.encode_ifla_info_nested_data(
sub_attr_payload[sub_attr_length_index] = sub_attr_length
# add padding
- for x in xrange(self.pad_bytes_needed(sub_attr_length)):
+ for x in range(self.pad_bytes_needed(sub_attr_length)):
sub_attr_pack_layout.append('x')
# The [1:] is to remove the leading = so that when we do the ''.join() later
ifla_info_nested_attr_to_str_dict = self.ifla_info_nested_data_attributes_to_string_dict.get(sub_attr_type, {}).get(kind)
if not ifla_info_nested_data_handlers or not ifla_info_nested_attr_to_str_dict:
- self.log.log(SYSLOG_EXTRA_DEBUG, "%s: decode: unsupported %s %s" % (sub_attr_type_str, ifla_info_nested_kind_str, kind))
+ self.log.log(
+ SYSLOG_EXTRA_DEBUG,
+ "%s: decode: unsupported %s %s"
+ % (sub_attr_type_str, ifla_info_nested_kind_str, kind)
+ )
else:
while sub_attr_data:
(info_nested_data_length, info_nested_data_type) = unpack("=HH", sub_attr_data[:4])
return
if sub_attr_type in (Link.IFLA_INFO_KIND, Link.IFLA_INFO_SLAVE_KIND):
- self.value[sub_attr_type] = remove_trailing_null(unpack('%ds' % (sub_attr_length - 4), data[4:sub_attr_length])[0])
+ self.value[sub_attr_type] = remove_trailing_null(unpack("%ds" % (sub_attr_length - 4), data[4:sub_attr_length])[0]).decode("utf-8")
elif sub_attr_type == Link.IFLA_INFO_DATA:
self.value[Link.IFLA_INFO_DATA] = self.decode_ifla_info_nested_data(
next_sub_attr_line = 0
sub_attr_line = True
- for x in xrange(1, self.attr_end/4):
+ for x in range(1, self.attr_end//4):
start = x * 4
end = start + 4
(sub_attr_length, sub_attr_type) = unpack('=HH', self.data[start:start+4])
sub_attr_end = padded_length(sub_attr_length)
- next_sub_attr_line = line_number + (sub_attr_end/4)
+ next_sub_attr_line = line_number + (sub_attr_end//4)
if sub_attr_end == sub_attr_length:
padded_to = ', '
if ifla_info_kind or ifla_info_slave_kind:
value_pretty = {}
- for (sub_key, sub_value) in self.value.iteritems():
+ for (sub_key, sub_value) in self.value.items():
sub_key_pretty = "(%2d) %s" % (sub_key, Link.ifla_info_to_string.get(sub_key, 'UNKNOWN'))
sub_value_pretty = sub_value
kind_to_string_dict = kind_dict.get(sub_key, {})
sub_value_pretty = {}
- for (sub_sub_key, sub_sub_value) in sub_value.iteritems():
+ for (sub_sub_key, sub_sub_value) in sub_value.items():
sub_sub_key_pretty = "(%2d) %s" % (sub_sub_key, kind_to_string_dict.get(sub_sub_key, 'UNKNOWN'))
sub_value_pretty[sub_sub_key_pretty] = sub_sub_value
#
# Until we cross that bridge though we will keep things nice and simple and
# pack everything via a single pack() call.
- for (sub_attr_type, sub_attr_value) in self.value.iteritems():
+
+ for (sub_attr_type, sub_attr_value) in self.value.items():
sub_attr_pack_layout = ['=', 'HH']
sub_attr_payload = [0, sub_attr_type]
sub_attr_length_index = 0
sub_attr_payload[sub_attr_length_index] = sub_attr_length
# add padding
- for x in xrange(self.pad_bytes_needed(sub_attr_length)):
- sub_attr_pack_layout.append('x')
+ sub_attr_pack_layout[-1] = "%s%s" % (sub_attr_pack_layout[-1], "x" * self.pad_bytes_needed(sub_attr_length))
# The [1:] is to remove the leading = so that when we do the ''.join() later
# we do not end up with an = in the middle of the pack layout string. There
Link.IFLA_BRPORT_PEER_LINK,
Link.IFLA_BRPORT_DUAL_LINK,
Link.IFLA_BRPORT_NEIGH_SUPPRESS):
- self.value[sub_attr_type] = unpack('=B', data[4])[0]
+ self.value[sub_attr_type] = self.decode_one_byte_attribute(data)
# 2 Byte attributes
elif sub_attr_type in (Link.IFLA_BRPORT_PRIORITY,
next_sub_attr_line = 0
sub_attr_line = True
- for x in xrange(1, self.attr_end/4):
+ for x in range(1, self.attr_end//4):
start = x * 4
end = start + 4
(sub_attr_length, sub_attr_type) = unpack('=HH', self.data[start:start+4])
sub_attr_end = padded_length(sub_attr_length)
- next_sub_attr_line = line_number + (sub_attr_end/4)
+ next_sub_attr_line = line_number + (sub_attr_end//4)
if sub_attr_end == sub_attr_length:
padded_to = ', '
value_pretty = {}
- for (sub_key, sub_value) in self.value.iteritems():
+ for (sub_key, sub_value) in self.value.items():
sub_key_pretty = "(%2d) %s" % (sub_key, Link.ifla_brport_to_string.get(sub_key, 'UNKNOWN'))
sub_value_pretty = sub_value
value_pretty[sub_key_pretty] = sub_value_pretty
def get_flags_string(self):
foo = []
- for (flag, flag_string) in self.flag_to_string.iteritems():
+ for (flag, flag_string) in self.flag_to_string.items():
if self.flags & flag:
foo.append(flag_string)
color_end = "\033[0m" if color else ""
self.dump_buffer.append(" %sNetlink Header%s" % (color_start, color_end))
- for x in range(0, netlink_header_length/4):
+ for x in range(0, netlink_header_length // 4):
start = x * 4
end = start + 4
def build_message(self, seq, pid):
self.seq = seq
self.pid = pid
- attrs = ''
+ attrs = bytes()
- for attr in self.attributes.itervalues():
+ for attr in self.attributes.values():
attrs += attr.encode()
self.length = self.header_LEN + len(self.body) + len(attrs)
self.header_data = pack(self.header_PACK, self.length, self.msgtype, self.flags, self.seq, self.pid)
- self.msg_data = self.body + attrs
+
+ if not attrs:
+ self.msg_data = self.body
+ else:
+ self.msg_data = self.body + attrs
+
self.message = self.header_data + self.msg_data
if self.debug:
self.get_netlink_header_flags_string(self.msgtype, self.flags)))
def pretty_display_dict(self, dic, level):
- for k,v in dic.iteritems():
+ for k,v in dic.items():
if isinstance(v, dict):
self.log.debug(' '*level + str(k) + ':')
self.pretty_display_dict(v, level+5)
if desc is None:
desc = "RXed %s, length %d, seq %d, pid %d, flags 0x%x" % (self, self.length, self.seq, self.pid, self.flags)
- for (attr_type, attr_obj) in self.attributes.iteritems():
+ for (attr_type, attr_obj) in self.attributes.items():
key_string = "(%2d) %s" % (attr_type, self.get_attr_string(attr_type))
attr_string[key_string] = attr_obj.get_pretty_value()
color_end = "\033[0m" if color else ""
self.dump_buffer.append(" %sService Header%s" % (color_start, color_end))
- for x in range(0, self.LEN/4):
+ for x in range(0, self.LEN//4):
if self.line_number == 5:
extra = "Family %s (%s:%d), Length %s (%d), Flags %s, Scope %s (%d)" % \
(zfilled_hex(self.family, 2), get_family_str(self.family), self.family,
color_end = "\033[0m" if color else ""
self.dump_buffer.append(" %sService Header%s" % (color_start, color_end))
- for x in range(0, self.LEN/4):
+ for x in range(0, self.LEN//4):
if self.line_number == 5:
error_number = abs(self.negative_errno)
color_end = "\033[0m" if color else ""
self.dump_buffer.append(" %sService Header%s" % (color_start, color_end))
- for x in range(0, self.LEN/4):
+ for x in range(0, self.LEN//4):
if self.line_number == 5:
extra = "Family %s (%s:%d), Device Type %s (%d - %s)" % \
(zfilled_hex(self.family, 2), get_family_str(self.family), self.family,
info = [ifindex,state,flags,vid]
proto = unpack('=H',sub_attr_data[28:30])[0]
if proto == htons(ETH_P_IP):
- ip_addr = IPv4Address(unpack('>L', sub_attr_data[12:16])[0])
+ ip_addr = ipnetwork.IPNetwork(unpack('>L', sub_attr_data[12:16])[0])
else:
(data1, data2) = unpack('>QQ',sub_attr_data[12:28])
- ip_addr = IPv6Address(data1 << 64 | data2)
+ ip_addr = ipnetwork.IPNetwork(data1 << 64 | data2)
info.append(ip_addr)
info = list(info)
proto = unpack('=H',sub_attr_data[28:30])[0]
if proto == 8:
- ip_addr = IPv4Address(unpack('>L', sub_attr_data[12:16])[0])
+ ip_addr = ipnetwork.IPNetwork(unpack('>L', sub_attr_data[12:16])[0])
else:
(data1, data2) = unpack('>QQ',sub_attr_data[12:28])
- ip_addr = IPv6Address(data1 << 64 | data2)
+ ip_addr = ipnetwork.IPNetwork(data1 << 64 | data2)
info.append(ip_addr)
self.value[MDB.MDBA_MDB_ENTRY][MDB.MDBA_MDB_ENTRY_INFO] = info
if proto == htons(ETH_P_IP):
self.PACK = '=IBBHLxxxxxxxxxxxxHxx'
reorder = unpack('<L', ip.packed)[0]
- ip = IPv4Address(reorder)
+ ip = ipnetwork.IPv4Address(reorder)
self.LEN = calcsize(self.PACK)
length = self.HEADER_LEN + self.LEN
elif proto == htons(ETH_P_IPV6):
self.PACK = '=IBBHQQHxx'
(ifindex, flags, state,vid, data1,data2, proto) = unpack(self.PACK, self.data[4:])
- ip = IPv6Address(data1 << 64 | data2)
+ ip = ipnetwork.IPNetwork(data1 << 64 | data2)
else:
raise Exception("%d Invalid Proto" % proto)
self.LEN = calcsize(self.PACK)
color_end = "\033[0m" if color else ""
self.dump_buffer.append(" %sService Header%s" % (color_start, color_end))
- for x in range(0, self.LEN/4):
+ for x in range(0, self.LEN//4):
if self.line_number == 5:
extra = "Family %s (%s:%d)" % (zfilled_hex(self.family, 2), get_family_str(self.family), self.family)
elif self.line_number == 6:
color_end = "\033[0m" if color else ""
self.dump_buffer.append(" %sService Header%s" % (color_start, color_end))
- for x in range(0, self.LEN/4):
+ for x in range(0, self.LEN//4):
if self.line_number == 5:
extra = "Family %s (%s:%d), Source Length %s (%d), Destination Length %s (%d), TOS %s (%d)" % \
(zfilled_hex(self.family, 2), get_family_str(self.family), self.family,
color_end = "\033[0m" if color else ""
self.dump_buffer.append(" %sService Header%s" % (color_start, color_end))
- for x in range(0, self.LEN/4):
+ for x in range(0, self.LEN//4):
extra = ''
start = x * 4
end = start + 4