]>
git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py
2 # SPDX-License-Identifier: BSD-3-Clause
12 if sys
.version_info
< (3, 0):
13 print("Python3 is required to run this script")
18 from scapy
.all
import Ether
20 print("Scapy module is required")
29 def assert_requirements(req
):
31 assert requirement is met
32 req can hold a string or a list of strings
35 pkg_resources
.require(req
)
36 except (pkg_resources
.DistributionNotFound
, pkg_resources
.VersionConflict
) as e
:
37 print("Requirement assertion: " + str(e
))
41 TAP_UNPROTECTED
= "dtap1"
42 TAP_PROTECTED
= "dtap0"
45 class Interface(object):
47 MAX_PACKET_SIZE
= 1280
48 IOCTL_GET_INFO
= 0x8927
50 def __init__(self
, ifname
):
53 # create and bind socket to specified interface
54 self
.s
= socket
.socket(socket
.AF_PACKET
, socket
.SOCK_RAW
, socket
.htons(Interface
.ETH_P_ALL
))
55 self
.s
.settimeout(Interface
.SOCKET_TIMEOUT
)
56 self
.s
.bind((self
.name
, 0, socket
.PACKET_OTHERHOST
))
58 # get interface MAC address
59 info
= fcntl
.ioctl(self
.s
.fileno(), Interface
.IOCTL_GET_INFO
, struct
.pack('256s', bytes(ifname
[:15], encoding
='ascii')))
60 self
.mac
= ':'.join(['%02x' % i
for i
in info
[18:24]])
65 def send_l3packet(self
, pkt
, mac
):
66 e
= Ether(src
=self
.mac
, dst
=mac
)
67 self
.send_packet(e
/pkt
)
69 def send_packet(self
, pkt
):
70 self
.send_bytes(bytes(pkt
))
72 def send_bytes(self
, bytedata
):
75 def recv_packet(self
):
76 return Ether(self
.recv_bytes())
79 return self
.s
.recv(Interface
.MAX_PACKET_SIZE
)
85 class PacketXfer(object):
86 def __init__(self
, protected_iface
=TAP_PROTECTED
, unprotected_iface
=TAP_UNPROTECTED
):
87 self
.protected_port
= Interface(protected_iface
)
88 self
.unprotected_port
= Interface(unprotected_iface
)
90 def send_to_protected_port(self
, pkt
, remote_mac
=None):
91 if remote_mac
is None:
92 remote_mac
= self
.unprotected_port
.get_mac()
93 self
.protected_port
.send_l3packet(pkt
, remote_mac
)
95 def send_to_unprotected_port(self
, pkt
, remote_mac
=None):
96 if remote_mac
is None:
97 remote_mac
= self
.protected_port
.get_mac()
98 self
.unprotected_port
.send_l3packet(pkt
, remote_mac
)
100 def xfer_unprotected(self
, pkt
):
101 self
.send_to_unprotected_port(pkt
)
102 return self
.protected_port
.recv_packet()
104 def xfer_protected(self
, pkt
):
105 self
.send_to_protected_port(pkt
)
106 return self
.unprotected_port
.recv_packet()
110 if len(sys
.argv
) == 1:
111 sys
.exit(unittest
.main(verbosity
=2))
112 elif len(sys
.argv
) == 2:
113 if sys
.argv
[1] == "config":
114 module
= __import__('__main__')
116 print(module
.config())
117 except AttributeError:
118 sys
.stderr
.write("Cannot find \"config()\" in a test")
124 if __name__
== "__main__":
125 if len(sys
.argv
) == 2 and sys
.argv
[1] == "check_reqs":
126 assert_requirements(PKTTEST_REQ
)
128 print("Usage: " + sys
.argv
[0] + " check_reqs")