]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import json |
2 | import logging | |
adb31ebb | 3 | from threading import Lock |
f67539c2 | 4 | from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING |
e306af50 TL |
5 | |
6 | from ceph.deployment import translate | |
7 | from ceph.deployment.drive_group import DriveGroupSpec | |
8 | from ceph.deployment.drive_selection import DriveSelection | |
f91f0fd5 | 9 | from ceph.deployment.inventory import Device |
adb31ebb | 10 | from ceph.utils import datetime_to_str, str_to_datetime |
e306af50 | 11 | |
f6b5b4d7 | 12 | from datetime import datetime |
e306af50 | 13 | import orchestrator |
f67539c2 | 14 | from cephadm.serve import CephadmServe |
adb31ebb | 15 | from cephadm.utils import forall_hosts |
f67539c2 | 16 | from ceph.utils import datetime_now |
e306af50 TL |
17 | from orchestrator import OrchestratorError |
18 | from mgr_module import MonCommandFailed | |
19 | ||
f67539c2 | 20 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService |
f91f0fd5 TL |
21 | |
22 | if TYPE_CHECKING: | |
23 | from cephadm.module import CephadmOrchestrator | |
e306af50 TL |
24 | |
25 | logger = logging.getLogger(__name__) | |
26 | ||
27 | ||
f91f0fd5 | 28 | class OSDService(CephService): |
f6b5b4d7 TL |
29 | TYPE = 'osd' |
30 | ||
31 | def create_from_spec(self, drive_group: DriveGroupSpec) -> str: | |
e306af50 | 32 | logger.debug(f"Processing DriveGroup {drive_group}") |
f6b5b4d7 | 33 | osd_id_claims = self.find_destroyed_osds() |
adb31ebb TL |
34 | if osd_id_claims: |
35 | logger.info( | |
36 | f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}") | |
f6b5b4d7 TL |
37 | |
38 | @forall_hosts | |
39 | def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]: | |
f67539c2 TL |
40 | # skip this host if there has been no change in inventory |
41 | if not self.mgr.cache.osdspec_needs_apply(host, drive_group): | |
42 | self.mgr.log.debug("skipping apply of %s on %s (no change)" % ( | |
43 | host, drive_group)) | |
44 | return None | |
45 | ||
f6b5b4d7 TL |
46 | cmd = self.driveselection_to_ceph_volume(drive_selection, |
47 | osd_id_claims.get(host, [])) | |
e306af50 | 48 | if not cmd: |
f91f0fd5 TL |
49 | logger.debug("No data_devices, skipping DriveGroup: {}".format( |
50 | drive_group.service_id)) | |
f6b5b4d7 | 51 | return None |
f67539c2 TL |
52 | |
53 | logger.info('Applying service osd.%s on host %s...' % ( | |
54 | drive_group.service_id, host | |
55 | )) | |
56 | start_ts = datetime_now() | |
f6b5b4d7 | 57 | env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] |
e306af50 | 58 | ret_msg = self.create_single_host( |
f67539c2 TL |
59 | drive_group, host, cmd, |
60 | replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars | |
61 | ) | |
62 | self.mgr.cache.update_osdspec_last_applied( | |
63 | host, drive_group.service_name(), start_ts | |
e306af50 | 64 | ) |
f67539c2 | 65 | self.mgr.cache.save_host(host) |
f6b5b4d7 TL |
66 | return ret_msg |
67 | ||
68 | ret = create_from_spec_one(self.prepare_drivegroup(drive_group)) | |
69 | return ", ".join(filter(None, ret)) | |
70 | ||
f67539c2 TL |
71 | def create_single_host(self, |
72 | drive_group: DriveGroupSpec, | |
73 | host: str, cmd: str, replace_osd_ids: List[str], | |
f91f0fd5 | 74 | env_vars: Optional[List[str]] = None) -> str: |
e306af50 TL |
75 | out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars) |
76 | ||
77 | if code == 1 and ', it is already prepared' in '\n'.join(err): | |
78 | # HACK: when we create against an existing LV, ceph-volume | |
79 | # returns an error and the above message. To make this | |
80 | # command idempotent, tolerate this "error" and continue. | |
81 | logger.debug('the device was already prepared; continuing') | |
82 | code = 0 | |
83 | if code: | |
84 | raise RuntimeError( | |
85 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
86 | code, '\n'.join(err))) | |
f67539c2 TL |
87 | return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(), |
88 | replace_osd_ids) | |
e306af50 | 89 | |
f67539c2 TL |
90 | def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str, |
91 | replace_osd_ids: Optional[List[str]] = None) -> str: | |
92 | ||
93 | if replace_osd_ids is None: | |
94 | replace_osd_ids = self.find_destroyed_osds().get(host, []) | |
95 | assert replace_osd_ids is not None | |
e306af50 | 96 | # check result |
f67539c2 | 97 | osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json( |
e306af50 TL |
98 | host, 'osd', 'ceph-volume', |
99 | [ | |
100 | '--', | |
101 | 'lvm', 'list', | |
102 | '--format', 'json', | |
103 | ]) | |
104 | before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True) | |
e306af50 TL |
105 | fsid = self.mgr._cluster_fsid |
106 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
107 | created = [] | |
108 | for osd_id, osds in osds_elems.items(): | |
109 | for osd in osds: | |
110 | if osd['tags']['ceph.cluster_fsid'] != fsid: | |
111 | logger.debug('mismatched fsid, skipping %s' % osd) | |
112 | continue | |
113 | if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: | |
114 | # if it exists but is part of the replacement operation, don't skip | |
115 | continue | |
116 | if osd_id not in osd_uuid_map: | |
117 | logger.debug('osd id {} does not exist in cluster'.format(osd_id)) | |
118 | continue | |
119 | if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: | |
120 | logger.debug('mismatched osd uuid (cluster has %s, osd ' | |
f91f0fd5 TL |
121 | 'has %s)' % ( |
122 | osd_uuid_map.get(osd_id), | |
123 | osd['tags']['ceph.osd_fsid'])) | |
e306af50 TL |
124 | continue |
125 | ||
126 | created.append(osd_id) | |
f67539c2 TL |
127 | daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec( |
128 | service_name=service_name, | |
f6b5b4d7 TL |
129 | daemon_id=osd_id, |
130 | host=host, | |
131 | daemon_type='osd', | |
132 | ) | |
f67539c2 TL |
133 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
134 | CephadmServe(self.mgr)._create_daemon( | |
f6b5b4d7 | 135 | daemon_spec, |
e306af50 TL |
136 | osd_uuid_map=osd_uuid_map) |
137 | ||
138 | if created: | |
139 | self.mgr.cache.invalidate_host_devices(host) | |
140 | return "Created osd(s) %s on host '%s'" % (','.join(created), host) | |
141 | else: | |
142 | return "Created no osd(s) on host %s; already created?" % host | |
143 | ||
144 | def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: | |
145 | # 1) use fn_filter to determine matching_hosts | |
f91f0fd5 TL |
146 | matching_hosts = drive_group.placement.filter_matching_hostspecs( |
147 | self.mgr.inventory.all_specs()) | |
e306af50 TL |
148 | # 2) Map the inventory to the InventoryHost object |
149 | host_ds_map = [] | |
150 | ||
151 | # set osd_id_claims | |
152 | ||
f91f0fd5 | 153 | def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]: |
e306af50 TL |
154 | # This is stupid and needs to be loaded with the host |
155 | for _host, _inventory in inventory_dict.items(): | |
156 | if _host == hostname: | |
157 | return _inventory | |
158 | raise OrchestratorError("No inventory found for host: {}".format(hostname)) | |
159 | ||
160 | # 3) iterate over matching_host and call DriveSelection | |
161 | logger.debug(f"Checking matching hosts -> {matching_hosts}") | |
162 | for host in matching_hosts: | |
163 | inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices) | |
164 | logger.debug(f"Found inventory for host {inventory_for_host}") | |
f91f0fd5 TL |
165 | |
166 | # List of Daemons on that host | |
167 | dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name()) | |
168 | dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host] | |
169 | ||
170 | drive_selection = DriveSelection(drive_group, inventory_for_host, | |
171 | existing_daemons=len(dd_for_spec_and_host)) | |
e306af50 TL |
172 | logger.debug(f"Found drive selection {drive_selection}") |
173 | host_ds_map.append((host, drive_selection)) | |
174 | return host_ds_map | |
175 | ||
f91f0fd5 TL |
176 | @staticmethod |
177 | def driveselection_to_ceph_volume(drive_selection: DriveSelection, | |
e306af50 TL |
178 | osd_id_claims: Optional[List[str]] = None, |
179 | preview: bool = False) -> Optional[str]: | |
f6b5b4d7 TL |
180 | logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command") |
181 | cmd: Optional[str] = translate.to_ceph_volume(drive_selection, | |
e306af50 TL |
182 | osd_id_claims, preview=preview).run() |
183 | logger.debug(f"Resulting ceph-volume cmd: {cmd}") | |
184 | return cmd | |
185 | ||
f91f0fd5 | 186 | def get_previews(self, host: str) -> List[Dict[str, Any]]: |
e306af50 | 187 | # Find OSDSpecs that match host. |
f6b5b4d7 | 188 | osdspecs = self.resolve_osdspecs_for_host(host) |
e306af50 TL |
189 | return self.generate_previews(osdspecs, host) |
190 | ||
191 | def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]: | |
192 | """ | |
193 | ||
194 | The return should look like this: | |
195 | ||
196 | [ | |
197 | {'data': {<metadata>}, | |
198 | 'osdspec': <name of osdspec>, | |
199 | 'host': <name of host> | |
200 | }, | |
201 | ||
202 | {'data': ..., | |
203 | 'osdspec': .., | |
204 | 'host': .. | |
205 | } | |
206 | ] | |
207 | ||
208 | Note: One host can have multiple previews based on its assigned OSDSpecs. | |
209 | """ | |
210 | self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}") | |
211 | ret_all: List[Dict[str, Any]] = [] | |
212 | if not osdspecs: | |
213 | return ret_all | |
214 | for osdspec in osdspecs: | |
215 | ||
216 | # populate osd_id_claims | |
f6b5b4d7 | 217 | osd_id_claims = self.find_destroyed_osds() |
e306af50 TL |
218 | |
219 | # prepare driveselection | |
220 | for host, ds in self.prepare_drivegroup(osdspec): | |
221 | if host != for_host: | |
222 | continue | |
223 | ||
224 | # driveselection for host | |
f6b5b4d7 TL |
225 | cmd = self.driveselection_to_ceph_volume(ds, |
226 | osd_id_claims.get(host, []), | |
e306af50 TL |
227 | preview=True) |
228 | if not cmd: | |
229 | logger.debug("No data_devices, skipping DriveGroup: {}".format( | |
230 | osdspec.service_name())) | |
231 | continue | |
232 | ||
233 | # get preview data from ceph-volume | |
234 | out, err, code = self._run_ceph_volume_command(host, cmd) | |
235 | if out: | |
adb31ebb TL |
236 | try: |
237 | concat_out: Dict[str, Any] = json.loads(' '.join(out)) | |
238 | except ValueError: | |
239 | logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out)) | |
240 | concat_out = {} | |
241 | ||
e306af50 TL |
242 | ret_all.append({'data': concat_out, |
243 | 'osdspec': osdspec.service_id, | |
244 | 'host': host}) | |
245 | return ret_all | |
246 | ||
f6b5b4d7 TL |
247 | def resolve_hosts_for_osdspecs(self, |
248 | specs: Optional[List[DriveGroupSpec]] = None | |
249 | ) -> List[str]: | |
250 | osdspecs = [] | |
251 | if specs: | |
252 | osdspecs = [cast(DriveGroupSpec, spec) for spec in specs] | |
253 | if not osdspecs: | |
254 | self.mgr.log.debug("No OSDSpecs found") | |
255 | return [] | |
f91f0fd5 | 256 | return sum([spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()) for spec in osdspecs], []) |
f6b5b4d7 | 257 | |
f91f0fd5 TL |
258 | def resolve_osdspecs_for_host(self, host: str, |
259 | specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]: | |
f6b5b4d7 TL |
260 | matching_specs = [] |
261 | self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>") | |
262 | if not specs: | |
263 | specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items() | |
264 | if spec.service_type == 'osd'] | |
265 | for spec in specs: | |
f91f0fd5 | 266 | if host in spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()): |
f6b5b4d7 TL |
267 | self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>") |
268 | matching_specs.append(spec) | |
269 | return matching_specs | |
270 | ||
e306af50 TL |
271 | def _run_ceph_volume_command(self, host: str, |
272 | cmd: str, env_vars: Optional[List[str]] = None | |
273 | ) -> Tuple[List[str], List[str], int]: | |
274 | self.mgr.inventory.assert_host(host) | |
275 | ||
276 | # get bootstrap key | |
277 | ret, keyring, err = self.mgr.check_mon_command({ | |
278 | 'prefix': 'auth get', | |
279 | 'entity': 'client.bootstrap-osd', | |
280 | }) | |
281 | ||
e306af50 | 282 | j = json.dumps({ |
f91f0fd5 | 283 | 'config': self.mgr.get_minimal_ceph_conf(), |
e306af50 TL |
284 | 'keyring': keyring, |
285 | }) | |
286 | ||
287 | split_cmd = cmd.split(' ') | |
288 | _cmd = ['--config-json', '-', '--'] | |
289 | _cmd.extend(split_cmd) | |
f67539c2 | 290 | out, err, code = CephadmServe(self.mgr)._run_cephadm( |
e306af50 TL |
291 | host, 'osd', 'ceph-volume', |
292 | _cmd, | |
293 | env_vars=env_vars, | |
294 | stdin=j, | |
295 | error_ok=True) | |
296 | return out, err, code | |
297 | ||
298 | def get_osdspec_affinity(self, osd_id: str) -> str: | |
299 | return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '') | |
300 | ||
301 | def find_destroyed_osds(self) -> Dict[str, List[str]]: | |
302 | osd_host_map: Dict[str, List[str]] = dict() | |
303 | try: | |
304 | ret, out, err = self.mgr.check_mon_command({ | |
305 | 'prefix': 'osd tree', | |
306 | 'states': ['destroyed'], | |
307 | 'format': 'json' | |
308 | }) | |
309 | except MonCommandFailed as e: | |
310 | logger.exception('osd tree failed') | |
311 | raise OrchestratorError(str(e)) | |
312 | try: | |
313 | tree = json.loads(out) | |
adb31ebb TL |
314 | except ValueError: |
315 | logger.exception(f'Cannot decode JSON: \'{out}\'') | |
e306af50 TL |
316 | return osd_host_map |
317 | ||
318 | nodes = tree.get('nodes', {}) | |
319 | for node in nodes: | |
320 | if node.get('type') == 'host': | |
321 | osd_host_map.update( | |
322 | {node.get('name'): [str(_id) for _id in node.get('children', list())]} | |
323 | ) | |
adb31ebb TL |
324 | if osd_host_map: |
325 | self.mgr.log.info(f"Found osd claims -> {osd_host_map}") | |
e306af50 TL |
326 | return osd_host_map |
327 | ||
328 | ||
e306af50 | 329 | class RemoveUtil(object): |
f91f0fd5 TL |
330 | def __init__(self, mgr: "CephadmOrchestrator") -> None: |
331 | self.mgr: "CephadmOrchestrator" = mgr | |
e306af50 | 332 | |
f6b5b4d7 TL |
333 | def get_osds_in_cluster(self) -> List[str]: |
334 | osd_map = self.mgr.get_osdmap() | |
335 | return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] | |
336 | ||
337 | def osd_df(self) -> dict: | |
338 | base_cmd = 'osd df' | |
339 | ret, out, err = self.mgr.mon_command({ | |
340 | 'prefix': base_cmd, | |
341 | 'format': 'json' | |
342 | }) | |
adb31ebb | 343 | try: |
f67539c2 | 344 | return json.loads(out) |
adb31ebb TL |
345 | except ValueError: |
346 | logger.exception(f'Cannot decode JSON: \'{out}\'') | |
347 | return {} | |
f6b5b4d7 TL |
348 | |
349 | def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: | |
350 | if not osd_df: | |
351 | osd_df = self.osd_df() | |
352 | osd_nodes = osd_df.get('nodes', []) | |
353 | for osd_node in osd_nodes: | |
354 | if osd_node.get('id') == int(osd_id): | |
355 | return osd_node.get('pgs', -1) | |
356 | return -1 | |
e306af50 | 357 | |
f6b5b4d7 | 358 | def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]: |
e306af50 | 359 | """ |
f6b5b4d7 TL |
360 | Cut osd_id list in half until it's ok-to-stop |
361 | ||
362 | :param osds: list of osd_ids | |
363 | :return: list of ods_ids that can be stopped at once | |
e306af50 | 364 | """ |
f6b5b4d7 TL |
365 | if not osds: |
366 | return [] | |
367 | while not self.ok_to_stop(osds): | |
368 | if len(osds) <= 1: | |
369 | # can't even stop one OSD, aborting | |
f91f0fd5 TL |
370 | self.mgr.log.info( |
371 | "Can't even stop one OSD. Cluster is probably busy. Retrying later..") | |
f6b5b4d7 TL |
372 | return [] |
373 | ||
374 | # This potentially prolongs the global wait time. | |
375 | self.mgr.event.wait(1) | |
376 | # splitting osd_ids in half until ok_to_stop yields success | |
377 | # maybe popping ids off one by one is better here..depends on the cluster size I guess.. | |
378 | # There's a lot of room for micro adjustments here | |
379 | osds = osds[len(osds) // 2:] | |
380 | return osds | |
381 | ||
f67539c2 TL |
382 | # todo start draining |
383 | # return all([osd.start_draining() for osd in osds]) | |
f6b5b4d7 TL |
384 | |
385 | def ok_to_stop(self, osds: List["OSD"]) -> bool: | |
e306af50 | 386 | cmd_args = { |
f6b5b4d7 TL |
387 | 'prefix': "osd ok-to-stop", |
388 | 'ids': [str(osd.osd_id) for osd in osds] | |
e306af50 TL |
389 | } |
390 | return self._run_mon_cmd(cmd_args) | |
391 | ||
f6b5b4d7 TL |
392 | def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool: |
393 | base_cmd = f"osd {flag}" | |
394 | self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}") | |
e306af50 | 395 | ret, out, err = self.mgr.mon_command({ |
f6b5b4d7 TL |
396 | 'prefix': base_cmd, |
397 | 'ids': [str(osd.osd_id) for osd in osds] | |
e306af50 TL |
398 | }) |
399 | if ret != 0: | |
f67539c2 TL |
400 | self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>") |
401 | return False | |
402 | self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}") | |
403 | return True | |
404 | ||
405 | def get_weight(self, osd: "OSD") -> Optional[float]: | |
406 | ret, out, err = self.mgr.mon_command({ | |
407 | 'prefix': 'osd crush tree', | |
408 | 'format': 'json', | |
409 | }) | |
410 | if ret != 0: | |
411 | self.mgr.log.error(f"Could not dump crush weights. <{err}>") | |
412 | return None | |
413 | j = json.loads(out) | |
414 | for n in j.get("nodes", []): | |
415 | if n.get("name") == f"osd.{osd.osd_id}": | |
416 | self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}") | |
417 | return n.get("crush_weight") | |
418 | return None | |
419 | ||
420 | def reweight_osd(self, osd: "OSD", weight: float) -> bool: | |
421 | self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}") | |
422 | ret, out, err = self.mgr.mon_command({ | |
423 | 'prefix': "osd crush reweight", | |
424 | 'name': f"osd.{osd.osd_id}", | |
425 | 'weight': weight, | |
426 | }) | |
427 | if ret != 0: | |
428 | self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>") | |
f6b5b4d7 | 429 | return False |
f67539c2 | 430 | self.mgr.log.info(f"{osd} weight is now {weight}") |
f6b5b4d7 | 431 | return True |
e306af50 | 432 | |
f6b5b4d7 | 433 | def safe_to_destroy(self, osd_ids: List[int]) -> bool: |
e306af50 TL |
434 | """ Queries the safe-to-destroy flag for OSDs """ |
435 | cmd_args = {'prefix': 'osd safe-to-destroy', | |
f6b5b4d7 | 436 | 'ids': [str(x) for x in osd_ids]} |
e306af50 TL |
437 | return self._run_mon_cmd(cmd_args) |
438 | ||
439 | def destroy_osd(self, osd_id: int) -> bool: | |
440 | """ Destroys an OSD (forcefully) """ | |
441 | cmd_args = {'prefix': 'osd destroy-actual', | |
442 | 'id': int(osd_id), | |
443 | 'yes_i_really_mean_it': True} | |
444 | return self._run_mon_cmd(cmd_args) | |
445 | ||
e306af50 TL |
446 | def purge_osd(self, osd_id: int) -> bool: |
447 | """ Purges an OSD from the cluster (forcefully) """ | |
448 | cmd_args = { | |
449 | 'prefix': 'osd purge-actual', | |
450 | 'id': int(osd_id), | |
451 | 'yes_i_really_mean_it': True | |
452 | } | |
453 | return self._run_mon_cmd(cmd_args) | |
454 | ||
e306af50 TL |
455 | def _run_mon_cmd(self, cmd_args: dict) -> bool: |
456 | """ | |
457 | Generic command to run mon_command and evaluate/log the results | |
458 | """ | |
459 | ret, out, err = self.mgr.mon_command(cmd_args) | |
460 | if ret != 0: | |
461 | self.mgr.log.debug(f"ran {cmd_args} with mon_command") | |
462 | self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") | |
463 | return False | |
464 | self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") | |
465 | return True | |
f6b5b4d7 | 466 | |
f6b5b4d7 TL |
467 | |
468 | class NotFoundError(Exception): | |
469 | pass | |
470 | ||
471 | ||
472 | class OSD: | |
473 | ||
474 | def __init__(self, | |
475 | osd_id: int, | |
476 | remove_util: RemoveUtil, | |
477 | drain_started_at: Optional[datetime] = None, | |
478 | process_started_at: Optional[datetime] = None, | |
479 | drain_stopped_at: Optional[datetime] = None, | |
480 | drain_done_at: Optional[datetime] = None, | |
481 | draining: bool = False, | |
482 | started: bool = False, | |
483 | stopped: bool = False, | |
484 | replace: bool = False, | |
485 | force: bool = False, | |
486 | hostname: Optional[str] = None, | |
f6b5b4d7 TL |
487 | ): |
488 | # the ID of the OSD | |
489 | self.osd_id = osd_id | |
490 | ||
491 | # when did process (not the actual draining) start | |
492 | self.process_started_at = process_started_at | |
493 | ||
494 | # when did the drain start | |
495 | self.drain_started_at = drain_started_at | |
496 | ||
497 | # when did the drain stop | |
498 | self.drain_stopped_at = drain_stopped_at | |
499 | ||
500 | # when did the drain finish | |
501 | self.drain_done_at = drain_done_at | |
502 | ||
503 | # did the draining start | |
504 | self.draining = draining | |
505 | ||
506 | # was the operation started | |
507 | self.started = started | |
508 | ||
509 | # was the operation stopped | |
510 | self.stopped = stopped | |
511 | ||
512 | # If this is a replace or remove operation | |
513 | self.replace = replace | |
514 | # If we wait for the osd to be drained | |
515 | self.force = force | |
516 | # The name of the node | |
f91f0fd5 | 517 | self.hostname = hostname |
f6b5b4d7 TL |
518 | |
519 | # mgr obj to make mgr/mon calls | |
adb31ebb | 520 | self.rm_util: RemoveUtil = remove_util |
f6b5b4d7 | 521 | |
f67539c2 TL |
522 | self.original_weight: Optional[float] = None |
523 | ||
f6b5b4d7 TL |
524 | def start(self) -> None: |
525 | if self.started: | |
526 | logger.debug(f"Already started draining {self}") | |
527 | return None | |
528 | self.started = True | |
529 | self.stopped = False | |
530 | ||
531 | def start_draining(self) -> bool: | |
532 | if self.stopped: | |
533 | logger.debug(f"Won't start draining {self}. OSD draining is stopped.") | |
534 | return False | |
f67539c2 TL |
535 | if self.replace: |
536 | self.rm_util.set_osd_flag([self], 'out') | |
537 | else: | |
538 | self.original_weight = self.rm_util.get_weight(self) | |
539 | self.rm_util.reweight_osd(self, 0.0) | |
f6b5b4d7 TL |
540 | self.drain_started_at = datetime.utcnow() |
541 | self.draining = True | |
542 | logger.debug(f"Started draining {self}.") | |
543 | return True | |
544 | ||
545 | def stop_draining(self) -> bool: | |
f67539c2 TL |
546 | if self.replace: |
547 | self.rm_util.set_osd_flag([self], 'in') | |
548 | else: | |
549 | if self.original_weight: | |
550 | self.rm_util.reweight_osd(self, self.original_weight) | |
f6b5b4d7 TL |
551 | self.drain_stopped_at = datetime.utcnow() |
552 | self.draining = False | |
553 | logger.debug(f"Stopped draining {self}.") | |
554 | return True | |
555 | ||
556 | def stop(self) -> None: | |
557 | if self.stopped: | |
558 | logger.debug(f"Already stopped draining {self}") | |
559 | return None | |
560 | self.started = False | |
561 | self.stopped = True | |
562 | self.stop_draining() | |
563 | ||
564 | @property | |
565 | def is_draining(self) -> bool: | |
566 | """ | |
567 | Consider an OSD draining when it is | |
568 | actively draining but not yet empty | |
569 | """ | |
570 | return self.draining and not self.is_empty | |
571 | ||
572 | @property | |
573 | def is_ok_to_stop(self) -> bool: | |
574 | return self.rm_util.ok_to_stop([self]) | |
575 | ||
576 | @property | |
577 | def is_empty(self) -> bool: | |
578 | if self.get_pg_count() == 0: | |
579 | if not self.drain_done_at: | |
580 | self.drain_done_at = datetime.utcnow() | |
581 | self.draining = False | |
582 | return True | |
583 | return False | |
584 | ||
585 | def safe_to_destroy(self) -> bool: | |
586 | return self.rm_util.safe_to_destroy([self.osd_id]) | |
587 | ||
588 | def down(self) -> bool: | |
589 | return self.rm_util.set_osd_flag([self], 'down') | |
590 | ||
591 | def destroy(self) -> bool: | |
592 | return self.rm_util.destroy_osd(self.osd_id) | |
593 | ||
594 | def purge(self) -> bool: | |
595 | return self.rm_util.purge_osd(self.osd_id) | |
596 | ||
597 | def get_pg_count(self) -> int: | |
598 | return self.rm_util.get_pg_count(self.osd_id) | |
599 | ||
600 | @property | |
601 | def exists(self) -> bool: | |
602 | return str(self.osd_id) in self.rm_util.get_osds_in_cluster() | |
603 | ||
f91f0fd5 | 604 | def drain_status_human(self) -> str: |
f6b5b4d7 TL |
605 | default_status = 'not started' |
606 | status = 'started' if self.started and not self.draining else default_status | |
607 | status = 'draining' if self.draining else status | |
608 | status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status | |
609 | return status | |
610 | ||
f91f0fd5 | 611 | def pg_count_str(self) -> str: |
f6b5b4d7 TL |
612 | return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count()) |
613 | ||
614 | def to_json(self) -> dict: | |
f91f0fd5 | 615 | out: Dict[str, Any] = dict() |
f6b5b4d7 TL |
616 | out['osd_id'] = self.osd_id |
617 | out['started'] = self.started | |
618 | out['draining'] = self.draining | |
619 | out['stopped'] = self.stopped | |
620 | out['replace'] = self.replace | |
621 | out['force'] = self.force | |
f91f0fd5 | 622 | out['hostname'] = self.hostname # type: ignore |
f6b5b4d7 TL |
623 | |
624 | for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: | |
625 | if getattr(self, k): | |
f91f0fd5 | 626 | out[k] = datetime_to_str(getattr(self, k)) |
f6b5b4d7 TL |
627 | else: |
628 | out[k] = getattr(self, k) | |
629 | return out | |
630 | ||
631 | @classmethod | |
adb31ebb | 632 | def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]: |
f6b5b4d7 TL |
633 | if not inp: |
634 | return None | |
635 | for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: | |
636 | if inp.get(date_field): | |
f91f0fd5 | 637 | inp.update({date_field: str_to_datetime(inp.get(date_field, ''))}) |
adb31ebb | 638 | inp.update({'remove_util': rm_util}) |
f91f0fd5 TL |
639 | if 'nodename' in inp: |
640 | hostname = inp.pop('nodename') | |
641 | inp['hostname'] = hostname | |
f6b5b4d7 TL |
642 | return cls(**inp) |
643 | ||
f91f0fd5 | 644 | def __hash__(self) -> int: |
f6b5b4d7 TL |
645 | return hash(self.osd_id) |
646 | ||
647 | def __eq__(self, other: object) -> bool: | |
648 | if not isinstance(other, OSD): | |
649 | return NotImplemented | |
650 | return self.osd_id == other.osd_id | |
651 | ||
652 | def __repr__(self) -> str: | |
f67539c2 | 653 | return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}" |
f6b5b4d7 TL |
654 | |
655 | ||
adb31ebb TL |
656 | class OSDRemovalQueue(object): |
657 | ||
658 | def __init__(self, mgr: "CephadmOrchestrator") -> None: | |
659 | self.mgr: "CephadmOrchestrator" = mgr | |
660 | self.osds: Set[OSD] = set() | |
661 | self.rm_util = RemoveUtil(mgr) | |
662 | ||
663 | # locks multithreaded access to self.osds. Please avoid locking | |
664 | # network calls, like mon commands. | |
665 | self.lock = Lock() | |
666 | ||
667 | def process_removal_queue(self) -> None: | |
668 | """ | |
669 | Performs actions in the _serve() loop to remove an OSD | |
670 | when criteria is met. | |
671 | ||
672 | we can't hold self.lock, as we're calling _remove_daemon in the loop | |
673 | """ | |
674 | ||
675 | # make sure that we don't run on OSDs that are not in the cluster anymore. | |
676 | self.cleanup() | |
677 | ||
678 | # find osds that are ok-to-stop and not yet draining | |
679 | ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds()) | |
680 | if ok_to_stop_osds: | |
681 | # start draining those | |
682 | _ = [osd.start_draining() for osd in ok_to_stop_osds] | |
683 | ||
684 | all_osds = self.all_osds() | |
685 | ||
686 | logger.debug( | |
687 | f"{self.queue_size()} OSDs are scheduled " | |
688 | f"for removal: {all_osds}") | |
689 | ||
690 | # Check all osds for their state and take action (remove, purge etc) | |
691 | new_queue: Set[OSD] = set() | |
692 | for osd in all_osds: # type: OSD | |
693 | if not osd.force: | |
694 | # skip criteria | |
695 | if not osd.is_empty: | |
f67539c2 | 696 | logger.debug(f"{osd} is not empty yet. Waiting a bit more") |
adb31ebb TL |
697 | new_queue.add(osd) |
698 | continue | |
699 | ||
700 | if not osd.safe_to_destroy(): | |
f67539c2 TL |
701 | logger.debug( |
702 | f"{osd} is not safe-to-destroy yet. Waiting a bit more") | |
adb31ebb TL |
703 | new_queue.add(osd) |
704 | continue | |
705 | ||
706 | # abort criteria | |
707 | if not osd.down(): | |
708 | # also remove it from the remove_osd list and set a health_check warning? | |
709 | raise orchestrator.OrchestratorError( | |
f67539c2 TL |
710 | f"Could not mark {osd} down") |
711 | ||
712 | # stop and remove daemon | |
713 | assert osd.hostname is not None | |
714 | CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname) | |
715 | logger.info(f"Successfully removed {osd} on {osd.hostname}") | |
f6b5b4d7 | 716 | |
adb31ebb | 717 | if osd.replace: |
f67539c2 | 718 | # mark destroyed in osdmap |
adb31ebb TL |
719 | if not osd.destroy(): |
720 | raise orchestrator.OrchestratorError( | |
f67539c2 TL |
721 | f"Could not destroy {osd}") |
722 | logger.info( | |
723 | f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement") | |
adb31ebb | 724 | else: |
f67539c2 | 725 | # purge from osdmap |
adb31ebb | 726 | if not osd.purge(): |
f67539c2 TL |
727 | raise orchestrator.OrchestratorError(f"Could not purge {osd}") |
728 | logger.info(f"Successfully purged {osd} on {osd.hostname}") | |
adb31ebb | 729 | |
f67539c2 | 730 | logger.debug(f"Removing {osd} from the queue.") |
adb31ebb TL |
731 | |
732 | # self could change while this is processing (osds get added from the CLI) | |
733 | # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and | |
734 | # osds that were added while this method was executed' | |
735 | with self.lock: | |
736 | self.osds.intersection_update(new_queue) | |
737 | self._save_to_store() | |
738 | ||
739 | def cleanup(self) -> None: | |
740 | # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs | |
741 | with self.lock: | |
742 | for osd in self._not_in_cluster(): | |
743 | self.osds.remove(osd) | |
744 | ||
745 | def _save_to_store(self) -> None: | |
746 | osd_queue = [osd.to_json() for osd in self.osds] | |
747 | logger.debug(f"Saving {osd_queue} to store") | |
748 | self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) | |
749 | ||
750 | def load_from_store(self) -> None: | |
751 | with self.lock: | |
752 | for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): | |
753 | for osd in json.loads(v): | |
754 | logger.debug(f"Loading osd ->{osd} from store") | |
755 | osd_obj = OSD.from_json(osd, rm_util=self.rm_util) | |
756 | if osd_obj is not None: | |
757 | self.osds.add(osd_obj) | |
f6b5b4d7 TL |
758 | |
759 | def as_osd_ids(self) -> List[int]: | |
adb31ebb TL |
760 | with self.lock: |
761 | return [osd.osd_id for osd in self.osds] | |
f6b5b4d7 TL |
762 | |
763 | def queue_size(self) -> int: | |
adb31ebb TL |
764 | with self.lock: |
765 | return len(self.osds) | |
f6b5b4d7 TL |
766 | |
767 | def draining_osds(self) -> List["OSD"]: | |
adb31ebb TL |
768 | with self.lock: |
769 | return [osd for osd in self.osds if osd.is_draining] | |
f6b5b4d7 TL |
770 | |
771 | def idling_osds(self) -> List["OSD"]: | |
adb31ebb TL |
772 | with self.lock: |
773 | return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty] | |
f6b5b4d7 TL |
774 | |
775 | def empty_osds(self) -> List["OSD"]: | |
adb31ebb TL |
776 | with self.lock: |
777 | return [osd for osd in self.osds if osd.is_empty] | |
f6b5b4d7 TL |
778 | |
779 | def all_osds(self) -> List["OSD"]: | |
adb31ebb TL |
780 | with self.lock: |
781 | return [osd for osd in self.osds] | |
f6b5b4d7 | 782 | |
adb31ebb TL |
783 | def _not_in_cluster(self) -> List["OSD"]: |
784 | return [osd for osd in self.osds if not osd.exists] | |
f6b5b4d7 TL |
785 | |
786 | def enqueue(self, osd: "OSD") -> None: | |
787 | if not osd.exists: | |
788 | raise NotFoundError() | |
adb31ebb TL |
789 | with self.lock: |
790 | self.osds.add(osd) | |
f6b5b4d7 TL |
791 | osd.start() |
792 | ||
793 | def rm(self, osd: "OSD") -> None: | |
794 | if not osd.exists: | |
795 | raise NotFoundError() | |
796 | osd.stop() | |
adb31ebb TL |
797 | with self.lock: |
798 | try: | |
799 | logger.debug(f'Removing {osd} from the queue.') | |
800 | self.osds.remove(osd) | |
801 | except KeyError: | |
802 | logger.debug(f"Could not find {osd} in queue.") | |
803 | raise KeyError | |
804 | ||
805 | def __eq__(self, other: Any) -> bool: | |
806 | if not isinstance(other, OSDRemovalQueue): | |
807 | return False | |
808 | with self.lock: | |
809 | return self.osds == other.osds |