]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/ceph-volume/ceph_volume/util/system.py
update sources to 12.2.2
[ceph.git] / ceph / src / ceph-volume / ceph_volume / util / system.py
index 084a0e0d3710042a8b9d38aeaf530d1038c8ffd0..d580a4c28f08f33cb7b5fe12fdf6d04ce8c1be90 100644 (file)
@@ -2,6 +2,7 @@ import errno
 import os
 import pwd
 import platform
+import tempfile
 import uuid
 from ceph_volume import process
 from . import as_string
@@ -68,37 +69,122 @@ def chown(path, recursive=True):
         os.chown(path, uid, gid)
 
 
-def is_mounted(source, destination=None):
+def is_binary(path):
     """
-    Check if the given device is mounted, optionally validating destination.
-    This relies on absolute path devices, it will ignore non-absolute
-    entries like::
+    Detect if a file path is a binary or not. Will falsely report as binary
+    when utf-16 encoded. In the ceph universe there is no such risk (yet)
+    """
+    with open(path, 'rb') as fp:
+        contents = fp.read(8192)
+    if b'\x00' in contents:  # a null byte may signal binary
+        return True
+    return False
+
+
+class tmp_mount(object):
+    """
+    Temporarily mount a device on a temporary directory,
+    and unmount it upon exit
+    """
+
+    def __init__(self, device):
+        self.device = device
+        self.path = None
+
+    def __enter__(self):
+        self.path = tempfile.mkdtemp()
+        process.run([
+            'sudo',
+            'mount',
+            '-v',
+            self.device,
+            self.path
+        ])
+        return self.path
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        process.run([
+            'sudo',
+            'umount',
+            '-v',
+            self.path
+        ])
+
+
+def path_is_mounted(path, destination=None):
+    """
+    Check if the given path is mounted
+    """
+    mounts = get_mounts(paths=True)
+    realpath = os.path.realpath(path)
+    mounted_locations = mounts.get(realpath, [])
+
+    if destination:
+        if destination.startswith('/'):
+            destination = os.path.realpath(destination)
+        return destination in mounted_locations
+    return mounted_locations != []
+
 
-        tmpfs /run tmpfs rw,seclabel,nosuid,nodev,mode=755 0 0
+def device_is_mounted(dev, destination=None):
+    """
+    Check if the given device is mounted, optionally validating that a
+    destination exists
+    """
+    mounts = get_mounts(devices=True)
+    realpath = os.path.realpath(dev) if dev.startswith('/') else dev
+    destination = os.path.realpath(destination) if destination else None
+    mounted_locations = mounts.get(realpath, [])
+
+    if destination:
+        return destination in mounted_locations
+    return mounted_locations != []
+
+
+def get_mounts(devices=False, paths=False):
+    """
+    Create a mapping of all available system mounts so that other helpers can
+    detect nicely what path or device is mounted
 
-    But will parse paths that are absolute like::
+    It ignores (most of) non existing devices, but since some setups might need
+    some extra device information, it will make an exception for:
 
-        /dev/sdc2 /boot xfs rw,attr2,inode64,noquota 0 0
+    - tmpfs
+    - devtmpfs
 
-    When destination is passed in, it will check that the entry where the
-    source appears is mounted to where destination defines. This is useful so
-    that an error message can report that a source is not mounted at an
-    expected destination.
+    If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
+    if ``paths`` is set to ``True`` then the mapping will be
+    a path-to-device(s)
     """
-    dev = os.path.realpath(source)
-    with open(PROCDIR + '/mounts', 'rb') as proc_mounts:
-        for line in proc_mounts:
-            fields = line.split()
-            if len(fields) < 3:
+    devices_mounted = {}
+    paths_mounted = {}
+    do_not_skip = ['tmpfs', 'devtmpfs']
+    default_to_devices = devices is False and paths is False
+
+    with open(PROCDIR + '/mounts', 'rb') as mounts:
+        proc_mounts = mounts.readlines()
+
+    for line in proc_mounts:
+        fields = [as_string(f) for f in line.split()]
+        if len(fields) < 3:
+            continue
+        device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
+        path = os.path.realpath(fields[1])
+        # only care about actual existing devices
+        if not os.path.exists(device) or not device.startswith('/'):
+            if device not in do_not_skip:
                 continue
-            mounted_device = fields[0]
-            mounted_path = fields[1]
-            if os.path.isabs(mounted_device) and os.path.exists(mounted_device):
-                mounted_device = os.path.realpath(mounted_device)
-                if as_string(mounted_device) == dev:
-                    if destination:
-                        destination = os.path.realpath(destination)
-                        return destination == as_string(os.path.realpath(mounted_path))
-                    else:
-                        return True
-    return False
+        if device in devices_mounted.keys():
+            devices_mounted[device].append(path)
+        else:
+            devices_mounted[device] = [path]
+        if path in paths_mounted.keys():
+            paths_mounted[path].append(device)
+        else:
+            paths_mounted[path] = [device]
+
+    # Default to returning information for devices if
+    if devices is True or default_to_devices:
+        return devices_mounted
+    else:
+        return paths_mounted