3 from typing
import List
, Optional
, Dict
, Callable
5 from ..inventory
import Device
6 from ..drive_group
import DriveGroupSpec
, DeviceSelection
, DriveGroupValidationError
8 from .filter import FilterGenerator
9 from .matchers
import _MatchInvalid
11 logger
= logging
.getLogger(__name__
)
14 def to_dg_exception(f
: Callable
) -> Callable
[['DriveSelection', str,
15 Optional
['DeviceSelection']],
17 def wrapper(self
: 'DriveSelection', name
: str, ds
: Optional
['DeviceSelection']) -> List
[Device
]:
20 except _MatchInvalid
as e
:
21 raise DriveGroupValidationError(f
'{self.spec.service_id}.{name}', e
.args
[0])
25 class DriveSelection(object):
27 spec
, # type: DriveGroupSpec
28 disks
, # type: List[Device]
29 existing_daemons
=None, # type: Optional[int]
31 self
.disks
= disks
.copy()
33 self
.existing_daemons
= existing_daemons
or 0
35 self
._data
= self
.assign_devices('data_devices', self
.spec
.data_devices
)
36 self
._wal
= self
.assign_devices('wal_devices', self
.spec
.wal_devices
)
37 self
._db
= self
.assign_devices('db_devices', self
.spec
.db_devices
)
38 self
._journal
= self
.assign_devices('journal_devices', self
.spec
.journal_devices
)
40 def data_devices(self
):
41 # type: () -> List[Device]
44 def wal_devices(self
):
45 # type: () -> List[Device]
49 # type: () -> List[Device]
52 def journal_devices(self
):
53 # type: () -> List[Device]
56 def _limit_reached(self
, device_filter
, len_devices
,
58 # type: (DeviceSelection, int, str) -> bool
59 """ Check for the <limit> property and apply logic
61 If a limit is set in 'device_attrs' we have to stop adding
64 If limit is set (>0) and len(devices) >= limit
66 :param int len_devices: Length of the already populated device set/list
67 :param str disk_path: The disk identifier (for logging purposes)
68 :return: True/False if the device should be added to the list of devices
71 limit
= device_filter
.limit
or 0
73 if limit
> 0 and (len_devices
+ self
.existing_daemons
>= limit
):
74 logger
.info("Refuse to add {} due to limit policy of <{}>".format(
80 def _has_mandatory_idents(disk
):
81 # type: (Device) -> bool
82 """ Check for mandatory identification fields
85 logger
.debug("Found matching disk: {}".format(disk
.path
))
89 "Disk {} doesn't have a 'path' identifier".format(disk
))
92 def assign_devices(self
, device_filter
):
93 # type: (Optional[DeviceSelection]) -> List[Device]
94 """ Assign drives based on used filters
96 Do not add disks when:
98 1) Filter didn't match
99 2) Disk doesn't have a mandatory identification item (path)
100 3) The set :limit was reached
102 After the disk was added we make sure not to re-assign this disk
103 for another defined type[wal/db/journal devices]
105 return a sorted(by path) list of devices
108 if not device_filter
:
109 logger
.debug('device_filter is None')
112 if not self
.spec
.data_devices
:
113 logger
.debug('data_devices is None')
116 if device_filter
.paths
:
117 logger
.debug('device filter is using explicit paths')
118 return device_filter
.paths
120 devices
= list() # type: List[Device]
121 for disk
in self
.disks
:
122 logger
.debug("Processing disk {}".format(disk
.path
))
124 if not disk
.available
and not disk
.ceph_device
:
126 ("Ignoring disk {}. "
127 "Disk is unavailable due to {}".format(disk
.path
, disk
.rejected_reasons
))
131 if not self
._has
_mandatory
_idents
(disk
):
133 "Ignoring disk {}. Missing mandatory idents".format(
137 # break on this condition.
138 if self
._limit
_reached
(device_filter
, len(devices
), disk
.path
):
139 logger
.debug("Ignoring disk {}. Limit reached".format(
146 if self
.spec
.filter_logic
== 'AND':
147 if not all(m
.compare(disk
) for m
in FilterGenerator(device_filter
)):
149 "Ignoring disk {}. Not all filter did match the disk".format(
153 if self
.spec
.filter_logic
== 'OR':
154 if not any(m
.compare(disk
) for m
in FilterGenerator(device_filter
)):
156 "Ignoring disk {}. No filter matched the disk".format(
160 logger
.debug('Adding disk {}'.format(disk
.path
))
163 # This disk is already taken and must not be re-assigned.
164 for taken_device
in devices
:
165 if taken_device
in self
.disks
:
166 self
.disks
.remove(taken_device
)
168 return sorted([x
for x
in devices
], key
=lambda dev
: dev
.path
)
170 def __repr__(self
) -> str:
171 selection
: Dict
[str, List
[str]] = {
172 'data devices': [d
.path
for d
in self
._data
],
173 'wal_devices': [d
.path
for d
in self
._wal
],
174 'db devices': [d
.path
for d
in self
._db
],
175 'journal devices': [d
.path
for d
in self
._journal
]
177 return "DeviceSelection({})".format(
178 ', '.join('{}={}'.format(key
, selection
[key
]) for key
in selection
.keys())