]>
Commit | Line | Data |
---|---|---|
f67539c2 | 1 | import hashlib |
7c673cae FG |
2 | import json |
3 | import logging | |
4 | import datetime | |
f67539c2 TL |
5 | import os |
6 | import re | |
7c673cae | 7 | import time |
f67539c2 TL |
8 | |
9 | from io import StringIO | |
10 | from contextlib import contextmanager | |
7c673cae | 11 | from textwrap import dedent |
f67539c2 | 12 | from IPy import IP |
cd265ab1 | 13 | |
f67539c2 | 14 | from teuthology.contextutil import safe_while |
522d829b | 15 | from teuthology.misc import get_file, write_file |
7c673cae | 16 | from teuthology.orchestra import run |
20effc67 TL |
17 | from teuthology.orchestra.run import Raw |
18 | from teuthology.exceptions import CommandFailedError, ConnectionLostError | |
f67539c2 | 19 | |
11fdf7f2 | 20 | from tasks.cephfs.filesystem import Filesystem |
7c673cae FG |
21 | |
22 | log = logging.getLogger(__name__) | |
23 | ||
1e59de90 TL |
24 | |
25 | UMOUNT_TIMEOUT = 300 | |
26 | ||
27 | ||
7c673cae | 28 | class CephFSMount(object): |
f67539c2 TL |
29 | def __init__(self, ctx, test_dir, client_id, client_remote, |
30 | client_keyring_path=None, hostfs_mntpt=None, | |
1e59de90 TL |
31 | cephfs_name=None, cephfs_mntpt=None, brxnet=None, |
32 | client_config=None): | |
7c673cae FG |
33 | """ |
34 | :param test_dir: Global teuthology test dir | |
35 | :param client_id: Client ID, the 'foo' in client.foo | |
f67539c2 TL |
36 | :param client_keyring_path: path to keyring for given client_id |
37 | :param client_remote: Remote instance for the host where client will | |
38 | run | |
39 | :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will | |
40 | be mounted | |
41 | :param cephfs_name: Name of Ceph FS to be mounted | |
42 | :param cephfs_mntpt: Path to directory inside Ceph FS that will be | |
43 | mounted as root | |
7c673cae | 44 | """ |
11fdf7f2 | 45 | self.ctx = ctx |
7c673cae | 46 | self.test_dir = test_dir |
f67539c2 TL |
47 | |
48 | self._verify_attrs(client_id=client_id, | |
49 | client_keyring_path=client_keyring_path, | |
50 | hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name, | |
51 | cephfs_mntpt=cephfs_mntpt) | |
52 | ||
1e59de90 TL |
53 | if client_config is None: |
54 | client_config = {} | |
55 | self.client_config = client_config | |
56 | ||
57 | self.cephfs_name = cephfs_name | |
7c673cae | 58 | self.client_id = client_id |
f67539c2 | 59 | self.client_keyring_path = client_keyring_path |
7c673cae | 60 | self.client_remote = client_remote |
1e59de90 TL |
61 | self.cluster_name = 'ceph' # TODO: use config['cluster'] |
62 | self.fs = None | |
63 | ||
64 | if cephfs_mntpt is None and client_config.get("mount_path"): | |
65 | self.cephfs_mntpt = client_config.get("mount_path") | |
66 | log.info(f"using client_config[\"cephfs_mntpt\"] = {self.cephfs_mntpt}") | |
67 | else: | |
68 | self.cephfs_mntpt = cephfs_mntpt | |
69 | log.info(f"cephfs_mntpt = {self.cephfs_mntpt}") | |
70 | ||
71 | if hostfs_mntpt is None and client_config.get("mountpoint"): | |
72 | self.hostfs_mntpt = client_config.get("mountpoint") | |
73 | log.info(f"using client_config[\"hostfs_mntpt\"] = {self.hostfs_mntpt}") | |
74 | elif hostfs_mntpt is not None: | |
f67539c2 | 75 | self.hostfs_mntpt = hostfs_mntpt |
f67539c2 TL |
76 | else: |
77 | self.hostfs_mntpt = os.path.join(self.test_dir, f'mnt.{self.client_id}') | |
1e59de90 TL |
78 | self.hostfs_mntpt_dirname = os.path.basename(self.hostfs_mntpt) |
79 | log.info(f"hostfs_mntpt = {self.hostfs_mntpt}") | |
7c673cae | 80 | |
f67539c2 TL |
81 | self._netns_name = None |
82 | self.nsid = -1 | |
83 | if brxnet is None: | |
84 | self.ceph_brx_net = '192.168.0.0/16' | |
85 | else: | |
86 | self.ceph_brx_net = brxnet | |
87 | ||
7c673cae FG |
88 | self.test_files = ['a', 'b', 'c'] |
89 | ||
90 | self.background_procs = [] | |
91 | ||
f67539c2 TL |
92 | # This will cleanup the stale netnses, which are from the |
93 | # last failed test cases. | |
94 | @staticmethod | |
95 | def cleanup_stale_netnses_and_bridge(remote): | |
96 | p = remote.run(args=['ip', 'netns', 'list'], | |
97 | stdout=StringIO(), timeout=(5*60)) | |
98 | p = p.stdout.getvalue().strip() | |
99 | ||
100 | # Get the netns name list | |
101 | netns_list = re.findall(r'ceph-ns-[^()\s][-.\w]+[^():\s]', p) | |
102 | ||
103 | # Remove the stale netnses | |
104 | for ns in netns_list: | |
105 | ns_name = ns.split()[0] | |
106 | args = ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name)] | |
107 | try: | |
108 | remote.run(args=args, timeout=(5*60), omit_sudo=False) | |
109 | except Exception: | |
110 | pass | |
111 | ||
112 | # Remove the stale 'ceph-brx' | |
113 | try: | |
114 | args = ['sudo', 'ip', 'link', 'delete', 'ceph-brx'] | |
115 | remote.run(args=args, timeout=(5*60), omit_sudo=False) | |
116 | except Exception: | |
117 | pass | |
118 | ||
119 | def _parse_netns_name(self): | |
120 | self._netns_name = '-'.join(["ceph-ns", | |
121 | re.sub(r'/+', "-", self.mountpoint)]) | |
122 | ||
7c673cae FG |
123 | @property |
124 | def mountpoint(self): | |
1e59de90 | 125 | if self.hostfs_mntpt is None: |
f67539c2 TL |
126 | self.hostfs_mntpt = os.path.join(self.test_dir, |
127 | self.hostfs_mntpt_dirname) | |
128 | return self.hostfs_mntpt | |
9f95a23c TL |
129 | |
130 | @mountpoint.setter | |
131 | def mountpoint(self, path): | |
132 | if not isinstance(path, str): | |
133 | raise RuntimeError('path should be of str type.') | |
f67539c2 TL |
134 | self._mountpoint = self.hostfs_mntpt = path |
135 | ||
136 | @property | |
137 | def netns_name(self): | |
138 | if self._netns_name == None: | |
139 | self._parse_netns_name() | |
140 | return self._netns_name | |
141 | ||
142 | @netns_name.setter | |
143 | def netns_name(self, name): | |
144 | self._netns_name = name | |
145 | ||
20effc67 TL |
146 | def assert_that_ceph_fs_exists(self): |
147 | output = self.ctx.managers[self.cluster_name].raw_cluster_cmd("fs", "ls") | |
148 | if self.cephfs_name: | |
149 | assert self.cephfs_name in output, \ | |
150 | 'expected ceph fs is not present on the cluster' | |
151 | log.info(f'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster') | |
152 | else: | |
153 | assert 'No filesystems enabled' not in output, \ | |
154 | 'ceph cluster has no ceph fs, not even the default ceph fs' | |
155 | log.info('Mounting default Ceph FS; just confirmed its presence on cluster') | |
156 | ||
f67539c2 TL |
157 | def assert_and_log_minimum_mount_details(self): |
158 | """ | |
159 | Make sure we have minimum details required for mounting. Ideally, this | |
160 | method should be called at the beginning of the mount method. | |
161 | """ | |
162 | if not self.client_id or not self.client_remote or \ | |
163 | not self.hostfs_mntpt: | |
1e59de90 TL |
164 | log.error(f"self.client_id = {self.client_id}") |
165 | log.error(f"self.client_remote = {self.client_remote}") | |
166 | log.error(f"self.hostfs_mntpt = {self.hostfs_mntpt}") | |
f67539c2 TL |
167 | errmsg = ('Mounting CephFS requires that at least following ' |
168 | 'details to be provided -\n' | |
169 | '1. the client ID,\n2. the mountpoint and\n' | |
170 | '3. the remote machine where CephFS will be mounted.\n') | |
171 | raise RuntimeError(errmsg) | |
172 | ||
20effc67 TL |
173 | self.assert_that_ceph_fs_exists() |
174 | ||
f67539c2 TL |
175 | log.info('Mounting Ceph FS. Following are details of mount; remember ' |
176 | '"None" represents Python type None -') | |
177 | log.info(f'self.client_remote.hostname = {self.client_remote.hostname}') | |
178 | log.info(f'self.client.name = client.{self.client_id}') | |
179 | log.info(f'self.hostfs_mntpt = {self.hostfs_mntpt}') | |
180 | log.info(f'self.cephfs_name = {self.cephfs_name}') | |
181 | log.info(f'self.cephfs_mntpt = {self.cephfs_mntpt}') | |
182 | log.info(f'self.client_keyring_path = {self.client_keyring_path}') | |
183 | if self.client_keyring_path: | |
184 | log.info('keyring content -\n' + | |
185 | get_file(self.client_remote, self.client_keyring_path, | |
186 | sudo=True).decode()) | |
7c673cae | 187 | |
1e59de90 | 188 | def is_blocked(self): |
aee94f69 TL |
189 | if not self.addr: |
190 | # can't infer if our addr is blocklisted - let the caller try to | |
191 | # umount without lazy/force. If the client was blocklisted, then | |
192 | # the umount would be stuck and the test would fail on timeout. | |
193 | # happens only with Ubuntu 20.04 (missing kclient patches :/). | |
194 | return False | |
1e59de90 TL |
195 | self.fs = Filesystem(self.ctx, name=self.cephfs_name) |
196 | ||
197 | try: | |
198 | output = self.fs.mon_manager.raw_cluster_cmd(args='osd blocklist ls') | |
199 | except CommandFailedError: | |
200 | # Fallback for older Ceph cluster | |
201 | output = self.fs.mon_manager.raw_cluster_cmd(args='osd blacklist ls') | |
202 | ||
203 | return self.addr in output | |
204 | ||
205 | def is_stuck(self): | |
206 | """ | |
207 | Check if mount is stuck/in a hanged state. | |
208 | """ | |
209 | if not self.is_mounted(): | |
210 | return False | |
211 | ||
212 | retval = self.client_remote.run(args=f'sudo stat {self.hostfs_mntpt}', | |
213 | omit_sudo=False, wait=False).returncode | |
214 | if retval == 0: | |
215 | return False | |
216 | ||
217 | time.sleep(10) | |
218 | proc = self.client_remote.run(args='ps -ef', stdout=StringIO()) | |
219 | # if proc was running even after 10 seconds, it has to be stuck. | |
220 | if f'stat {self.hostfs_mntpt}' in proc.stdout.getvalue(): | |
221 | log.critical('client mounted at self.hostfs_mntpt is stuck!') | |
222 | return True | |
223 | return False | |
224 | ||
7c673cae | 225 | def is_mounted(self): |
1e59de90 TL |
226 | file = self.client_remote.read_file('/proc/self/mounts',stdout=StringIO()) |
227 | if self.hostfs_mntpt in file: | |
228 | return True | |
229 | else: | |
230 | log.debug(f"not mounted; /proc/self/mounts is:\n{file}") | |
231 | return False | |
7c673cae | 232 | |
11fdf7f2 TL |
233 | def setupfs(self, name=None): |
234 | if name is None and self.fs is not None: | |
235 | # Previous mount existed, reuse the old name | |
236 | name = self.fs.name | |
237 | self.fs = Filesystem(self.ctx, name=name) | |
238 | log.info('Wait for MDS to reach steady state...') | |
239 | self.fs.wait_for_daemons() | |
240 | log.info('Ready to start {}...'.format(type(self).__name__)) | |
241 | ||
20effc67 TL |
242 | def _create_mntpt(self): |
243 | self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}', | |
244 | timeout=60) | |
245 | # Use 0000 mode to prevent undesired modifications to the mountpoint on | |
246 | # the local file system. | |
247 | self.client_remote.run(args=f'chmod 0000 {self.hostfs_mntpt}', | |
248 | timeout=60) | |
249 | ||
250 | @property | |
251 | def _nsenter_args(self): | |
252 | return ['nsenter', f'--net=/var/run/netns/{self.netns_name}'] | |
253 | ||
254 | def _set_filemode_on_mntpt(self): | |
255 | stderr = StringIO() | |
256 | try: | |
257 | self.client_remote.run( | |
258 | args=['sudo', 'chmod', '1777', self.hostfs_mntpt], | |
259 | stderr=stderr, timeout=(5*60)) | |
260 | except CommandFailedError: | |
261 | # the client does not have write permissions in the caps it holds | |
262 | # for the Ceph FS that was just mounted. | |
263 | if 'permission denied' in stderr.getvalue().lower(): | |
264 | pass | |
265 | ||
f67539c2 TL |
266 | def _setup_brx_and_nat(self): |
267 | # The ip for ceph-brx should be | |
268 | ip = IP(self.ceph_brx_net)[-2] | |
269 | mask = self.ceph_brx_net.split('/')[1] | |
270 | brd = IP(self.ceph_brx_net).broadcast() | |
271 | ||
272 | brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(), | |
273 | stdout=StringIO(), timeout=(5*60)) | |
274 | brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue()) | |
275 | if brx: | |
276 | # If the 'ceph-brx' already exists, then check whether | |
277 | # the new net is conflicting with it | |
278 | _ip, _mask = brx[0].split()[1].split('/', 1) | |
279 | if _ip != "{}".format(ip) or _mask != mask: | |
280 | raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx[0].split()[1], ip, mask)) | |
281 | ||
282 | # Setup the ceph-brx and always use the last valid IP | |
283 | if not brx: | |
284 | log.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip, mask)) | |
285 | ||
286 | self.run_shell_payload(f""" | |
287 | set -e | |
288 | sudo ip link add name ceph-brx type bridge | |
289 | sudo ip addr flush dev ceph-brx | |
290 | sudo ip link set ceph-brx up | |
291 | sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx | |
292 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
293 | ||
294 | args = "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward" | |
295 | self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False) | |
296 | ||
297 | # Setup the NAT | |
298 | p = self.client_remote.run(args=['route'], stderr=StringIO(), | |
299 | stdout=StringIO(), timeout=(5*60)) | |
300 | p = re.findall(r'default .*', p.stdout.getvalue()) | |
301 | if p == False: | |
302 | raise RuntimeError("No default gw found") | |
303 | gw = p[0].split()[7] | |
304 | ||
305 | self.run_shell_payload(f""" | |
306 | set -e | |
307 | sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT | |
308 | sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT | |
309 | sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE | |
310 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
311 | ||
312 | def _setup_netns(self): | |
313 | p = self.client_remote.run(args=['ip', 'netns', 'list'], | |
314 | stderr=StringIO(), stdout=StringIO(), | |
315 | timeout=(5*60)).stdout.getvalue().strip() | |
316 | ||
317 | # Get the netns name list | |
318 | netns_list = re.findall(r'[^()\s][-.\w]+[^():\s]', p) | |
319 | ||
320 | out = re.search(r"{0}".format(self.netns_name), p) | |
321 | if out is None: | |
322 | # Get an uniq nsid for the new netns | |
323 | nsid = 0 | |
324 | p = self.client_remote.run(args=['ip', 'netns', 'list-id'], | |
325 | stderr=StringIO(), stdout=StringIO(), | |
326 | timeout=(5*60)).stdout.getvalue() | |
327 | while True: | |
328 | out = re.search(r"nsid {} ".format(nsid), p) | |
329 | if out is None: | |
330 | break | |
331 | ||
332 | nsid += 1 | |
333 | ||
334 | # Add one new netns and set it id | |
335 | self.run_shell_payload(f""" | |
336 | set -e | |
337 | sudo ip netns add {self.netns_name} | |
338 | sudo ip netns set {self.netns_name} {nsid} | |
339 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
340 | self.nsid = nsid; | |
341 | else: | |
342 | # The netns already exists and maybe suspended by self.kill() | |
343 | self.resume_netns(); | |
344 | ||
345 | nsid = int(re.search(r"{0} \(id: (\d+)\)".format(self.netns_name), p).group(1)) | |
346 | self.nsid = nsid; | |
347 | return | |
348 | ||
349 | # Get one ip address for netns | |
350 | ips = IP(self.ceph_brx_net) | |
351 | for ip in ips: | |
352 | found = False | |
353 | if ip == ips[0]: | |
354 | continue | |
355 | if ip == ips[-2]: | |
356 | raise RuntimeError("we have ran out of the ip addresses") | |
357 | ||
358 | for ns in netns_list: | |
359 | ns_name = ns.split()[0] | |
360 | args = ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name), 'ip', 'addr'] | |
361 | try: | |
362 | p = self.client_remote.run(args=args, stderr=StringIO(), | |
363 | stdout=StringIO(), timeout=(5*60), | |
364 | omit_sudo=False) | |
365 | q = re.search("{0}".format(ip), p.stdout.getvalue()) | |
366 | if q is not None: | |
367 | found = True | |
368 | break | |
369 | except CommandFailedError: | |
370 | if "No such file or directory" in p.stderr.getvalue(): | |
371 | pass | |
372 | if "Invalid argument" in p.stderr.getvalue(): | |
373 | pass | |
374 | ||
375 | if found == False: | |
376 | break | |
377 | ||
378 | mask = self.ceph_brx_net.split('/')[1] | |
379 | brd = IP(self.ceph_brx_net).broadcast() | |
380 | ||
381 | log.info("Setuping the netns '{0}' with {1}/{2}".format(self.netns_name, ip, mask)) | |
382 | ||
383 | # Setup the veth interfaces | |
384 | brxip = IP(self.ceph_brx_net)[-2] | |
385 | self.run_shell_payload(f""" | |
386 | set -e | |
387 | sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid} | |
388 | sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0 | |
389 | sudo ip netns exec {self.netns_name} ip link set veth0 up | |
390 | sudo ip netns exec {self.netns_name} ip link set lo up | |
391 | sudo ip netns exec {self.netns_name} ip route add default via {brxip} | |
392 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
393 | ||
394 | # Bring up the brx interface and join it to 'ceph-brx' | |
395 | self.run_shell_payload(f""" | |
396 | set -e | |
397 | sudo ip link set brx.{nsid} up | |
398 | sudo ip link set dev brx.{nsid} master ceph-brx | |
399 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
400 | ||
401 | def _cleanup_netns(self): | |
402 | if self.nsid == -1: | |
403 | return | |
404 | log.info("Removing the netns '{0}'".format(self.netns_name)) | |
405 | ||
406 | # Delete the netns and the peer veth interface | |
407 | self.run_shell_payload(f""" | |
408 | set -e | |
409 | sudo ip link set brx.{self.nsid} down | |
410 | sudo ip link delete dev brx.{self.nsid} | |
411 | sudo ip netns delete {self.netns_name} | |
412 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
413 | ||
414 | self.nsid = -1 | |
415 | ||
416 | def _cleanup_brx_and_nat(self): | |
417 | brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(), | |
418 | stdout=StringIO(), timeout=(5*60)) | |
419 | brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue()) | |
420 | if not brx: | |
421 | return | |
422 | ||
423 | # If we are the last netns, will delete the ceph-brx | |
424 | args = ['sudo', 'ip', 'link', 'show'] | |
425 | p = self.client_remote.run(args=args, stdout=StringIO(), | |
426 | timeout=(5*60), omit_sudo=False) | |
427 | _list = re.findall(r'brx\.', p.stdout.getvalue().strip()) | |
428 | if len(_list) != 0: | |
429 | return | |
430 | ||
431 | log.info("Removing the 'ceph-brx'") | |
432 | ||
433 | self.run_shell_payload(""" | |
434 | set -e | |
435 | sudo ip link set ceph-brx down | |
436 | sudo ip link delete ceph-brx | |
437 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
438 | ||
439 | # Drop the iptables NAT rules | |
440 | ip = IP(self.ceph_brx_net)[-2] | |
441 | mask = self.ceph_brx_net.split('/')[1] | |
442 | ||
443 | p = self.client_remote.run(args=['route'], stderr=StringIO(), | |
444 | stdout=StringIO(), timeout=(5*60)) | |
445 | p = re.findall(r'default .*', p.stdout.getvalue()) | |
446 | if p == False: | |
447 | raise RuntimeError("No default gw found") | |
448 | gw = p[0].split()[7] | |
449 | self.run_shell_payload(f""" | |
450 | set -e | |
451 | sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT | |
452 | sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT | |
453 | sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE | |
454 | """, timeout=(5*60), omit_sudo=False, cwd='/') | |
455 | ||
456 | def setup_netns(self): | |
457 | """ | |
458 | Setup the netns for the mountpoint. | |
459 | """ | |
460 | log.info("Setting the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint)) | |
461 | self._setup_brx_and_nat() | |
462 | self._setup_netns() | |
463 | ||
464 | def cleanup_netns(self): | |
465 | """ | |
466 | Cleanup the netns for the mountpoint. | |
467 | """ | |
468 | # We will defer cleaning the netnses and bridge until the last | |
469 | # mountpoint is unmounted, this will be a temporary work around | |
470 | # for issue#46282. | |
471 | ||
472 | # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint)) | |
473 | # self._cleanup_netns() | |
474 | # self._cleanup_brx_and_nat() | |
475 | ||
476 | def suspend_netns(self): | |
477 | """ | |
478 | Suspend the netns veth interface. | |
479 | """ | |
480 | if self.nsid == -1: | |
481 | return | |
482 | ||
483 | log.info("Suspending the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint)) | |
484 | ||
485 | args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'down'] | |
486 | self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False) | |
487 | ||
488 | def resume_netns(self): | |
489 | """ | |
490 | Resume the netns veth interface. | |
491 | """ | |
492 | if self.nsid == -1: | |
493 | return | |
494 | ||
495 | log.info("Resuming the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint)) | |
496 | ||
497 | args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up'] | |
498 | self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False) | |
499 | ||
20effc67 | 500 | def mount(self, mntopts=[], check_status=True, **kwargs): |
f67539c2 TL |
501 | """ |
502 | kwargs expects its members to be same as the arguments accepted by | |
503 | self.update_attrs(). | |
504 | """ | |
7c673cae FG |
505 | raise NotImplementedError() |
506 | ||
f67539c2 TL |
507 | def mount_wait(self, **kwargs): |
508 | """ | |
509 | Accepts arguments same as self.mount(). | |
510 | """ | |
511 | self.mount(**kwargs) | |
e306af50 TL |
512 | self.wait_until_mounted() |
513 | ||
1e59de90 TL |
514 | def _run_umount_lf(self): |
515 | log.debug(f'Force/lazy unmounting on client.{self.client_id}') | |
516 | ||
517 | try: | |
518 | proc = self.client_remote.run( | |
519 | args=f'sudo umount --lazy --force {self.hostfs_mntpt}', | |
520 | timeout=UMOUNT_TIMEOUT, omit_sudo=False) | |
521 | except CommandFailedError: | |
522 | if self.is_mounted(): | |
523 | raise | |
524 | ||
525 | return proc | |
526 | ||
7c673cae FG |
527 | def umount(self): |
528 | raise NotImplementedError() | |
529 | ||
1e59de90 TL |
530 | def umount_wait(self, force=False, require_clean=False, |
531 | timeout=UMOUNT_TIMEOUT): | |
7c673cae FG |
532 | """ |
533 | ||
534 | :param force: Expect that the mount will not shutdown cleanly: kill | |
535 | it hard. | |
536 | :param require_clean: Wait for the Ceph client associated with the | |
537 | mount (e.g. ceph-fuse) to terminate, and | |
538 | raise if it doesn't do so cleanly. | |
f67539c2 | 539 | :param timeout: amount of time to be waited for umount command to finish |
7c673cae FG |
540 | :return: |
541 | """ | |
542 | raise NotImplementedError() | |
543 | ||
f67539c2 TL |
544 | def _verify_attrs(self, **kwargs): |
545 | """ | |
546 | Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt, | |
547 | cephfs_name, cephfs_mntpt are either type str or None. | |
548 | """ | |
549 | for k, v in kwargs.items(): | |
550 | if v is not None and not isinstance(v, str): | |
551 | raise RuntimeError('value of attributes should be either str ' | |
552 | f'or None. {k} - {v}') | |
553 | ||
554 | def update_attrs(self, client_id=None, client_keyring_path=None, | |
555 | client_remote=None, hostfs_mntpt=None, cephfs_name=None, | |
556 | cephfs_mntpt=None): | |
557 | if not (client_id or client_keyring_path or client_remote or | |
558 | cephfs_name or cephfs_mntpt or hostfs_mntpt): | |
559 | return | |
560 | ||
561 | self._verify_attrs(client_id=client_id, | |
562 | client_keyring_path=client_keyring_path, | |
563 | hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name, | |
564 | cephfs_mntpt=cephfs_mntpt) | |
565 | ||
566 | if client_id: | |
567 | self.client_id = client_id | |
568 | if client_keyring_path: | |
569 | self.client_keyring_path = client_keyring_path | |
570 | if client_remote: | |
571 | self.client_remote = client_remote | |
572 | if hostfs_mntpt: | |
573 | self.hostfs_mntpt = hostfs_mntpt | |
574 | if cephfs_name: | |
575 | self.cephfs_name = cephfs_name | |
576 | if cephfs_mntpt: | |
577 | self.cephfs_mntpt = cephfs_mntpt | |
578 | ||
579 | def remount(self, **kwargs): | |
580 | """ | |
581 | Update mount object's attributes and attempt remount with these | |
582 | new values for these attrbiutes. | |
583 | ||
584 | 1. Run umount_wait(). | |
585 | 2. Run update_attrs(). | |
586 | 3. Run mount(). | |
587 | ||
20effc67 TL |
588 | Accepts arguments of self.mount() and self.update_attrs() with 1 |
589 | exception: wait accepted too which can be True or False. | |
f67539c2 TL |
590 | """ |
591 | self.umount_wait() | |
1e59de90 | 592 | assert not self.is_mounted() |
f67539c2 TL |
593 | |
594 | mntopts = kwargs.pop('mntopts', []) | |
f67539c2 TL |
595 | check_status = kwargs.pop('check_status', True) |
596 | wait = kwargs.pop('wait', True) | |
597 | ||
598 | self.update_attrs(**kwargs) | |
599 | ||
20effc67 | 600 | retval = self.mount(mntopts=mntopts, check_status=check_status) |
f67539c2 TL |
601 | # avoid this scenario (again): mount command might've failed and |
602 | # check_status might have silenced the exception, yet we attempt to | |
603 | # wait which might lead to an error. | |
604 | if retval is None and wait: | |
605 | self.wait_until_mounted() | |
606 | ||
607 | return retval | |
7c673cae FG |
608 | |
609 | def kill(self): | |
f67539c2 TL |
610 | """ |
611 | Suspend the netns veth interface to make the client disconnected | |
612 | from the ceph cluster | |
613 | """ | |
614 | log.info('Killing connection on {0}...'.format(self.client_remote.name)) | |
615 | self.suspend_netns() | |
616 | ||
617 | def kill_cleanup(self): | |
618 | """ | |
619 | Follow up ``kill`` to get to a clean unmounted state. | |
620 | """ | |
621 | log.info('Cleaning up killed connection on {0}'.format(self.client_remote.name)) | |
622 | self.umount_wait(force=True) | |
7c673cae FG |
623 | |
624 | def cleanup(self): | |
f67539c2 TL |
625 | """ |
626 | Remove the mount point. | |
627 | ||
628 | Prerequisite: the client is not mounted. | |
629 | """ | |
630 | log.info('Cleaning up mount {0}'.format(self.client_remote.name)) | |
631 | stderr = StringIO() | |
632 | try: | |
633 | self.client_remote.run(args=['rmdir', '--', self.mountpoint], | |
634 | cwd=self.test_dir, stderr=stderr, | |
635 | timeout=(60*5), check_status=False) | |
636 | except CommandFailedError: | |
637 | if "no such file or directory" not in stderr.getvalue().lower(): | |
638 | raise | |
639 | ||
640 | self.cleanup_netns() | |
7c673cae FG |
641 | |
642 | def wait_until_mounted(self): | |
643 | raise NotImplementedError() | |
644 | ||
645 | def get_keyring_path(self): | |
a4b75251 | 646 | # N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps |
7c673cae FG |
647 | return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id) |
648 | ||
f67539c2 TL |
649 | def get_key_from_keyfile(self): |
650 | # XXX: don't call run_shell(), since CephFS might be unmounted. | |
1e59de90 TL |
651 | keyring = self.client_remote.read_file(self.client_keyring_path).\ |
652 | decode() | |
653 | ||
f67539c2 TL |
654 | for line in keyring.split('\n'): |
655 | if line.find('key') != -1: | |
656 | return line[line.find('=') + 1 : ].strip() | |
657 | ||
1e59de90 TL |
658 | raise RuntimeError('Key not found in keyring file ' |
659 | f'{self.client_keyring_path}. Its contents are -\n' | |
660 | f'{keyring}') | |
661 | ||
7c673cae FG |
662 | @property |
663 | def config_path(self): | |
664 | """ | |
665 | Path to ceph.conf: override this if you're not a normal systemwide ceph install | |
666 | :return: stringv | |
667 | """ | |
668 | return "/etc/ceph/ceph.conf" | |
669 | ||
670 | @contextmanager | |
f67539c2 | 671 | def mounted_wait(self): |
7c673cae FG |
672 | """ |
673 | A context manager, from an initially unmounted state, to mount | |
674 | this, yield, and then unmount and clean up. | |
675 | """ | |
676 | self.mount() | |
677 | self.wait_until_mounted() | |
678 | try: | |
679 | yield | |
680 | finally: | |
681 | self.umount_wait() | |
682 | ||
9f95a23c TL |
683 | def create_file(self, filename='testfile', dirname=None, user=None, |
684 | check_status=True): | |
685 | assert(self.is_mounted()) | |
686 | ||
687 | if not os.path.isabs(filename): | |
688 | if dirname: | |
689 | if os.path.isabs(dirname): | |
690 | path = os.path.join(dirname, filename) | |
691 | else: | |
f67539c2 | 692 | path = os.path.join(self.hostfs_mntpt, dirname, filename) |
9f95a23c | 693 | else: |
f67539c2 | 694 | path = os.path.join(self.hostfs_mntpt, filename) |
9f95a23c TL |
695 | else: |
696 | path = filename | |
697 | ||
698 | if user: | |
699 | args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', 'touch ' + path] | |
700 | else: | |
701 | args = 'touch ' + path | |
702 | ||
703 | return self.client_remote.run(args=args, check_status=check_status) | |
704 | ||
7c673cae FG |
705 | def create_files(self): |
706 | assert(self.is_mounted()) | |
707 | ||
708 | for suffix in self.test_files: | |
709 | log.info("Creating file {0}".format(suffix)) | |
710 | self.client_remote.run(args=[ | |
522d829b | 711 | 'touch', os.path.join(self.hostfs_mntpt, suffix) |
7c673cae FG |
712 | ]) |
713 | ||
9f95a23c TL |
714 | def test_create_file(self, filename='testfile', dirname=None, user=None, |
715 | check_status=True): | |
716 | return self.create_file(filename=filename, dirname=dirname, user=user, | |
717 | check_status=False) | |
718 | ||
7c673cae FG |
719 | def check_files(self): |
720 | assert(self.is_mounted()) | |
721 | ||
722 | for suffix in self.test_files: | |
723 | log.info("Checking file {0}".format(suffix)) | |
724 | r = self.client_remote.run(args=[ | |
522d829b | 725 | 'ls', os.path.join(self.hostfs_mntpt, suffix) |
7c673cae FG |
726 | ], check_status=False) |
727 | if r.exitstatus != 0: | |
728 | raise RuntimeError("Expected file {0} not found".format(suffix)) | |
729 | ||
cd265ab1 TL |
730 | def write_file(self, path, data, perms=None): |
731 | """ | |
732 | Write the given data at the given path and set the given perms to the | |
733 | file on the path. | |
734 | """ | |
f67539c2 TL |
735 | if path.find(self.hostfs_mntpt) == -1: |
736 | path = os.path.join(self.hostfs_mntpt, path) | |
cd265ab1 | 737 | |
522d829b | 738 | write_file(self.client_remote, path, data) |
cd265ab1 TL |
739 | |
740 | if perms: | |
741 | self.run_shell(args=f'chmod {perms} {path}') | |
742 | ||
743 | def read_file(self, path): | |
744 | """ | |
745 | Return the data from the file on given path. | |
746 | """ | |
f67539c2 TL |
747 | if path.find(self.hostfs_mntpt) == -1: |
748 | path = os.path.join(self.hostfs_mntpt, path) | |
cd265ab1 | 749 | |
522d829b | 750 | return self.run_shell(args=['cat', path]).\ |
cd265ab1 TL |
751 | stdout.getvalue().strip() |
752 | ||
7c673cae FG |
753 | def create_destroy(self): |
754 | assert(self.is_mounted()) | |
755 | ||
756 | filename = "{0} {1}".format(datetime.datetime.now(), self.client_id) | |
757 | log.debug("Creating test file {0}".format(filename)) | |
758 | self.client_remote.run(args=[ | |
522d829b | 759 | 'touch', os.path.join(self.hostfs_mntpt, filename) |
7c673cae FG |
760 | ]) |
761 | log.debug("Deleting test file {0}".format(filename)) | |
762 | self.client_remote.run(args=[ | |
522d829b | 763 | 'rm', '-f', os.path.join(self.hostfs_mntpt, filename) |
7c673cae FG |
764 | ]) |
765 | ||
522d829b | 766 | def _run_python(self, pyscript, py_version='python3', sudo=False): |
1e59de90 | 767 | args, omit_sudo = [], True |
522d829b TL |
768 | if sudo: |
769 | args.append('sudo') | |
1e59de90 | 770 | omit_sudo = False |
522d829b | 771 | args += ['adjust-ulimits', 'daemon-helper', 'kill', py_version, '-c', pyscript] |
1e59de90 TL |
772 | return self.client_remote.run(args=args, wait=False, stdin=run.PIPE, |
773 | stdout=StringIO(), omit_sudo=omit_sudo) | |
91327a77 | 774 | |
522d829b TL |
775 | def run_python(self, pyscript, py_version='python3', sudo=False): |
776 | p = self._run_python(pyscript, py_version, sudo=sudo) | |
7c673cae | 777 | p.wait() |
f67539c2 TL |
778 | return p.stdout.getvalue().strip() |
779 | ||
1e59de90 | 780 | def run_shell(self, args, timeout=300, **kwargs): |
2a845540 | 781 | omit_sudo = kwargs.pop('omit_sudo', False) |
f67539c2 TL |
782 | cwd = kwargs.pop('cwd', self.mountpoint) |
783 | stdout = kwargs.pop('stdout', StringIO()) | |
784 | stderr = kwargs.pop('stderr', StringIO()) | |
785 | ||
2a845540 TL |
786 | return self.client_remote.run(args=args, cwd=cwd, timeout=timeout, |
787 | stdout=stdout, stderr=stderr, | |
788 | omit_sudo=omit_sudo, **kwargs) | |
7c673cae | 789 | |
f6b5b4d7 | 790 | def run_shell_payload(self, payload, **kwargs): |
1e59de90 TL |
791 | kwargs['args'] = ["bash", "-c", Raw(f"'{payload}'")] |
792 | if kwargs.pop('sudo', False): | |
793 | kwargs['args'].insert(0, 'sudo') | |
794 | kwargs['omit_sudo'] = False | |
795 | return self.run_shell(**kwargs) | |
f6b5b4d7 | 796 | |
f67539c2 TL |
797 | def run_as_user(self, **kwargs): |
798 | """ | |
799 | Besides the arguments defined for run_shell() this method also | |
800 | accepts argument 'user'. | |
801 | """ | |
802 | args = kwargs.pop('args') | |
803 | user = kwargs.pop('user') | |
9f95a23c | 804 | if isinstance(args, str): |
f67539c2 TL |
805 | args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', args] |
806 | elif isinstance(args, list): | |
807 | cmdlist = args | |
808 | cmd = '' | |
809 | for i in cmdlist: | |
810 | cmd = cmd + i + ' ' | |
811 | # get rid of extra space at the end. | |
812 | cmd = cmd[:-1] | |
813 | ||
814 | args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', cmd] | |
815 | ||
816 | kwargs['args'] = args | |
1e59de90 | 817 | kwargs['omit_sudo'] = False |
f67539c2 | 818 | return self.run_shell(**kwargs) |
9f95a23c | 819 | |
f67539c2 TL |
820 | def run_as_root(self, **kwargs): |
821 | """ | |
822 | Accepts same arguments as run_shell(). | |
823 | """ | |
824 | kwargs['user'] = 'root' | |
825 | return self.run_as_user(**kwargs) | |
826 | ||
1e59de90 TL |
827 | def assert_retval(self, proc_retval, exp_retval): |
828 | msg = (f'expected return value: {exp_retval}\n' | |
829 | f'received return value: {proc_retval}\n') | |
830 | assert proc_retval == exp_retval, msg | |
f67539c2 | 831 | |
1e59de90 TL |
832 | def _verify(self, proc, exp_retval=None, exp_errmsgs=None): |
833 | if exp_retval is None and exp_errmsgs is None: | |
834 | raise RuntimeError('Method didn\'t get enough parameters. Pass ' | |
835 | 'return value or error message expected from ' | |
836 | 'the command/process.') | |
837 | ||
838 | if exp_retval is not None: | |
839 | self.assert_retval(proc.returncode, exp_retval) | |
840 | if exp_errmsgs is None: | |
841 | return | |
842 | ||
843 | if isinstance(exp_errmsgs, str): | |
844 | exp_errmsgs = (exp_errmsgs, ) | |
845 | ||
846 | proc_stderr = proc.stderr.getvalue().lower() | |
847 | msg = ('didn\'t find any of the expected string in stderr.\n' | |
848 | f'expected string: {exp_errmsgs}\n' | |
849 | f'received error message: {proc_stderr}\n' | |
850 | 'note: received error message is converted to lowercase') | |
851 | for e in exp_errmsgs: | |
852 | if e in proc_stderr: | |
853 | break | |
854 | # this else is meant for for loop. | |
855 | else: | |
856 | assert False, msg | |
f67539c2 | 857 | |
1e59de90 | 858 | def negtestcmd(self, args, retval=None, errmsgs=None, stdin=None, |
f67539c2 TL |
859 | cwd=None, wait=True): |
860 | """ | |
861 | Conduct a negative test for the given command. | |
862 | ||
1e59de90 | 863 | retval and errmsgs are parameters to confirm the cause of command |
f67539c2 | 864 | failure. |
1e59de90 TL |
865 | |
866 | Note: errmsgs is expected to be a tuple, but in case there's only | |
867 | error message, it can also be a string. This method will handle | |
868 | that internally. | |
f67539c2 TL |
869 | """ |
870 | proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd, | |
871 | check_status=False) | |
1e59de90 | 872 | self._verify(proc, retval, errmsgs) |
f67539c2 TL |
873 | return proc |
874 | ||
1e59de90 | 875 | def negtestcmd_as_user(self, args, user, retval=None, errmsgs=None, |
f67539c2 TL |
876 | stdin=None, cwd=None, wait=True): |
877 | proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin, | |
878 | cwd=cwd, check_status=False) | |
1e59de90 | 879 | self._verify(proc, retval, errmsgs) |
f67539c2 TL |
880 | return proc |
881 | ||
1e59de90 | 882 | def negtestcmd_as_root(self, args, retval=None, errmsgs=None, stdin=None, |
f67539c2 TL |
883 | cwd=None, wait=True): |
884 | proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd, | |
885 | check_status=False) | |
1e59de90 | 886 | self._verify(proc, retval, errmsgs) |
f67539c2 | 887 | return proc |
7c673cae | 888 | |
1e59de90 TL |
889 | def open_for_reading(self, basename): |
890 | """ | |
891 | Open a file for reading only. | |
892 | """ | |
893 | assert(self.is_mounted()) | |
894 | ||
895 | path = os.path.join(self.hostfs_mntpt, basename) | |
896 | ||
897 | return self._run_python(dedent( | |
898 | """ | |
899 | import os | |
900 | mode = os.O_RDONLY | |
901 | fd = os.open("{path}", mode) | |
902 | os.close(fd) | |
903 | """.format(path=path) | |
904 | )) | |
905 | ||
906 | def open_for_writing(self, basename, creat=True, trunc=True, excl=False): | |
907 | """ | |
908 | Open a file for writing only. | |
909 | """ | |
910 | assert(self.is_mounted()) | |
911 | ||
912 | path = os.path.join(self.hostfs_mntpt, basename) | |
913 | ||
914 | return self._run_python(dedent( | |
915 | """ | |
916 | import os | |
917 | mode = os.O_WRONLY | |
918 | if {creat}: | |
919 | mode |= os.O_CREAT | |
920 | if {trunc}: | |
921 | mode |= os.O_TRUNC | |
922 | if {excl}: | |
923 | mode |= os.O_EXCL | |
924 | fd = os.open("{path}", mode) | |
925 | os.close(fd) | |
926 | """.format(path=path, creat=creat, trunc=trunc, excl=excl) | |
927 | )) | |
928 | ||
7c673cae FG |
929 | def open_no_data(self, basename): |
930 | """ | |
931 | A pure metadata operation | |
932 | """ | |
933 | assert(self.is_mounted()) | |
934 | ||
f67539c2 | 935 | path = os.path.join(self.hostfs_mntpt, basename) |
7c673cae FG |
936 | |
937 | p = self._run_python(dedent( | |
938 | """ | |
939 | f = open("{path}", 'w') | |
940 | """.format(path=path) | |
941 | )) | |
942 | p.wait() | |
943 | ||
aee94f69 | 944 | def open_background(self, basename="background_file", write=True, content="content"): |
7c673cae FG |
945 | """ |
946 | Open a file for writing, then block such that the client | |
947 | will hold a capability. | |
948 | ||
949 | Don't return until the remote process has got as far as opening | |
950 | the file, then return the RemoteProcess instance. | |
951 | """ | |
952 | assert(self.is_mounted()) | |
953 | ||
f67539c2 | 954 | path = os.path.join(self.hostfs_mntpt, basename) |
7c673cae | 955 | |
494da23a TL |
956 | if write: |
957 | pyscript = dedent(""" | |
958 | import time | |
959 | ||
9f95a23c | 960 | with open("{path}", 'w') as f: |
aee94f69 | 961 | f.write("{content}") |
9f95a23c | 962 | f.flush() |
9f95a23c TL |
963 | while True: |
964 | time.sleep(1) | |
aee94f69 | 965 | """).format(path=path, content=content) |
494da23a TL |
966 | else: |
967 | pyscript = dedent(""" | |
968 | import time | |
969 | ||
9f95a23c TL |
970 | with open("{path}", 'r') as f: |
971 | while True: | |
972 | time.sleep(1) | |
494da23a | 973 | """).format(path=path) |
7c673cae FG |
974 | |
975 | rproc = self._run_python(pyscript) | |
976 | self.background_procs.append(rproc) | |
977 | ||
978 | # This wait would not be sufficient if the file had already | |
979 | # existed, but it's simple and in practice users of open_background | |
980 | # are not using it on existing files. | |
aee94f69 TL |
981 | if write: |
982 | self.wait_for_visible(basename, size=len(content)) | |
983 | else: | |
984 | self.wait_for_visible(basename) | |
7c673cae FG |
985 | |
986 | return rproc | |
987 | ||
2a845540 TL |
988 | def open_dir_background(self, basename): |
989 | """ | |
990 | Create and hold a capability to a directory. | |
991 | """ | |
992 | assert(self.is_mounted()) | |
993 | ||
994 | path = os.path.join(self.hostfs_mntpt, basename) | |
995 | ||
996 | pyscript = dedent(""" | |
997 | import time | |
998 | import os | |
999 | ||
1000 | os.mkdir("{path}") | |
1001 | fd = os.open("{path}", os.O_RDONLY) | |
1002 | while True: | |
1003 | time.sleep(1) | |
1004 | """).format(path=path) | |
1005 | ||
1006 | rproc = self._run_python(pyscript) | |
1007 | self.background_procs.append(rproc) | |
1008 | ||
1009 | self.wait_for_visible(basename) | |
1010 | ||
1011 | return rproc | |
1012 | ||
494da23a | 1013 | def wait_for_dir_empty(self, dirname, timeout=30): |
f67539c2 TL |
1014 | dirpath = os.path.join(self.hostfs_mntpt, dirname) |
1015 | with safe_while(sleep=5, tries=(timeout//5)) as proceed: | |
1016 | while proceed(): | |
1017 | p = self.run_shell_payload(f"stat -c %h {dirpath}") | |
1018 | nr_links = int(p.stdout.getvalue().strip()) | |
1019 | if nr_links == 2: | |
1020 | return | |
494da23a | 1021 | |
aee94f69 | 1022 | def wait_for_visible(self, basename="background_file", size=None, timeout=30): |
7c673cae | 1023 | i = 0 |
aee94f69 TL |
1024 | args = ['stat'] |
1025 | if size is not None: | |
1026 | args += ['--printf=%s'] | |
1027 | args += [os.path.join(self.hostfs_mntpt, basename)] | |
7c673cae | 1028 | while i < timeout: |
aee94f69 TL |
1029 | p = self.client_remote.run(args=args, stdout=StringIO(), check_status=False) |
1030 | if p.exitstatus == 0: | |
1031 | if size is not None: | |
1032 | s = p.stdout.getvalue().strip() | |
1033 | if int(s) == size: | |
1034 | log.info(f"File {basename} became visible with size {size} from {self.client_id} after {i}s") | |
1035 | return | |
1036 | else: | |
1037 | log.error(f"File {basename} became visible but with size {int(s)} not {size}") | |
1038 | else: | |
1039 | log.info(f"File {basename} became visible from {self.client_id} after {i}s") | |
1040 | return | |
1041 | time.sleep(1) | |
1042 | i += 1 | |
7c673cae FG |
1043 | |
1044 | raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format( | |
1045 | i, basename, self.client_id)) | |
1046 | ||
1047 | def lock_background(self, basename="background_file", do_flock=True): | |
1048 | """ | |
1049 | Open and lock a files for writing, hold the lock in a background process | |
1050 | """ | |
1051 | assert(self.is_mounted()) | |
1052 | ||
f67539c2 | 1053 | path = os.path.join(self.hostfs_mntpt, basename) |
7c673cae FG |
1054 | |
1055 | script_builder = """ | |
1056 | import time | |
1057 | import fcntl | |
1058 | import struct""" | |
1059 | if do_flock: | |
1060 | script_builder += """ | |
1061 | f1 = open("{path}-1", 'w') | |
1062 | fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)""" | |
1063 | script_builder += """ | |
1064 | f2 = open("{path}-2", 'w') | |
1065 | lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0) | |
1066 | fcntl.fcntl(f2, fcntl.F_SETLK, lockdata) | |
1067 | while True: | |
1068 | time.sleep(1) | |
1069 | """ | |
1070 | ||
1071 | pyscript = dedent(script_builder).format(path=path) | |
1072 | ||
31f18b77 | 1073 | log.info("lock_background file {0}".format(basename)) |
7c673cae FG |
1074 | rproc = self._run_python(pyscript) |
1075 | self.background_procs.append(rproc) | |
1076 | return rproc | |
1077 | ||
31f18b77 FG |
1078 | def lock_and_release(self, basename="background_file"): |
1079 | assert(self.is_mounted()) | |
1080 | ||
f67539c2 | 1081 | path = os.path.join(self.hostfs_mntpt, basename) |
31f18b77 FG |
1082 | |
1083 | script = """ | |
1084 | import time | |
1085 | import fcntl | |
1086 | import struct | |
1087 | f1 = open("{path}-1", 'w') | |
1088 | fcntl.flock(f1, fcntl.LOCK_EX) | |
1089 | f2 = open("{path}-2", 'w') | |
1090 | lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0) | |
1091 | fcntl.fcntl(f2, fcntl.F_SETLK, lockdata) | |
1092 | """ | |
1093 | pyscript = dedent(script).format(path=path) | |
1094 | ||
1095 | log.info("lock_and_release file {0}".format(basename)) | |
1096 | return self._run_python(pyscript) | |
1097 | ||
7c673cae FG |
1098 | def check_filelock(self, basename="background_file", do_flock=True): |
1099 | assert(self.is_mounted()) | |
1100 | ||
f67539c2 | 1101 | path = os.path.join(self.hostfs_mntpt, basename) |
7c673cae FG |
1102 | |
1103 | script_builder = """ | |
1104 | import fcntl | |
1105 | import errno | |
1106 | import struct""" | |
1107 | if do_flock: | |
1108 | script_builder += """ | |
1109 | f1 = open("{path}-1", 'r') | |
1110 | try: | |
1111 | fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB) | |
9f95a23c | 1112 | except IOError as e: |
7c673cae FG |
1113 | if e.errno == errno.EAGAIN: |
1114 | pass | |
1115 | else: | |
1116 | raise RuntimeError("flock on file {path}-1 not found")""" | |
1117 | script_builder += """ | |
1118 | f2 = open("{path}-2", 'r') | |
1119 | try: | |
1120 | lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0) | |
1121 | fcntl.fcntl(f2, fcntl.F_SETLK, lockdata) | |
9f95a23c | 1122 | except IOError as e: |
7c673cae FG |
1123 | if e.errno == errno.EAGAIN: |
1124 | pass | |
1125 | else: | |
1126 | raise RuntimeError("posix lock on file {path}-2 not found") | |
1127 | """ | |
1128 | pyscript = dedent(script_builder).format(path=path) | |
1129 | ||
1130 | log.info("check lock on file {0}".format(basename)) | |
1131 | self.client_remote.run(args=[ | |
522d829b | 1132 | 'python3', '-c', pyscript |
7c673cae FG |
1133 | ]) |
1134 | ||
1135 | def write_background(self, basename="background_file", loop=False): | |
1136 | """ | |
1137 | Open a file for writing, complete as soon as you can | |
1138 | :param basename: | |
1139 | :return: | |
1140 | """ | |
1141 | assert(self.is_mounted()) | |
1142 | ||
f67539c2 | 1143 | path = os.path.join(self.hostfs_mntpt, basename) |
7c673cae FG |
1144 | |
1145 | pyscript = dedent(""" | |
1146 | import os | |
1147 | import time | |
1148 | ||
9f95a23c | 1149 | fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644) |
7c673cae FG |
1150 | try: |
1151 | while True: | |
9f95a23c | 1152 | os.write(fd, b'content') |
7c673cae FG |
1153 | time.sleep(1) |
1154 | if not {loop}: | |
1155 | break | |
9f95a23c | 1156 | except IOError as e: |
7c673cae FG |
1157 | pass |
1158 | os.close(fd) | |
1159 | """).format(path=path, loop=str(loop)) | |
1160 | ||
1161 | rproc = self._run_python(pyscript) | |
1162 | self.background_procs.append(rproc) | |
1163 | return rproc | |
1164 | ||
1165 | def write_n_mb(self, filename, n_mb, seek=0, wait=True): | |
1166 | """ | |
1167 | Write the requested number of megabytes to a file | |
1168 | """ | |
1169 | assert(self.is_mounted()) | |
1170 | ||
1171 | return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename), | |
1172 | "bs=1M", "conv=fdatasync", | |
e306af50 TL |
1173 | "count={0}".format(int(n_mb)), |
1174 | "seek={0}".format(int(seek)) | |
7c673cae FG |
1175 | ], wait=wait) |
1176 | ||
1177 | def write_test_pattern(self, filename, size): | |
1178 | log.info("Writing {0} bytes to {1}".format(size, filename)) | |
1179 | return self.run_python(dedent(""" | |
1180 | import zlib | |
1181 | path = "{path}" | |
9f95a23c TL |
1182 | with open(path, 'w') as f: |
1183 | for i in range(0, {size}): | |
1184 | val = zlib.crc32(str(i).encode('utf-8')) & 7 | |
1185 | f.write(chr(val)) | |
7c673cae | 1186 | """.format( |
f67539c2 | 1187 | path=os.path.join(self.hostfs_mntpt, filename), |
7c673cae FG |
1188 | size=size |
1189 | ))) | |
1190 | ||
1191 | def validate_test_pattern(self, filename, size): | |
1192 | log.info("Validating {0} bytes from {1}".format(size, filename)) | |
522d829b | 1193 | # Use sudo because cephfs-data-scan may recreate the file with owner==root |
7c673cae FG |
1194 | return self.run_python(dedent(""" |
1195 | import zlib | |
1196 | path = "{path}" | |
9f95a23c TL |
1197 | with open(path, 'r') as f: |
1198 | bytes = f.read() | |
7c673cae FG |
1199 | if len(bytes) != {size}: |
1200 | raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format( | |
1201 | len(bytes), {size} | |
1202 | )) | |
1203 | for i, b in enumerate(bytes): | |
9f95a23c | 1204 | val = zlib.crc32(str(i).encode('utf-8')) & 7 |
7c673cae FG |
1205 | if b != chr(val): |
1206 | raise RuntimeError("Bad data at offset {{0}}".format(i)) | |
1207 | """.format( | |
f67539c2 | 1208 | path=os.path.join(self.hostfs_mntpt, filename), |
7c673cae | 1209 | size=size |
522d829b | 1210 | )), sudo=True) |
7c673cae FG |
1211 | |
1212 | def open_n_background(self, fs_path, count): | |
1213 | """ | |
1214 | Open N files for writing, hold them open in a background process | |
1215 | ||
1216 | :param fs_path: Path relative to CephFS root, e.g. "foo/bar" | |
1217 | :return: a RemoteProcess | |
1218 | """ | |
1219 | assert(self.is_mounted()) | |
1220 | ||
f67539c2 | 1221 | abs_path = os.path.join(self.hostfs_mntpt, fs_path) |
7c673cae FG |
1222 | |
1223 | pyscript = dedent(""" | |
1224 | import sys | |
1225 | import time | |
1226 | import os | |
1227 | ||
1228 | n = {count} | |
1229 | abs_path = "{abs_path}" | |
1230 | ||
f91f0fd5 TL |
1231 | if not os.path.exists(abs_path): |
1232 | os.makedirs(abs_path) | |
7c673cae FG |
1233 | |
1234 | handles = [] | |
1235 | for i in range(0, n): | |
f91f0fd5 TL |
1236 | fname = "file_"+str(i) |
1237 | path = os.path.join(abs_path, fname) | |
1238 | handles.append(open(path, 'w')) | |
7c673cae FG |
1239 | |
1240 | while True: | |
1241 | time.sleep(1) | |
1242 | """).format(abs_path=abs_path, count=count) | |
1243 | ||
1244 | rproc = self._run_python(pyscript) | |
1245 | self.background_procs.append(rproc) | |
1246 | return rproc | |
1247 | ||
aee94f69 TL |
1248 | def create_n_files(self, fs_path, count, sync=False, dirsync=False, |
1249 | unlink=False, finaldirsync=False, hard_links=0): | |
20effc67 TL |
1250 | """ |
1251 | Create n files. | |
1252 | ||
1253 | :param sync: sync the file after writing | |
1254 | :param dirsync: sync the containing directory after closing the file | |
1255 | :param unlink: unlink the file after closing | |
1256 | :param finaldirsync: sync the containing directory after closing the last file | |
aee94f69 | 1257 | :param hard_links: create given number of hard link(s) for each file |
20effc67 TL |
1258 | """ |
1259 | ||
7c673cae FG |
1260 | assert(self.is_mounted()) |
1261 | ||
f67539c2 | 1262 | abs_path = os.path.join(self.hostfs_mntpt, fs_path) |
7c673cae | 1263 | |
20effc67 | 1264 | pyscript = dedent(f""" |
7c673cae | 1265 | import os |
aee94f69 | 1266 | import uuid |
7c673cae FG |
1267 | |
1268 | n = {count} | |
aee94f69 TL |
1269 | create_hard_links = False |
1270 | if {hard_links} > 0: | |
1271 | create_hard_links = True | |
20effc67 | 1272 | path = "{abs_path}" |
7c673cae | 1273 | |
20effc67 TL |
1274 | dpath = os.path.dirname(path) |
1275 | fnameprefix = os.path.basename(path) | |
1276 | os.makedirs(dpath, exist_ok=True) | |
7c673cae | 1277 | |
20effc67 TL |
1278 | try: |
1279 | dirfd = os.open(dpath, os.O_DIRECTORY) | |
1280 | ||
1281 | for i in range(n): | |
1282 | fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}") | |
1283 | with open(fpath, 'w') as f: | |
1284 | f.write(f"{{i}}") | |
1285 | if {sync}: | |
1286 | f.flush() | |
1287 | os.fsync(f.fileno()) | |
1288 | if {unlink}: | |
1289 | os.unlink(fpath) | |
1290 | if {dirsync}: | |
1291 | os.fsync(dirfd) | |
aee94f69 TL |
1292 | if create_hard_links: |
1293 | for j in range({hard_links}): | |
1294 | os.system(f"ln {{fpath}} {{dpath}}/{{fnameprefix}}_{{i}}_{{uuid.uuid4()}}") | |
20effc67 TL |
1295 | if {finaldirsync}: |
1296 | os.fsync(dirfd) | |
1297 | finally: | |
1298 | os.close(dirfd) | |
1299 | """) | |
7c673cae FG |
1300 | |
1301 | self.run_python(pyscript) | |
1302 | ||
1303 | def teardown(self): | |
1304 | for p in self.background_procs: | |
1305 | log.info("Terminating background process") | |
1306 | self._kill_background(p) | |
1307 | ||
1308 | self.background_procs = [] | |
1309 | ||
1310 | def _kill_background(self, p): | |
1311 | if p.stdin: | |
1312 | p.stdin.close() | |
1313 | try: | |
1314 | p.wait() | |
1315 | except (CommandFailedError, ConnectionLostError): | |
1316 | pass | |
1317 | ||
1318 | def kill_background(self, p): | |
1319 | """ | |
1320 | For a process that was returned by one of the _background member functions, | |
1321 | kill it hard. | |
1322 | """ | |
1323 | self._kill_background(p) | |
1324 | self.background_procs.remove(p) | |
1325 | ||
eafe8130 TL |
1326 | def send_signal(self, signal): |
1327 | signal = signal.lower() | |
1328 | if signal.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']: | |
1329 | raise NotImplementedError | |
1330 | ||
1331 | self.client_remote.run(args=['sudo', 'kill', '-{0}'.format(signal), | |
1332 | self.client_pid], omit_sudo=False) | |
1333 | ||
7c673cae FG |
1334 | def get_global_id(self): |
1335 | raise NotImplementedError() | |
1336 | ||
11fdf7f2 TL |
1337 | def get_global_inst(self): |
1338 | raise NotImplementedError() | |
1339 | ||
1340 | def get_global_addr(self): | |
1341 | raise NotImplementedError() | |
1342 | ||
7c673cae FG |
1343 | def get_osd_epoch(self): |
1344 | raise NotImplementedError() | |
1345 | ||
f67539c2 TL |
1346 | def get_op_read_count(self): |
1347 | raise NotImplementedError() | |
1348 | ||
20effc67 TL |
1349 | def readlink(self, fs_path): |
1350 | abs_path = os.path.join(self.hostfs_mntpt, fs_path) | |
1351 | ||
1352 | pyscript = dedent(""" | |
1353 | import os | |
1354 | ||
1355 | print(os.readlink("{path}")) | |
1356 | """).format(path=abs_path) | |
1357 | ||
1358 | proc = self._run_python(pyscript) | |
1359 | proc.wait() | |
1360 | return str(proc.stdout.getvalue().strip()) | |
1361 | ||
1362 | ||
9f95a23c TL |
1363 | def lstat(self, fs_path, follow_symlinks=False, wait=True): |
1364 | return self.stat(fs_path, follow_symlinks=False, wait=True) | |
1365 | ||
522d829b | 1366 | def stat(self, fs_path, follow_symlinks=True, wait=True, **kwargs): |
7c673cae FG |
1367 | """ |
1368 | stat a file, and return the result as a dictionary like this: | |
1369 | { | |
1370 | "st_ctime": 1414161137.0, | |
1371 | "st_mtime": 1414161137.0, | |
1372 | "st_nlink": 33, | |
1373 | "st_gid": 0, | |
1374 | "st_dev": 16777218, | |
1375 | "st_size": 1190, | |
1376 | "st_ino": 2, | |
1377 | "st_uid": 0, | |
1378 | "st_mode": 16877, | |
1379 | "st_atime": 1431520593.0 | |
1380 | } | |
1381 | ||
1382 | Raises exception on absent file. | |
1383 | """ | |
f67539c2 | 1384 | abs_path = os.path.join(self.hostfs_mntpt, fs_path) |
9f95a23c TL |
1385 | if follow_symlinks: |
1386 | stat_call = "os.stat('" + abs_path + "')" | |
1387 | else: | |
1388 | stat_call = "os.lstat('" + abs_path + "')" | |
7c673cae FG |
1389 | |
1390 | pyscript = dedent(""" | |
1391 | import os | |
1392 | import stat | |
1393 | import json | |
1394 | import sys | |
1395 | ||
1396 | try: | |
9f95a23c | 1397 | s = {stat_call} |
7c673cae FG |
1398 | except OSError as e: |
1399 | sys.exit(e.errno) | |
1400 | ||
1401 | attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"] | |
9f95a23c | 1402 | print(json.dumps( |
7c673cae | 1403 | dict([(a, getattr(s, a)) for a in attrs]), |
9f95a23c TL |
1404 | indent=2)) |
1405 | """).format(stat_call=stat_call) | |
522d829b | 1406 | proc = self._run_python(pyscript, **kwargs) |
7c673cae FG |
1407 | if wait: |
1408 | proc.wait() | |
1409 | return json.loads(proc.stdout.getvalue().strip()) | |
1410 | else: | |
1411 | return proc | |
1412 | ||
1413 | def touch(self, fs_path): | |
1414 | """ | |
1415 | Create a dentry if it doesn't already exist. This python | |
1416 | implementation exists because the usual command line tool doesn't | |
1417 | pass through error codes like EIO. | |
1418 | ||
1419 | :param fs_path: | |
1420 | :return: | |
1421 | """ | |
f67539c2 | 1422 | abs_path = os.path.join(self.hostfs_mntpt, fs_path) |
7c673cae FG |
1423 | pyscript = dedent(""" |
1424 | import sys | |
1425 | import errno | |
1426 | ||
1427 | try: | |
1428 | f = open("{path}", "w") | |
1429 | f.close() | |
1430 | except IOError as e: | |
1431 | sys.exit(errno.EIO) | |
1432 | """).format(path=abs_path) | |
1433 | proc = self._run_python(pyscript) | |
1434 | proc.wait() | |
1435 | ||
1436 | def path_to_ino(self, fs_path, follow_symlinks=True): | |
f67539c2 | 1437 | abs_path = os.path.join(self.hostfs_mntpt, fs_path) |
7c673cae FG |
1438 | |
1439 | if follow_symlinks: | |
1440 | pyscript = dedent(""" | |
1441 | import os | |
1442 | import stat | |
1443 | ||
9f95a23c | 1444 | print(os.stat("{path}").st_ino) |
7c673cae FG |
1445 | """).format(path=abs_path) |
1446 | else: | |
1447 | pyscript = dedent(""" | |
1448 | import os | |
1449 | import stat | |
1450 | ||
9f95a23c | 1451 | print(os.lstat("{path}").st_ino) |
7c673cae FG |
1452 | """).format(path=abs_path) |
1453 | ||
1454 | proc = self._run_python(pyscript) | |
1455 | proc.wait() | |
1456 | return int(proc.stdout.getvalue().strip()) | |
1457 | ||
1458 | def path_to_nlink(self, fs_path): | |
f67539c2 | 1459 | abs_path = os.path.join(self.hostfs_mntpt, fs_path) |
7c673cae FG |
1460 | |
1461 | pyscript = dedent(""" | |
1462 | import os | |
1463 | import stat | |
1464 | ||
9f95a23c | 1465 | print(os.stat("{path}").st_nlink) |
7c673cae FG |
1466 | """).format(path=abs_path) |
1467 | ||
1468 | proc = self._run_python(pyscript) | |
1469 | proc.wait() | |
1470 | return int(proc.stdout.getvalue().strip()) | |
1471 | ||
522d829b | 1472 | def ls(self, path=None, **kwargs): |
7c673cae FG |
1473 | """ |
1474 | Wrap ls: return a list of strings | |
1475 | """ | |
1e59de90 | 1476 | kwargs['args'] = ["ls"] |
7c673cae | 1477 | if path: |
1e59de90 TL |
1478 | kwargs['args'].append(path) |
1479 | if kwargs.pop('sudo', False): | |
1480 | kwargs['args'].insert(0, 'sudo') | |
1481 | kwargs['omit_sudo'] = False | |
1482 | ls_text = self.run_shell(**kwargs).stdout.getvalue().strip() | |
7c673cae FG |
1483 | |
1484 | if ls_text: | |
1485 | return ls_text.split("\n") | |
1486 | else: | |
1487 | # Special case because otherwise split on empty string | |
1488 | # gives you [''] instead of [] | |
1489 | return [] | |
1490 | ||
522d829b | 1491 | def setfattr(self, path, key, val, **kwargs): |
7c673cae FG |
1492 | """ |
1493 | Wrap setfattr. | |
1494 | ||
1495 | :param path: relative to mount point | |
1496 | :param key: xattr name | |
1497 | :param val: xattr value | |
1498 | :return: None | |
1499 | """ | |
1e59de90 TL |
1500 | kwargs['args'] = ["setfattr", "-n", key, "-v", val, path] |
1501 | if kwargs.pop('sudo', False): | |
1502 | kwargs['args'].insert(0, 'sudo') | |
1503 | kwargs['omit_sudo'] = False | |
1504 | self.run_shell(**kwargs) | |
7c673cae | 1505 | |
522d829b | 1506 | def getfattr(self, path, attr, **kwargs): |
7c673cae FG |
1507 | """ |
1508 | Wrap getfattr: return the values of a named xattr on one file, or | |
1509 | None if the attribute is not found. | |
1510 | ||
1511 | :return: a string | |
1512 | """ | |
1e59de90 TL |
1513 | kwargs['args'] = ["getfattr", "--only-values", "-n", attr, path] |
1514 | if kwargs.pop('sudo', False): | |
1515 | kwargs['args'].insert(0, 'sudo') | |
1516 | kwargs['omit_sudo'] = False | |
1517 | kwargs['wait'] = False | |
1518 | p = self.run_shell(**kwargs) | |
7c673cae FG |
1519 | try: |
1520 | p.wait() | |
1521 | except CommandFailedError as e: | |
1522 | if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue(): | |
1523 | return None | |
1524 | else: | |
1525 | raise | |
1526 | ||
e306af50 | 1527 | return str(p.stdout.getvalue()) |
7c673cae FG |
1528 | |
1529 | def df(self): | |
1530 | """ | |
1531 | Wrap df: return a dict of usage fields in bytes | |
1532 | """ | |
1533 | ||
1534 | p = self.run_shell(["df", "-B1", "."]) | |
1535 | lines = p.stdout.getvalue().strip().split("\n") | |
1536 | fs, total, used, avail = lines[1].split()[:4] | |
e306af50 | 1537 | log.warning(lines) |
7c673cae FG |
1538 | |
1539 | return { | |
1540 | "total": int(total), | |
1541 | "used": int(used), | |
1542 | "available": int(avail) | |
1543 | } | |
f67539c2 TL |
1544 | |
1545 | def dir_checksum(self, path=None, follow_symlinks=False): | |
1546 | cmd = ["find"] | |
1547 | if follow_symlinks: | |
1548 | cmd.append("-L") | |
1549 | if path: | |
1550 | cmd.append(path) | |
1551 | cmd.extend(["-type", "f", "-exec", "md5sum", "{}", "+"]) | |
1552 | checksum_text = self.run_shell(cmd).stdout.getvalue().strip() | |
1553 | checksum_sorted = sorted(checksum_text.split('\n'), key=lambda v: v.split()[1]) | |
1554 | return hashlib.md5(('\n'.join(checksum_sorted)).encode('utf-8')).hexdigest() | |
1e59de90 TL |
1555 | |
1556 | def validate_subvol_options(self): | |
1557 | mount_subvol_num = self.client_config.get('mount_subvol_num', None) | |
1558 | if self.cephfs_mntpt and mount_subvol_num is not None: | |
1559 | log.warning("You cannot specify both: cephfs_mntpt and mount_subvol_num") | |
1560 | log.info(f"Mounting subvol {mount_subvol_num} for now") | |
1561 | ||
1562 | if mount_subvol_num is not None: | |
1563 | # mount_subvol must be an index into the subvol path array for the fs | |
1564 | if not self.cephfs_name: | |
1565 | self.cephfs_name = 'cephfs' | |
1566 | assert(hasattr(self.ctx, "created_subvols")) | |
1567 | # mount_subvol must be specified under client.[0-9] yaml section | |
1568 | subvol_paths = self.ctx.created_subvols[self.cephfs_name] | |
1569 | path_to_mount = subvol_paths[mount_subvol_num] | |
1570 | self.cephfs_mntpt = path_to_mount |