3 from typing
import List
, Optional
, Dict
, Callable
5 from ..inventory
import Device
6 from ..drive_group
import DriveGroupSpec
, DeviceSelection
, DriveGroupValidationError
8 from .filter import FilterGenerator
9 from .matchers
import _MatchInvalid
11 logger
= logging
.getLogger(__name__
)
14 def to_dg_exception(f
: Callable
) -> Callable
[['DriveSelection', str,
15 Optional
['DeviceSelection']],
17 def wrapper(self
: 'DriveSelection', name
: str, ds
: Optional
['DeviceSelection']) -> List
[Device
]:
20 except _MatchInvalid
as e
:
21 raise DriveGroupValidationError(f
'{self.spec.service_id}.{name}', e
.args
[0])
25 class DriveSelection(object):
27 spec
, # type: DriveGroupSpec
28 disks
, # type: List[Device]
29 existing_daemons
=None, # type: Optional[int]
31 self
.disks
= disks
.copy()
33 self
.existing_daemons
= existing_daemons
or 0
35 self
._data
= self
.assign_devices('data_devices', self
.spec
.data_devices
)
36 self
._wal
= self
.assign_devices('wal_devices', self
.spec
.wal_devices
)
37 self
._db
= self
.assign_devices('db_devices', self
.spec
.db_devices
)
38 self
._journal
= self
.assign_devices('journal_devices', self
.spec
.journal_devices
)
40 def data_devices(self
):
41 # type: () -> List[Device]
44 def wal_devices(self
):
45 # type: () -> List[Device]
49 # type: () -> List[Device]
52 def journal_devices(self
):
53 # type: () -> List[Device]
56 def _limit_reached(self
, device_filter
, len_devices
,
58 # type: (DeviceSelection, int, str) -> bool
59 """ Check for the <limit> property and apply logic
61 If a limit is set in 'device_attrs' we have to stop adding
64 If limit is set (>0) and len(devices) >= limit
66 :param int len_devices: Length of the already populated device set/list
67 :param str disk_path: The disk identifier (for logging purposes)
68 :return: True/False if the device should be added to the list of devices
71 limit
= device_filter
.limit
or 0
73 if limit
> 0 and (len_devices
+ self
.existing_daemons
>= limit
):
74 logger
.info("Refuse to add {} due to limit policy of <{}>".format(
80 def _has_mandatory_idents(disk
):
81 # type: (Device) -> bool
82 """ Check for mandatory identification fields
85 logger
.debug("Found matching disk: {}".format(disk
.path
))
89 "Disk {} doesn't have a 'path' identifier".format(disk
))
92 def assign_devices(self
, device_filter
):
93 # type: (Optional[DeviceSelection]) -> List[Device]
94 """ Assign drives based on used filters
96 Do not add disks when:
98 1) Filter didn't match
99 2) Disk doesn't have a mandatory identification item (path)
100 3) The set :limit was reached
102 After the disk was added we make sure not to re-assign this disk
103 for another defined type[wal/db/journal devices]
105 return a sorted(by path) list of devices
108 if not device_filter
:
109 logger
.debug('device_filter is None')
112 if not self
.spec
.data_devices
:
113 logger
.debug('data_devices is None')
116 if device_filter
.paths
:
117 logger
.debug('device filter is using explicit paths')
118 return device_filter
.paths
120 devices
= list() # type: List[Device]
121 for disk
in self
.disks
:
122 logger
.debug("Processing disk {}".format(disk
.path
))
124 if not disk
.available
and not disk
.ceph_device
:
126 ("Ignoring disk {}. "
127 "Disk is unavailable due to {}".format(disk
.path
, disk
.rejected_reasons
))
131 if not disk
.available
and disk
.ceph_device
and disk
.lvs
:
132 other_osdspec_affinity
= ''
134 if lv
['osdspec_affinity'] != self
.spec
.service_id
:
135 other_osdspec_affinity
= lv
['osdspec_affinity']
137 if other_osdspec_affinity
:
138 logger
.debug("{} is already used in spec {}, "
139 "skipping it.".format(disk
.path
, other_osdspec_affinity
))
142 if not self
._has
_mandatory
_idents
(disk
):
144 "Ignoring disk {}. Missing mandatory idents".format(
148 # break on this condition.
149 if self
._limit
_reached
(device_filter
, len(devices
), disk
.path
):
150 logger
.debug("Ignoring disk {}. Limit reached".format(
157 if self
.spec
.filter_logic
== 'AND':
158 if not all(m
.compare(disk
) for m
in FilterGenerator(device_filter
)):
160 "Ignoring disk {}. Not all filter did match the disk".format(
164 if self
.spec
.filter_logic
== 'OR':
165 if not any(m
.compare(disk
) for m
in FilterGenerator(device_filter
)):
167 "Ignoring disk {}. No filter matched the disk".format(
171 logger
.debug('Adding disk {}'.format(disk
.path
))
174 # This disk is already taken and must not be re-assigned.
175 for taken_device
in devices
:
176 if taken_device
in self
.disks
:
177 self
.disks
.remove(taken_device
)
179 return sorted([x
for x
in devices
], key
=lambda dev
: dev
.path
)
181 def __repr__(self
) -> str:
182 selection
: Dict
[str, List
[str]] = {
183 'data devices': [d
.path
for d
in self
._data
],
184 'wal_devices': [d
.path
for d
in self
._wal
],
185 'db devices': [d
.path
for d
in self
._db
],
186 'journal devices': [d
.path
for d
in self
._journal
]
188 return "DeviceSelection({})".format(
189 ', '.join('{}={}'.format(key
, selection
[key
]) for key
in selection
.keys())