]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import json |
2 | import logging | |
20effc67 | 3 | from asyncio import gather |
adb31ebb | 4 | from threading import Lock |
f67539c2 | 5 | from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING |
e306af50 TL |
6 | |
7 | from ceph.deployment import translate | |
8 | from ceph.deployment.drive_group import DriveGroupSpec | |
9 | from ceph.deployment.drive_selection import DriveSelection | |
f91f0fd5 | 10 | from ceph.deployment.inventory import Device |
adb31ebb | 11 | from ceph.utils import datetime_to_str, str_to_datetime |
e306af50 | 12 | |
f6b5b4d7 | 13 | from datetime import datetime |
e306af50 | 14 | import orchestrator |
f67539c2 | 15 | from cephadm.serve import CephadmServe |
aee94f69 | 16 | from cephadm.utils import SpecialHostLabels |
f67539c2 | 17 | from ceph.utils import datetime_now |
a4b75251 | 18 | from orchestrator import OrchestratorError, DaemonDescription |
e306af50 TL |
19 | from mgr_module import MonCommandFailed |
20 | ||
f67539c2 | 21 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService |
f91f0fd5 TL |
22 | |
23 | if TYPE_CHECKING: | |
24 | from cephadm.module import CephadmOrchestrator | |
e306af50 TL |
25 | |
26 | logger = logging.getLogger(__name__) | |
27 | ||
28 | ||
f91f0fd5 | 29 | class OSDService(CephService): |
f6b5b4d7 TL |
30 | TYPE = 'osd' |
31 | ||
32 | def create_from_spec(self, drive_group: DriveGroupSpec) -> str: | |
e306af50 | 33 | logger.debug(f"Processing DriveGroup {drive_group}") |
b3b6e05e TL |
34 | osd_id_claims = OsdIdClaims(self.mgr) |
35 | if osd_id_claims.get(): | |
adb31ebb | 36 | logger.info( |
b3b6e05e | 37 | f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}") |
f6b5b4d7 | 38 | |
20effc67 | 39 | async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]: |
f67539c2 TL |
40 | # skip this host if there has been no change in inventory |
41 | if not self.mgr.cache.osdspec_needs_apply(host, drive_group): | |
42 | self.mgr.log.debug("skipping apply of %s on %s (no change)" % ( | |
43 | host, drive_group)) | |
44 | return None | |
b3b6e05e | 45 | # skip this host if we cannot schedule here |
aee94f69 | 46 | if self.mgr.inventory.has_label(host, SpecialHostLabels.DRAIN_DAEMONS): |
b3b6e05e TL |
47 | return None |
48 | ||
49 | osd_id_claims_for_host = osd_id_claims.filtered_by_host(host) | |
f67539c2 | 50 | |
33c7a0ef TL |
51 | cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection, |
52 | osd_id_claims_for_host) | |
53 | if not cmds: | |
f91f0fd5 TL |
54 | logger.debug("No data_devices, skipping DriveGroup: {}".format( |
55 | drive_group.service_id)) | |
f6b5b4d7 | 56 | return None |
f67539c2 | 57 | |
b3b6e05e | 58 | logger.debug('Applying service osd.%s on host %s...' % ( |
f67539c2 TL |
59 | drive_group.service_id, host |
60 | )) | |
61 | start_ts = datetime_now() | |
f6b5b4d7 | 62 | env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] |
20effc67 | 63 | ret_msg = await self.create_single_host( |
33c7a0ef | 64 | drive_group, host, cmds, |
b3b6e05e | 65 | replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars |
f67539c2 TL |
66 | ) |
67 | self.mgr.cache.update_osdspec_last_applied( | |
68 | host, drive_group.service_name(), start_ts | |
e306af50 | 69 | ) |
f67539c2 | 70 | self.mgr.cache.save_host(host) |
f6b5b4d7 TL |
71 | return ret_msg |
72 | ||
20effc67 TL |
73 | async def all_hosts() -> List[Optional[str]]: |
74 | futures = [create_from_spec_one(h, ds) | |
75 | for h, ds in self.prepare_drivegroup(drive_group)] | |
76 | return await gather(*futures) | |
77 | ||
1e59de90 TL |
78 | with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'): |
79 | ret = self.mgr.wait_async(all_hosts()) | |
f6b5b4d7 TL |
80 | return ", ".join(filter(None, ret)) |
81 | ||
20effc67 TL |
82 | async def create_single_host(self, |
83 | drive_group: DriveGroupSpec, | |
33c7a0ef | 84 | host: str, cmds: List[str], replace_osd_ids: List[str], |
20effc67 | 85 | env_vars: Optional[List[str]] = None) -> str: |
33c7a0ef TL |
86 | for cmd in cmds: |
87 | out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars) | |
88 | if code == 1 and ', it is already prepared' in '\n'.join(err): | |
89 | # HACK: when we create against an existing LV, ceph-volume | |
90 | # returns an error and the above message. To make this | |
91 | # command idempotent, tolerate this "error" and continue. | |
92 | logger.debug('the device was already prepared; continuing') | |
93 | code = 0 | |
94 | if code: | |
95 | raise RuntimeError( | |
96 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
97 | code, '\n'.join(err))) | |
20effc67 TL |
98 | return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(), |
99 | replace_osd_ids) | |
e306af50 | 100 | |
20effc67 TL |
101 | async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str, |
102 | replace_osd_ids: Optional[List[str]] = None) -> str: | |
f67539c2 TL |
103 | |
104 | if replace_osd_ids is None: | |
b3b6e05e | 105 | replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host) |
f67539c2 | 106 | assert replace_osd_ids is not None |
20effc67 TL |
107 | |
108 | # check result: lvm | |
109 | osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json( | |
e306af50 TL |
110 | host, 'osd', 'ceph-volume', |
111 | [ | |
112 | '--', | |
113 | 'lvm', 'list', | |
114 | '--format', 'json', | |
115 | ]) | |
116 | before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True) | |
e306af50 TL |
117 | fsid = self.mgr._cluster_fsid |
118 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
119 | created = [] | |
120 | for osd_id, osds in osds_elems.items(): | |
121 | for osd in osds: | |
b3b6e05e TL |
122 | if osd['type'] == 'db': |
123 | continue | |
e306af50 TL |
124 | if osd['tags']['ceph.cluster_fsid'] != fsid: |
125 | logger.debug('mismatched fsid, skipping %s' % osd) | |
126 | continue | |
127 | if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: | |
128 | # if it exists but is part of the replacement operation, don't skip | |
129 | continue | |
20effc67 TL |
130 | if self.mgr.cache.has_daemon(f'osd.{osd_id}', host): |
131 | # cephadm daemon instance already exists | |
132 | logger.debug(f'osd id {osd_id} daemon already exists') | |
133 | continue | |
e306af50 TL |
134 | if osd_id not in osd_uuid_map: |
135 | logger.debug('osd id {} does not exist in cluster'.format(osd_id)) | |
136 | continue | |
137 | if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: | |
138 | logger.debug('mismatched osd uuid (cluster has %s, osd ' | |
f91f0fd5 TL |
139 | 'has %s)' % ( |
140 | osd_uuid_map.get(osd_id), | |
141 | osd['tags']['ceph.osd_fsid'])) | |
e306af50 TL |
142 | continue |
143 | ||
144 | created.append(osd_id) | |
f67539c2 TL |
145 | daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec( |
146 | service_name=service_name, | |
20effc67 | 147 | daemon_id=str(osd_id), |
f6b5b4d7 TL |
148 | host=host, |
149 | daemon_type='osd', | |
150 | ) | |
f67539c2 | 151 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
20effc67 | 152 | await CephadmServe(self.mgr)._create_daemon( |
f6b5b4d7 | 153 | daemon_spec, |
e306af50 TL |
154 | osd_uuid_map=osd_uuid_map) |
155 | ||
20effc67 TL |
156 | # check result: raw |
157 | raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json( | |
158 | host, 'osd', 'ceph-volume', | |
159 | [ | |
160 | '--', | |
161 | 'raw', 'list', | |
162 | '--format', 'json', | |
163 | ]) | |
164 | for osd_uuid, osd in raw_elems.items(): | |
165 | if osd.get('ceph_fsid') != fsid: | |
166 | continue | |
167 | osd_id = str(osd.get('osd_id', '-1')) | |
168 | if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: | |
169 | # if it exists but is part of the replacement operation, don't skip | |
170 | continue | |
171 | if self.mgr.cache.has_daemon(f'osd.{osd_id}', host): | |
172 | # cephadm daemon instance already exists | |
173 | logger.debug(f'osd id {osd_id} daemon already exists') | |
174 | continue | |
175 | if osd_id not in osd_uuid_map: | |
176 | logger.debug('osd id {} does not exist in cluster'.format(osd_id)) | |
177 | continue | |
178 | if osd_uuid_map.get(osd_id) != osd_uuid: | |
179 | logger.debug('mismatched osd uuid (cluster has %s, osd ' | |
180 | 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid)) | |
181 | continue | |
182 | if osd_id in created: | |
183 | continue | |
184 | ||
185 | created.append(osd_id) | |
186 | daemon_spec = CephadmDaemonDeploySpec( | |
187 | service_name=service_name, | |
188 | daemon_id=osd_id, | |
189 | host=host, | |
190 | daemon_type='osd', | |
191 | ) | |
192 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) | |
193 | await CephadmServe(self.mgr)._create_daemon( | |
194 | daemon_spec, | |
195 | osd_uuid_map=osd_uuid_map) | |
196 | ||
e306af50 TL |
197 | if created: |
198 | self.mgr.cache.invalidate_host_devices(host) | |
b3b6e05e | 199 | self.mgr.cache.invalidate_autotune(host) |
e306af50 TL |
200 | return "Created osd(s) %s on host '%s'" % (','.join(created), host) |
201 | else: | |
202 | return "Created no osd(s) on host %s; already created?" % host | |
203 | ||
204 | def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: | |
205 | # 1) use fn_filter to determine matching_hosts | |
f91f0fd5 | 206 | matching_hosts = drive_group.placement.filter_matching_hostspecs( |
20effc67 | 207 | self.mgr.cache.get_schedulable_hosts()) |
e306af50 TL |
208 | # 2) Map the inventory to the InventoryHost object |
209 | host_ds_map = [] | |
210 | ||
211 | # set osd_id_claims | |
212 | ||
f91f0fd5 | 213 | def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]: |
e306af50 TL |
214 | # This is stupid and needs to be loaded with the host |
215 | for _host, _inventory in inventory_dict.items(): | |
216 | if _host == hostname: | |
217 | return _inventory | |
218 | raise OrchestratorError("No inventory found for host: {}".format(hostname)) | |
219 | ||
220 | # 3) iterate over matching_host and call DriveSelection | |
221 | logger.debug(f"Checking matching hosts -> {matching_hosts}") | |
222 | for host in matching_hosts: | |
223 | inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices) | |
224 | logger.debug(f"Found inventory for host {inventory_for_host}") | |
f91f0fd5 TL |
225 | |
226 | # List of Daemons on that host | |
227 | dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name()) | |
228 | dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host] | |
229 | ||
230 | drive_selection = DriveSelection(drive_group, inventory_for_host, | |
231 | existing_daemons=len(dd_for_spec_and_host)) | |
e306af50 | 232 | logger.debug(f"Found drive selection {drive_selection}") |
33c7a0ef TL |
233 | if drive_group.method and drive_group.method == 'raw': |
234 | # ceph-volume can currently only handle a 1:1 mapping | |
235 | # of data/db/wal devices for raw mode osds. If db/wal devices | |
236 | # are defined and the number does not match the number of data | |
237 | # devices, we need to bail out | |
238 | if drive_selection.data_devices() and drive_selection.db_devices(): | |
239 | if len(drive_selection.data_devices()) != len(drive_selection.db_devices()): | |
240 | raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found ' | |
241 | f'{len(drive_selection.data_devices())} potential data device(s) and ' | |
242 | f'{len(drive_selection.db_devices())} potential db device(s) on host {host}') | |
243 | if drive_selection.data_devices() and drive_selection.wal_devices(): | |
244 | if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()): | |
245 | raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found ' | |
246 | f'{len(drive_selection.data_devices())} potential data device(s) and ' | |
247 | f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}') | |
e306af50 TL |
248 | host_ds_map.append((host, drive_selection)) |
249 | return host_ds_map | |
250 | ||
f91f0fd5 TL |
251 | @staticmethod |
252 | def driveselection_to_ceph_volume(drive_selection: DriveSelection, | |
e306af50 | 253 | osd_id_claims: Optional[List[str]] = None, |
33c7a0ef | 254 | preview: bool = False) -> List[str]: |
f6b5b4d7 | 255 | logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command") |
33c7a0ef TL |
256 | cmds: List[str] = translate.to_ceph_volume(drive_selection, |
257 | osd_id_claims, preview=preview).run() | |
258 | logger.debug(f"Resulting ceph-volume cmds: {cmds}") | |
259 | return cmds | |
e306af50 | 260 | |
f91f0fd5 | 261 | def get_previews(self, host: str) -> List[Dict[str, Any]]: |
e306af50 | 262 | # Find OSDSpecs that match host. |
f6b5b4d7 | 263 | osdspecs = self.resolve_osdspecs_for_host(host) |
e306af50 TL |
264 | return self.generate_previews(osdspecs, host) |
265 | ||
266 | def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]: | |
267 | """ | |
268 | ||
269 | The return should look like this: | |
270 | ||
271 | [ | |
272 | {'data': {<metadata>}, | |
273 | 'osdspec': <name of osdspec>, | |
a4b75251 TL |
274 | 'host': <name of host>, |
275 | 'notes': <notes> | |
e306af50 TL |
276 | }, |
277 | ||
278 | {'data': ..., | |
279 | 'osdspec': .., | |
a4b75251 TL |
280 | 'host': ..., |
281 | 'notes': ... | |
e306af50 TL |
282 | } |
283 | ] | |
284 | ||
285 | Note: One host can have multiple previews based on its assigned OSDSpecs. | |
286 | """ | |
287 | self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}") | |
288 | ret_all: List[Dict[str, Any]] = [] | |
289 | if not osdspecs: | |
290 | return ret_all | |
291 | for osdspec in osdspecs: | |
292 | ||
293 | # populate osd_id_claims | |
b3b6e05e | 294 | osd_id_claims = OsdIdClaims(self.mgr) |
e306af50 TL |
295 | |
296 | # prepare driveselection | |
297 | for host, ds in self.prepare_drivegroup(osdspec): | |
298 | if host != for_host: | |
299 | continue | |
300 | ||
301 | # driveselection for host | |
33c7a0ef TL |
302 | cmds: List[str] = self.driveselection_to_ceph_volume(ds, |
303 | osd_id_claims.filtered_by_host( | |
304 | host), | |
305 | preview=True) | |
306 | if not cmds: | |
e306af50 TL |
307 | logger.debug("No data_devices, skipping DriveGroup: {}".format( |
308 | osdspec.service_name())) | |
309 | continue | |
310 | ||
311 | # get preview data from ceph-volume | |
33c7a0ef | 312 | for cmd in cmds: |
1e59de90 TL |
313 | with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'): |
314 | out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd)) | |
33c7a0ef TL |
315 | if out: |
316 | try: | |
317 | concat_out: Dict[str, Any] = json.loads(' '.join(out)) | |
318 | except ValueError: | |
319 | logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out)) | |
320 | concat_out = {} | |
321 | notes = [] | |
322 | if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit: | |
323 | found = len(concat_out) | |
324 | limit = osdspec.data_devices.limit | |
325 | notes.append( | |
326 | f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})') | |
327 | ret_all.append({'data': concat_out, | |
328 | 'osdspec': osdspec.service_id, | |
329 | 'host': host, | |
330 | 'notes': notes}) | |
e306af50 TL |
331 | return ret_all |
332 | ||
f6b5b4d7 TL |
333 | def resolve_hosts_for_osdspecs(self, |
334 | specs: Optional[List[DriveGroupSpec]] = None | |
335 | ) -> List[str]: | |
336 | osdspecs = [] | |
337 | if specs: | |
338 | osdspecs = [cast(DriveGroupSpec, spec) for spec in specs] | |
339 | if not osdspecs: | |
340 | self.mgr.log.debug("No OSDSpecs found") | |
341 | return [] | |
20effc67 | 342 | return sum([spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()) for spec in osdspecs], []) |
f6b5b4d7 | 343 | |
f91f0fd5 TL |
344 | def resolve_osdspecs_for_host(self, host: str, |
345 | specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]: | |
f6b5b4d7 TL |
346 | matching_specs = [] |
347 | self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>") | |
348 | if not specs: | |
349 | specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items() | |
350 | if spec.service_type == 'osd'] | |
351 | for spec in specs: | |
20effc67 | 352 | if host in spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()): |
f6b5b4d7 TL |
353 | self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>") |
354 | matching_specs.append(spec) | |
355 | return matching_specs | |
356 | ||
20effc67 TL |
357 | async def _run_ceph_volume_command(self, host: str, |
358 | cmd: str, env_vars: Optional[List[str]] = None | |
359 | ) -> Tuple[List[str], List[str], int]: | |
e306af50 TL |
360 | self.mgr.inventory.assert_host(host) |
361 | ||
362 | # get bootstrap key | |
363 | ret, keyring, err = self.mgr.check_mon_command({ | |
364 | 'prefix': 'auth get', | |
365 | 'entity': 'client.bootstrap-osd', | |
366 | }) | |
367 | ||
e306af50 | 368 | j = json.dumps({ |
f91f0fd5 | 369 | 'config': self.mgr.get_minimal_ceph_conf(), |
e306af50 TL |
370 | 'keyring': keyring, |
371 | }) | |
372 | ||
373 | split_cmd = cmd.split(' ') | |
374 | _cmd = ['--config-json', '-', '--'] | |
375 | _cmd.extend(split_cmd) | |
20effc67 | 376 | out, err, code = await CephadmServe(self.mgr)._run_cephadm( |
e306af50 TL |
377 | host, 'osd', 'ceph-volume', |
378 | _cmd, | |
379 | env_vars=env_vars, | |
380 | stdin=j, | |
381 | error_ok=True) | |
382 | return out, err, code | |
383 | ||
a4b75251 TL |
384 | def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: |
385 | # Do not remove the osd.N keyring, if we failed to deploy the OSD, because | |
386 | # we cannot recover from it. The OSD keys are created by ceph-volume and not by | |
387 | # us. | |
388 | if not is_failed_deploy: | |
389 | super().post_remove(daemon, is_failed_deploy=is_failed_deploy) | |
390 | ||
b3b6e05e TL |
391 | |
392 | class OsdIdClaims(object): | |
393 | """ | |
394 | Retrieve and provide osd ids that can be reused in the cluster | |
395 | """ | |
396 | ||
397 | def __init__(self, mgr: "CephadmOrchestrator") -> None: | |
398 | self.mgr: "CephadmOrchestrator" = mgr | |
399 | self.osd_host_map: Dict[str, List[str]] = dict() | |
400 | self.refresh() | |
401 | ||
402 | def refresh(self) -> None: | |
e306af50 TL |
403 | try: |
404 | ret, out, err = self.mgr.check_mon_command({ | |
405 | 'prefix': 'osd tree', | |
406 | 'states': ['destroyed'], | |
407 | 'format': 'json' | |
408 | }) | |
409 | except MonCommandFailed as e: | |
410 | logger.exception('osd tree failed') | |
411 | raise OrchestratorError(str(e)) | |
412 | try: | |
413 | tree = json.loads(out) | |
adb31ebb TL |
414 | except ValueError: |
415 | logger.exception(f'Cannot decode JSON: \'{out}\'') | |
b3b6e05e | 416 | return |
e306af50 TL |
417 | |
418 | nodes = tree.get('nodes', {}) | |
419 | for node in nodes: | |
420 | if node.get('type') == 'host': | |
b3b6e05e | 421 | self.osd_host_map.update( |
e306af50 TL |
422 | {node.get('name'): [str(_id) for _id in node.get('children', list())]} |
423 | ) | |
b3b6e05e TL |
424 | if self.osd_host_map: |
425 | self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}") | |
426 | ||
427 | def get(self) -> Dict[str, List[str]]: | |
428 | return self.osd_host_map | |
429 | ||
430 | def filtered_by_host(self, host: str) -> List[str]: | |
431 | """ | |
432 | Return the list of osd ids that can be reused in a host | |
433 | ||
434 | OSD id claims in CRUSH map are linked to the bare name of | |
435 | the hostname. In case of FQDN hostnames the host is searched by the | |
436 | bare name | |
437 | """ | |
438 | return self.osd_host_map.get(host.split(".")[0], []) | |
e306af50 TL |
439 | |
440 | ||
e306af50 | 441 | class RemoveUtil(object): |
f91f0fd5 TL |
442 | def __init__(self, mgr: "CephadmOrchestrator") -> None: |
443 | self.mgr: "CephadmOrchestrator" = mgr | |
e306af50 | 444 | |
f6b5b4d7 TL |
445 | def get_osds_in_cluster(self) -> List[str]: |
446 | osd_map = self.mgr.get_osdmap() | |
447 | return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] | |
448 | ||
449 | def osd_df(self) -> dict: | |
450 | base_cmd = 'osd df' | |
451 | ret, out, err = self.mgr.mon_command({ | |
452 | 'prefix': base_cmd, | |
453 | 'format': 'json' | |
454 | }) | |
adb31ebb | 455 | try: |
f67539c2 | 456 | return json.loads(out) |
adb31ebb TL |
457 | except ValueError: |
458 | logger.exception(f'Cannot decode JSON: \'{out}\'') | |
459 | return {} | |
f6b5b4d7 TL |
460 | |
461 | def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: | |
462 | if not osd_df: | |
463 | osd_df = self.osd_df() | |
464 | osd_nodes = osd_df.get('nodes', []) | |
465 | for osd_node in osd_nodes: | |
466 | if osd_node.get('id') == int(osd_id): | |
467 | return osd_node.get('pgs', -1) | |
468 | return -1 | |
e306af50 | 469 | |
f6b5b4d7 | 470 | def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]: |
e306af50 | 471 | """ |
f6b5b4d7 TL |
472 | Cut osd_id list in half until it's ok-to-stop |
473 | ||
474 | :param osds: list of osd_ids | |
475 | :return: list of ods_ids that can be stopped at once | |
e306af50 | 476 | """ |
f6b5b4d7 TL |
477 | if not osds: |
478 | return [] | |
479 | while not self.ok_to_stop(osds): | |
480 | if len(osds) <= 1: | |
481 | # can't even stop one OSD, aborting | |
a4b75251 | 482 | self.mgr.log.debug( |
f91f0fd5 | 483 | "Can't even stop one OSD. Cluster is probably busy. Retrying later..") |
f6b5b4d7 TL |
484 | return [] |
485 | ||
486 | # This potentially prolongs the global wait time. | |
487 | self.mgr.event.wait(1) | |
488 | # splitting osd_ids in half until ok_to_stop yields success | |
489 | # maybe popping ids off one by one is better here..depends on the cluster size I guess.. | |
490 | # There's a lot of room for micro adjustments here | |
491 | osds = osds[len(osds) // 2:] | |
492 | return osds | |
493 | ||
f67539c2 TL |
494 | # todo start draining |
495 | # return all([osd.start_draining() for osd in osds]) | |
f6b5b4d7 TL |
496 | |
497 | def ok_to_stop(self, osds: List["OSD"]) -> bool: | |
e306af50 | 498 | cmd_args = { |
f6b5b4d7 TL |
499 | 'prefix': "osd ok-to-stop", |
500 | 'ids': [str(osd.osd_id) for osd in osds] | |
e306af50 | 501 | } |
a4b75251 | 502 | return self._run_mon_cmd(cmd_args, error_ok=True) |
e306af50 | 503 | |
f6b5b4d7 TL |
504 | def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool: |
505 | base_cmd = f"osd {flag}" | |
506 | self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}") | |
e306af50 | 507 | ret, out, err = self.mgr.mon_command({ |
f6b5b4d7 TL |
508 | 'prefix': base_cmd, |
509 | 'ids': [str(osd.osd_id) for osd in osds] | |
e306af50 TL |
510 | }) |
511 | if ret != 0: | |
f67539c2 TL |
512 | self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>") |
513 | return False | |
514 | self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}") | |
515 | return True | |
516 | ||
517 | def get_weight(self, osd: "OSD") -> Optional[float]: | |
518 | ret, out, err = self.mgr.mon_command({ | |
519 | 'prefix': 'osd crush tree', | |
520 | 'format': 'json', | |
521 | }) | |
522 | if ret != 0: | |
523 | self.mgr.log.error(f"Could not dump crush weights. <{err}>") | |
524 | return None | |
525 | j = json.loads(out) | |
526 | for n in j.get("nodes", []): | |
527 | if n.get("name") == f"osd.{osd.osd_id}": | |
528 | self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}") | |
529 | return n.get("crush_weight") | |
530 | return None | |
531 | ||
532 | def reweight_osd(self, osd: "OSD", weight: float) -> bool: | |
533 | self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}") | |
534 | ret, out, err = self.mgr.mon_command({ | |
535 | 'prefix': "osd crush reweight", | |
536 | 'name': f"osd.{osd.osd_id}", | |
537 | 'weight': weight, | |
538 | }) | |
539 | if ret != 0: | |
540 | self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>") | |
f6b5b4d7 | 541 | return False |
f67539c2 | 542 | self.mgr.log.info(f"{osd} weight is now {weight}") |
f6b5b4d7 | 543 | return True |
e306af50 | 544 | |
a4b75251 TL |
545 | def zap_osd(self, osd: "OSD") -> str: |
546 | "Zaps all devices that are associated with an OSD" | |
547 | if osd.hostname is not None: | |
1e59de90 TL |
548 | cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)] |
549 | if not osd.no_destroy: | |
550 | cmd.append('--destroy') | |
551 | with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'): | |
552 | out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( | |
553 | osd.hostname, 'osd', 'ceph-volume', | |
554 | cmd, | |
555 | error_ok=True)) | |
a4b75251 TL |
556 | self.mgr.cache.invalidate_host_devices(osd.hostname) |
557 | if code: | |
558 | raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) | |
559 | return '\n'.join(out + err) | |
560 | raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown") | |
561 | ||
f6b5b4d7 | 562 | def safe_to_destroy(self, osd_ids: List[int]) -> bool: |
e306af50 TL |
563 | """ Queries the safe-to-destroy flag for OSDs """ |
564 | cmd_args = {'prefix': 'osd safe-to-destroy', | |
f6b5b4d7 | 565 | 'ids': [str(x) for x in osd_ids]} |
a4b75251 | 566 | return self._run_mon_cmd(cmd_args, error_ok=True) |
e306af50 TL |
567 | |
568 | def destroy_osd(self, osd_id: int) -> bool: | |
569 | """ Destroys an OSD (forcefully) """ | |
570 | cmd_args = {'prefix': 'osd destroy-actual', | |
571 | 'id': int(osd_id), | |
572 | 'yes_i_really_mean_it': True} | |
573 | return self._run_mon_cmd(cmd_args) | |
574 | ||
e306af50 TL |
575 | def purge_osd(self, osd_id: int) -> bool: |
576 | """ Purges an OSD from the cluster (forcefully) """ | |
577 | cmd_args = { | |
578 | 'prefix': 'osd purge-actual', | |
579 | 'id': int(osd_id), | |
580 | 'yes_i_really_mean_it': True | |
581 | } | |
582 | return self._run_mon_cmd(cmd_args) | |
583 | ||
a4b75251 | 584 | def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool: |
e306af50 TL |
585 | """ |
586 | Generic command to run mon_command and evaluate/log the results | |
587 | """ | |
588 | ret, out, err = self.mgr.mon_command(cmd_args) | |
589 | if ret != 0: | |
590 | self.mgr.log.debug(f"ran {cmd_args} with mon_command") | |
a4b75251 | 591 | if not error_ok: |
20effc67 TL |
592 | self.mgr.log.error( |
593 | f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") | |
e306af50 TL |
594 | return False |
595 | self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") | |
596 | return True | |
f6b5b4d7 | 597 | |
f6b5b4d7 TL |
598 | |
599 | class NotFoundError(Exception): | |
600 | pass | |
601 | ||
602 | ||
603 | class OSD: | |
604 | ||
605 | def __init__(self, | |
606 | osd_id: int, | |
607 | remove_util: RemoveUtil, | |
608 | drain_started_at: Optional[datetime] = None, | |
609 | process_started_at: Optional[datetime] = None, | |
610 | drain_stopped_at: Optional[datetime] = None, | |
611 | drain_done_at: Optional[datetime] = None, | |
612 | draining: bool = False, | |
613 | started: bool = False, | |
614 | stopped: bool = False, | |
615 | replace: bool = False, | |
616 | force: bool = False, | |
617 | hostname: Optional[str] = None, | |
1e59de90 TL |
618 | zap: bool = False, |
619 | no_destroy: bool = False): | |
f6b5b4d7 TL |
620 | # the ID of the OSD |
621 | self.osd_id = osd_id | |
622 | ||
623 | # when did process (not the actual draining) start | |
624 | self.process_started_at = process_started_at | |
625 | ||
626 | # when did the drain start | |
627 | self.drain_started_at = drain_started_at | |
628 | ||
629 | # when did the drain stop | |
630 | self.drain_stopped_at = drain_stopped_at | |
631 | ||
632 | # when did the drain finish | |
633 | self.drain_done_at = drain_done_at | |
634 | ||
635 | # did the draining start | |
636 | self.draining = draining | |
637 | ||
638 | # was the operation started | |
639 | self.started = started | |
640 | ||
641 | # was the operation stopped | |
642 | self.stopped = stopped | |
643 | ||
644 | # If this is a replace or remove operation | |
645 | self.replace = replace | |
646 | # If we wait for the osd to be drained | |
647 | self.force = force | |
648 | # The name of the node | |
f91f0fd5 | 649 | self.hostname = hostname |
f6b5b4d7 TL |
650 | |
651 | # mgr obj to make mgr/mon calls | |
adb31ebb | 652 | self.rm_util: RemoveUtil = remove_util |
f6b5b4d7 | 653 | |
f67539c2 TL |
654 | self.original_weight: Optional[float] = None |
655 | ||
a4b75251 TL |
656 | # Whether devices associated with the OSD should be zapped (DATA ERASED) |
657 | self.zap = zap | |
1e59de90 TL |
658 | # Whether all associated LV devices should be destroyed. |
659 | self.no_destroy = no_destroy | |
a4b75251 | 660 | |
f6b5b4d7 TL |
661 | def start(self) -> None: |
662 | if self.started: | |
663 | logger.debug(f"Already started draining {self}") | |
664 | return None | |
665 | self.started = True | |
666 | self.stopped = False | |
667 | ||
668 | def start_draining(self) -> bool: | |
669 | if self.stopped: | |
670 | logger.debug(f"Won't start draining {self}. OSD draining is stopped.") | |
671 | return False | |
f67539c2 TL |
672 | if self.replace: |
673 | self.rm_util.set_osd_flag([self], 'out') | |
674 | else: | |
675 | self.original_weight = self.rm_util.get_weight(self) | |
676 | self.rm_util.reweight_osd(self, 0.0) | |
f6b5b4d7 TL |
677 | self.drain_started_at = datetime.utcnow() |
678 | self.draining = True | |
679 | logger.debug(f"Started draining {self}.") | |
680 | return True | |
681 | ||
682 | def stop_draining(self) -> bool: | |
f67539c2 TL |
683 | if self.replace: |
684 | self.rm_util.set_osd_flag([self], 'in') | |
685 | else: | |
686 | if self.original_weight: | |
687 | self.rm_util.reweight_osd(self, self.original_weight) | |
f6b5b4d7 TL |
688 | self.drain_stopped_at = datetime.utcnow() |
689 | self.draining = False | |
690 | logger.debug(f"Stopped draining {self}.") | |
691 | return True | |
692 | ||
693 | def stop(self) -> None: | |
694 | if self.stopped: | |
695 | logger.debug(f"Already stopped draining {self}") | |
696 | return None | |
697 | self.started = False | |
698 | self.stopped = True | |
699 | self.stop_draining() | |
700 | ||
701 | @property | |
702 | def is_draining(self) -> bool: | |
703 | """ | |
704 | Consider an OSD draining when it is | |
705 | actively draining but not yet empty | |
706 | """ | |
707 | return self.draining and not self.is_empty | |
708 | ||
709 | @property | |
710 | def is_ok_to_stop(self) -> bool: | |
711 | return self.rm_util.ok_to_stop([self]) | |
712 | ||
713 | @property | |
714 | def is_empty(self) -> bool: | |
715 | if self.get_pg_count() == 0: | |
716 | if not self.drain_done_at: | |
717 | self.drain_done_at = datetime.utcnow() | |
718 | self.draining = False | |
719 | return True | |
720 | return False | |
721 | ||
722 | def safe_to_destroy(self) -> bool: | |
723 | return self.rm_util.safe_to_destroy([self.osd_id]) | |
724 | ||
725 | def down(self) -> bool: | |
726 | return self.rm_util.set_osd_flag([self], 'down') | |
727 | ||
728 | def destroy(self) -> bool: | |
729 | return self.rm_util.destroy_osd(self.osd_id) | |
730 | ||
a4b75251 TL |
731 | def do_zap(self) -> str: |
732 | return self.rm_util.zap_osd(self) | |
733 | ||
f6b5b4d7 TL |
734 | def purge(self) -> bool: |
735 | return self.rm_util.purge_osd(self.osd_id) | |
736 | ||
737 | def get_pg_count(self) -> int: | |
738 | return self.rm_util.get_pg_count(self.osd_id) | |
739 | ||
740 | @property | |
741 | def exists(self) -> bool: | |
742 | return str(self.osd_id) in self.rm_util.get_osds_in_cluster() | |
743 | ||
f91f0fd5 | 744 | def drain_status_human(self) -> str: |
f6b5b4d7 TL |
745 | default_status = 'not started' |
746 | status = 'started' if self.started and not self.draining else default_status | |
747 | status = 'draining' if self.draining else status | |
748 | status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status | |
749 | return status | |
750 | ||
f91f0fd5 | 751 | def pg_count_str(self) -> str: |
f6b5b4d7 TL |
752 | return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count()) |
753 | ||
754 | def to_json(self) -> dict: | |
f91f0fd5 | 755 | out: Dict[str, Any] = dict() |
f6b5b4d7 TL |
756 | out['osd_id'] = self.osd_id |
757 | out['started'] = self.started | |
758 | out['draining'] = self.draining | |
759 | out['stopped'] = self.stopped | |
760 | out['replace'] = self.replace | |
761 | out['force'] = self.force | |
a4b75251 | 762 | out['zap'] = self.zap |
f91f0fd5 | 763 | out['hostname'] = self.hostname # type: ignore |
f6b5b4d7 TL |
764 | |
765 | for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: | |
766 | if getattr(self, k): | |
f91f0fd5 | 767 | out[k] = datetime_to_str(getattr(self, k)) |
f6b5b4d7 TL |
768 | else: |
769 | out[k] = getattr(self, k) | |
770 | return out | |
771 | ||
772 | @classmethod | |
adb31ebb | 773 | def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]: |
f6b5b4d7 TL |
774 | if not inp: |
775 | return None | |
776 | for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: | |
777 | if inp.get(date_field): | |
f91f0fd5 | 778 | inp.update({date_field: str_to_datetime(inp.get(date_field, ''))}) |
adb31ebb | 779 | inp.update({'remove_util': rm_util}) |
f91f0fd5 TL |
780 | if 'nodename' in inp: |
781 | hostname = inp.pop('nodename') | |
782 | inp['hostname'] = hostname | |
f6b5b4d7 TL |
783 | return cls(**inp) |
784 | ||
f91f0fd5 | 785 | def __hash__(self) -> int: |
f6b5b4d7 TL |
786 | return hash(self.osd_id) |
787 | ||
788 | def __eq__(self, other: object) -> bool: | |
789 | if not isinstance(other, OSD): | |
790 | return NotImplemented | |
791 | return self.osd_id == other.osd_id | |
792 | ||
793 | def __repr__(self) -> str: | |
f67539c2 | 794 | return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}" |
f6b5b4d7 TL |
795 | |
796 | ||
adb31ebb TL |
797 | class OSDRemovalQueue(object): |
798 | ||
799 | def __init__(self, mgr: "CephadmOrchestrator") -> None: | |
800 | self.mgr: "CephadmOrchestrator" = mgr | |
801 | self.osds: Set[OSD] = set() | |
802 | self.rm_util = RemoveUtil(mgr) | |
803 | ||
804 | # locks multithreaded access to self.osds. Please avoid locking | |
805 | # network calls, like mon commands. | |
806 | self.lock = Lock() | |
807 | ||
808 | def process_removal_queue(self) -> None: | |
809 | """ | |
810 | Performs actions in the _serve() loop to remove an OSD | |
811 | when criteria is met. | |
812 | ||
813 | we can't hold self.lock, as we're calling _remove_daemon in the loop | |
814 | """ | |
815 | ||
816 | # make sure that we don't run on OSDs that are not in the cluster anymore. | |
817 | self.cleanup() | |
818 | ||
819 | # find osds that are ok-to-stop and not yet draining | |
a4b75251 TL |
820 | ready_to_drain_osds = self._ready_to_drain_osds() |
821 | if ready_to_drain_osds: | |
adb31ebb | 822 | # start draining those |
a4b75251 | 823 | _ = [osd.start_draining() for osd in ready_to_drain_osds] |
adb31ebb TL |
824 | |
825 | all_osds = self.all_osds() | |
826 | ||
827 | logger.debug( | |
828 | f"{self.queue_size()} OSDs are scheduled " | |
829 | f"for removal: {all_osds}") | |
830 | ||
831 | # Check all osds for their state and take action (remove, purge etc) | |
832 | new_queue: Set[OSD] = set() | |
833 | for osd in all_osds: # type: OSD | |
834 | if not osd.force: | |
835 | # skip criteria | |
836 | if not osd.is_empty: | |
f67539c2 | 837 | logger.debug(f"{osd} is not empty yet. Waiting a bit more") |
adb31ebb TL |
838 | new_queue.add(osd) |
839 | continue | |
840 | ||
841 | if not osd.safe_to_destroy(): | |
f67539c2 TL |
842 | logger.debug( |
843 | f"{osd} is not safe-to-destroy yet. Waiting a bit more") | |
adb31ebb TL |
844 | new_queue.add(osd) |
845 | continue | |
846 | ||
847 | # abort criteria | |
848 | if not osd.down(): | |
849 | # also remove it from the remove_osd list and set a health_check warning? | |
850 | raise orchestrator.OrchestratorError( | |
f67539c2 TL |
851 | f"Could not mark {osd} down") |
852 | ||
853 | # stop and remove daemon | |
854 | assert osd.hostname is not None | |
a4b75251 TL |
855 | |
856 | if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'): | |
857 | CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname) | |
858 | logger.info(f"Successfully removed {osd} on {osd.hostname}") | |
859 | else: | |
860 | logger.info(f"Daemon {osd} on {osd.hostname} was already removed") | |
f6b5b4d7 | 861 | |
adb31ebb | 862 | if osd.replace: |
f67539c2 | 863 | # mark destroyed in osdmap |
adb31ebb TL |
864 | if not osd.destroy(): |
865 | raise orchestrator.OrchestratorError( | |
f67539c2 TL |
866 | f"Could not destroy {osd}") |
867 | logger.info( | |
868 | f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement") | |
adb31ebb | 869 | else: |
f67539c2 | 870 | # purge from osdmap |
adb31ebb | 871 | if not osd.purge(): |
f67539c2 TL |
872 | raise orchestrator.OrchestratorError(f"Could not purge {osd}") |
873 | logger.info(f"Successfully purged {osd} on {osd.hostname}") | |
adb31ebb | 874 | |
a4b75251 TL |
875 | if osd.zap: |
876 | # throws an exception if the zap fails | |
877 | logger.info(f"Zapping devices for {osd} on {osd.hostname}") | |
878 | osd.do_zap() | |
879 | logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}") | |
880 | ||
f67539c2 | 881 | logger.debug(f"Removing {osd} from the queue.") |
adb31ebb TL |
882 | |
883 | # self could change while this is processing (osds get added from the CLI) | |
884 | # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and | |
885 | # osds that were added while this method was executed' | |
886 | with self.lock: | |
887 | self.osds.intersection_update(new_queue) | |
888 | self._save_to_store() | |
889 | ||
890 | def cleanup(self) -> None: | |
891 | # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs | |
892 | with self.lock: | |
893 | for osd in self._not_in_cluster(): | |
894 | self.osds.remove(osd) | |
895 | ||
a4b75251 TL |
896 | def _ready_to_drain_osds(self) -> List["OSD"]: |
897 | """ | |
898 | Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can | |
1e59de90 | 899 | be accommodated by the 'max_osd_draining_count' config value, considering the number of OSDs |
a4b75251 TL |
900 | that are already draining. |
901 | """ | |
902 | draining_limit = max(1, self.mgr.max_osd_draining_count) | |
903 | num_already_draining = len(self.draining_osds()) | |
904 | num_to_start_draining = max(0, draining_limit - num_already_draining) | |
905 | stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds()) | |
906 | return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining] | |
907 | ||
adb31ebb TL |
908 | def _save_to_store(self) -> None: |
909 | osd_queue = [osd.to_json() for osd in self.osds] | |
910 | logger.debug(f"Saving {osd_queue} to store") | |
911 | self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) | |
912 | ||
913 | def load_from_store(self) -> None: | |
914 | with self.lock: | |
915 | for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): | |
916 | for osd in json.loads(v): | |
917 | logger.debug(f"Loading osd ->{osd} from store") | |
918 | osd_obj = OSD.from_json(osd, rm_util=self.rm_util) | |
919 | if osd_obj is not None: | |
920 | self.osds.add(osd_obj) | |
f6b5b4d7 TL |
921 | |
922 | def as_osd_ids(self) -> List[int]: | |
adb31ebb TL |
923 | with self.lock: |
924 | return [osd.osd_id for osd in self.osds] | |
f6b5b4d7 TL |
925 | |
926 | def queue_size(self) -> int: | |
adb31ebb TL |
927 | with self.lock: |
928 | return len(self.osds) | |
f6b5b4d7 TL |
929 | |
930 | def draining_osds(self) -> List["OSD"]: | |
adb31ebb TL |
931 | with self.lock: |
932 | return [osd for osd in self.osds if osd.is_draining] | |
f6b5b4d7 TL |
933 | |
934 | def idling_osds(self) -> List["OSD"]: | |
adb31ebb TL |
935 | with self.lock: |
936 | return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty] | |
f6b5b4d7 TL |
937 | |
938 | def empty_osds(self) -> List["OSD"]: | |
adb31ebb TL |
939 | with self.lock: |
940 | return [osd for osd in self.osds if osd.is_empty] | |
f6b5b4d7 TL |
941 | |
942 | def all_osds(self) -> List["OSD"]: | |
adb31ebb TL |
943 | with self.lock: |
944 | return [osd for osd in self.osds] | |
f6b5b4d7 | 945 | |
adb31ebb TL |
946 | def _not_in_cluster(self) -> List["OSD"]: |
947 | return [osd for osd in self.osds if not osd.exists] | |
f6b5b4d7 TL |
948 | |
949 | def enqueue(self, osd: "OSD") -> None: | |
950 | if not osd.exists: | |
951 | raise NotFoundError() | |
adb31ebb TL |
952 | with self.lock: |
953 | self.osds.add(osd) | |
f6b5b4d7 TL |
954 | osd.start() |
955 | ||
956 | def rm(self, osd: "OSD") -> None: | |
957 | if not osd.exists: | |
958 | raise NotFoundError() | |
959 | osd.stop() | |
adb31ebb TL |
960 | with self.lock: |
961 | try: | |
962 | logger.debug(f'Removing {osd} from the queue.') | |
963 | self.osds.remove(osd) | |
964 | except KeyError: | |
965 | logger.debug(f"Could not find {osd} in queue.") | |
966 | raise KeyError | |
967 | ||
968 | def __eq__(self, other: Any) -> bool: | |
969 | if not isinstance(other, OSDRemovalQueue): | |
970 | return False | |
971 | with self.lock: | |
972 | return self.osds == other.osds |