]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/mount.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / qa / tasks / cephfs / mount.py
CommitLineData
f67539c2 1import hashlib
7c673cae
FG
2import json
3import logging
4import datetime
f67539c2
TL
5import os
6import re
7c673cae 7import time
f67539c2
TL
8
9from io import StringIO
10from contextlib import contextmanager
7c673cae 11from textwrap import dedent
f67539c2 12from IPy import IP
cd265ab1 13
f67539c2 14from teuthology.contextutil import safe_while
522d829b 15from teuthology.misc import get_file, write_file
7c673cae 16from teuthology.orchestra import run
20effc67
TL
17from teuthology.orchestra.run import Raw
18from teuthology.exceptions import CommandFailedError, ConnectionLostError
f67539c2 19
11fdf7f2 20from tasks.cephfs.filesystem import Filesystem
7c673cae
FG
21
22log = logging.getLogger(__name__)
23
1e59de90
TL
24
25UMOUNT_TIMEOUT = 300
26
27
7c673cae 28class CephFSMount(object):
f67539c2
TL
29 def __init__(self, ctx, test_dir, client_id, client_remote,
30 client_keyring_path=None, hostfs_mntpt=None,
1e59de90
TL
31 cephfs_name=None, cephfs_mntpt=None, brxnet=None,
32 client_config=None):
7c673cae
FG
33 """
34 :param test_dir: Global teuthology test dir
35 :param client_id: Client ID, the 'foo' in client.foo
f67539c2
TL
36 :param client_keyring_path: path to keyring for given client_id
37 :param client_remote: Remote instance for the host where client will
38 run
39 :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
40 be mounted
41 :param cephfs_name: Name of Ceph FS to be mounted
42 :param cephfs_mntpt: Path to directory inside Ceph FS that will be
43 mounted as root
7c673cae 44 """
11fdf7f2 45 self.ctx = ctx
7c673cae 46 self.test_dir = test_dir
f67539c2
TL
47
48 self._verify_attrs(client_id=client_id,
49 client_keyring_path=client_keyring_path,
50 hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
51 cephfs_mntpt=cephfs_mntpt)
52
1e59de90
TL
53 if client_config is None:
54 client_config = {}
55 self.client_config = client_config
56
57 self.cephfs_name = cephfs_name
7c673cae 58 self.client_id = client_id
f67539c2 59 self.client_keyring_path = client_keyring_path
7c673cae 60 self.client_remote = client_remote
1e59de90
TL
61 self.cluster_name = 'ceph' # TODO: use config['cluster']
62 self.fs = None
63
64 if cephfs_mntpt is None and client_config.get("mount_path"):
65 self.cephfs_mntpt = client_config.get("mount_path")
66 log.info(f"using client_config[\"cephfs_mntpt\"] = {self.cephfs_mntpt}")
67 else:
68 self.cephfs_mntpt = cephfs_mntpt
69 log.info(f"cephfs_mntpt = {self.cephfs_mntpt}")
70
71 if hostfs_mntpt is None and client_config.get("mountpoint"):
72 self.hostfs_mntpt = client_config.get("mountpoint")
73 log.info(f"using client_config[\"hostfs_mntpt\"] = {self.hostfs_mntpt}")
74 elif hostfs_mntpt is not None:
f67539c2 75 self.hostfs_mntpt = hostfs_mntpt
f67539c2
TL
76 else:
77 self.hostfs_mntpt = os.path.join(self.test_dir, f'mnt.{self.client_id}')
1e59de90
TL
78 self.hostfs_mntpt_dirname = os.path.basename(self.hostfs_mntpt)
79 log.info(f"hostfs_mntpt = {self.hostfs_mntpt}")
7c673cae 80
f67539c2
TL
81 self._netns_name = None
82 self.nsid = -1
83 if brxnet is None:
84 self.ceph_brx_net = '192.168.0.0/16'
85 else:
86 self.ceph_brx_net = brxnet
87
7c673cae
FG
88 self.test_files = ['a', 'b', 'c']
89
90 self.background_procs = []
91
f67539c2
TL
92 # This will cleanup the stale netnses, which are from the
93 # last failed test cases.
94 @staticmethod
95 def cleanup_stale_netnses_and_bridge(remote):
96 p = remote.run(args=['ip', 'netns', 'list'],
97 stdout=StringIO(), timeout=(5*60))
98 p = p.stdout.getvalue().strip()
99
100 # Get the netns name list
101 netns_list = re.findall(r'ceph-ns-[^()\s][-.\w]+[^():\s]', p)
102
103 # Remove the stale netnses
104 for ns in netns_list:
105 ns_name = ns.split()[0]
106 args = ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name)]
107 try:
108 remote.run(args=args, timeout=(5*60), omit_sudo=False)
109 except Exception:
110 pass
111
112 # Remove the stale 'ceph-brx'
113 try:
114 args = ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
115 remote.run(args=args, timeout=(5*60), omit_sudo=False)
116 except Exception:
117 pass
118
119 def _parse_netns_name(self):
120 self._netns_name = '-'.join(["ceph-ns",
121 re.sub(r'/+', "-", self.mountpoint)])
122
7c673cae
FG
123 @property
124 def mountpoint(self):
1e59de90 125 if self.hostfs_mntpt is None:
f67539c2
TL
126 self.hostfs_mntpt = os.path.join(self.test_dir,
127 self.hostfs_mntpt_dirname)
128 return self.hostfs_mntpt
9f95a23c
TL
129
130 @mountpoint.setter
131 def mountpoint(self, path):
132 if not isinstance(path, str):
133 raise RuntimeError('path should be of str type.')
f67539c2
TL
134 self._mountpoint = self.hostfs_mntpt = path
135
136 @property
137 def netns_name(self):
138 if self._netns_name == None:
139 self._parse_netns_name()
140 return self._netns_name
141
142 @netns_name.setter
143 def netns_name(self, name):
144 self._netns_name = name
145
20effc67
TL
146 def assert_that_ceph_fs_exists(self):
147 output = self.ctx.managers[self.cluster_name].raw_cluster_cmd("fs", "ls")
148 if self.cephfs_name:
149 assert self.cephfs_name in output, \
150 'expected ceph fs is not present on the cluster'
151 log.info(f'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster')
152 else:
153 assert 'No filesystems enabled' not in output, \
154 'ceph cluster has no ceph fs, not even the default ceph fs'
155 log.info('Mounting default Ceph FS; just confirmed its presence on cluster')
156
f67539c2
TL
157 def assert_and_log_minimum_mount_details(self):
158 """
159 Make sure we have minimum details required for mounting. Ideally, this
160 method should be called at the beginning of the mount method.
161 """
162 if not self.client_id or not self.client_remote or \
163 not self.hostfs_mntpt:
1e59de90
TL
164 log.error(f"self.client_id = {self.client_id}")
165 log.error(f"self.client_remote = {self.client_remote}")
166 log.error(f"self.hostfs_mntpt = {self.hostfs_mntpt}")
f67539c2
TL
167 errmsg = ('Mounting CephFS requires that at least following '
168 'details to be provided -\n'
169 '1. the client ID,\n2. the mountpoint and\n'
170 '3. the remote machine where CephFS will be mounted.\n')
171 raise RuntimeError(errmsg)
172
20effc67
TL
173 self.assert_that_ceph_fs_exists()
174
f67539c2
TL
175 log.info('Mounting Ceph FS. Following are details of mount; remember '
176 '"None" represents Python type None -')
177 log.info(f'self.client_remote.hostname = {self.client_remote.hostname}')
178 log.info(f'self.client.name = client.{self.client_id}')
179 log.info(f'self.hostfs_mntpt = {self.hostfs_mntpt}')
180 log.info(f'self.cephfs_name = {self.cephfs_name}')
181 log.info(f'self.cephfs_mntpt = {self.cephfs_mntpt}')
182 log.info(f'self.client_keyring_path = {self.client_keyring_path}')
183 if self.client_keyring_path:
184 log.info('keyring content -\n' +
185 get_file(self.client_remote, self.client_keyring_path,
186 sudo=True).decode())
7c673cae 187
1e59de90 188 def is_blocked(self):
aee94f69
TL
189 if not self.addr:
190 # can't infer if our addr is blocklisted - let the caller try to
191 # umount without lazy/force. If the client was blocklisted, then
192 # the umount would be stuck and the test would fail on timeout.
193 # happens only with Ubuntu 20.04 (missing kclient patches :/).
194 return False
1e59de90
TL
195 self.fs = Filesystem(self.ctx, name=self.cephfs_name)
196
197 try:
198 output = self.fs.mon_manager.raw_cluster_cmd(args='osd blocklist ls')
199 except CommandFailedError:
200 # Fallback for older Ceph cluster
201 output = self.fs.mon_manager.raw_cluster_cmd(args='osd blacklist ls')
202
203 return self.addr in output
204
205 def is_stuck(self):
206 """
207 Check if mount is stuck/in a hanged state.
208 """
209 if not self.is_mounted():
210 return False
211
212 retval = self.client_remote.run(args=f'sudo stat {self.hostfs_mntpt}',
213 omit_sudo=False, wait=False).returncode
214 if retval == 0:
215 return False
216
217 time.sleep(10)
218 proc = self.client_remote.run(args='ps -ef', stdout=StringIO())
219 # if proc was running even after 10 seconds, it has to be stuck.
220 if f'stat {self.hostfs_mntpt}' in proc.stdout.getvalue():
221 log.critical('client mounted at self.hostfs_mntpt is stuck!')
222 return True
223 return False
224
7c673cae 225 def is_mounted(self):
1e59de90
TL
226 file = self.client_remote.read_file('/proc/self/mounts',stdout=StringIO())
227 if self.hostfs_mntpt in file:
228 return True
229 else:
230 log.debug(f"not mounted; /proc/self/mounts is:\n{file}")
231 return False
7c673cae 232
11fdf7f2
TL
233 def setupfs(self, name=None):
234 if name is None and self.fs is not None:
235 # Previous mount existed, reuse the old name
236 name = self.fs.name
237 self.fs = Filesystem(self.ctx, name=name)
238 log.info('Wait for MDS to reach steady state...')
239 self.fs.wait_for_daemons()
240 log.info('Ready to start {}...'.format(type(self).__name__))
241
20effc67
TL
242 def _create_mntpt(self):
243 self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}',
244 timeout=60)
245 # Use 0000 mode to prevent undesired modifications to the mountpoint on
246 # the local file system.
247 self.client_remote.run(args=f'chmod 0000 {self.hostfs_mntpt}',
248 timeout=60)
249
250 @property
251 def _nsenter_args(self):
252 return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
253
254 def _set_filemode_on_mntpt(self):
255 stderr = StringIO()
256 try:
257 self.client_remote.run(
258 args=['sudo', 'chmod', '1777', self.hostfs_mntpt],
259 stderr=stderr, timeout=(5*60))
260 except CommandFailedError:
261 # the client does not have write permissions in the caps it holds
262 # for the Ceph FS that was just mounted.
263 if 'permission denied' in stderr.getvalue().lower():
264 pass
265
f67539c2
TL
266 def _setup_brx_and_nat(self):
267 # The ip for ceph-brx should be
268 ip = IP(self.ceph_brx_net)[-2]
269 mask = self.ceph_brx_net.split('/')[1]
270 brd = IP(self.ceph_brx_net).broadcast()
271
272 brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
273 stdout=StringIO(), timeout=(5*60))
274 brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
275 if brx:
276 # If the 'ceph-brx' already exists, then check whether
277 # the new net is conflicting with it
278 _ip, _mask = brx[0].split()[1].split('/', 1)
279 if _ip != "{}".format(ip) or _mask != mask:
280 raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx[0].split()[1], ip, mask))
281
282 # Setup the ceph-brx and always use the last valid IP
283 if not brx:
284 log.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip, mask))
285
286 self.run_shell_payload(f"""
287 set -e
288 sudo ip link add name ceph-brx type bridge
289 sudo ip addr flush dev ceph-brx
290 sudo ip link set ceph-brx up
291 sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
292 """, timeout=(5*60), omit_sudo=False, cwd='/')
293
294 args = "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
295 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
296
297 # Setup the NAT
298 p = self.client_remote.run(args=['route'], stderr=StringIO(),
299 stdout=StringIO(), timeout=(5*60))
300 p = re.findall(r'default .*', p.stdout.getvalue())
301 if p == False:
302 raise RuntimeError("No default gw found")
303 gw = p[0].split()[7]
304
305 self.run_shell_payload(f"""
306 set -e
307 sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
308 sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
309 sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
310 """, timeout=(5*60), omit_sudo=False, cwd='/')
311
312 def _setup_netns(self):
313 p = self.client_remote.run(args=['ip', 'netns', 'list'],
314 stderr=StringIO(), stdout=StringIO(),
315 timeout=(5*60)).stdout.getvalue().strip()
316
317 # Get the netns name list
318 netns_list = re.findall(r'[^()\s][-.\w]+[^():\s]', p)
319
320 out = re.search(r"{0}".format(self.netns_name), p)
321 if out is None:
322 # Get an uniq nsid for the new netns
323 nsid = 0
324 p = self.client_remote.run(args=['ip', 'netns', 'list-id'],
325 stderr=StringIO(), stdout=StringIO(),
326 timeout=(5*60)).stdout.getvalue()
327 while True:
328 out = re.search(r"nsid {} ".format(nsid), p)
329 if out is None:
330 break
331
332 nsid += 1
333
334 # Add one new netns and set it id
335 self.run_shell_payload(f"""
336 set -e
337 sudo ip netns add {self.netns_name}
338 sudo ip netns set {self.netns_name} {nsid}
339 """, timeout=(5*60), omit_sudo=False, cwd='/')
340 self.nsid = nsid;
341 else:
342 # The netns already exists and maybe suspended by self.kill()
343 self.resume_netns();
344
345 nsid = int(re.search(r"{0} \(id: (\d+)\)".format(self.netns_name), p).group(1))
346 self.nsid = nsid;
347 return
348
349 # Get one ip address for netns
350 ips = IP(self.ceph_brx_net)
351 for ip in ips:
352 found = False
353 if ip == ips[0]:
354 continue
355 if ip == ips[-2]:
356 raise RuntimeError("we have ran out of the ip addresses")
357
358 for ns in netns_list:
359 ns_name = ns.split()[0]
360 args = ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name), 'ip', 'addr']
361 try:
362 p = self.client_remote.run(args=args, stderr=StringIO(),
363 stdout=StringIO(), timeout=(5*60),
364 omit_sudo=False)
365 q = re.search("{0}".format(ip), p.stdout.getvalue())
366 if q is not None:
367 found = True
368 break
369 except CommandFailedError:
370 if "No such file or directory" in p.stderr.getvalue():
371 pass
372 if "Invalid argument" in p.stderr.getvalue():
373 pass
374
375 if found == False:
376 break
377
378 mask = self.ceph_brx_net.split('/')[1]
379 brd = IP(self.ceph_brx_net).broadcast()
380
381 log.info("Setuping the netns '{0}' with {1}/{2}".format(self.netns_name, ip, mask))
382
383 # Setup the veth interfaces
384 brxip = IP(self.ceph_brx_net)[-2]
385 self.run_shell_payload(f"""
386 set -e
387 sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
388 sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
389 sudo ip netns exec {self.netns_name} ip link set veth0 up
390 sudo ip netns exec {self.netns_name} ip link set lo up
391 sudo ip netns exec {self.netns_name} ip route add default via {brxip}
392 """, timeout=(5*60), omit_sudo=False, cwd='/')
393
394 # Bring up the brx interface and join it to 'ceph-brx'
395 self.run_shell_payload(f"""
396 set -e
397 sudo ip link set brx.{nsid} up
398 sudo ip link set dev brx.{nsid} master ceph-brx
399 """, timeout=(5*60), omit_sudo=False, cwd='/')
400
401 def _cleanup_netns(self):
402 if self.nsid == -1:
403 return
404 log.info("Removing the netns '{0}'".format(self.netns_name))
405
406 # Delete the netns and the peer veth interface
407 self.run_shell_payload(f"""
408 set -e
409 sudo ip link set brx.{self.nsid} down
410 sudo ip link delete dev brx.{self.nsid}
411 sudo ip netns delete {self.netns_name}
412 """, timeout=(5*60), omit_sudo=False, cwd='/')
413
414 self.nsid = -1
415
416 def _cleanup_brx_and_nat(self):
417 brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
418 stdout=StringIO(), timeout=(5*60))
419 brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
420 if not brx:
421 return
422
423 # If we are the last netns, will delete the ceph-brx
424 args = ['sudo', 'ip', 'link', 'show']
425 p = self.client_remote.run(args=args, stdout=StringIO(),
426 timeout=(5*60), omit_sudo=False)
427 _list = re.findall(r'brx\.', p.stdout.getvalue().strip())
428 if len(_list) != 0:
429 return
430
431 log.info("Removing the 'ceph-brx'")
432
433 self.run_shell_payload("""
434 set -e
435 sudo ip link set ceph-brx down
436 sudo ip link delete ceph-brx
437 """, timeout=(5*60), omit_sudo=False, cwd='/')
438
439 # Drop the iptables NAT rules
440 ip = IP(self.ceph_brx_net)[-2]
441 mask = self.ceph_brx_net.split('/')[1]
442
443 p = self.client_remote.run(args=['route'], stderr=StringIO(),
444 stdout=StringIO(), timeout=(5*60))
445 p = re.findall(r'default .*', p.stdout.getvalue())
446 if p == False:
447 raise RuntimeError("No default gw found")
448 gw = p[0].split()[7]
449 self.run_shell_payload(f"""
450 set -e
451 sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
452 sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
453 sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
454 """, timeout=(5*60), omit_sudo=False, cwd='/')
455
456 def setup_netns(self):
457 """
458 Setup the netns for the mountpoint.
459 """
460 log.info("Setting the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
461 self._setup_brx_and_nat()
462 self._setup_netns()
463
464 def cleanup_netns(self):
465 """
466 Cleanup the netns for the mountpoint.
467 """
468 # We will defer cleaning the netnses and bridge until the last
469 # mountpoint is unmounted, this will be a temporary work around
470 # for issue#46282.
471
472 # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
473 # self._cleanup_netns()
474 # self._cleanup_brx_and_nat()
475
476 def suspend_netns(self):
477 """
478 Suspend the netns veth interface.
479 """
480 if self.nsid == -1:
481 return
482
483 log.info("Suspending the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
484
485 args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'down']
486 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
487
488 def resume_netns(self):
489 """
490 Resume the netns veth interface.
491 """
492 if self.nsid == -1:
493 return
494
495 log.info("Resuming the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
496
497 args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
498 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
499
20effc67 500 def mount(self, mntopts=[], check_status=True, **kwargs):
f67539c2
TL
501 """
502 kwargs expects its members to be same as the arguments accepted by
503 self.update_attrs().
504 """
7c673cae
FG
505 raise NotImplementedError()
506
f67539c2
TL
507 def mount_wait(self, **kwargs):
508 """
509 Accepts arguments same as self.mount().
510 """
511 self.mount(**kwargs)
e306af50
TL
512 self.wait_until_mounted()
513
1e59de90
TL
514 def _run_umount_lf(self):
515 log.debug(f'Force/lazy unmounting on client.{self.client_id}')
516
517 try:
518 proc = self.client_remote.run(
519 args=f'sudo umount --lazy --force {self.hostfs_mntpt}',
520 timeout=UMOUNT_TIMEOUT, omit_sudo=False)
521 except CommandFailedError:
522 if self.is_mounted():
523 raise
524
525 return proc
526
7c673cae
FG
527 def umount(self):
528 raise NotImplementedError()
529
1e59de90
TL
530 def umount_wait(self, force=False, require_clean=False,
531 timeout=UMOUNT_TIMEOUT):
7c673cae
FG
532 """
533
534 :param force: Expect that the mount will not shutdown cleanly: kill
535 it hard.
536 :param require_clean: Wait for the Ceph client associated with the
537 mount (e.g. ceph-fuse) to terminate, and
538 raise if it doesn't do so cleanly.
f67539c2 539 :param timeout: amount of time to be waited for umount command to finish
7c673cae
FG
540 :return:
541 """
542 raise NotImplementedError()
543
f67539c2
TL
544 def _verify_attrs(self, **kwargs):
545 """
546 Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
547 cephfs_name, cephfs_mntpt are either type str or None.
548 """
549 for k, v in kwargs.items():
550 if v is not None and not isinstance(v, str):
551 raise RuntimeError('value of attributes should be either str '
552 f'or None. {k} - {v}')
553
554 def update_attrs(self, client_id=None, client_keyring_path=None,
555 client_remote=None, hostfs_mntpt=None, cephfs_name=None,
556 cephfs_mntpt=None):
557 if not (client_id or client_keyring_path or client_remote or
558 cephfs_name or cephfs_mntpt or hostfs_mntpt):
559 return
560
561 self._verify_attrs(client_id=client_id,
562 client_keyring_path=client_keyring_path,
563 hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
564 cephfs_mntpt=cephfs_mntpt)
565
566 if client_id:
567 self.client_id = client_id
568 if client_keyring_path:
569 self.client_keyring_path = client_keyring_path
570 if client_remote:
571 self.client_remote = client_remote
572 if hostfs_mntpt:
573 self.hostfs_mntpt = hostfs_mntpt
574 if cephfs_name:
575 self.cephfs_name = cephfs_name
576 if cephfs_mntpt:
577 self.cephfs_mntpt = cephfs_mntpt
578
579 def remount(self, **kwargs):
580 """
581 Update mount object's attributes and attempt remount with these
582 new values for these attrbiutes.
583
584 1. Run umount_wait().
585 2. Run update_attrs().
586 3. Run mount().
587
20effc67
TL
588 Accepts arguments of self.mount() and self.update_attrs() with 1
589 exception: wait accepted too which can be True or False.
f67539c2
TL
590 """
591 self.umount_wait()
1e59de90 592 assert not self.is_mounted()
f67539c2
TL
593
594 mntopts = kwargs.pop('mntopts', [])
f67539c2
TL
595 check_status = kwargs.pop('check_status', True)
596 wait = kwargs.pop('wait', True)
597
598 self.update_attrs(**kwargs)
599
20effc67 600 retval = self.mount(mntopts=mntopts, check_status=check_status)
f67539c2
TL
601 # avoid this scenario (again): mount command might've failed and
602 # check_status might have silenced the exception, yet we attempt to
603 # wait which might lead to an error.
604 if retval is None and wait:
605 self.wait_until_mounted()
606
607 return retval
7c673cae
FG
608
609 def kill(self):
f67539c2
TL
610 """
611 Suspend the netns veth interface to make the client disconnected
612 from the ceph cluster
613 """
614 log.info('Killing connection on {0}...'.format(self.client_remote.name))
615 self.suspend_netns()
616
617 def kill_cleanup(self):
618 """
619 Follow up ``kill`` to get to a clean unmounted state.
620 """
621 log.info('Cleaning up killed connection on {0}'.format(self.client_remote.name))
622 self.umount_wait(force=True)
7c673cae
FG
623
624 def cleanup(self):
f67539c2
TL
625 """
626 Remove the mount point.
627
628 Prerequisite: the client is not mounted.
629 """
630 log.info('Cleaning up mount {0}'.format(self.client_remote.name))
631 stderr = StringIO()
632 try:
633 self.client_remote.run(args=['rmdir', '--', self.mountpoint],
634 cwd=self.test_dir, stderr=stderr,
635 timeout=(60*5), check_status=False)
636 except CommandFailedError:
637 if "no such file or directory" not in stderr.getvalue().lower():
638 raise
639
640 self.cleanup_netns()
7c673cae
FG
641
642 def wait_until_mounted(self):
643 raise NotImplementedError()
644
645 def get_keyring_path(self):
a4b75251 646 # N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps
7c673cae
FG
647 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
648
f67539c2
TL
649 def get_key_from_keyfile(self):
650 # XXX: don't call run_shell(), since CephFS might be unmounted.
1e59de90
TL
651 keyring = self.client_remote.read_file(self.client_keyring_path).\
652 decode()
653
f67539c2
TL
654 for line in keyring.split('\n'):
655 if line.find('key') != -1:
656 return line[line.find('=') + 1 : ].strip()
657
1e59de90
TL
658 raise RuntimeError('Key not found in keyring file '
659 f'{self.client_keyring_path}. Its contents are -\n'
660 f'{keyring}')
661
7c673cae
FG
662 @property
663 def config_path(self):
664 """
665 Path to ceph.conf: override this if you're not a normal systemwide ceph install
666 :return: stringv
667 """
668 return "/etc/ceph/ceph.conf"
669
670 @contextmanager
f67539c2 671 def mounted_wait(self):
7c673cae
FG
672 """
673 A context manager, from an initially unmounted state, to mount
674 this, yield, and then unmount and clean up.
675 """
676 self.mount()
677 self.wait_until_mounted()
678 try:
679 yield
680 finally:
681 self.umount_wait()
682
9f95a23c
TL
683 def create_file(self, filename='testfile', dirname=None, user=None,
684 check_status=True):
685 assert(self.is_mounted())
686
687 if not os.path.isabs(filename):
688 if dirname:
689 if os.path.isabs(dirname):
690 path = os.path.join(dirname, filename)
691 else:
f67539c2 692 path = os.path.join(self.hostfs_mntpt, dirname, filename)
9f95a23c 693 else:
f67539c2 694 path = os.path.join(self.hostfs_mntpt, filename)
9f95a23c
TL
695 else:
696 path = filename
697
698 if user:
699 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', 'touch ' + path]
700 else:
701 args = 'touch ' + path
702
703 return self.client_remote.run(args=args, check_status=check_status)
704
7c673cae
FG
705 def create_files(self):
706 assert(self.is_mounted())
707
708 for suffix in self.test_files:
709 log.info("Creating file {0}".format(suffix))
710 self.client_remote.run(args=[
522d829b 711 'touch', os.path.join(self.hostfs_mntpt, suffix)
7c673cae
FG
712 ])
713
9f95a23c
TL
714 def test_create_file(self, filename='testfile', dirname=None, user=None,
715 check_status=True):
716 return self.create_file(filename=filename, dirname=dirname, user=user,
717 check_status=False)
718
7c673cae
FG
719 def check_files(self):
720 assert(self.is_mounted())
721
722 for suffix in self.test_files:
723 log.info("Checking file {0}".format(suffix))
724 r = self.client_remote.run(args=[
522d829b 725 'ls', os.path.join(self.hostfs_mntpt, suffix)
7c673cae
FG
726 ], check_status=False)
727 if r.exitstatus != 0:
728 raise RuntimeError("Expected file {0} not found".format(suffix))
729
cd265ab1
TL
730 def write_file(self, path, data, perms=None):
731 """
732 Write the given data at the given path and set the given perms to the
733 file on the path.
734 """
f67539c2
TL
735 if path.find(self.hostfs_mntpt) == -1:
736 path = os.path.join(self.hostfs_mntpt, path)
cd265ab1 737
522d829b 738 write_file(self.client_remote, path, data)
cd265ab1
TL
739
740 if perms:
741 self.run_shell(args=f'chmod {perms} {path}')
742
743 def read_file(self, path):
744 """
745 Return the data from the file on given path.
746 """
f67539c2
TL
747 if path.find(self.hostfs_mntpt) == -1:
748 path = os.path.join(self.hostfs_mntpt, path)
cd265ab1 749
522d829b 750 return self.run_shell(args=['cat', path]).\
cd265ab1
TL
751 stdout.getvalue().strip()
752
7c673cae
FG
753 def create_destroy(self):
754 assert(self.is_mounted())
755
756 filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
757 log.debug("Creating test file {0}".format(filename))
758 self.client_remote.run(args=[
522d829b 759 'touch', os.path.join(self.hostfs_mntpt, filename)
7c673cae
FG
760 ])
761 log.debug("Deleting test file {0}".format(filename))
762 self.client_remote.run(args=[
522d829b 763 'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
7c673cae
FG
764 ])
765
522d829b 766 def _run_python(self, pyscript, py_version='python3', sudo=False):
1e59de90 767 args, omit_sudo = [], True
522d829b
TL
768 if sudo:
769 args.append('sudo')
1e59de90 770 omit_sudo = False
522d829b 771 args += ['adjust-ulimits', 'daemon-helper', 'kill', py_version, '-c', pyscript]
1e59de90
TL
772 return self.client_remote.run(args=args, wait=False, stdin=run.PIPE,
773 stdout=StringIO(), omit_sudo=omit_sudo)
91327a77 774
522d829b
TL
775 def run_python(self, pyscript, py_version='python3', sudo=False):
776 p = self._run_python(pyscript, py_version, sudo=sudo)
7c673cae 777 p.wait()
f67539c2
TL
778 return p.stdout.getvalue().strip()
779
1e59de90 780 def run_shell(self, args, timeout=300, **kwargs):
2a845540 781 omit_sudo = kwargs.pop('omit_sudo', False)
f67539c2
TL
782 cwd = kwargs.pop('cwd', self.mountpoint)
783 stdout = kwargs.pop('stdout', StringIO())
784 stderr = kwargs.pop('stderr', StringIO())
785
2a845540
TL
786 return self.client_remote.run(args=args, cwd=cwd, timeout=timeout,
787 stdout=stdout, stderr=stderr,
788 omit_sudo=omit_sudo, **kwargs)
7c673cae 789
f6b5b4d7 790 def run_shell_payload(self, payload, **kwargs):
1e59de90
TL
791 kwargs['args'] = ["bash", "-c", Raw(f"'{payload}'")]
792 if kwargs.pop('sudo', False):
793 kwargs['args'].insert(0, 'sudo')
794 kwargs['omit_sudo'] = False
795 return self.run_shell(**kwargs)
f6b5b4d7 796
f67539c2
TL
797 def run_as_user(self, **kwargs):
798 """
799 Besides the arguments defined for run_shell() this method also
800 accepts argument 'user'.
801 """
802 args = kwargs.pop('args')
803 user = kwargs.pop('user')
9f95a23c 804 if isinstance(args, str):
f67539c2
TL
805 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', args]
806 elif isinstance(args, list):
807 cmdlist = args
808 cmd = ''
809 for i in cmdlist:
810 cmd = cmd + i + ' '
811 # get rid of extra space at the end.
812 cmd = cmd[:-1]
813
814 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', cmd]
815
816 kwargs['args'] = args
1e59de90 817 kwargs['omit_sudo'] = False
f67539c2 818 return self.run_shell(**kwargs)
9f95a23c 819
f67539c2
TL
820 def run_as_root(self, **kwargs):
821 """
822 Accepts same arguments as run_shell().
823 """
824 kwargs['user'] = 'root'
825 return self.run_as_user(**kwargs)
826
1e59de90
TL
827 def assert_retval(self, proc_retval, exp_retval):
828 msg = (f'expected return value: {exp_retval}\n'
829 f'received return value: {proc_retval}\n')
830 assert proc_retval == exp_retval, msg
f67539c2 831
1e59de90
TL
832 def _verify(self, proc, exp_retval=None, exp_errmsgs=None):
833 if exp_retval is None and exp_errmsgs is None:
834 raise RuntimeError('Method didn\'t get enough parameters. Pass '
835 'return value or error message expected from '
836 'the command/process.')
837
838 if exp_retval is not None:
839 self.assert_retval(proc.returncode, exp_retval)
840 if exp_errmsgs is None:
841 return
842
843 if isinstance(exp_errmsgs, str):
844 exp_errmsgs = (exp_errmsgs, )
845
846 proc_stderr = proc.stderr.getvalue().lower()
847 msg = ('didn\'t find any of the expected string in stderr.\n'
848 f'expected string: {exp_errmsgs}\n'
849 f'received error message: {proc_stderr}\n'
850 'note: received error message is converted to lowercase')
851 for e in exp_errmsgs:
852 if e in proc_stderr:
853 break
854 # this else is meant for for loop.
855 else:
856 assert False, msg
f67539c2 857
1e59de90 858 def negtestcmd(self, args, retval=None, errmsgs=None, stdin=None,
f67539c2
TL
859 cwd=None, wait=True):
860 """
861 Conduct a negative test for the given command.
862
1e59de90 863 retval and errmsgs are parameters to confirm the cause of command
f67539c2 864 failure.
1e59de90
TL
865
866 Note: errmsgs is expected to be a tuple, but in case there's only
867 error message, it can also be a string. This method will handle
868 that internally.
f67539c2
TL
869 """
870 proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd,
871 check_status=False)
1e59de90 872 self._verify(proc, retval, errmsgs)
f67539c2
TL
873 return proc
874
1e59de90 875 def negtestcmd_as_user(self, args, user, retval=None, errmsgs=None,
f67539c2
TL
876 stdin=None, cwd=None, wait=True):
877 proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin,
878 cwd=cwd, check_status=False)
1e59de90 879 self._verify(proc, retval, errmsgs)
f67539c2
TL
880 return proc
881
1e59de90 882 def negtestcmd_as_root(self, args, retval=None, errmsgs=None, stdin=None,
f67539c2
TL
883 cwd=None, wait=True):
884 proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd,
885 check_status=False)
1e59de90 886 self._verify(proc, retval, errmsgs)
f67539c2 887 return proc
7c673cae 888
1e59de90
TL
889 def open_for_reading(self, basename):
890 """
891 Open a file for reading only.
892 """
893 assert(self.is_mounted())
894
895 path = os.path.join(self.hostfs_mntpt, basename)
896
897 return self._run_python(dedent(
898 """
899 import os
900 mode = os.O_RDONLY
901 fd = os.open("{path}", mode)
902 os.close(fd)
903 """.format(path=path)
904 ))
905
906 def open_for_writing(self, basename, creat=True, trunc=True, excl=False):
907 """
908 Open a file for writing only.
909 """
910 assert(self.is_mounted())
911
912 path = os.path.join(self.hostfs_mntpt, basename)
913
914 return self._run_python(dedent(
915 """
916 import os
917 mode = os.O_WRONLY
918 if {creat}:
919 mode |= os.O_CREAT
920 if {trunc}:
921 mode |= os.O_TRUNC
922 if {excl}:
923 mode |= os.O_EXCL
924 fd = os.open("{path}", mode)
925 os.close(fd)
926 """.format(path=path, creat=creat, trunc=trunc, excl=excl)
927 ))
928
7c673cae
FG
929 def open_no_data(self, basename):
930 """
931 A pure metadata operation
932 """
933 assert(self.is_mounted())
934
f67539c2 935 path = os.path.join(self.hostfs_mntpt, basename)
7c673cae
FG
936
937 p = self._run_python(dedent(
938 """
939 f = open("{path}", 'w')
940 """.format(path=path)
941 ))
942 p.wait()
943
aee94f69 944 def open_background(self, basename="background_file", write=True, content="content"):
7c673cae
FG
945 """
946 Open a file for writing, then block such that the client
947 will hold a capability.
948
949 Don't return until the remote process has got as far as opening
950 the file, then return the RemoteProcess instance.
951 """
952 assert(self.is_mounted())
953
f67539c2 954 path = os.path.join(self.hostfs_mntpt, basename)
7c673cae 955
494da23a
TL
956 if write:
957 pyscript = dedent("""
958 import time
959
9f95a23c 960 with open("{path}", 'w') as f:
aee94f69 961 f.write("{content}")
9f95a23c 962 f.flush()
9f95a23c
TL
963 while True:
964 time.sleep(1)
aee94f69 965 """).format(path=path, content=content)
494da23a
TL
966 else:
967 pyscript = dedent("""
968 import time
969
9f95a23c
TL
970 with open("{path}", 'r') as f:
971 while True:
972 time.sleep(1)
494da23a 973 """).format(path=path)
7c673cae
FG
974
975 rproc = self._run_python(pyscript)
976 self.background_procs.append(rproc)
977
978 # This wait would not be sufficient if the file had already
979 # existed, but it's simple and in practice users of open_background
980 # are not using it on existing files.
aee94f69
TL
981 if write:
982 self.wait_for_visible(basename, size=len(content))
983 else:
984 self.wait_for_visible(basename)
7c673cae
FG
985
986 return rproc
987
2a845540
TL
988 def open_dir_background(self, basename):
989 """
990 Create and hold a capability to a directory.
991 """
992 assert(self.is_mounted())
993
994 path = os.path.join(self.hostfs_mntpt, basename)
995
996 pyscript = dedent("""
997 import time
998 import os
999
1000 os.mkdir("{path}")
1001 fd = os.open("{path}", os.O_RDONLY)
1002 while True:
1003 time.sleep(1)
1004 """).format(path=path)
1005
1006 rproc = self._run_python(pyscript)
1007 self.background_procs.append(rproc)
1008
1009 self.wait_for_visible(basename)
1010
1011 return rproc
1012
494da23a 1013 def wait_for_dir_empty(self, dirname, timeout=30):
f67539c2
TL
1014 dirpath = os.path.join(self.hostfs_mntpt, dirname)
1015 with safe_while(sleep=5, tries=(timeout//5)) as proceed:
1016 while proceed():
1017 p = self.run_shell_payload(f"stat -c %h {dirpath}")
1018 nr_links = int(p.stdout.getvalue().strip())
1019 if nr_links == 2:
1020 return
494da23a 1021
aee94f69 1022 def wait_for_visible(self, basename="background_file", size=None, timeout=30):
7c673cae 1023 i = 0
aee94f69
TL
1024 args = ['stat']
1025 if size is not None:
1026 args += ['--printf=%s']
1027 args += [os.path.join(self.hostfs_mntpt, basename)]
7c673cae 1028 while i < timeout:
aee94f69
TL
1029 p = self.client_remote.run(args=args, stdout=StringIO(), check_status=False)
1030 if p.exitstatus == 0:
1031 if size is not None:
1032 s = p.stdout.getvalue().strip()
1033 if int(s) == size:
1034 log.info(f"File {basename} became visible with size {size} from {self.client_id} after {i}s")
1035 return
1036 else:
1037 log.error(f"File {basename} became visible but with size {int(s)} not {size}")
1038 else:
1039 log.info(f"File {basename} became visible from {self.client_id} after {i}s")
1040 return
1041 time.sleep(1)
1042 i += 1
7c673cae
FG
1043
1044 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
1045 i, basename, self.client_id))
1046
1047 def lock_background(self, basename="background_file", do_flock=True):
1048 """
1049 Open and lock a files for writing, hold the lock in a background process
1050 """
1051 assert(self.is_mounted())
1052
f67539c2 1053 path = os.path.join(self.hostfs_mntpt, basename)
7c673cae
FG
1054
1055 script_builder = """
1056 import time
1057 import fcntl
1058 import struct"""
1059 if do_flock:
1060 script_builder += """
1061 f1 = open("{path}-1", 'w')
1062 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
1063 script_builder += """
1064 f2 = open("{path}-2", 'w')
1065 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
1066 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
1067 while True:
1068 time.sleep(1)
1069 """
1070
1071 pyscript = dedent(script_builder).format(path=path)
1072
31f18b77 1073 log.info("lock_background file {0}".format(basename))
7c673cae
FG
1074 rproc = self._run_python(pyscript)
1075 self.background_procs.append(rproc)
1076 return rproc
1077
31f18b77
FG
1078 def lock_and_release(self, basename="background_file"):
1079 assert(self.is_mounted())
1080
f67539c2 1081 path = os.path.join(self.hostfs_mntpt, basename)
31f18b77
FG
1082
1083 script = """
1084 import time
1085 import fcntl
1086 import struct
1087 f1 = open("{path}-1", 'w')
1088 fcntl.flock(f1, fcntl.LOCK_EX)
1089 f2 = open("{path}-2", 'w')
1090 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
1091 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
1092 """
1093 pyscript = dedent(script).format(path=path)
1094
1095 log.info("lock_and_release file {0}".format(basename))
1096 return self._run_python(pyscript)
1097
7c673cae
FG
1098 def check_filelock(self, basename="background_file", do_flock=True):
1099 assert(self.is_mounted())
1100
f67539c2 1101 path = os.path.join(self.hostfs_mntpt, basename)
7c673cae
FG
1102
1103 script_builder = """
1104 import fcntl
1105 import errno
1106 import struct"""
1107 if do_flock:
1108 script_builder += """
1109 f1 = open("{path}-1", 'r')
1110 try:
1111 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
9f95a23c 1112 except IOError as e:
7c673cae
FG
1113 if e.errno == errno.EAGAIN:
1114 pass
1115 else:
1116 raise RuntimeError("flock on file {path}-1 not found")"""
1117 script_builder += """
1118 f2 = open("{path}-2", 'r')
1119 try:
1120 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
1121 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
9f95a23c 1122 except IOError as e:
7c673cae
FG
1123 if e.errno == errno.EAGAIN:
1124 pass
1125 else:
1126 raise RuntimeError("posix lock on file {path}-2 not found")
1127 """
1128 pyscript = dedent(script_builder).format(path=path)
1129
1130 log.info("check lock on file {0}".format(basename))
1131 self.client_remote.run(args=[
522d829b 1132 'python3', '-c', pyscript
7c673cae
FG
1133 ])
1134
1135 def write_background(self, basename="background_file", loop=False):
1136 """
1137 Open a file for writing, complete as soon as you can
1138 :param basename:
1139 :return:
1140 """
1141 assert(self.is_mounted())
1142
f67539c2 1143 path = os.path.join(self.hostfs_mntpt, basename)
7c673cae
FG
1144
1145 pyscript = dedent("""
1146 import os
1147 import time
1148
9f95a23c 1149 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644)
7c673cae
FG
1150 try:
1151 while True:
9f95a23c 1152 os.write(fd, b'content')
7c673cae
FG
1153 time.sleep(1)
1154 if not {loop}:
1155 break
9f95a23c 1156 except IOError as e:
7c673cae
FG
1157 pass
1158 os.close(fd)
1159 """).format(path=path, loop=str(loop))
1160
1161 rproc = self._run_python(pyscript)
1162 self.background_procs.append(rproc)
1163 return rproc
1164
1165 def write_n_mb(self, filename, n_mb, seek=0, wait=True):
1166 """
1167 Write the requested number of megabytes to a file
1168 """
1169 assert(self.is_mounted())
1170
1171 return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
1172 "bs=1M", "conv=fdatasync",
e306af50
TL
1173 "count={0}".format(int(n_mb)),
1174 "seek={0}".format(int(seek))
7c673cae
FG
1175 ], wait=wait)
1176
1177 def write_test_pattern(self, filename, size):
1178 log.info("Writing {0} bytes to {1}".format(size, filename))
1179 return self.run_python(dedent("""
1180 import zlib
1181 path = "{path}"
9f95a23c
TL
1182 with open(path, 'w') as f:
1183 for i in range(0, {size}):
1184 val = zlib.crc32(str(i).encode('utf-8')) & 7
1185 f.write(chr(val))
7c673cae 1186 """.format(
f67539c2 1187 path=os.path.join(self.hostfs_mntpt, filename),
7c673cae
FG
1188 size=size
1189 )))
1190
1191 def validate_test_pattern(self, filename, size):
1192 log.info("Validating {0} bytes from {1}".format(size, filename))
522d829b 1193 # Use sudo because cephfs-data-scan may recreate the file with owner==root
7c673cae
FG
1194 return self.run_python(dedent("""
1195 import zlib
1196 path = "{path}"
9f95a23c
TL
1197 with open(path, 'r') as f:
1198 bytes = f.read()
7c673cae
FG
1199 if len(bytes) != {size}:
1200 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
1201 len(bytes), {size}
1202 ))
1203 for i, b in enumerate(bytes):
9f95a23c 1204 val = zlib.crc32(str(i).encode('utf-8')) & 7
7c673cae
FG
1205 if b != chr(val):
1206 raise RuntimeError("Bad data at offset {{0}}".format(i))
1207 """.format(
f67539c2 1208 path=os.path.join(self.hostfs_mntpt, filename),
7c673cae 1209 size=size
522d829b 1210 )), sudo=True)
7c673cae
FG
1211
1212 def open_n_background(self, fs_path, count):
1213 """
1214 Open N files for writing, hold them open in a background process
1215
1216 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
1217 :return: a RemoteProcess
1218 """
1219 assert(self.is_mounted())
1220
f67539c2 1221 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
7c673cae
FG
1222
1223 pyscript = dedent("""
1224 import sys
1225 import time
1226 import os
1227
1228 n = {count}
1229 abs_path = "{abs_path}"
1230
f91f0fd5
TL
1231 if not os.path.exists(abs_path):
1232 os.makedirs(abs_path)
7c673cae
FG
1233
1234 handles = []
1235 for i in range(0, n):
f91f0fd5
TL
1236 fname = "file_"+str(i)
1237 path = os.path.join(abs_path, fname)
1238 handles.append(open(path, 'w'))
7c673cae
FG
1239
1240 while True:
1241 time.sleep(1)
1242 """).format(abs_path=abs_path, count=count)
1243
1244 rproc = self._run_python(pyscript)
1245 self.background_procs.append(rproc)
1246 return rproc
1247
aee94f69
TL
1248 def create_n_files(self, fs_path, count, sync=False, dirsync=False,
1249 unlink=False, finaldirsync=False, hard_links=0):
20effc67
TL
1250 """
1251 Create n files.
1252
1253 :param sync: sync the file after writing
1254 :param dirsync: sync the containing directory after closing the file
1255 :param unlink: unlink the file after closing
1256 :param finaldirsync: sync the containing directory after closing the last file
aee94f69 1257 :param hard_links: create given number of hard link(s) for each file
20effc67
TL
1258 """
1259
7c673cae
FG
1260 assert(self.is_mounted())
1261
f67539c2 1262 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
7c673cae 1263
20effc67 1264 pyscript = dedent(f"""
7c673cae 1265 import os
aee94f69 1266 import uuid
7c673cae
FG
1267
1268 n = {count}
aee94f69
TL
1269 create_hard_links = False
1270 if {hard_links} > 0:
1271 create_hard_links = True
20effc67 1272 path = "{abs_path}"
7c673cae 1273
20effc67
TL
1274 dpath = os.path.dirname(path)
1275 fnameprefix = os.path.basename(path)
1276 os.makedirs(dpath, exist_ok=True)
7c673cae 1277
20effc67
TL
1278 try:
1279 dirfd = os.open(dpath, os.O_DIRECTORY)
1280
1281 for i in range(n):
1282 fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}")
1283 with open(fpath, 'w') as f:
1284 f.write(f"{{i}}")
1285 if {sync}:
1286 f.flush()
1287 os.fsync(f.fileno())
1288 if {unlink}:
1289 os.unlink(fpath)
1290 if {dirsync}:
1291 os.fsync(dirfd)
aee94f69
TL
1292 if create_hard_links:
1293 for j in range({hard_links}):
1294 os.system(f"ln {{fpath}} {{dpath}}/{{fnameprefix}}_{{i}}_{{uuid.uuid4()}}")
20effc67
TL
1295 if {finaldirsync}:
1296 os.fsync(dirfd)
1297 finally:
1298 os.close(dirfd)
1299 """)
7c673cae
FG
1300
1301 self.run_python(pyscript)
1302
1303 def teardown(self):
1304 for p in self.background_procs:
1305 log.info("Terminating background process")
1306 self._kill_background(p)
1307
1308 self.background_procs = []
1309
1310 def _kill_background(self, p):
1311 if p.stdin:
1312 p.stdin.close()
1313 try:
1314 p.wait()
1315 except (CommandFailedError, ConnectionLostError):
1316 pass
1317
1318 def kill_background(self, p):
1319 """
1320 For a process that was returned by one of the _background member functions,
1321 kill it hard.
1322 """
1323 self._kill_background(p)
1324 self.background_procs.remove(p)
1325
eafe8130
TL
1326 def send_signal(self, signal):
1327 signal = signal.lower()
1328 if signal.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']:
1329 raise NotImplementedError
1330
1331 self.client_remote.run(args=['sudo', 'kill', '-{0}'.format(signal),
1332 self.client_pid], omit_sudo=False)
1333
7c673cae
FG
1334 def get_global_id(self):
1335 raise NotImplementedError()
1336
11fdf7f2
TL
1337 def get_global_inst(self):
1338 raise NotImplementedError()
1339
1340 def get_global_addr(self):
1341 raise NotImplementedError()
1342
7c673cae
FG
1343 def get_osd_epoch(self):
1344 raise NotImplementedError()
1345
f67539c2
TL
1346 def get_op_read_count(self):
1347 raise NotImplementedError()
1348
20effc67
TL
1349 def readlink(self, fs_path):
1350 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1351
1352 pyscript = dedent("""
1353 import os
1354
1355 print(os.readlink("{path}"))
1356 """).format(path=abs_path)
1357
1358 proc = self._run_python(pyscript)
1359 proc.wait()
1360 return str(proc.stdout.getvalue().strip())
1361
1362
9f95a23c
TL
1363 def lstat(self, fs_path, follow_symlinks=False, wait=True):
1364 return self.stat(fs_path, follow_symlinks=False, wait=True)
1365
522d829b 1366 def stat(self, fs_path, follow_symlinks=True, wait=True, **kwargs):
7c673cae
FG
1367 """
1368 stat a file, and return the result as a dictionary like this:
1369 {
1370 "st_ctime": 1414161137.0,
1371 "st_mtime": 1414161137.0,
1372 "st_nlink": 33,
1373 "st_gid": 0,
1374 "st_dev": 16777218,
1375 "st_size": 1190,
1376 "st_ino": 2,
1377 "st_uid": 0,
1378 "st_mode": 16877,
1379 "st_atime": 1431520593.0
1380 }
1381
1382 Raises exception on absent file.
1383 """
f67539c2 1384 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
9f95a23c
TL
1385 if follow_symlinks:
1386 stat_call = "os.stat('" + abs_path + "')"
1387 else:
1388 stat_call = "os.lstat('" + abs_path + "')"
7c673cae
FG
1389
1390 pyscript = dedent("""
1391 import os
1392 import stat
1393 import json
1394 import sys
1395
1396 try:
9f95a23c 1397 s = {stat_call}
7c673cae
FG
1398 except OSError as e:
1399 sys.exit(e.errno)
1400
1401 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
9f95a23c 1402 print(json.dumps(
7c673cae 1403 dict([(a, getattr(s, a)) for a in attrs]),
9f95a23c
TL
1404 indent=2))
1405 """).format(stat_call=stat_call)
522d829b 1406 proc = self._run_python(pyscript, **kwargs)
7c673cae
FG
1407 if wait:
1408 proc.wait()
1409 return json.loads(proc.stdout.getvalue().strip())
1410 else:
1411 return proc
1412
1413 def touch(self, fs_path):
1414 """
1415 Create a dentry if it doesn't already exist. This python
1416 implementation exists because the usual command line tool doesn't
1417 pass through error codes like EIO.
1418
1419 :param fs_path:
1420 :return:
1421 """
f67539c2 1422 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
7c673cae
FG
1423 pyscript = dedent("""
1424 import sys
1425 import errno
1426
1427 try:
1428 f = open("{path}", "w")
1429 f.close()
1430 except IOError as e:
1431 sys.exit(errno.EIO)
1432 """).format(path=abs_path)
1433 proc = self._run_python(pyscript)
1434 proc.wait()
1435
1436 def path_to_ino(self, fs_path, follow_symlinks=True):
f67539c2 1437 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
7c673cae
FG
1438
1439 if follow_symlinks:
1440 pyscript = dedent("""
1441 import os
1442 import stat
1443
9f95a23c 1444 print(os.stat("{path}").st_ino)
7c673cae
FG
1445 """).format(path=abs_path)
1446 else:
1447 pyscript = dedent("""
1448 import os
1449 import stat
1450
9f95a23c 1451 print(os.lstat("{path}").st_ino)
7c673cae
FG
1452 """).format(path=abs_path)
1453
1454 proc = self._run_python(pyscript)
1455 proc.wait()
1456 return int(proc.stdout.getvalue().strip())
1457
1458 def path_to_nlink(self, fs_path):
f67539c2 1459 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
7c673cae
FG
1460
1461 pyscript = dedent("""
1462 import os
1463 import stat
1464
9f95a23c 1465 print(os.stat("{path}").st_nlink)
7c673cae
FG
1466 """).format(path=abs_path)
1467
1468 proc = self._run_python(pyscript)
1469 proc.wait()
1470 return int(proc.stdout.getvalue().strip())
1471
522d829b 1472 def ls(self, path=None, **kwargs):
7c673cae
FG
1473 """
1474 Wrap ls: return a list of strings
1475 """
1e59de90 1476 kwargs['args'] = ["ls"]
7c673cae 1477 if path:
1e59de90
TL
1478 kwargs['args'].append(path)
1479 if kwargs.pop('sudo', False):
1480 kwargs['args'].insert(0, 'sudo')
1481 kwargs['omit_sudo'] = False
1482 ls_text = self.run_shell(**kwargs).stdout.getvalue().strip()
7c673cae
FG
1483
1484 if ls_text:
1485 return ls_text.split("\n")
1486 else:
1487 # Special case because otherwise split on empty string
1488 # gives you [''] instead of []
1489 return []
1490
522d829b 1491 def setfattr(self, path, key, val, **kwargs):
7c673cae
FG
1492 """
1493 Wrap setfattr.
1494
1495 :param path: relative to mount point
1496 :param key: xattr name
1497 :param val: xattr value
1498 :return: None
1499 """
1e59de90
TL
1500 kwargs['args'] = ["setfattr", "-n", key, "-v", val, path]
1501 if kwargs.pop('sudo', False):
1502 kwargs['args'].insert(0, 'sudo')
1503 kwargs['omit_sudo'] = False
1504 self.run_shell(**kwargs)
7c673cae 1505
522d829b 1506 def getfattr(self, path, attr, **kwargs):
7c673cae
FG
1507 """
1508 Wrap getfattr: return the values of a named xattr on one file, or
1509 None if the attribute is not found.
1510
1511 :return: a string
1512 """
1e59de90
TL
1513 kwargs['args'] = ["getfattr", "--only-values", "-n", attr, path]
1514 if kwargs.pop('sudo', False):
1515 kwargs['args'].insert(0, 'sudo')
1516 kwargs['omit_sudo'] = False
1517 kwargs['wait'] = False
1518 p = self.run_shell(**kwargs)
7c673cae
FG
1519 try:
1520 p.wait()
1521 except CommandFailedError as e:
1522 if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
1523 return None
1524 else:
1525 raise
1526
e306af50 1527 return str(p.stdout.getvalue())
7c673cae
FG
1528
1529 def df(self):
1530 """
1531 Wrap df: return a dict of usage fields in bytes
1532 """
1533
1534 p = self.run_shell(["df", "-B1", "."])
1535 lines = p.stdout.getvalue().strip().split("\n")
1536 fs, total, used, avail = lines[1].split()[:4]
e306af50 1537 log.warning(lines)
7c673cae
FG
1538
1539 return {
1540 "total": int(total),
1541 "used": int(used),
1542 "available": int(avail)
1543 }
f67539c2
TL
1544
1545 def dir_checksum(self, path=None, follow_symlinks=False):
1546 cmd = ["find"]
1547 if follow_symlinks:
1548 cmd.append("-L")
1549 if path:
1550 cmd.append(path)
1551 cmd.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
1552 checksum_text = self.run_shell(cmd).stdout.getvalue().strip()
1553 checksum_sorted = sorted(checksum_text.split('\n'), key=lambda v: v.split()[1])
1554 return hashlib.md5(('\n'.join(checksum_sorted)).encode('utf-8')).hexdigest()
1e59de90
TL
1555
1556 def validate_subvol_options(self):
1557 mount_subvol_num = self.client_config.get('mount_subvol_num', None)
1558 if self.cephfs_mntpt and mount_subvol_num is not None:
1559 log.warning("You cannot specify both: cephfs_mntpt and mount_subvol_num")
1560 log.info(f"Mounting subvol {mount_subvol_num} for now")
1561
1562 if mount_subvol_num is not None:
1563 # mount_subvol must be an index into the subvol path array for the fs
1564 if not self.cephfs_name:
1565 self.cephfs_name = 'cephfs'
1566 assert(hasattr(self.ctx, "created_subvols"))
1567 # mount_subvol must be specified under client.[0-9] yaml section
1568 subvol_paths = self.ctx.created_subvols[self.cephfs_name]
1569 path_to_mount = subvol_paths[mount_subvol_num]
1570 self.cephfs_mntpt = path_to_mount