import os
import re
import stat
+import time
from ceph_volume import process
from ceph_volume.api import lvm
from ceph_volume.util.system import get_file_contents
:param device: A ``Device()`` object
"""
- udev_info = udevadm_property(device.path)
- partition_number = udev_info.get('ID_PART_ENTRY_NUMBER')
+ # Sometimes there's a race condition that makes 'ID_PART_ENTRY_NUMBER' be not present
+ # in the output of `udevadm info --query=property`.
+ # Probably not ideal and not the best fix but this allows to get around that issue.
+ # The idea is to make it retry multiple times before actually failing.
+ for i in range(10):
+ udev_info = udevadm_property(device.path)
+ partition_number = udev_info.get('ID_PART_ENTRY_NUMBER')
+ if partition_number:
+ break
+ time.sleep(0.2)
if not partition_number:
raise RuntimeError('Unable to detect the partition number for device: %s' % device.path)
def lsblk(device, columns=None, abspath=False):
- return lsblk_all(device=device,
- columns=columns,
- abspath=abspath)
+ result = []
+ if not os.path.isdir(device):
+ result = lsblk_all(device=device,
+ columns=columns,
+ abspath=abspath)
+ if not result:
+ logger.debug(f"{device} not found is lsblk report")
+ return {}
+
+ return result[0]
def lsblk_all(device='', columns=None, abspath=False):
"""
base_command.append('-p')
base_command.append('-o')
base_command.append(','.join(columns))
+ if device:
+ base_command.append('--nodeps')
+ base_command.append(device)
out, err, rc = process.call(base_command)
for line in out:
result.append(_lsblk_parser(line))
- if not device:
- return result
-
- for dev in result:
- if dev['NAME'] == os.path.basename(device):
- return dev
+ return result
- return {}
def is_device(dev):
"""