]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / dpdk / examples / ipsec-secgw / test / pkttest.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: BSD-3-Clause
3
4 import fcntl
5 import pkg_resources
6 import socket
7 import struct
8 import sys
9 import unittest
10
11
12 if sys.version_info < (3, 0):
13 print("Python3 is required to run this script")
14 sys.exit(1)
15
16
17 try:
18 from scapy.all import Ether
19 except ImportError:
20 print("Scapy module is required")
21 sys.exit(1)
22
23
24 PKTTEST_REQ = [
25 "scapy>=2.4.3",
26 ]
27
28
29 def assert_requirements(req):
30 """
31 assert requirement is met
32 req can hold a string or a list of strings
33 """
34 try:
35 pkg_resources.require(req)
36 except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
37 print("Requirement assertion: " + str(e))
38 sys.exit(1)
39
40
41 TAP_UNPROTECTED = "dtap1"
42 TAP_PROTECTED = "dtap0"
43
44
45 class Interface(object):
46 ETH_P_ALL = 3
47 MAX_PACKET_SIZE = 1280
48 IOCTL_GET_INFO = 0x8927
49 SOCKET_TIMEOUT = 0.5
50 def __init__(self, ifname):
51 self.name = ifname
52
53 # create and bind socket to specified interface
54 self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
55 self.s.settimeout(Interface.SOCKET_TIMEOUT)
56 self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
57
58 # get interface MAC address
59 info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
60 self.mac = ':'.join(['%02x' % i for i in info[18:24]])
61
62 def __del__(self):
63 self.s.close()
64
65 def send_l3packet(self, pkt, mac):
66 e = Ether(src=self.mac, dst=mac)
67 self.send_packet(e/pkt)
68
69 def send_packet(self, pkt):
70 self.send_bytes(bytes(pkt))
71
72 def send_bytes(self, bytedata):
73 self.s.send(bytedata)
74
75 def recv_packet(self):
76 return Ether(self.recv_bytes())
77
78 def recv_bytes(self):
79 return self.s.recv(Interface.MAX_PACKET_SIZE)
80
81 def get_mac(self):
82 return self.mac
83
84
85 class PacketXfer(object):
86 def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
87 self.protected_port = Interface(protected_iface)
88 self.unprotected_port = Interface(unprotected_iface)
89
90 def send_to_protected_port(self, pkt, remote_mac=None):
91 if remote_mac is None:
92 remote_mac = self.unprotected_port.get_mac()
93 self.protected_port.send_l3packet(pkt, remote_mac)
94
95 def send_to_unprotected_port(self, pkt, remote_mac=None):
96 if remote_mac is None:
97 remote_mac = self.protected_port.get_mac()
98 self.unprotected_port.send_l3packet(pkt, remote_mac)
99
100 def xfer_unprotected(self, pkt):
101 self.send_to_unprotected_port(pkt)
102 return self.protected_port.recv_packet()
103
104 def xfer_protected(self, pkt):
105 self.send_to_protected_port(pkt)
106 return self.unprotected_port.recv_packet()
107
108
109 def pkttest():
110 if len(sys.argv) == 1:
111 sys.exit(unittest.main(verbosity=2))
112 elif len(sys.argv) == 2:
113 if sys.argv[1] == "config":
114 module = __import__('__main__')
115 try:
116 print(module.config())
117 except AttributeError:
118 sys.stderr.write("Cannot find \"config()\" in a test")
119 sys.exit(1)
120 else:
121 sys.exit(1)
122
123
124 if __name__ == "__main__":
125 if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
126 assert_requirements(PKTTEST_REQ)
127 else:
128 print("Usage: " + sys.argv[0] + " check_reqs")