from itertools import repeat
from math import floor
from ceph_volume import process, util
-from ceph_volume.exceptions import (
- MultipleLVsError, MultipleVGsError,
- MultiplePVsError, SizeAllocationError
-)
+from ceph_volume.exceptions import SizeAllocationError
logger = logging.getLogger(__name__)
+def convert_filters_to_str(filters):
+ """
+ Convert filter args from dictionary to following format -
+ filters={filter_name=filter_val,...}
+ """
+ if not filters:
+ return filters
+
+ filter_arg = ''
+ for k, v in filters.items():
+ filter_arg += k + '=' + v + ','
+ # get rid of extra comma at the end
+ filter_arg = filter_arg[:len(filter_arg) - 1]
+
+ return filter_arg
+
+
+def convert_tags_to_str(tags):
+ """
+ Convert tags from dictionary to following format -
+ tags={tag_name=tag_val,...}
+ """
+ if not tags:
+ return tags
+
+ tag_arg = 'tags={'
+ for k, v in tags.items():
+ tag_arg += k + '=' + v + ','
+ # get rid of extra comma at the end
+ tag_arg = tag_arg[:len(tag_arg) - 1] + '}'
+
+ return tag_arg
+
+
+def make_filters_lvmcmd_ready(filters, tags):
+ """
+ Convert filters (including tags) from dictionary to following format -
+ filter_name=filter_val...,tags={tag_name=tag_val,...}
+
+ The command will look as follows =
+ lvs -S filter_name=filter_val...,tags={tag_name=tag_val,...}
+ """
+ filters = convert_filters_to_str(filters)
+ tags = convert_tags_to_str(tags)
+
+ if filters and tags:
+ return filters + ',' + tags
+ if filters and not tags:
+ return filters
+ if not filters and tags:
+ return tags
+ else:
+ return ''
+
+
def _output_parser(output, fields):
"""
Newer versions of LVM allow ``--reportformat=json``, but older versions,
PV_FIELDS = 'pv_name,pv_tags,pv_uuid,vg_name,lv_uuid'
-def get_api_pvs():
- """
- Return the list of physical volumes configured for lvm and available in the
- system using flags to include common metadata associated with them like the uuid
-
- This will only return physical volumes set up to work with LVM.
-
- Command and delimited output should look like::
-
- $ pvs --noheadings --readonly --separator=';' -o pv_name,pv_tags,pv_uuid
- /dev/sda1;;
- /dev/sdv;;07A4F654-4162-4600-8EB3-88D1E42F368D
-
- """
- stdout, stderr, returncode = process.call(
- ['pvs', '--no-heading', '--readonly', '--separator=";"', '-o',
- PV_FIELDS],
- verbose_on_failure=False
- )
-
- return _output_parser(stdout, PV_FIELDS)
-
-
class PVolume(object):
"""
Represents a Physical Volume from LVM, with some top-level attributes like
self.set_tag(k, v)
# after setting all the tags, refresh them for the current object, use the
# pv_* identifiers to filter because those shouldn't change
- pv_object = get_pv(pv_name=self.pv_name, pv_uuid=self.pv_uuid)
+ pv_object = self.get_first_pv(filter={'pv_name': self.pv_name,
+ 'pv_uuid': self.pv_uuid})
self.tags = pv_object.tags
def set_tag(self, key, value):
)
-class PVolumes(list):
- """
- A list of all known (physical) volumes for the current system, with the ability
- to filter them via keyword arguments.
- """
-
- def __init__(self, populate=True):
- if populate:
- self._populate()
-
- def _populate(self):
- # get all the pvs in the current system
- for pv_item in get_api_pvs():
- self.append(PVolume(**pv_item))
-
- def _purge(self):
- """
- Deplete all the items in the list, used internally only so that we can
- dynamically allocate the items when filtering without the concern of
- messing up the contents
- """
- self[:] = []
-
- def _filter(self, pv_name=None, pv_uuid=None, pv_tags=None):
- """
- The actual method that filters using a new list. Useful so that other
- methods that do not want to alter the contents of the list (e.g.
- ``self.find``) can operate safely.
- """
- filtered = [i for i in self]
- if pv_name:
- filtered = [i for i in filtered if i.pv_name == pv_name]
-
- if pv_uuid:
- filtered = [i for i in filtered if i.pv_uuid == pv_uuid]
-
- # at this point, `filtered` has either all the physical volumes in self
- # or is an actual filtered list if any filters were applied
- if pv_tags:
- tag_filtered = []
- for pvolume in filtered:
- matches = all(pvolume.tags.get(k) == str(v) for k, v in pv_tags.items())
- if matches:
- tag_filtered.append(pvolume)
- # return the tag_filtered pvolumes here, the `filtered` list is no
- # longer usable
- return tag_filtered
-
- return filtered
-
- def filter(self, pv_name=None, pv_uuid=None, pv_tags=None):
- """
- Filter out volumes on top level attributes like ``pv_name`` or by
- ``pv_tags`` where a dict is required. For example, to find a physical
- volume that has an OSD ID of 0, the filter would look like::
-
- pv_tags={'ceph.osd_id': '0'}
-
- """
- if not any([pv_name, pv_uuid, pv_tags]):
- raise TypeError('.filter() requires pv_name, pv_uuid, or pv_tags'
- '(none given)')
-
- filtered_pvs = PVolumes(populate=False)
- filtered_pvs.extend(self._filter(pv_name, pv_uuid, pv_tags))
- return filtered_pvs
-
- def get(self, pv_name=None, pv_uuid=None, pv_tags=None):
- """
- This is a bit expensive, since it will try to filter out all the
- matching items in the list, filter them out applying anything that was
- added and return the matching item.
-
- This method does *not* alter the list, and it will raise an error if
- multiple pvs are matched
-
- It is useful to use ``tags`` when trying to find a specific logical volume,
- but it can also lead to multiple pvs being found, since a lot of metadata
- is shared between pvs of a distinct OSD.
- """
- if not any([pv_name, pv_uuid, pv_tags]):
- return None
- pvs = self._filter(
- pv_name=pv_name,
- pv_uuid=pv_uuid,
- pv_tags=pv_tags
- )
- if not pvs:
- return None
- if len(pvs) > 1 and pv_tags:
- raise MultiplePVsError(pv_name)
- return pvs[0]
-
-
def create_pv(device):
"""
Create a physical volume from a device, useful when devices need to be later mapped
)
-def get_pv(pv_name=None, pv_uuid=None, pv_tags=None, pvs=None):
+def get_pvs(fields=PV_FIELDS, filters='', tags=None):
"""
- Return a matching pv (physical volume) for the current system, requiring
- ``pv_name``, ``pv_uuid``, or ``pv_tags``. Raises an error if more than one
- pv is found.
+ Return a list of PVs that are available on the system and match the
+ filters and tags passed. Argument filters takes a dictionary containing
+ arguments required by -S option of LVM. Passing a list of LVM tags can be
+ quite tricky to pass as a dictionary within dictionary, therefore pass
+ dictionary of tags via tags argument and tricky part will be taken care of
+ by the helper methods.
+
+ :param fields: string containing list of fields to be displayed by the
+ pvs command
+ :param sep: string containing separator to be used between two fields
+ :param filters: dictionary containing LVM filters
+ :param tags: dictionary containng LVM tags
+ :returns: list of class PVolume object representing pvs on the system
"""
- if not any([pv_name, pv_uuid, pv_tags]):
- return None
- if pvs is None or len(pvs) == 0:
- pvs = PVolumes()
+ filters = make_filters_lvmcmd_ready(filters, tags)
+ args = ['pvs', '--no-heading', '--readonly', '--separator=";"', '-S',
+ filters, '-o', fields]
+
+ stdout, stderr, returncode = process.call(args, verbose_on_failure=False)
+ pvs_report = _output_parser(stdout, fields)
+ return [PVolume(**pv_report) for pv_report in pvs_report]
+
- return pvs.get(pv_name=pv_name, pv_uuid=pv_uuid, pv_tags=pv_tags)
+def get_first_pv(fields=PV_FIELDS, filters=None, tags=None):
+ """
+ Wrapper of get_pv meant to be a convenience method to avoid the phrase::
+ pvs = get_pvs()
+ if len(pvs) >= 1:
+ pv = pvs[0]
+ """
+ pvs = get_pvs(fields=fields, filters=filters, tags=tags)
+ return pvs[0] if len(pvs) > 0 else []
################################
VG_CMD_OPTIONS = ['--noheadings', '--readonly', '--units=b', '--nosuffix', '--separator=";"']
-def get_api_vgs():
- """
- Return the list of group volumes available in the system using flags to
- include common metadata associated with them
-
- Command and sample delimited output should look like::
-
- $ vgs --noheadings --units=b --readonly --separator=';' \
- -o vg_name,pv_count,lv_count,vg_attr,vg_free_count,vg_extent_size
- ubuntubox-vg;1;2;wz--n-;12;
-
- To normalize sizing, the units are forced in 'g' which is equivalent to
- gigabytes, which uses multiples of 1024 (as opposed to 1000)
- """
- stdout, stderr, returncode = process.call(
- ['vgs'] + VG_CMD_OPTIONS + ['-o', VG_FIELDS],
- verbose_on_failure=False
- )
- return _output_parser(stdout, VG_FIELDS)
-
-
class VolumeGroup(object):
"""
Represents an LVM group, with some top-level attributes like ``vg_name``
return int(int(self.vg_free_count) / slots)
-class VolumeGroups(list):
- """
- A list of all known volume groups for the current system, with the ability
- to filter them via keyword arguments.
- """
-
- def __init__(self, populate=True):
- if populate:
- self._populate()
-
- def _populate(self):
- # get all the vgs in the current system
- for vg_item in get_api_vgs():
- self.append(VolumeGroup(**vg_item))
-
- def _purge(self):
- """
- Deplete all the items in the list, used internally only so that we can
- dynamically allocate the items when filtering without the concern of
- messing up the contents
- """
- self[:] = []
-
- def _filter(self, vg_name=None, vg_tags=None):
- """
- The actual method that filters using a new list. Useful so that other
- methods that do not want to alter the contents of the list (e.g.
- ``self.find``) can operate safely.
-
- .. note:: ``vg_tags`` is not yet implemented
- """
- filtered = [i for i in self]
- if vg_name:
- filtered = [i for i in filtered if i.vg_name == vg_name]
-
- # at this point, `filtered` has either all the volumes in self or is an
- # actual filtered list if any filters were applied
- if vg_tags:
- tag_filtered = []
- for volume in filtered:
- matches = all(volume.tags.get(k) == str(v) for k, v in vg_tags.items())
- if matches:
- tag_filtered.append(volume)
- return tag_filtered
-
- return filtered
-
- def filter(self, vg_name=None, vg_tags=None):
- """
- Filter out groups on top level attributes like ``vg_name`` or by
- ``vg_tags`` where a dict is required. For example, to find a Ceph group
- with dmcache as the type, the filter would look like::
-
- vg_tags={'ceph.type': 'dmcache'}
-
- .. warning:: These tags are not documented because they are currently
- unused, but are here to maintain API consistency
- """
- if not any([vg_name, vg_tags]):
- raise TypeError('.filter() requires vg_name or vg_tags (none given)')
-
- filtered_vgs = VolumeGroups(populate=False)
- filtered_vgs.extend(self._filter(vg_name, vg_tags))
- return filtered_vgs
-
- def get(self, vg_name=None, vg_tags=None):
- """
- This is a bit expensive, since it will try to filter out all the
- matching items in the list, filter them out applying anything that was
- added and return the matching item.
-
- This method does *not* alter the list, and it will raise an error if
- multiple VGs are matched
-
- It is useful to use ``tags`` when trying to find a specific volume group,
- but it can also lead to multiple vgs being found (although unlikely)
- """
- if not any([vg_name, vg_tags]):
- return None
- vgs = self._filter(
- vg_name=vg_name,
- vg_tags=vg_tags
- )
- if not vgs:
- return None
- if len(vgs) > 1:
- # this is probably never going to happen, but it is here to keep
- # the API code consistent
- raise MultipleVGsError(vg_name)
- return vgs[0]
-
-
def create_vg(devices, name=None, name_prefix=None):
"""
Create a Volume Group. Command looks like::
name] + devices
)
- vg = get_vg(vg_name=name)
- return vg
+ return get_first_vg(filters={'vg_name': name})
def extend_vg(vg, devices):
vg.name] + devices
)
- vg = get_vg(vg_name=vg.name)
- return vg
+ return get_first_vg(filters={'vg_name': vg.name})
def reduce_vg(vg, devices):
vg.name] + devices
)
- vg = get_vg(vg_name=vg.name)
- return vg
+ return get_first_vg(filter={'vg_name': vg.name})
def remove_vg(vg_name):
)
-def get_vg(vg_name=None, vg_tags=None, vgs=None):
+def get_vgs(fields=VG_FIELDS, filters='', tags=None):
"""
- Return a matching vg for the current system, requires ``vg_name`` or
- ``tags``. Raises an error if more than one vg is found.
+ Return a list of VGs that are available on the system and match the
+ filters and tags passed. Argument filters takes a dictionary containing
+ arguments required by -S option of LVM. Passing a list of LVM tags can be
+ quite tricky to pass as a dictionary within dictionary, therefore pass
+ dictionary of tags via tags argument and tricky part will be taken care of
+ by the helper methods.
- It is useful to use ``tags`` when trying to find a specific volume group,
- but it can also lead to multiple vgs being found.
+ :param fields: string containing list of fields to be displayed by the
+ vgs command
+ :param sep: string containing separator to be used between two fields
+ :param filters: dictionary containing LVM filters
+ :param tags: dictionary containng LVM tags
+ :returns: list of class VolumeGroup object representing vgs on the system
"""
- if not any([vg_name, vg_tags]):
- return None
- if vgs is None or len(vgs) == 0:
- vgs = VolumeGroups()
+ filters = make_filters_lvmcmd_ready(filters, tags)
+ args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters, '-o', fields]
- return vgs.get(vg_name=vg_name, vg_tags=vg_tags)
+ stdout, stderr, returncode = process.call(args, verbose_on_failure=False)
+ vgs_report =_output_parser(stdout, fields)
+ return [VolumeGroup(**vg_report) for vg_report in vgs_report]
+
+
+def get_first_vg(fields=VG_FIELDS, filters=None, tags=None):
+ """
+ Wrapper of get_vg meant to be a convenience method to avoid the phrase::
+ vgs = get_vgs()
+ if len(vgs) >= 1:
+ vg = vgs[0]
+ """
+ vgs = get_vgs(fields=fields, filters=filters, tags=tags)
+ return vgs[0] if len(vgs) > 0 else []
def get_device_vgs(device, name_prefix=''):
LV_FIELDS = 'lv_tags,lv_path,lv_name,vg_name,lv_uuid,lv_size'
LV_CMD_OPTIONS = ['--noheadings', '--readonly', '--separator=";"', '-a']
-def get_api_lvs():
- """
- Return the list of logical volumes available in the system using flags to include common
- metadata associated with them
-
- Command and delimited output should look like::
-
- $ lvs --noheadings --readonly --separator=';' -a -o lv_tags,lv_path,lv_name,vg_name
- ;/dev/ubuntubox-vg/root;root;ubuntubox-vg
- ;/dev/ubuntubox-vg/swap_1;swap_1;ubuntubox-vg
-
- """
- stdout, stderr, returncode = process.call(
- ['lvs'] + LV_CMD_OPTIONS + ['-o', LV_FIELDS],
- verbose_on_failure=False
- )
- return _output_parser(stdout, LV_FIELDS)
-
class Volume(object):
"""
process.call(['lvchange', '-an', self.lv_path])
-class Volumes(list):
- """
- A list of all known (logical) volumes for the current system, with the ability
- to filter them via keyword arguments.
- """
-
- def __init__(self):
- self._populate()
-
- def _populate(self):
- # get all the lvs in the current system
- for lv_item in get_api_lvs():
- self.append(Volume(**lv_item))
-
- def _purge(self):
- """
- Delete all the items in the list, used internally only so that we can
- dynamically allocate the items when filtering without the concern of
- messing up the contents
- """
- self[:] = []
-
- def _filter(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
- """
- The actual method that filters using a new list. Useful so that other
- methods that do not want to alter the contents of the list (e.g.
- ``self.find``) can operate safely.
- """
- filtered = [i for i in self]
- if lv_name:
- filtered = [i for i in filtered if i.lv_name == lv_name]
-
- if vg_name:
- filtered = [i for i in filtered if i.vg_name == vg_name]
-
- if lv_uuid:
- filtered = [i for i in filtered if i.lv_uuid == lv_uuid]
-
- if lv_path:
- filtered = [i for i in filtered if i.lv_path == lv_path]
-
- # at this point, `filtered` has either all the volumes in self or is an
- # actual filtered list if any filters were applied
- if lv_tags:
- tag_filtered = []
- for volume in filtered:
- # all the tags we got need to match on the volume
- matches = all(volume.tags.get(k) == str(v) for k, v in lv_tags.items())
- if matches:
- tag_filtered.append(volume)
- return tag_filtered
-
- return filtered
-
- def filter(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
- """
- Filter out volumes on top level attributes like ``lv_name`` or by
- ``lv_tags`` where a dict is required. For example, to find a volume
- that has an OSD ID of 0, the filter would look like::
-
- lv_tags={'ceph.osd_id': '0'}
-
- """
- if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
- raise TypeError('.filter() requires lv_name, vg_name, lv_path, lv_uuid, or tags (none given)')
- # first find the filtered volumes with the values in self
- filtered_volumes = self._filter(
- lv_name=lv_name,
- vg_name=vg_name,
- lv_path=lv_path,
- lv_uuid=lv_uuid,
- lv_tags=lv_tags
- )
- # then purge everything
- self._purge()
- # and add the filtered items
- self.extend(filtered_volumes)
-
- def get(self, lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None):
- """
- This is a bit expensive, since it will try to filter out all the
- matching items in the list, filter them out applying anything that was
- added and return the matching item.
-
- This method does *not* alter the list, and it will raise an error if
- multiple LVs are matched
-
- It is useful to use ``tags`` when trying to find a specific logical volume,
- but it can also lead to multiple lvs being found, since a lot of metadata
- is shared between lvs of a distinct OSD.
- """
- if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
- return None
- lvs = self._filter(
- lv_name=lv_name,
- vg_name=vg_name,
- lv_path=lv_path,
- lv_uuid=lv_uuid,
- lv_tags=lv_tags
- )
- if not lvs:
- return None
- if len(lvs) > 1:
- raise MultipleLVsError(lv_name, lv_path)
- return lvs[0]
-
-
def create_lv(name_prefix,
uuid,
vg=None,
]
process.run(command)
- lv = get_lv(lv_name=name, vg_name=vg.vg_name)
+ lv = get_first_lv(filters={'lv_name': name, 'vg_name': vg.vg_name})
if tags is None:
tags = {
return lv
-def remove_lv(lv):
- """
- Removes a logical volume given it's absolute path.
-
- Will return True if the lv is successfully removed or
- raises a RuntimeError if the removal fails.
-
- :param lv: A ``Volume`` object or the path for an LV
- """
- if isinstance(lv, Volume):
- path = lv.lv_path
- else:
- path = lv
-
- stdout, stderr, returncode = process.call(
- [
- 'lvremove',
- '-v', # verbose
- '-f', # force it
- path
- ],
- show_command=True,
- terminal_verbose=True,
- )
- if returncode != 0:
- raise RuntimeError("Unable to remove %s" % path)
- return True
-
-
-def is_lv(dev, lvs=None):
- """
- Boolean to detect if a device is an LV or not.
- """
- splitname = dmsetup_splitname(dev)
- # Allowing to optionally pass `lvs` can help reduce repetitive checks for
- # multiple devices at once.
- if lvs is None or len(lvs) == 0:
- lvs = Volumes()
-
- if splitname.get('LV_NAME'):
- lvs.filter(lv_name=splitname['LV_NAME'], vg_name=splitname['VG_NAME'])
- return len(lvs) > 0
- return False
-
-def get_lv_by_name(name):
- stdout, stderr, returncode = process.call(
- ['lvs', '--noheadings', '-o', LV_FIELDS, '-S',
- 'lv_name={}'.format(name)],
- verbose_on_failure=False
- )
- lvs = _output_parser(stdout, LV_FIELDS)
- return [Volume(**lv) for lv in lvs]
-
-def get_lvs_by_tag(lv_tag):
- stdout, stderr, returncode = process.call(
- ['lvs', '--noheadings', '--separator=";"', '-a', '-o', LV_FIELDS, '-S',
- 'lv_tags={{{}}}'.format(lv_tag)],
- verbose_on_failure=False
- )
- lvs = _output_parser(stdout, LV_FIELDS)
- return [Volume(**lv) for lv in lvs]
-
-def get_lv(lv_name=None, vg_name=None, lv_path=None, lv_uuid=None, lv_tags=None, lvs=None):
- """
- Return a matching lv for the current system, requiring ``lv_name``,
- ``vg_name``, ``lv_path`` or ``tags``. Raises an error if more than one lv
- is found.
-
- It is useful to use ``tags`` when trying to find a specific logical volume,
- but it can also lead to multiple lvs being found, since a lot of metadata
- is shared between lvs of a distinct OSD.
- """
- if not any([lv_name, vg_name, lv_path, lv_uuid, lv_tags]):
- return None
- if lvs is None:
- lvs = Volumes()
- return lvs.get(
- lv_name=lv_name, vg_name=vg_name, lv_path=lv_path, lv_uuid=lv_uuid,
- lv_tags=lv_tags
- )
-
-
-def get_lv_from_argument(argument):
- """
- Helper proxy function that consumes a possible logical volume passed in from the CLI
- in the form of `vg/lv`, but with some validation so that an argument that is a full
- path to a device can be ignored
- """
- if argument.startswith('/'):
- lv = get_lv(lv_path=argument)
- return lv
- try:
- vg_name, lv_name = argument.split('/')
- except (ValueError, AttributeError):
- return None
- return get_lv(lv_name=lv_name, vg_name=vg_name)
-
-
def create_lvs(volume_group, parts=None, size=None, name_prefix='ceph-lv'):
"""
Create multiple Logical Volumes from a Volume Group by calculating the
return lvs
-def get_device_lvs(device, name_prefix=''):
- stdout, stderr, returncode = process.call(
- ['pvs'] + LV_CMD_OPTIONS + ['-o', LV_FIELDS, device],
- verbose_on_failure=False
- )
- lvs = _output_parser(stdout, LV_FIELDS)
- return [Volume(**lv) for lv in lvs if lv['lv_name'] and
- lv['lv_name'].startswith(name_prefix)]
-
-
-#############################################################
-#
-# New methods to get PVs, LVs, and VGs.
-# Later, these can be easily merged with get_api_* methods
-#
-###########################################################
-
-def convert_filters_to_str(filters):
- """
- Convert filter args from dictionary to following format -
- filters={filter_name=filter_val,...}
- """
- if not filters:
- return filters
-
- filter_arg = ''
- for k, v in filters.items():
- filter_arg += k + '=' + v + ','
- # get rid of extra comma at the end
- filter_arg = filter_arg[:len(filter_arg) - 1]
-
- return filter_arg
-
-def convert_tags_to_str(tags):
- """
- Convert tags from dictionary to following format -
- tags={tag_name=tag_val,...}
+def remove_lv(lv):
"""
- if not tags:
- return tags
-
- tag_arg = 'tags={'
- for k, v in tags.items():
- tag_arg += k + '=' + v + ','
- # get rid of extra comma at the end
- tag_arg = tag_arg[:len(tag_arg) - 1] + '}'
-
- return tag_arg
+ Removes a logical volume given it's absolute path.
-def make_filters_lvmcmd_ready(filters, tags):
- """
- Convert filters (including tags) from dictionary to following format -
- filter_name=filter_val...,tags={tag_name=tag_val,...}
+ Will return True if the lv is successfully removed or
+ raises a RuntimeError if the removal fails.
- The command will look as follows =
- lvs -S filter_name=filter_val...,tags={tag_name=tag_val,...}
+ :param lv: A ``Volume`` object or the path for an LV
"""
- filters = convert_filters_to_str(filters)
- tags = convert_tags_to_str(tags)
-
- if filters and tags:
- return filters + ',' + tags
- if filters and not tags:
- return filters
- if not filters and tags:
- return tags
+ if isinstance(lv, Volume):
+ path = lv.lv_path
else:
- return ''
-
-def get_pvs(fields=PV_FIELDS, filters='', tags=None):
- """
- Return a list of PVs that are available on the system and match the
- filters and tags passed. Argument filters takes a dictionary containing
- arguments required by -S option of LVM. Passing a list of LVM tags can be
- quite tricky to pass as a dictionary within dictionary, therefore pass
- dictionary of tags via tags argument and tricky part will be taken care of
- by the helper methods.
-
- :param fields: string containing list of fields to be displayed by the
- pvs command
- :param sep: string containing separator to be used between two fields
- :param filters: dictionary containing LVM filters
- :param tags: dictionary containng LVM tags
- :returns: list of class PVolume object representing pvs on the system
- """
- filters = make_filters_lvmcmd_ready(filters, tags)
- args = ['pvs', '--no-heading', '--readonly', '--separator=";"', '-S',
- filters, '-o', fields]
-
- stdout, stderr, returncode = process.call(args, verbose_on_failure=False)
- pvs_report = _output_parser(stdout, fields)
- return [PVolume(**pv_report) for pv_report in pvs_report]
-
-def get_first_pv(fields=PV_FIELDS, filters=None, tags=None):
- """
- Wrapper of get_pv meant to be a convenience method to avoid the phrase::
- pvs = get_pvs()
- if len(pvs) >= 1:
- pv = pvs[0]
- """
- pvs = get_pvs(fields=fields, filters=filters, tags=tags)
- return pvs[0] if len(pvs) > 0 else []
-
-def get_vgs(fields=VG_FIELDS, filters='', tags=None):
- """
- Return a list of VGs that are available on the system and match the
- filters and tags passed. Argument filters takes a dictionary containing
- arguments required by -S option of LVM. Passing a list of LVM tags can be
- quite tricky to pass as a dictionary within dictionary, therefore pass
- dictionary of tags via tags argument and tricky part will be taken care of
- by the helper methods.
-
- :param fields: string containing list of fields to be displayed by the
- vgs command
- :param sep: string containing separator to be used between two fields
- :param filters: dictionary containing LVM filters
- :param tags: dictionary containng LVM tags
- :returns: list of class VolumeGroup object representing vgs on the system
- """
- filters = make_filters_lvmcmd_ready(filters, tags)
- args = ['vgs'] + VG_CMD_OPTIONS + ['-S', filters, '-o', fields]
+ path = lv
- stdout, stderr, returncode = process.call(args, verbose_on_failure=False)
- vgs_report =_output_parser(stdout, fields)
- return [VolumeGroup(**vg_report) for vg_report in vgs_report]
+ stdout, stderr, returncode = process.call(
+ [
+ 'lvremove',
+ '-v', # verbose
+ '-f', # force it
+ path
+ ],
+ show_command=True,
+ terminal_verbose=True,
+ )
+ if returncode != 0:
+ raise RuntimeError("Unable to remove %s" % path)
+ return True
-def get_first_vg(fields=VG_FIELDS, filters=None, tags=None):
- """
- Wrapper of get_vg meant to be a convenience method to avoid the phrase::
- vgs = get_vgs()
- if len(vgs) >= 1:
- vg = vgs[0]
- """
- vgs = get_vgs(fields=fields, filters=filters, tags=tags)
- return vgs[0] if len(vgs) > 0 else []
def get_lvs(fields=LV_FIELDS, filters='', tags=None):
"""
lvs_report = _output_parser(stdout, fields)
return [Volume(**lv_report) for lv_report in lvs_report]
+
def get_first_lv(fields=LV_FIELDS, filters=None, tags=None):
"""
Wrapper of get_lv meant to be a convenience method to avoid the phrase::
"""
lvs = get_lvs(fields=fields, filters=filters, tags=tags)
return lvs[0] if len(lvs) > 0 else []
+
+
+def get_lv_by_name(name):
+ stdout, stderr, returncode = process.call(
+ ['lvs', '--noheadings', '-o', LV_FIELDS, '-S',
+ 'lv_name={}'.format(name)],
+ verbose_on_failure=False
+ )
+ lvs = _output_parser(stdout, LV_FIELDS)
+ return [Volume(**lv) for lv in lvs]
+
+
+def get_lvs_by_tag(lv_tag):
+ stdout, stderr, returncode = process.call(
+ ['lvs', '--noheadings', '--separator=";"', '-a', '-o', LV_FIELDS, '-S',
+ 'lv_tags={{{}}}'.format(lv_tag)],
+ verbose_on_failure=False
+ )
+ lvs = _output_parser(stdout, LV_FIELDS)
+ return [Volume(**lv) for lv in lvs]
+
+
+def get_device_lvs(device, name_prefix=''):
+ stdout, stderr, returncode = process.call(
+ ['pvs'] + LV_CMD_OPTIONS + ['-o', LV_FIELDS, device],
+ verbose_on_failure=False
+ )
+ lvs = _output_parser(stdout, LV_FIELDS)
+ return [Volume(**lv) for lv in lvs if lv['lv_name'] and
+ lv['lv_name'].startswith(name_prefix)]