]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/munet/linux.py
Merge pull request #13235 from Orange-OpenSource/link-state
[mirror_frr.git] / tests / topotests / munet / linux.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
2 # SPDX-License-Identifier: GPL-2.0-or-later
3 #
4 # June 10 2022, Christian Hopps <chopps@labn.net>
5 #
6 # Copyright (c) 2022, LabN Consulting, L.L.C.
7 #
8 """A module that gives access to linux unshare system call."""
9
10 import ctypes # pylint: disable=C0415
11 import ctypes.util # pylint: disable=C0415
12 import errno
13 import functools
14 import os
15
16
17 libc = None
18
19
20 def raise_oserror(enum):
21 s = errno.errorcode[enum] if enum in errno.errorcode else str(enum)
22 error = OSError(s)
23 error.errno = enum
24 error.strerror = s
25 raise error
26
27
28 def _load_libc():
29 global libc # pylint: disable=W0601,W0603
30 if libc:
31 return
32 lcpath = ctypes.util.find_library("c")
33 libc = ctypes.CDLL(lcpath, use_errno=True)
34
35
36 def pause():
37 if not libc:
38 _load_libc()
39 libc.pause()
40
41
42 MS_RDONLY = 1
43 MS_NOSUID = 1 << 1
44 MS_NODEV = 1 << 2
45 MS_NOEXEC = 1 << 3
46 MS_SYNCHRONOUS = 1 << 4
47 MS_REMOUNT = 1 << 5
48 MS_MANDLOCK = 1 << 6
49 MS_DIRSYNC = 1 << 7
50 MS_NOSYMFOLLOW = 1 << 8
51 MS_NOATIME = 1 << 10
52 MS_NODIRATIME = 1 << 11
53 MS_BIND = 1 << 12
54 MS_MOVE = 1 << 13
55 MS_REC = 1 << 14
56 MS_SILENT = 1 << 15
57 MS_POSIXACL = 1 << 16
58 MS_UNBINDABLE = 1 << 17
59 MS_PRIVATE = 1 << 18
60 MS_SLAVE = 1 << 19
61 MS_SHARED = 1 << 20
62 MS_RELATIME = 1 << 21
63 MS_KERNMOUNT = 1 << 22
64 MS_I_VERSION = 1 << 23
65 MS_STRICTATIME = 1 << 24
66 MS_LAZYTIME = 1 << 25
67
68
69 def mount(source, target, fs, flags=0, options=""):
70 if not libc:
71 _load_libc()
72 libc.mount.argtypes = (
73 ctypes.c_char_p,
74 ctypes.c_char_p,
75 ctypes.c_char_p,
76 ctypes.c_ulong,
77 ctypes.c_char_p,
78 )
79 fsenc = fs.encode() if fs else None
80 optenc = options.encode() if options else None
81 ret = libc.mount(source.encode(), target.encode(), fsenc, flags, optenc)
82 if ret < 0:
83 err = ctypes.get_errno()
84 raise OSError(
85 err,
86 f"Error mounting {source} ({fs}) on {target}"
87 f" with options '{options}': {os.strerror(err)}",
88 )
89
90
91 # unmout options
92 MNT_FORCE = 0x1
93 MNT_DETACH = 0x2
94 MNT_EXPIRE = 0x4
95 UMOUNT_NOFOLLOW = 0x8
96
97
98 def umount(target, options):
99 if not libc:
100 _load_libc()
101 libc.umount.argtypes = (ctypes.c_char_p, ctypes.c_uint)
102
103 ret = libc.umount(target.encode(), int(options))
104 if ret < 0:
105 err = ctypes.get_errno()
106 raise OSError(
107 err,
108 f"Error umounting {target} with options '{options}': {os.strerror(err)}",
109 )
110
111
112 def pidfd_open(pid, flags=0):
113 if hasattr(os, "pidfd_open") and os.pidfd_open is not pidfd_open:
114 return os.pidfd_open(pid, flags) # pylint: disable=no-member
115
116 if not libc:
117 _load_libc()
118
119 try:
120 pfof = libc.pidfd_open
121 except AttributeError:
122 __NR_pidfd_open = 434
123 _pidfd_open = libc.syscall
124 _pidfd_open.restype = ctypes.c_int
125 _pidfd_open.argtypes = ctypes.c_long, ctypes.c_uint, ctypes.c_uint
126 pfof = functools.partial(_pidfd_open, __NR_pidfd_open)
127
128 fd = pfof(int(pid), int(flags))
129 if fd == -1:
130 raise_oserror(ctypes.get_errno())
131
132 return fd
133
134
135 if not hasattr(os, "pidfd_open"):
136 os.pidfd_open = pidfd_open
137
138
139 def setns(fd, nstype): # noqa: D402
140 """See setns(2) manpage."""
141 if not libc:
142 _load_libc()
143
144 if libc.setns(int(fd), int(nstype)) == -1:
145 raise_oserror(ctypes.get_errno())
146
147
148 def unshare(flags): # noqa: D402
149 """See unshare(2) manpage."""
150 if not libc:
151 _load_libc()
152
153 if libc.unshare(int(flags)) == -1:
154 raise_oserror(ctypes.get_errno())
155
156
157 CLONE_NEWTIME = 0x00000080
158 CLONE_VM = 0x00000100
159 CLONE_FS = 0x00000200
160 CLONE_FILES = 0x00000400
161 CLONE_SIGHAND = 0x00000800
162 CLONE_PIDFD = 0x00001000
163 CLONE_PTRACE = 0x00002000
164 CLONE_VFORK = 0x00004000
165 CLONE_PARENT = 0x00008000
166 CLONE_THREAD = 0x00010000
167 CLONE_NEWNS = 0x00020000
168 CLONE_SYSVSEM = 0x00040000
169 CLONE_SETTLS = 0x00080000
170 CLONE_PARENT_SETTID = 0x00100000
171 CLONE_CHILD_CLEARTID = 0x00200000
172 CLONE_DETACHED = 0x00400000
173 CLONE_UNTRACED = 0x00800000
174 CLONE_CHILD_SETTID = 0x01000000
175 CLONE_NEWCGROUP = 0x02000000
176 CLONE_NEWUTS = 0x04000000
177 CLONE_NEWIPC = 0x08000000
178 CLONE_NEWUSER = 0x10000000
179 CLONE_NEWPID = 0x20000000
180 CLONE_NEWNET = 0x40000000
181 CLONE_IO = 0x80000000
182
183 clone_flag_names = {
184 CLONE_NEWTIME: "CLONE_NEWTIME",
185 CLONE_VM: "CLONE_VM",
186 CLONE_FS: "CLONE_FS",
187 CLONE_FILES: "CLONE_FILES",
188 CLONE_SIGHAND: "CLONE_SIGHAND",
189 CLONE_PIDFD: "CLONE_PIDFD",
190 CLONE_PTRACE: "CLONE_PTRACE",
191 CLONE_VFORK: "CLONE_VFORK",
192 CLONE_PARENT: "CLONE_PARENT",
193 CLONE_THREAD: "CLONE_THREAD",
194 CLONE_NEWNS: "CLONE_NEWNS",
195 CLONE_SYSVSEM: "CLONE_SYSVSEM",
196 CLONE_SETTLS: "CLONE_SETTLS",
197 CLONE_PARENT_SETTID: "CLONE_PARENT_SETTID",
198 CLONE_CHILD_CLEARTID: "CLONE_CHILD_CLEARTID",
199 CLONE_DETACHED: "CLONE_DETACHED",
200 CLONE_UNTRACED: "CLONE_UNTRACED",
201 CLONE_CHILD_SETTID: "CLONE_CHILD_SETTID",
202 CLONE_NEWCGROUP: "CLONE_NEWCGROUP",
203 CLONE_NEWUTS: "CLONE_NEWUTS",
204 CLONE_NEWIPC: "CLONE_NEWIPC",
205 CLONE_NEWUSER: "CLONE_NEWUSER",
206 CLONE_NEWPID: "CLONE_NEWPID",
207 CLONE_NEWNET: "CLONE_NEWNET",
208 CLONE_IO: "CLONE_IO",
209 }
210
211
212 def clone_flag_string(flags):
213 ns = [v for k, v in clone_flag_names.items() if k & flags]
214 if ns:
215 return "|".join(ns)
216 return "None"
217
218
219 namespace_files = {
220 CLONE_NEWUSER: "ns/user",
221 CLONE_NEWCGROUP: "ns/cgroup",
222 CLONE_NEWIPC: "ns/ipc",
223 CLONE_NEWUTS: "ns/uts",
224 CLONE_NEWNET: "ns/net",
225 CLONE_NEWPID: "ns/pid_for_children",
226 CLONE_NEWNS: "ns/mnt",
227 CLONE_NEWTIME: "ns/time_for_children",
228 }
229
230 PR_SET_PDEATHSIG = 1
231 PR_GET_PDEATHSIG = 2
232 PR_SET_NAME = 15
233 PR_GET_NAME = 16
234
235
236 def set_process_name(name):
237 if not libc:
238 _load_libc()
239
240 # Why does uncommenting this cause failure?
241 # libc.prctl.argtypes = (
242 # ctypes.c_int,
243 # ctypes.c_ulong,
244 # ctypes.c_ulong,
245 # ctypes.c_ulong,
246 # ctypes.c_ulong,
247 # )
248
249 s = ctypes.create_string_buffer(bytes(name, encoding="ascii"))
250 sr = ctypes.byref(s)
251 libc.prctl(PR_SET_NAME, sr, 0, 0, 0)
252
253
254 def set_parent_death_signal(signum):
255 if not libc:
256 _load_libc()
257
258 # Why does uncommenting this cause failure?
259 libc.prctl.argtypes = (
260 ctypes.c_int,
261 ctypes.c_ulong,
262 ctypes.c_ulong,
263 ctypes.c_ulong,
264 ctypes.c_ulong,
265 )
266
267 libc.prctl(PR_SET_PDEATHSIG, signum, 0, 0, 0)