]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import json |
2 | import logging | |
3 | from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional | |
4 | ||
5 | from ceph.deployment import translate | |
6 | from ceph.deployment.drive_group import DriveGroupSpec | |
7 | from ceph.deployment.drive_selection import DriveSelection | |
8 | ||
f6b5b4d7 | 9 | from datetime import datetime |
e306af50 | 10 | import orchestrator |
f6b5b4d7 | 11 | from cephadm.utils import forall_hosts |
e306af50 TL |
12 | from orchestrator import OrchestratorError |
13 | from mgr_module import MonCommandFailed | |
14 | ||
f6b5b4d7 | 15 | from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec |
e306af50 TL |
16 | |
17 | logger = logging.getLogger(__name__) | |
f6b5b4d7 | 18 | DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' |
e306af50 TL |
19 | |
20 | ||
21 | class OSDService(CephadmService): | |
f6b5b4d7 TL |
22 | TYPE = 'osd' |
23 | ||
24 | def create_from_spec(self, drive_group: DriveGroupSpec) -> str: | |
e306af50 | 25 | logger.debug(f"Processing DriveGroup {drive_group}") |
f6b5b4d7 TL |
26 | osd_id_claims = self.find_destroyed_osds() |
27 | logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}") | |
28 | ||
29 | @forall_hosts | |
30 | def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]: | |
e306af50 | 31 | logger.info('Applying %s on host %s...' % (drive_group.service_id, host)) |
f6b5b4d7 TL |
32 | cmd = self.driveselection_to_ceph_volume(drive_selection, |
33 | osd_id_claims.get(host, [])) | |
e306af50 TL |
34 | if not cmd: |
35 | logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id)) | |
f6b5b4d7 TL |
36 | return None |
37 | env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] | |
e306af50 | 38 | ret_msg = self.create_single_host( |
f6b5b4d7 | 39 | host, cmd, replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars |
e306af50 | 40 | ) |
f6b5b4d7 TL |
41 | return ret_msg |
42 | ||
43 | ret = create_from_spec_one(self.prepare_drivegroup(drive_group)) | |
44 | return ", ".join(filter(None, ret)) | |
45 | ||
e306af50 TL |
46 | def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str: |
47 | out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars) | |
48 | ||
49 | if code == 1 and ', it is already prepared' in '\n'.join(err): | |
50 | # HACK: when we create against an existing LV, ceph-volume | |
51 | # returns an error and the above message. To make this | |
52 | # command idempotent, tolerate this "error" and continue. | |
53 | logger.debug('the device was already prepared; continuing') | |
54 | code = 0 | |
55 | if code: | |
56 | raise RuntimeError( | |
57 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
58 | code, '\n'.join(err))) | |
59 | ||
60 | # check result | |
61 | out, err, code = self.mgr._run_cephadm( | |
62 | host, 'osd', 'ceph-volume', | |
63 | [ | |
64 | '--', | |
65 | 'lvm', 'list', | |
66 | '--format', 'json', | |
67 | ]) | |
68 | before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True) | |
69 | osds_elems = json.loads('\n'.join(out)) | |
70 | fsid = self.mgr._cluster_fsid | |
71 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
72 | created = [] | |
73 | for osd_id, osds in osds_elems.items(): | |
74 | for osd in osds: | |
75 | if osd['tags']['ceph.cluster_fsid'] != fsid: | |
76 | logger.debug('mismatched fsid, skipping %s' % osd) | |
77 | continue | |
78 | if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: | |
79 | # if it exists but is part of the replacement operation, don't skip | |
80 | continue | |
81 | if osd_id not in osd_uuid_map: | |
82 | logger.debug('osd id {} does not exist in cluster'.format(osd_id)) | |
83 | continue | |
84 | if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: | |
85 | logger.debug('mismatched osd uuid (cluster has %s, osd ' | |
86 | 'has %s)' % ( | |
87 | osd_uuid_map.get(osd_id), | |
88 | osd['tags']['ceph.osd_fsid'])) | |
89 | continue | |
90 | ||
91 | created.append(osd_id) | |
f6b5b4d7 TL |
92 | daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec( |
93 | daemon_id=osd_id, | |
94 | host=host, | |
95 | daemon_type='osd', | |
96 | ) | |
e306af50 | 97 | self.mgr._create_daemon( |
f6b5b4d7 | 98 | daemon_spec, |
e306af50 TL |
99 | osd_uuid_map=osd_uuid_map) |
100 | ||
101 | if created: | |
102 | self.mgr.cache.invalidate_host_devices(host) | |
103 | return "Created osd(s) %s on host '%s'" % (','.join(created), host) | |
104 | else: | |
105 | return "Created no osd(s) on host %s; already created?" % host | |
106 | ||
107 | def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: | |
108 | # 1) use fn_filter to determine matching_hosts | |
109 | matching_hosts = drive_group.placement.filter_matching_hosts(self.mgr._get_hosts) | |
110 | # 2) Map the inventory to the InventoryHost object | |
111 | host_ds_map = [] | |
112 | ||
113 | # set osd_id_claims | |
114 | ||
115 | def _find_inv_for_host(hostname: str, inventory_dict: dict): | |
116 | # This is stupid and needs to be loaded with the host | |
117 | for _host, _inventory in inventory_dict.items(): | |
118 | if _host == hostname: | |
119 | return _inventory | |
120 | raise OrchestratorError("No inventory found for host: {}".format(hostname)) | |
121 | ||
122 | # 3) iterate over matching_host and call DriveSelection | |
123 | logger.debug(f"Checking matching hosts -> {matching_hosts}") | |
124 | for host in matching_hosts: | |
125 | inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices) | |
126 | logger.debug(f"Found inventory for host {inventory_for_host}") | |
127 | drive_selection = DriveSelection(drive_group, inventory_for_host) | |
128 | logger.debug(f"Found drive selection {drive_selection}") | |
129 | host_ds_map.append((host, drive_selection)) | |
130 | return host_ds_map | |
131 | ||
f6b5b4d7 | 132 | def driveselection_to_ceph_volume(self, |
e306af50 TL |
133 | drive_selection: DriveSelection, |
134 | osd_id_claims: Optional[List[str]] = None, | |
135 | preview: bool = False) -> Optional[str]: | |
f6b5b4d7 TL |
136 | logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command") |
137 | cmd: Optional[str] = translate.to_ceph_volume(drive_selection, | |
e306af50 TL |
138 | osd_id_claims, preview=preview).run() |
139 | logger.debug(f"Resulting ceph-volume cmd: {cmd}") | |
140 | return cmd | |
141 | ||
142 | def get_previews(self, host) -> List[Dict[str, Any]]: | |
143 | # Find OSDSpecs that match host. | |
f6b5b4d7 | 144 | osdspecs = self.resolve_osdspecs_for_host(host) |
e306af50 TL |
145 | return self.generate_previews(osdspecs, host) |
146 | ||
147 | def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]: | |
148 | """ | |
149 | ||
150 | The return should look like this: | |
151 | ||
152 | [ | |
153 | {'data': {<metadata>}, | |
154 | 'osdspec': <name of osdspec>, | |
155 | 'host': <name of host> | |
156 | }, | |
157 | ||
158 | {'data': ..., | |
159 | 'osdspec': .., | |
160 | 'host': .. | |
161 | } | |
162 | ] | |
163 | ||
164 | Note: One host can have multiple previews based on its assigned OSDSpecs. | |
165 | """ | |
166 | self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}") | |
167 | ret_all: List[Dict[str, Any]] = [] | |
168 | if not osdspecs: | |
169 | return ret_all | |
170 | for osdspec in osdspecs: | |
171 | ||
172 | # populate osd_id_claims | |
f6b5b4d7 | 173 | osd_id_claims = self.find_destroyed_osds() |
e306af50 TL |
174 | |
175 | # prepare driveselection | |
176 | for host, ds in self.prepare_drivegroup(osdspec): | |
177 | if host != for_host: | |
178 | continue | |
179 | ||
180 | # driveselection for host | |
f6b5b4d7 TL |
181 | cmd = self.driveselection_to_ceph_volume(ds, |
182 | osd_id_claims.get(host, []), | |
e306af50 TL |
183 | preview=True) |
184 | if not cmd: | |
185 | logger.debug("No data_devices, skipping DriveGroup: {}".format( | |
186 | osdspec.service_name())) | |
187 | continue | |
188 | ||
189 | # get preview data from ceph-volume | |
190 | out, err, code = self._run_ceph_volume_command(host, cmd) | |
191 | if out: | |
192 | concat_out: Dict[str, Any] = json.loads(" ".join(out)) | |
193 | ret_all.append({'data': concat_out, | |
194 | 'osdspec': osdspec.service_id, | |
195 | 'host': host}) | |
196 | return ret_all | |
197 | ||
f6b5b4d7 TL |
198 | def resolve_hosts_for_osdspecs(self, |
199 | specs: Optional[List[DriveGroupSpec]] = None | |
200 | ) -> List[str]: | |
201 | osdspecs = [] | |
202 | if specs: | |
203 | osdspecs = [cast(DriveGroupSpec, spec) for spec in specs] | |
204 | if not osdspecs: | |
205 | self.mgr.log.debug("No OSDSpecs found") | |
206 | return [] | |
207 | return sum([spec.placement.filter_matching_hosts(self.mgr._get_hosts) for spec in osdspecs], []) | |
208 | ||
209 | def resolve_osdspecs_for_host(self, host: str, specs: Optional[List[DriveGroupSpec]] = None): | |
210 | matching_specs = [] | |
211 | self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>") | |
212 | if not specs: | |
213 | specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items() | |
214 | if spec.service_type == 'osd'] | |
215 | for spec in specs: | |
216 | if host in spec.placement.filter_matching_hosts(self.mgr._get_hosts): | |
217 | self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>") | |
218 | matching_specs.append(spec) | |
219 | return matching_specs | |
220 | ||
e306af50 TL |
221 | def _run_ceph_volume_command(self, host: str, |
222 | cmd: str, env_vars: Optional[List[str]] = None | |
223 | ) -> Tuple[List[str], List[str], int]: | |
224 | self.mgr.inventory.assert_host(host) | |
225 | ||
226 | # get bootstrap key | |
227 | ret, keyring, err = self.mgr.check_mon_command({ | |
228 | 'prefix': 'auth get', | |
229 | 'entity': 'client.bootstrap-osd', | |
230 | }) | |
231 | ||
232 | # generate config | |
233 | ret, config, err = self.mgr.check_mon_command({ | |
234 | "prefix": "config generate-minimal-conf", | |
235 | }) | |
236 | ||
237 | j = json.dumps({ | |
238 | 'config': config, | |
239 | 'keyring': keyring, | |
240 | }) | |
241 | ||
242 | split_cmd = cmd.split(' ') | |
243 | _cmd = ['--config-json', '-', '--'] | |
244 | _cmd.extend(split_cmd) | |
245 | out, err, code = self.mgr._run_cephadm( | |
246 | host, 'osd', 'ceph-volume', | |
247 | _cmd, | |
248 | env_vars=env_vars, | |
249 | stdin=j, | |
250 | error_ok=True) | |
251 | return out, err, code | |
252 | ||
253 | def get_osdspec_affinity(self, osd_id: str) -> str: | |
254 | return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '') | |
255 | ||
256 | def find_destroyed_osds(self) -> Dict[str, List[str]]: | |
257 | osd_host_map: Dict[str, List[str]] = dict() | |
258 | try: | |
259 | ret, out, err = self.mgr.check_mon_command({ | |
260 | 'prefix': 'osd tree', | |
261 | 'states': ['destroyed'], | |
262 | 'format': 'json' | |
263 | }) | |
264 | except MonCommandFailed as e: | |
265 | logger.exception('osd tree failed') | |
266 | raise OrchestratorError(str(e)) | |
267 | try: | |
268 | tree = json.loads(out) | |
269 | except json.decoder.JSONDecodeError: | |
270 | logger.exception(f"Could not decode json -> {out}") | |
271 | return osd_host_map | |
272 | ||
273 | nodes = tree.get('nodes', {}) | |
274 | for node in nodes: | |
275 | if node.get('type') == 'host': | |
276 | osd_host_map.update( | |
277 | {node.get('name'): [str(_id) for _id in node.get('children', list())]} | |
278 | ) | |
279 | self.mgr.log.info( | |
280 | f"Found osd claims -> {osd_host_map}") | |
281 | return osd_host_map | |
282 | ||
283 | ||
e306af50 TL |
284 | class RemoveUtil(object): |
285 | def __init__(self, mgr): | |
286 | self.mgr = mgr | |
e306af50 | 287 | |
f6b5b4d7 | 288 | def process_removal_queue(self) -> None: |
e306af50 TL |
289 | """ |
290 | Performs actions in the _serve() loop to remove an OSD | |
291 | when criteria is met. | |
292 | """ | |
f6b5b4d7 TL |
293 | |
294 | # make sure that we don't run on OSDs that are not in the cluster anymore. | |
295 | self.cleanup() | |
296 | ||
e306af50 | 297 | logger.debug( |
f6b5b4d7 TL |
298 | f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled " |
299 | f"for removal: {self.mgr.to_remove_osds.all_osds()}") | |
300 | ||
301 | # find osds that are ok-to-stop and not yet draining | |
302 | ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds()) | |
303 | if ok_to_stop_osds: | |
304 | # start draining those | |
305 | _ = [osd.start_draining() for osd in ok_to_stop_osds] | |
306 | ||
307 | # Check all osds for their state and take action (remove, purge etc) | |
308 | to_remove_osds = self.mgr.to_remove_osds.all_osds() | |
309 | new_queue = set() | |
310 | for osd in to_remove_osds: | |
e306af50 | 311 | if not osd.force: |
e306af50 | 312 | # skip criteria |
f6b5b4d7 | 313 | if not osd.is_empty: |
e306af50 | 314 | logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") |
f6b5b4d7 | 315 | new_queue.add(osd) |
e306af50 TL |
316 | continue |
317 | ||
f6b5b4d7 | 318 | if not osd.safe_to_destroy(): |
e306af50 TL |
319 | logger.info( |
320 | f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") | |
f6b5b4d7 | 321 | new_queue.add(osd) |
e306af50 TL |
322 | continue |
323 | ||
324 | # abort criteria | |
f6b5b4d7 | 325 | if not osd.down(): |
e306af50 TL |
326 | # also remove it from the remove_osd list and set a health_check warning? |
327 | raise orchestrator.OrchestratorError( | |
328 | f"Could not set OSD <{osd.osd_id}> to 'down'") | |
329 | ||
330 | if osd.replace: | |
f6b5b4d7 | 331 | if not osd.destroy(): |
e306af50 TL |
332 | raise orchestrator.OrchestratorError( |
333 | f"Could not destroy OSD <{osd.osd_id}>") | |
334 | else: | |
f6b5b4d7 | 335 | if not osd.purge(): |
e306af50 TL |
336 | raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") |
337 | ||
f6b5b4d7 TL |
338 | if not osd.exists: |
339 | continue | |
e306af50 TL |
340 | self.mgr._remove_daemon(osd.fullname, osd.nodename) |
341 | logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}") | |
342 | logger.debug(f"Removing {osd.osd_id} from the queue.") | |
e306af50 | 343 | |
f6b5b4d7 TL |
344 | # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI) |
345 | # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and | |
346 | # osds that were added while this method was executed' | |
347 | self.mgr.to_remove_osds.intersection_update(new_queue) | |
348 | self.save_to_store() | |
349 | ||
350 | def cleanup(self): | |
351 | # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs | |
352 | not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster() | |
353 | [self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds] | |
354 | ||
355 | def get_osds_in_cluster(self) -> List[str]: | |
356 | osd_map = self.mgr.get_osdmap() | |
357 | return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] | |
358 | ||
359 | def osd_df(self) -> dict: | |
360 | base_cmd = 'osd df' | |
361 | ret, out, err = self.mgr.mon_command({ | |
362 | 'prefix': base_cmd, | |
363 | 'format': 'json' | |
364 | }) | |
365 | return json.loads(out) | |
366 | ||
367 | def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: | |
368 | if not osd_df: | |
369 | osd_df = self.osd_df() | |
370 | osd_nodes = osd_df.get('nodes', []) | |
371 | for osd_node in osd_nodes: | |
372 | if osd_node.get('id') == int(osd_id): | |
373 | return osd_node.get('pgs', -1) | |
374 | return -1 | |
e306af50 | 375 | |
f6b5b4d7 | 376 | def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]: |
e306af50 | 377 | """ |
f6b5b4d7 TL |
378 | Cut osd_id list in half until it's ok-to-stop |
379 | ||
380 | :param osds: list of osd_ids | |
381 | :return: list of ods_ids that can be stopped at once | |
e306af50 | 382 | """ |
f6b5b4d7 TL |
383 | if not osds: |
384 | return [] | |
385 | while not self.ok_to_stop(osds): | |
386 | if len(osds) <= 1: | |
387 | # can't even stop one OSD, aborting | |
388 | self.mgr.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..") | |
389 | return [] | |
390 | ||
391 | # This potentially prolongs the global wait time. | |
392 | self.mgr.event.wait(1) | |
393 | # splitting osd_ids in half until ok_to_stop yields success | |
394 | # maybe popping ids off one by one is better here..depends on the cluster size I guess.. | |
395 | # There's a lot of room for micro adjustments here | |
396 | osds = osds[len(osds) // 2:] | |
397 | return osds | |
398 | ||
399 | # todo start draining | |
400 | # return all([osd.start_draining() for osd in osds]) | |
401 | ||
402 | def ok_to_stop(self, osds: List["OSD"]) -> bool: | |
e306af50 | 403 | cmd_args = { |
f6b5b4d7 TL |
404 | 'prefix': "osd ok-to-stop", |
405 | 'ids': [str(osd.osd_id) for osd in osds] | |
e306af50 TL |
406 | } |
407 | return self._run_mon_cmd(cmd_args) | |
408 | ||
f6b5b4d7 TL |
409 | def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool: |
410 | base_cmd = f"osd {flag}" | |
411 | self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}") | |
e306af50 | 412 | ret, out, err = self.mgr.mon_command({ |
f6b5b4d7 TL |
413 | 'prefix': base_cmd, |
414 | 'ids': [str(osd.osd_id) for osd in osds] | |
e306af50 TL |
415 | }) |
416 | if ret != 0: | |
f6b5b4d7 TL |
417 | self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>") |
418 | return False | |
419 | self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>") | |
420 | return True | |
e306af50 | 421 | |
f6b5b4d7 | 422 | def safe_to_destroy(self, osd_ids: List[int]) -> bool: |
e306af50 TL |
423 | """ Queries the safe-to-destroy flag for OSDs """ |
424 | cmd_args = {'prefix': 'osd safe-to-destroy', | |
f6b5b4d7 | 425 | 'ids': [str(x) for x in osd_ids]} |
e306af50 TL |
426 | return self._run_mon_cmd(cmd_args) |
427 | ||
428 | def destroy_osd(self, osd_id: int) -> bool: | |
429 | """ Destroys an OSD (forcefully) """ | |
430 | cmd_args = {'prefix': 'osd destroy-actual', | |
431 | 'id': int(osd_id), | |
432 | 'yes_i_really_mean_it': True} | |
433 | return self._run_mon_cmd(cmd_args) | |
434 | ||
e306af50 TL |
435 | def purge_osd(self, osd_id: int) -> bool: |
436 | """ Purges an OSD from the cluster (forcefully) """ | |
437 | cmd_args = { | |
438 | 'prefix': 'osd purge-actual', | |
439 | 'id': int(osd_id), | |
440 | 'yes_i_really_mean_it': True | |
441 | } | |
442 | return self._run_mon_cmd(cmd_args) | |
443 | ||
e306af50 TL |
444 | def _run_mon_cmd(self, cmd_args: dict) -> bool: |
445 | """ | |
446 | Generic command to run mon_command and evaluate/log the results | |
447 | """ | |
448 | ret, out, err = self.mgr.mon_command(cmd_args) | |
449 | if ret != 0: | |
450 | self.mgr.log.debug(f"ran {cmd_args} with mon_command") | |
451 | self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") | |
452 | return False | |
453 | self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") | |
454 | return True | |
f6b5b4d7 TL |
455 | |
456 | def save_to_store(self): | |
457 | osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()] | |
458 | logger.debug(f"Saving {osd_queue} to store") | |
459 | self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) | |
460 | ||
461 | def load_from_store(self): | |
462 | for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): | |
463 | for osd in json.loads(v): | |
464 | logger.debug(f"Loading osd ->{osd} from store") | |
465 | osd_obj = OSD.from_json(json.loads(osd), ctx=self) | |
466 | self.mgr.to_remove_osds.add(osd_obj) | |
467 | ||
468 | ||
469 | class NotFoundError(Exception): | |
470 | pass | |
471 | ||
472 | ||
473 | class OSD: | |
474 | ||
475 | def __init__(self, | |
476 | osd_id: int, | |
477 | remove_util: RemoveUtil, | |
478 | drain_started_at: Optional[datetime] = None, | |
479 | process_started_at: Optional[datetime] = None, | |
480 | drain_stopped_at: Optional[datetime] = None, | |
481 | drain_done_at: Optional[datetime] = None, | |
482 | draining: bool = False, | |
483 | started: bool = False, | |
484 | stopped: bool = False, | |
485 | replace: bool = False, | |
486 | force: bool = False, | |
487 | hostname: Optional[str] = None, | |
488 | fullname: Optional[str] = None, | |
489 | ): | |
490 | # the ID of the OSD | |
491 | self.osd_id = osd_id | |
492 | ||
493 | # when did process (not the actual draining) start | |
494 | self.process_started_at = process_started_at | |
495 | ||
496 | # when did the drain start | |
497 | self.drain_started_at = drain_started_at | |
498 | ||
499 | # when did the drain stop | |
500 | self.drain_stopped_at = drain_stopped_at | |
501 | ||
502 | # when did the drain finish | |
503 | self.drain_done_at = drain_done_at | |
504 | ||
505 | # did the draining start | |
506 | self.draining = draining | |
507 | ||
508 | # was the operation started | |
509 | self.started = started | |
510 | ||
511 | # was the operation stopped | |
512 | self.stopped = stopped | |
513 | ||
514 | # If this is a replace or remove operation | |
515 | self.replace = replace | |
516 | # If we wait for the osd to be drained | |
517 | self.force = force | |
518 | # The name of the node | |
519 | self.nodename = hostname | |
520 | # The full name of the osd | |
521 | self.fullname = fullname | |
522 | ||
523 | # mgr obj to make mgr/mon calls | |
524 | self.rm_util = remove_util | |
525 | ||
526 | def start(self) -> None: | |
527 | if self.started: | |
528 | logger.debug(f"Already started draining {self}") | |
529 | return None | |
530 | self.started = True | |
531 | self.stopped = False | |
532 | ||
533 | def start_draining(self) -> bool: | |
534 | if self.stopped: | |
535 | logger.debug(f"Won't start draining {self}. OSD draining is stopped.") | |
536 | return False | |
537 | self.rm_util.set_osd_flag([self], 'out') | |
538 | self.drain_started_at = datetime.utcnow() | |
539 | self.draining = True | |
540 | logger.debug(f"Started draining {self}.") | |
541 | return True | |
542 | ||
543 | def stop_draining(self) -> bool: | |
544 | self.rm_util.set_osd_flag([self], 'in') | |
545 | self.drain_stopped_at = datetime.utcnow() | |
546 | self.draining = False | |
547 | logger.debug(f"Stopped draining {self}.") | |
548 | return True | |
549 | ||
550 | def stop(self) -> None: | |
551 | if self.stopped: | |
552 | logger.debug(f"Already stopped draining {self}") | |
553 | return None | |
554 | self.started = False | |
555 | self.stopped = True | |
556 | self.stop_draining() | |
557 | ||
558 | @property | |
559 | def is_draining(self) -> bool: | |
560 | """ | |
561 | Consider an OSD draining when it is | |
562 | actively draining but not yet empty | |
563 | """ | |
564 | return self.draining and not self.is_empty | |
565 | ||
566 | @property | |
567 | def is_ok_to_stop(self) -> bool: | |
568 | return self.rm_util.ok_to_stop([self]) | |
569 | ||
570 | @property | |
571 | def is_empty(self) -> bool: | |
572 | if self.get_pg_count() == 0: | |
573 | if not self.drain_done_at: | |
574 | self.drain_done_at = datetime.utcnow() | |
575 | self.draining = False | |
576 | return True | |
577 | return False | |
578 | ||
579 | def safe_to_destroy(self) -> bool: | |
580 | return self.rm_util.safe_to_destroy([self.osd_id]) | |
581 | ||
582 | def down(self) -> bool: | |
583 | return self.rm_util.set_osd_flag([self], 'down') | |
584 | ||
585 | def destroy(self) -> bool: | |
586 | return self.rm_util.destroy_osd(self.osd_id) | |
587 | ||
588 | def purge(self) -> bool: | |
589 | return self.rm_util.purge_osd(self.osd_id) | |
590 | ||
591 | def get_pg_count(self) -> int: | |
592 | return self.rm_util.get_pg_count(self.osd_id) | |
593 | ||
594 | @property | |
595 | def exists(self) -> bool: | |
596 | return str(self.osd_id) in self.rm_util.get_osds_in_cluster() | |
597 | ||
598 | def drain_status_human(self): | |
599 | default_status = 'not started' | |
600 | status = 'started' if self.started and not self.draining else default_status | |
601 | status = 'draining' if self.draining else status | |
602 | status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status | |
603 | return status | |
604 | ||
605 | def pg_count_str(self): | |
606 | return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count()) | |
607 | ||
608 | def to_json(self) -> dict: | |
609 | out = dict() | |
610 | out['osd_id'] = self.osd_id | |
611 | out['started'] = self.started | |
612 | out['draining'] = self.draining | |
613 | out['stopped'] = self.stopped | |
614 | out['replace'] = self.replace | |
615 | out['force'] = self.force | |
616 | out['nodename'] = self.nodename # type: ignore | |
617 | ||
618 | for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: | |
619 | if getattr(self, k): | |
620 | out[k] = getattr(self, k).strftime(DATEFMT) | |
621 | else: | |
622 | out[k] = getattr(self, k) | |
623 | return out | |
624 | ||
625 | @classmethod | |
626 | def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]: | |
627 | if not inp: | |
628 | return None | |
629 | for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: | |
630 | if inp.get(date_field): | |
631 | inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)}) | |
632 | inp.update({'remove_util': ctx}) | |
633 | return cls(**inp) | |
634 | ||
635 | def __hash__(self): | |
636 | return hash(self.osd_id) | |
637 | ||
638 | def __eq__(self, other: object) -> bool: | |
639 | if not isinstance(other, OSD): | |
640 | return NotImplemented | |
641 | return self.osd_id == other.osd_id | |
642 | ||
643 | def __repr__(self) -> str: | |
644 | return f"<OSD>(osd_id={self.osd_id}, is_draining={self.is_draining})" | |
645 | ||
646 | ||
647 | class OSDQueue(Set): | |
648 | ||
649 | def __init__(self): | |
650 | super().__init__() | |
651 | ||
652 | def as_osd_ids(self) -> List[int]: | |
653 | return [osd.osd_id for osd in self] | |
654 | ||
655 | def queue_size(self) -> int: | |
656 | return len(self) | |
657 | ||
658 | def draining_osds(self) -> List["OSD"]: | |
659 | return [osd for osd in self if osd.is_draining] | |
660 | ||
661 | def idling_osds(self) -> List["OSD"]: | |
662 | return [osd for osd in self if not osd.is_draining and not osd.is_empty] | |
663 | ||
664 | def empty_osds(self) -> List["OSD"]: | |
665 | return [osd for osd in self if osd.is_empty] | |
666 | ||
667 | def all_osds(self) -> List["OSD"]: | |
668 | return [osd for osd in self] | |
669 | ||
670 | def not_in_cluster(self) -> List["OSD"]: | |
671 | return [osd for osd in self if not osd.exists] | |
672 | ||
673 | def enqueue(self, osd: "OSD") -> None: | |
674 | if not osd.exists: | |
675 | raise NotFoundError() | |
676 | self.add(osd) | |
677 | osd.start() | |
678 | ||
679 | def rm(self, osd: "OSD") -> None: | |
680 | if not osd.exists: | |
681 | raise NotFoundError() | |
682 | osd.stop() | |
683 | try: | |
684 | logger.debug(f'Removing {osd} from the queue.') | |
685 | self.remove(osd) | |
686 | except KeyError: | |
687 | logger.debug(f"Could not find {osd} in queue.") | |
688 | raise KeyError |