import errno
+import logging
import os
import pwd
import platform
import tempfile
import uuid
-from ceph_volume import process
+from ceph_volume import process, terminal
from . import as_string
+logger = logging.getLogger(__name__)
+mlogger = terminal.MultiLogger(__name__)
# TODO: get these out of here and into a common area for others to consume
if platform.system() == 'FreeBSD':
return str(uuid.uuid4())
+def which(executable):
+ """find the location of an executable"""
+ locations = (
+ '/usr/local/bin',
+ '/bin',
+ '/usr/bin',
+ '/usr/local/sbin',
+ '/usr/sbin',
+ '/sbin',
+ )
+
+ for location in locations:
+ executable_path = os.path.join(location, executable)
+ if os.path.exists(executable_path) and os.path.isfile(executable_path):
+ return executable_path
+ mlogger.warning('Absolute path not found for executable: %s', executable)
+ mlogger.warning('Ensure $PATH environment variable contains common executable locations')
+ # fallback to just returning the argument as-is, to prevent a hard fail,
+ # and hoping that the system might have the executable somewhere custom
+ return executable
+
+
def get_ceph_user_ids():
"""
Return the id and gid of the ceph user
"""
uid, gid = get_ceph_user_ids()
if os.path.islink(path):
+ process.run(['chown', '-h', 'ceph:ceph', path])
path = os.path.realpath(path)
if recursive:
process.run(['chown', '-R', 'ceph:ceph', path])
"""
Temporarily mount a device on a temporary directory,
and unmount it upon exit
+
+ When ``encrypted`` is set to ``True``, the exit method will call out to
+ close the device so that it doesn't remain open after mounting. It is
+ assumed that it will be open because otherwise it wouldn't be possible to
+ mount in the first place
"""
- def __init__(self, device):
+ def __init__(self, device, encrypted=False):
self.device = device
self.path = None
+ self.encrypted = encrypted
def __enter__(self):
self.path = tempfile.mkdtemp()
process.run([
- 'sudo',
'mount',
'-v',
self.device,
def __exit__(self, exc_type, exc_val, exc_tb):
process.run([
- 'sudo',
'umount',
'-v',
self.path
])
+ if self.encrypted:
+ # avoid a circular import from the encryption module
+ from ceph_volume.util import encryption
+ encryption.dmcrypt_close(self.device)
+
+
+def unmount(path):
+ """
+ Removes mounts at the given path
+ """
+ process.run([
+ 'umount',
+ '-v',
+ path,
+ ])
def path_is_mounted(path, destination=None):
mounted_locations = mounts.get(realpath, [])
if destination:
- if destination.startswith('/'):
- destination = os.path.realpath(destination)
return destination in mounted_locations
return mounted_locations != []
Check if the given device is mounted, optionally validating that a
destination exists
"""
- mounts = get_mounts(devices=True)
- realpath = os.path.realpath(dev) if dev.startswith('/') else dev
+ plain_mounts = get_mounts(devices=True)
+ realpath_mounts = get_mounts(devices=True, realpath=True)
+ realpath_dev = os.path.realpath(dev) if dev.startswith('/') else dev
destination = os.path.realpath(destination) if destination else None
- mounted_locations = mounts.get(realpath, [])
+ # plain mounts
+ plain_dev_mounts = plain_mounts.get(dev, [])
+ realpath_dev_mounts = plain_mounts.get(realpath_dev, [])
+ # realpath mounts
+ plain_dev_real_mounts = realpath_mounts.get(dev, [])
+ realpath_dev_real_mounts = realpath_mounts.get(realpath_dev, [])
- if destination:
- return destination in mounted_locations
- return mounted_locations != []
+ mount_locations = [
+ plain_dev_mounts,
+ realpath_dev_mounts,
+ plain_dev_real_mounts,
+ realpath_dev_real_mounts
+ ]
+
+ for mounts in mount_locations:
+ if mounts: # we have a matching mount
+ if destination:
+ if destination in mounts:
+ logger.info(
+ '%s detected as mounted, exists at destination: %s', dev, destination
+ )
+ return True
+ else:
+ logger.info('%s was found as mounted')
+ return True
+ logger.info('%s was not found as mounted')
+ return False
-def get_mounts(devices=False, paths=False):
+def get_mounts(devices=False, paths=False, realpath=False):
"""
Create a mapping of all available system mounts so that other helpers can
detect nicely what path or device is mounted
If ``devices`` is set to ``True`` the mapping will be a device-to-path(s),
if ``paths`` is set to ``True`` then the mapping will be
a path-to-device(s)
+
+ :param realpath: Resolve devices to use their realpaths. This is useful for
+ paths like LVM where more than one path can point to the same device
"""
devices_mounted = {}
paths_mounted = {}
fields = [as_string(f) for f in line.split()]
if len(fields) < 3:
continue
- device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
+ if realpath:
+ device = os.path.realpath(fields[0]) if fields[0].startswith('/') else fields[0]
+ else:
+ device = fields[0]
path = os.path.realpath(fields[1])
# only care about actual existing devices
if not os.path.exists(device) or not device.startswith('/'):