]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/pim_basic/mcast-tx.py
*: auto-convert to SPDX License IDs
[mirror_frr.git] / tests / topotests / pim_basic / mcast-tx.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: ISC
3 #
4 # mcast-tx.py
5 #
6 # Copyright (c) 2018 Cumulus Networks, Inc.
7 #
8
9 import argparse
10 import logging
11 import socket
12 import struct
13 import time
14 import sys
15
16 logging.basicConfig(
17 level=logging.DEBUG, format="%(asctime)s %(levelname)5s: %(message)s"
18 )
19
20 # Color the errors and warnings in red
21 logging.addLevelName(
22 logging.ERROR, "\033[91m %s\033[0m" % logging.getLevelName(logging.ERROR)
23 )
24 logging.addLevelName(
25 logging.WARNING, "\033[91m%s\033[0m" % logging.getLevelName(logging.WARNING)
26 )
27 log = logging.getLogger(__name__)
28
29 parser = argparse.ArgumentParser(description="Multicast packet generator")
30 parser.add_argument("group", help="Multicast IP")
31 parser.add_argument("ifname", help="Interface name")
32 parser.add_argument("--port", type=int, help="UDP port number", default=1000)
33 parser.add_argument("--ttl", type=int, help="time-to-live", default=20)
34 parser.add_argument("--count", type=int, help="Packets to send", default=1)
35 parser.add_argument("--interval", type=int, help="ms between packets", default=100)
36 args = parser.parse_args()
37
38 # Create the datagram socket
39 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
40
41 # IN.SO_BINDTODEVICE is not defined in some releases of python but it is 25
42 # https://github.com/sivel/bonding/issues/10
43 #
44 # Bind our socket to ifname
45 #
46 # Note ugly python version incompatibility
47 #
48 if sys.version_info[0] > 2:
49 sock.setsockopt(
50 socket.SOL_SOCKET,
51 25,
52 struct.pack("%ds" % len(args.ifname), args.ifname.encode("utf-8")),
53 )
54 else:
55 sock.setsockopt(
56 socket.SOL_SOCKET, 25, struct.pack("%ds" % len(args.ifname), args.ifname)
57 )
58
59 # We need to make sure our sendto() finishes before we close the socket
60 sock.setblocking(1)
61
62 # Set the time-to-live
63 sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, struct.pack("b", args.ttl))
64
65 ms = args.interval / 1000.0
66
67 # Send data to the multicast group
68 for x in range(args.count):
69 log.info(
70 "TX multicast UDP packet to %s:%d on %s" % (args.group, args.port, args.ifname)
71 )
72
73 #
74 # Note ugly python version incompatibility
75 #
76 if sys.version_info[0] > 2:
77 sent = sock.sendto(b"foobar %d" % x, (args.group, args.port))
78 else:
79 sent = sock.sendto("foobar %d" % x, (args.group, args.port))
80
81 if args.count > 1 and ms:
82 time.sleep(ms)
83
84 sock.close()