]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/munet/linux.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
2 # SPDX-License-Identifier: GPL-2.0-or-later
4 # June 10 2022, Christian Hopps <chopps@labn.net>
6 # Copyright (c) 2022, LabN Consulting, L.L.C.
8 """A module that gives access to linux unshare system call."""
10 import ctypes
# pylint: disable=C0415
11 import ctypes
.util
# pylint: disable=C0415
20 def raise_oserror(enum
):
21 s
= errno
.errorcode
[enum
] if enum
in errno
.errorcode
else str(enum
)
29 global libc
# pylint: disable=W0601,W0603
32 lcpath
= ctypes
.util
.find_library("c")
33 libc
= ctypes
.CDLL(lcpath
, use_errno
=True)
46 MS_SYNCHRONOUS
= 1 << 4
50 MS_NOSYMFOLLOW
= 1 << 8
52 MS_NODIRATIME
= 1 << 11
58 MS_UNBINDABLE
= 1 << 17
63 MS_KERNMOUNT
= 1 << 22
64 MS_I_VERSION
= 1 << 23
65 MS_STRICTATIME
= 1 << 24
69 def mount(source
, target
, fs
, flags
=0, options
=""):
72 libc
.mount
.argtypes
= (
79 fsenc
= fs
.encode() if fs
else None
80 optenc
= options
.encode() if options
else None
81 ret
= libc
.mount(source
.encode(), target
.encode(), fsenc
, flags
, optenc
)
83 err
= ctypes
.get_errno()
86 f
"Error mounting {source} ({fs}) on {target}"
87 f
" with options '{options}': {os.strerror(err)}",
98 def umount(target
, options
):
101 libc
.umount
.argtypes
= (ctypes
.c_char_p
, ctypes
.c_uint
)
103 ret
= libc
.umount(target
.encode(), int(options
))
105 err
= ctypes
.get_errno()
108 f
"Error umounting {target} with options '{options}': {os.strerror(err)}",
112 def pidfd_open(pid
, flags
=0):
113 if hasattr(os
, "pidfd_open") and os
.pidfd_open
is not pidfd_open
:
114 return os
.pidfd_open(pid
, flags
) # pylint: disable=no-member
120 pfof
= libc
.pidfd_open
121 except AttributeError:
122 __NR_pidfd_open
= 434
123 _pidfd_open
= libc
.syscall
124 _pidfd_open
.restype
= ctypes
.c_int
125 _pidfd_open
.argtypes
= ctypes
.c_long
, ctypes
.c_uint
, ctypes
.c_uint
126 pfof
= functools
.partial(_pidfd_open
, __NR_pidfd_open
)
128 fd
= pfof(int(pid
), int(flags
))
130 raise_oserror(ctypes
.get_errno())
135 if not hasattr(os
, "pidfd_open"):
136 os
.pidfd_open
= pidfd_open
139 def setns(fd
, nstype
): # noqa: D402
140 """See setns(2) manpage."""
144 if libc
.setns(int(fd
), int(nstype
)) == -1:
145 raise_oserror(ctypes
.get_errno())
148 def unshare(flags
): # noqa: D402
149 """See unshare(2) manpage."""
153 if libc
.unshare(int(flags
)) == -1:
154 raise_oserror(ctypes
.get_errno())
157 CLONE_NEWTIME
= 0x00000080
158 CLONE_VM
= 0x00000100
159 CLONE_FS
= 0x00000200
160 CLONE_FILES
= 0x00000400
161 CLONE_SIGHAND
= 0x00000800
162 CLONE_PIDFD
= 0x00001000
163 CLONE_PTRACE
= 0x00002000
164 CLONE_VFORK
= 0x00004000
165 CLONE_PARENT
= 0x00008000
166 CLONE_THREAD
= 0x00010000
167 CLONE_NEWNS
= 0x00020000
168 CLONE_SYSVSEM
= 0x00040000
169 CLONE_SETTLS
= 0x00080000
170 CLONE_PARENT_SETTID
= 0x00100000
171 CLONE_CHILD_CLEARTID
= 0x00200000
172 CLONE_DETACHED
= 0x00400000
173 CLONE_UNTRACED
= 0x00800000
174 CLONE_CHILD_SETTID
= 0x01000000
175 CLONE_NEWCGROUP
= 0x02000000
176 CLONE_NEWUTS
= 0x04000000
177 CLONE_NEWIPC
= 0x08000000
178 CLONE_NEWUSER
= 0x10000000
179 CLONE_NEWPID
= 0x20000000
180 CLONE_NEWNET
= 0x40000000
181 CLONE_IO
= 0x80000000
184 CLONE_NEWTIME
: "CLONE_NEWTIME",
185 CLONE_VM
: "CLONE_VM",
186 CLONE_FS
: "CLONE_FS",
187 CLONE_FILES
: "CLONE_FILES",
188 CLONE_SIGHAND
: "CLONE_SIGHAND",
189 CLONE_PIDFD
: "CLONE_PIDFD",
190 CLONE_PTRACE
: "CLONE_PTRACE",
191 CLONE_VFORK
: "CLONE_VFORK",
192 CLONE_PARENT
: "CLONE_PARENT",
193 CLONE_THREAD
: "CLONE_THREAD",
194 CLONE_NEWNS
: "CLONE_NEWNS",
195 CLONE_SYSVSEM
: "CLONE_SYSVSEM",
196 CLONE_SETTLS
: "CLONE_SETTLS",
197 CLONE_PARENT_SETTID
: "CLONE_PARENT_SETTID",
198 CLONE_CHILD_CLEARTID
: "CLONE_CHILD_CLEARTID",
199 CLONE_DETACHED
: "CLONE_DETACHED",
200 CLONE_UNTRACED
: "CLONE_UNTRACED",
201 CLONE_CHILD_SETTID
: "CLONE_CHILD_SETTID",
202 CLONE_NEWCGROUP
: "CLONE_NEWCGROUP",
203 CLONE_NEWUTS
: "CLONE_NEWUTS",
204 CLONE_NEWIPC
: "CLONE_NEWIPC",
205 CLONE_NEWUSER
: "CLONE_NEWUSER",
206 CLONE_NEWPID
: "CLONE_NEWPID",
207 CLONE_NEWNET
: "CLONE_NEWNET",
208 CLONE_IO
: "CLONE_IO",
212 def clone_flag_string(flags
):
213 ns
= [v
for k
, v
in clone_flag_names
.items() if k
& flags
]
220 CLONE_NEWUSER
: "ns/user",
221 CLONE_NEWCGROUP
: "ns/cgroup",
222 CLONE_NEWIPC
: "ns/ipc",
223 CLONE_NEWUTS
: "ns/uts",
224 CLONE_NEWNET
: "ns/net",
225 CLONE_NEWPID
: "ns/pid_for_children",
226 CLONE_NEWNS
: "ns/mnt",
227 CLONE_NEWTIME
: "ns/time_for_children",
236 def set_process_name(name
):
240 # Why does uncommenting this cause failure?
241 # libc.prctl.argtypes = (
249 s
= ctypes
.create_string_buffer(bytes(name
, encoding
="ascii"))
251 libc
.prctl(PR_SET_NAME
, sr
, 0, 0, 0)
254 def set_parent_death_signal(signum
):
258 # Why does uncommenting this cause failure?
259 libc
.prctl
.argtypes
= (
267 libc
.prctl(PR_SET_PDEATHSIG
, signum
, 0, 0, 0)