import os
import pwd
import platform
+import tempfile
import uuid
from ceph_volume import process
from . import as_string
os.chown(path, uid, gid)
-def is_mounted(source, destination=None):
+def is_binary(path):
"""
- Check if the given device is mounted, optionally validating destination.
- This relies on absolute path devices, it will ignore non-absolute
- entries like::
+ Detect if a file path is a binary or not. Will falsely report as binary
+ when utf-16 encoded. In the ceph universe there is no such risk (yet)
+ """
+ with open(path, 'rb') as fp:
+ contents = fp.read(8192)
+ if b'\x00' in contents: # a null byte may signal binary
+ return True
+ return False
+
+
+class tmp_mount(object):
+ """
+ Temporarily mount a device on a temporary directory,
+ and unmount it upon exit
+ """
+
+ def __init__(self, device):
+ self.device = device
+ self.path = None
+
+ def __enter__(self):
+ self.path = tempfile.mkdtemp()
+ process.run([
+ 'sudo',
+ 'mount',
+ '-v',
+ self.device,
+ self.path
+ ])
+ return self.path
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ process.run([
+ 'sudo',
+ 'umount',
+ '-v',
+ self.path
+ ])
+
+
+def path_is_mounted(path, destination=None):
+ """
+ Check if the given path is mounted
+ """
+ mounts = get_mounts(paths=True)
+ realpath = os.path.realpath(path)
+ mounted_locations = mounts.get(realpath, [])
+
+ if destination:
+ if destination.startswith('/'):
+ destination = os.path.realpath(destination)
+ return destination in mounted_locations
+ return mounted_locations != []
+
- tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0
+def device_is_mounted(dev, destination=None):
+ """
+ Check if the given device is mounted, optionally validating that a
+ destination exists
+ """
+ mounts = get_mounts(devices=True)
+ realpath = os.path.realpath(dev) if dev.startswith('/') else dev
+ destination = os.path.realpath(destination) if destination else None
+ mounted_locations = mounts.get(realpath, [])
+
+ if destination:
+ return destination in mounted_locations
+ return mounted_locations != []
+
+
+def get_mounts(devices=False, paths=False):
+ """
+ Create a mapping of all available system mounts so that other helpers can
+ detect nicely what path or device is mounted
- But will parse paths that are absolute like::
+ It ignores (most of) non existing devices, but since some setups might need
+ some extra device information, it will make an exception for:
- /dev/sdc2 /boot xfs rw,attr2,inode64,noquota 0 0
+ - tmpfs
+ - devtmpfs
- When destination is passed in, it will check that the entry where the
- source appears is mounted to where destination defines. This is useful so
- that an error message can report that a source is not mounted at an
- expected destination.
+ If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
+ if ``paths`` is set to ``True`` then the mapping will be
+ a path-to-device(s)
"""
- dev = os.path.realpath(source)
- with open(PROCDIR + '/mounts', 'rb') as proc_mounts:
- for line in proc_mounts:
- fields = line.split()
- if len(fields) < 3:
+ devices_mounted = {}
+ paths_mounted = {}
+ do_not_skip = ['tmpfs', 'devtmpfs']
+ default_to_devices = devices is False and paths is False
+
+ with open(PROCDIR + '/mounts', 'rb') as mounts:
+ proc_mounts = mounts.readlines()
+
+ for line in proc_mounts:
+ fields = [as_string(f) for f in line.split()]
+ if len(fields) < 3:
+ continue
+ device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
+ path = os.path.realpath(fields[1])
+ # only care about actual existing devices
+ if not os.path.exists(device) or not device.startswith('/'):
+ if device not in do_not_skip:
continue
- mounted_device = fields[0]
- mounted_path = fields[1]
- if os.path.isabs(mounted_device) and os.path.exists(mounted_device):
- mounted_device = os.path.realpath(mounted_device)
- if as_string(mounted_device) == dev:
- if destination:
- destination = os.path.realpath(destination)
- return destination == as_string(os.path.realpath(mounted_path))
- else:
- return True
- return False
+ if device in devices_mounted.keys():
+ devices_mounted[device].append(path)
+ else:
+ devices_mounted[device] = [path]
+ if path in paths_mounted.keys():
+ paths_mounted[path].append(device)
+ else:
+ paths_mounted[path] = [device]
+
+ # Default to returning information for devices if
+ if devices is True or default_to_devices:
+ return devices_mounted
+ else:
+ return paths_mounted