]> git.proxmox.com Git - ceph.git/blob - ceph/src/python-common/ceph/deployment/drive_selection/selector.py
07b0549f3a201f726b44eb7e4998e6c510efd248
[ceph.git] / ceph / src / python-common / ceph / deployment / drive_selection / selector.py
1 import logging
2
3 from typing import List, Optional, Dict, Callable
4
5 from ..inventory import Device
6 from ..drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError
7
8 from .filter import FilterGenerator
9 from .matchers import _MatchInvalid
10
11 logger = logging.getLogger(__name__)
12
13
14 def to_dg_exception(f: Callable) -> Callable[['DriveSelection', str,
15 Optional['DeviceSelection']],
16 List['Device']]:
17 def wrapper(self: 'DriveSelection', name: str, ds: Optional['DeviceSelection']) -> List[Device]:
18 try:
19 return f(self, ds)
20 except _MatchInvalid as e:
21 raise DriveGroupValidationError(f'{self.spec.service_id}.{name}', e.args[0])
22 return wrapper
23
24
25 class DriveSelection(object):
26 def __init__(self,
27 spec, # type: DriveGroupSpec
28 disks, # type: List[Device]
29 existing_daemons=None, # type: Optional[int]
30 ):
31 self.disks = disks.copy()
32 self.spec = spec
33 self.existing_daemons = existing_daemons or 0
34
35 self._data = self.assign_devices('data_devices', self.spec.data_devices)
36 self._wal = self.assign_devices('wal_devices', self.spec.wal_devices)
37 self._db = self.assign_devices('db_devices', self.spec.db_devices)
38 self._journal = self.assign_devices('journal_devices', self.spec.journal_devices)
39
40 def data_devices(self):
41 # type: () -> List[Device]
42 return self._data
43
44 def wal_devices(self):
45 # type: () -> List[Device]
46 return self._wal
47
48 def db_devices(self):
49 # type: () -> List[Device]
50 return self._db
51
52 def journal_devices(self):
53 # type: () -> List[Device]
54 return self._journal
55
56 def _limit_reached(self, device_filter, len_devices,
57 disk_path):
58 # type: (DeviceSelection, int, str) -> bool
59 """ Check for the <limit> property and apply logic
60
61 If a limit is set in 'device_attrs' we have to stop adding
62 disks at some point.
63
64 If limit is set (>0) and len(devices) >= limit
65
66 :param int len_devices: Length of the already populated device set/list
67 :param str disk_path: The disk identifier (for logging purposes)
68 :return: True/False if the device should be added to the list of devices
69 :rtype: bool
70 """
71 limit = device_filter.limit or 0
72
73 if limit > 0 and (len_devices + self.existing_daemons >= limit):
74 logger.info("Refuse to add {} due to limit policy of <{}>".format(
75 disk_path, limit))
76 return True
77 return False
78
79 @staticmethod
80 def _has_mandatory_idents(disk):
81 # type: (Device) -> bool
82 """ Check for mandatory identification fields
83 """
84 if disk.path:
85 logger.debug("Found matching disk: {}".format(disk.path))
86 return True
87 else:
88 raise Exception(
89 "Disk {} doesn't have a 'path' identifier".format(disk))
90
91 @to_dg_exception
92 def assign_devices(self, device_filter):
93 # type: (Optional[DeviceSelection]) -> List[Device]
94 """ Assign drives based on used filters
95
96 Do not add disks when:
97
98 1) Filter didn't match
99 2) Disk doesn't have a mandatory identification item (path)
100 3) The set :limit was reached
101
102 After the disk was added we make sure not to re-assign this disk
103 for another defined type[wal/db/journal devices]
104
105 return a sorted(by path) list of devices
106 """
107
108 if not device_filter:
109 logger.debug('device_filter is None')
110 return []
111
112 if not self.spec.data_devices:
113 logger.debug('data_devices is None')
114 return []
115
116 if device_filter.paths:
117 logger.debug('device filter is using explicit paths')
118 return device_filter.paths
119
120 devices = list() # type: List[Device]
121 for disk in self.disks:
122 logger.debug("Processing disk {}".format(disk.path))
123
124 if not disk.available and not disk.ceph_device:
125 logger.debug(
126 ("Ignoring disk {}. "
127 "Disk is unavailable due to {}".format(disk.path, disk.rejected_reasons))
128 )
129 continue
130
131 if not self._has_mandatory_idents(disk):
132 logger.debug(
133 "Ignoring disk {}. Missing mandatory idents".format(
134 disk.path))
135 continue
136
137 # break on this condition.
138 if self._limit_reached(device_filter, len(devices), disk.path):
139 logger.debug("Ignoring disk {}. Limit reached".format(
140 disk.path))
141 break
142
143 if disk in devices:
144 continue
145
146 if self.spec.filter_logic == 'AND':
147 if not all(m.compare(disk) for m in FilterGenerator(device_filter)):
148 logger.debug(
149 "Ignoring disk {}. Not all filter did match the disk".format(
150 disk.path))
151 continue
152
153 if self.spec.filter_logic == 'OR':
154 if not any(m.compare(disk) for m in FilterGenerator(device_filter)):
155 logger.debug(
156 "Ignoring disk {}. No filter matched the disk".format(
157 disk.path))
158 continue
159
160 logger.debug('Adding disk {}'.format(disk.path))
161 devices.append(disk)
162
163 # This disk is already taken and must not be re-assigned.
164 for taken_device in devices:
165 if taken_device in self.disks:
166 self.disks.remove(taken_device)
167
168 return sorted([x for x in devices], key=lambda dev: dev.path)
169
170 def __repr__(self) -> str:
171 selection: Dict[str, List[str]] = {
172 'data devices': [d.path for d in self._data],
173 'wal_devices': [d.path for d in self._wal],
174 'db devices': [d.path for d in self._db],
175 'journal devices': [d.path for d in self._journal]
176 }
177 return "DeviceSelection({})".format(
178 ', '.join('{}={}'.format(key, selection[key]) for key in selection.keys())
179 )