]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/services/osd.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
CommitLineData
e306af50
TL
1import json
2import logging
20effc67 3from asyncio import gather
adb31ebb 4from threading import Lock
f67539c2 5from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
e306af50
TL
6
7from ceph.deployment import translate
8from ceph.deployment.drive_group import DriveGroupSpec
9from ceph.deployment.drive_selection import DriveSelection
f91f0fd5 10from ceph.deployment.inventory import Device
adb31ebb 11from ceph.utils import datetime_to_str, str_to_datetime
e306af50 12
f6b5b4d7 13from datetime import datetime
e306af50 14import orchestrator
f67539c2 15from cephadm.serve import CephadmServe
aee94f69 16from cephadm.utils import SpecialHostLabels
f67539c2 17from ceph.utils import datetime_now
a4b75251 18from orchestrator import OrchestratorError, DaemonDescription
e306af50
TL
19from mgr_module import MonCommandFailed
20
f67539c2 21from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
f91f0fd5
TL
22
23if TYPE_CHECKING:
24 from cephadm.module import CephadmOrchestrator
e306af50
TL
25
26logger = logging.getLogger(__name__)
27
28
f91f0fd5 29class OSDService(CephService):
f6b5b4d7
TL
30 TYPE = 'osd'
31
32 def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
e306af50 33 logger.debug(f"Processing DriveGroup {drive_group}")
b3b6e05e
TL
34 osd_id_claims = OsdIdClaims(self.mgr)
35 if osd_id_claims.get():
adb31ebb 36 logger.info(
b3b6e05e 37 f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
f6b5b4d7 38
20effc67 39 async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
f67539c2
TL
40 # skip this host if there has been no change in inventory
41 if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
42 self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
43 host, drive_group))
44 return None
b3b6e05e 45 # skip this host if we cannot schedule here
aee94f69 46 if self.mgr.inventory.has_label(host, SpecialHostLabels.DRAIN_DAEMONS):
b3b6e05e
TL
47 return None
48
49 osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
f67539c2 50
33c7a0ef
TL
51 cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
52 osd_id_claims_for_host)
53 if not cmds:
f91f0fd5
TL
54 logger.debug("No data_devices, skipping DriveGroup: {}".format(
55 drive_group.service_id))
f6b5b4d7 56 return None
f67539c2 57
b3b6e05e 58 logger.debug('Applying service osd.%s on host %s...' % (
f67539c2
TL
59 drive_group.service_id, host
60 ))
61 start_ts = datetime_now()
f6b5b4d7 62 env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
20effc67 63 ret_msg = await self.create_single_host(
33c7a0ef 64 drive_group, host, cmds,
b3b6e05e 65 replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
f67539c2
TL
66 )
67 self.mgr.cache.update_osdspec_last_applied(
68 host, drive_group.service_name(), start_ts
e306af50 69 )
f67539c2 70 self.mgr.cache.save_host(host)
f6b5b4d7
TL
71 return ret_msg
72
20effc67
TL
73 async def all_hosts() -> List[Optional[str]]:
74 futures = [create_from_spec_one(h, ds)
75 for h, ds in self.prepare_drivegroup(drive_group)]
76 return await gather(*futures)
77
1e59de90
TL
78 with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'):
79 ret = self.mgr.wait_async(all_hosts())
f6b5b4d7
TL
80 return ", ".join(filter(None, ret))
81
20effc67
TL
82 async def create_single_host(self,
83 drive_group: DriveGroupSpec,
33c7a0ef 84 host: str, cmds: List[str], replace_osd_ids: List[str],
20effc67 85 env_vars: Optional[List[str]] = None) -> str:
33c7a0ef
TL
86 for cmd in cmds:
87 out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
88 if code == 1 and ', it is already prepared' in '\n'.join(err):
89 # HACK: when we create against an existing LV, ceph-volume
90 # returns an error and the above message. To make this
91 # command idempotent, tolerate this "error" and continue.
92 logger.debug('the device was already prepared; continuing')
93 code = 0
94 if code:
95 raise RuntimeError(
96 'cephadm exited with an error code: %d, stderr:%s' % (
97 code, '\n'.join(err)))
20effc67
TL
98 return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
99 replace_osd_ids)
e306af50 100
20effc67
TL
101 async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
102 replace_osd_ids: Optional[List[str]] = None) -> str:
f67539c2
TL
103
104 if replace_osd_ids is None:
b3b6e05e 105 replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
f67539c2 106 assert replace_osd_ids is not None
20effc67
TL
107
108 # check result: lvm
109 osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
e306af50
TL
110 host, 'osd', 'ceph-volume',
111 [
112 '--',
113 'lvm', 'list',
114 '--format', 'json',
115 ])
116 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
e306af50
TL
117 fsid = self.mgr._cluster_fsid
118 osd_uuid_map = self.mgr.get_osd_uuid_map()
119 created = []
120 for osd_id, osds in osds_elems.items():
121 for osd in osds:
b3b6e05e
TL
122 if osd['type'] == 'db':
123 continue
e306af50
TL
124 if osd['tags']['ceph.cluster_fsid'] != fsid:
125 logger.debug('mismatched fsid, skipping %s' % osd)
126 continue
127 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
128 # if it exists but is part of the replacement operation, don't skip
129 continue
20effc67
TL
130 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
131 # cephadm daemon instance already exists
132 logger.debug(f'osd id {osd_id} daemon already exists')
133 continue
e306af50
TL
134 if osd_id not in osd_uuid_map:
135 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
136 continue
137 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
138 logger.debug('mismatched osd uuid (cluster has %s, osd '
f91f0fd5
TL
139 'has %s)' % (
140 osd_uuid_map.get(osd_id),
141 osd['tags']['ceph.osd_fsid']))
e306af50
TL
142 continue
143
144 created.append(osd_id)
f67539c2
TL
145 daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
146 service_name=service_name,
20effc67 147 daemon_id=str(osd_id),
f6b5b4d7
TL
148 host=host,
149 daemon_type='osd',
150 )
f67539c2 151 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
20effc67 152 await CephadmServe(self.mgr)._create_daemon(
f6b5b4d7 153 daemon_spec,
e306af50
TL
154 osd_uuid_map=osd_uuid_map)
155
20effc67
TL
156 # check result: raw
157 raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
158 host, 'osd', 'ceph-volume',
159 [
160 '--',
161 'raw', 'list',
162 '--format', 'json',
163 ])
164 for osd_uuid, osd in raw_elems.items():
165 if osd.get('ceph_fsid') != fsid:
166 continue
167 osd_id = str(osd.get('osd_id', '-1'))
168 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
169 # if it exists but is part of the replacement operation, don't skip
170 continue
171 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
172 # cephadm daemon instance already exists
173 logger.debug(f'osd id {osd_id} daemon already exists')
174 continue
175 if osd_id not in osd_uuid_map:
176 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
177 continue
178 if osd_uuid_map.get(osd_id) != osd_uuid:
179 logger.debug('mismatched osd uuid (cluster has %s, osd '
180 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid))
181 continue
182 if osd_id in created:
183 continue
184
185 created.append(osd_id)
186 daemon_spec = CephadmDaemonDeploySpec(
187 service_name=service_name,
188 daemon_id=osd_id,
189 host=host,
190 daemon_type='osd',
191 )
192 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
193 await CephadmServe(self.mgr)._create_daemon(
194 daemon_spec,
195 osd_uuid_map=osd_uuid_map)
196
e306af50
TL
197 if created:
198 self.mgr.cache.invalidate_host_devices(host)
b3b6e05e 199 self.mgr.cache.invalidate_autotune(host)
e306af50
TL
200 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
201 else:
202 return "Created no osd(s) on host %s; already created?" % host
203
204 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
205 # 1) use fn_filter to determine matching_hosts
f91f0fd5 206 matching_hosts = drive_group.placement.filter_matching_hostspecs(
20effc67 207 self.mgr.cache.get_schedulable_hosts())
e306af50
TL
208 # 2) Map the inventory to the InventoryHost object
209 host_ds_map = []
210
211 # set osd_id_claims
212
f91f0fd5 213 def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
e306af50
TL
214 # This is stupid and needs to be loaded with the host
215 for _host, _inventory in inventory_dict.items():
216 if _host == hostname:
217 return _inventory
218 raise OrchestratorError("No inventory found for host: {}".format(hostname))
219
220 # 3) iterate over matching_host and call DriveSelection
221 logger.debug(f"Checking matching hosts -> {matching_hosts}")
222 for host in matching_hosts:
223 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
224 logger.debug(f"Found inventory for host {inventory_for_host}")
f91f0fd5
TL
225
226 # List of Daemons on that host
227 dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
228 dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]
229
230 drive_selection = DriveSelection(drive_group, inventory_for_host,
231 existing_daemons=len(dd_for_spec_and_host))
e306af50 232 logger.debug(f"Found drive selection {drive_selection}")
33c7a0ef
TL
233 if drive_group.method and drive_group.method == 'raw':
234 # ceph-volume can currently only handle a 1:1 mapping
235 # of data/db/wal devices for raw mode osds. If db/wal devices
236 # are defined and the number does not match the number of data
237 # devices, we need to bail out
238 if drive_selection.data_devices() and drive_selection.db_devices():
239 if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
240 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
241 f'{len(drive_selection.data_devices())} potential data device(s) and '
242 f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
243 if drive_selection.data_devices() and drive_selection.wal_devices():
244 if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
245 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
246 f'{len(drive_selection.data_devices())} potential data device(s) and '
247 f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
e306af50
TL
248 host_ds_map.append((host, drive_selection))
249 return host_ds_map
250
f91f0fd5
TL
251 @staticmethod
252 def driveselection_to_ceph_volume(drive_selection: DriveSelection,
e306af50 253 osd_id_claims: Optional[List[str]] = None,
33c7a0ef 254 preview: bool = False) -> List[str]:
f6b5b4d7 255 logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
33c7a0ef
TL
256 cmds: List[str] = translate.to_ceph_volume(drive_selection,
257 osd_id_claims, preview=preview).run()
258 logger.debug(f"Resulting ceph-volume cmds: {cmds}")
259 return cmds
e306af50 260
f91f0fd5 261 def get_previews(self, host: str) -> List[Dict[str, Any]]:
e306af50 262 # Find OSDSpecs that match host.
f6b5b4d7 263 osdspecs = self.resolve_osdspecs_for_host(host)
e306af50
TL
264 return self.generate_previews(osdspecs, host)
265
266 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
267 """
268
269 The return should look like this:
270
271 [
272 {'data': {<metadata>},
273 'osdspec': <name of osdspec>,
a4b75251
TL
274 'host': <name of host>,
275 'notes': <notes>
e306af50
TL
276 },
277
278 {'data': ...,
279 'osdspec': ..,
a4b75251
TL
280 'host': ...,
281 'notes': ...
e306af50
TL
282 }
283 ]
284
285 Note: One host can have multiple previews based on its assigned OSDSpecs.
286 """
287 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
288 ret_all: List[Dict[str, Any]] = []
289 if not osdspecs:
290 return ret_all
291 for osdspec in osdspecs:
292
293 # populate osd_id_claims
b3b6e05e 294 osd_id_claims = OsdIdClaims(self.mgr)
e306af50
TL
295
296 # prepare driveselection
297 for host, ds in self.prepare_drivegroup(osdspec):
298 if host != for_host:
299 continue
300
301 # driveselection for host
33c7a0ef
TL
302 cmds: List[str] = self.driveselection_to_ceph_volume(ds,
303 osd_id_claims.filtered_by_host(
304 host),
305 preview=True)
306 if not cmds:
e306af50
TL
307 logger.debug("No data_devices, skipping DriveGroup: {}".format(
308 osdspec.service_name()))
309 continue
310
311 # get preview data from ceph-volume
33c7a0ef 312 for cmd in cmds:
1e59de90
TL
313 with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'):
314 out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
33c7a0ef
TL
315 if out:
316 try:
317 concat_out: Dict[str, Any] = json.loads(' '.join(out))
318 except ValueError:
319 logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
320 concat_out = {}
321 notes = []
322 if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
323 found = len(concat_out)
324 limit = osdspec.data_devices.limit
325 notes.append(
326 f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
327 ret_all.append({'data': concat_out,
328 'osdspec': osdspec.service_id,
329 'host': host,
330 'notes': notes})
e306af50
TL
331 return ret_all
332
f6b5b4d7
TL
333 def resolve_hosts_for_osdspecs(self,
334 specs: Optional[List[DriveGroupSpec]] = None
335 ) -> List[str]:
336 osdspecs = []
337 if specs:
338 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
339 if not osdspecs:
340 self.mgr.log.debug("No OSDSpecs found")
341 return []
20effc67 342 return sum([spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()) for spec in osdspecs], [])
f6b5b4d7 343
f91f0fd5
TL
344 def resolve_osdspecs_for_host(self, host: str,
345 specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
f6b5b4d7
TL
346 matching_specs = []
347 self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
348 if not specs:
349 specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
350 if spec.service_type == 'osd']
351 for spec in specs:
20effc67 352 if host in spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()):
f6b5b4d7
TL
353 self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
354 matching_specs.append(spec)
355 return matching_specs
356
20effc67
TL
357 async def _run_ceph_volume_command(self, host: str,
358 cmd: str, env_vars: Optional[List[str]] = None
359 ) -> Tuple[List[str], List[str], int]:
e306af50
TL
360 self.mgr.inventory.assert_host(host)
361
362 # get bootstrap key
363 ret, keyring, err = self.mgr.check_mon_command({
364 'prefix': 'auth get',
365 'entity': 'client.bootstrap-osd',
366 })
367
e306af50 368 j = json.dumps({
f91f0fd5 369 'config': self.mgr.get_minimal_ceph_conf(),
e306af50
TL
370 'keyring': keyring,
371 })
372
373 split_cmd = cmd.split(' ')
374 _cmd = ['--config-json', '-', '--']
375 _cmd.extend(split_cmd)
20effc67 376 out, err, code = await CephadmServe(self.mgr)._run_cephadm(
e306af50
TL
377 host, 'osd', 'ceph-volume',
378 _cmd,
379 env_vars=env_vars,
380 stdin=j,
381 error_ok=True)
382 return out, err, code
383
a4b75251
TL
384 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
385 # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
386 # we cannot recover from it. The OSD keys are created by ceph-volume and not by
387 # us.
388 if not is_failed_deploy:
389 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
390
b3b6e05e
TL
391
392class OsdIdClaims(object):
393 """
394 Retrieve and provide osd ids that can be reused in the cluster
395 """
396
397 def __init__(self, mgr: "CephadmOrchestrator") -> None:
398 self.mgr: "CephadmOrchestrator" = mgr
399 self.osd_host_map: Dict[str, List[str]] = dict()
400 self.refresh()
401
402 def refresh(self) -> None:
e306af50
TL
403 try:
404 ret, out, err = self.mgr.check_mon_command({
405 'prefix': 'osd tree',
406 'states': ['destroyed'],
407 'format': 'json'
408 })
409 except MonCommandFailed as e:
410 logger.exception('osd tree failed')
411 raise OrchestratorError(str(e))
412 try:
413 tree = json.loads(out)
adb31ebb
TL
414 except ValueError:
415 logger.exception(f'Cannot decode JSON: \'{out}\'')
b3b6e05e 416 return
e306af50
TL
417
418 nodes = tree.get('nodes', {})
419 for node in nodes:
420 if node.get('type') == 'host':
b3b6e05e 421 self.osd_host_map.update(
e306af50
TL
422 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
423 )
b3b6e05e
TL
424 if self.osd_host_map:
425 self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")
426
427 def get(self) -> Dict[str, List[str]]:
428 return self.osd_host_map
429
430 def filtered_by_host(self, host: str) -> List[str]:
431 """
432 Return the list of osd ids that can be reused in a host
433
434 OSD id claims in CRUSH map are linked to the bare name of
435 the hostname. In case of FQDN hostnames the host is searched by the
436 bare name
437 """
438 return self.osd_host_map.get(host.split(".")[0], [])
e306af50
TL
439
440
e306af50 441class RemoveUtil(object):
f91f0fd5
TL
442 def __init__(self, mgr: "CephadmOrchestrator") -> None:
443 self.mgr: "CephadmOrchestrator" = mgr
e306af50 444
f6b5b4d7
TL
445 def get_osds_in_cluster(self) -> List[str]:
446 osd_map = self.mgr.get_osdmap()
447 return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
448
449 def osd_df(self) -> dict:
450 base_cmd = 'osd df'
451 ret, out, err = self.mgr.mon_command({
452 'prefix': base_cmd,
453 'format': 'json'
454 })
adb31ebb 455 try:
f67539c2 456 return json.loads(out)
adb31ebb
TL
457 except ValueError:
458 logger.exception(f'Cannot decode JSON: \'{out}\'')
459 return {}
f6b5b4d7
TL
460
461 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
462 if not osd_df:
463 osd_df = self.osd_df()
464 osd_nodes = osd_df.get('nodes', [])
465 for osd_node in osd_nodes:
466 if osd_node.get('id') == int(osd_id):
467 return osd_node.get('pgs', -1)
468 return -1
e306af50 469
f6b5b4d7 470 def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
e306af50 471 """
f6b5b4d7
TL
472 Cut osd_id list in half until it's ok-to-stop
473
474 :param osds: list of osd_ids
475 :return: list of ods_ids that can be stopped at once
e306af50 476 """
f6b5b4d7
TL
477 if not osds:
478 return []
479 while not self.ok_to_stop(osds):
480 if len(osds) <= 1:
481 # can't even stop one OSD, aborting
a4b75251 482 self.mgr.log.debug(
f91f0fd5 483 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
f6b5b4d7
TL
484 return []
485
486 # This potentially prolongs the global wait time.
487 self.mgr.event.wait(1)
488 # splitting osd_ids in half until ok_to_stop yields success
489 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
490 # There's a lot of room for micro adjustments here
491 osds = osds[len(osds) // 2:]
492 return osds
493
f67539c2
TL
494 # todo start draining
495 # return all([osd.start_draining() for osd in osds])
f6b5b4d7
TL
496
497 def ok_to_stop(self, osds: List["OSD"]) -> bool:
e306af50 498 cmd_args = {
f6b5b4d7
TL
499 'prefix': "osd ok-to-stop",
500 'ids': [str(osd.osd_id) for osd in osds]
e306af50 501 }
a4b75251 502 return self._run_mon_cmd(cmd_args, error_ok=True)
e306af50 503
f6b5b4d7
TL
504 def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
505 base_cmd = f"osd {flag}"
506 self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
e306af50 507 ret, out, err = self.mgr.mon_command({
f6b5b4d7
TL
508 'prefix': base_cmd,
509 'ids': [str(osd.osd_id) for osd in osds]
e306af50
TL
510 })
511 if ret != 0:
f67539c2
TL
512 self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
513 return False
514 self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
515 return True
516
517 def get_weight(self, osd: "OSD") -> Optional[float]:
518 ret, out, err = self.mgr.mon_command({
519 'prefix': 'osd crush tree',
520 'format': 'json',
521 })
522 if ret != 0:
523 self.mgr.log.error(f"Could not dump crush weights. <{err}>")
524 return None
525 j = json.loads(out)
526 for n in j.get("nodes", []):
527 if n.get("name") == f"osd.{osd.osd_id}":
528 self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
529 return n.get("crush_weight")
530 return None
531
532 def reweight_osd(self, osd: "OSD", weight: float) -> bool:
533 self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
534 ret, out, err = self.mgr.mon_command({
535 'prefix': "osd crush reweight",
536 'name': f"osd.{osd.osd_id}",
537 'weight': weight,
538 })
539 if ret != 0:
540 self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
f6b5b4d7 541 return False
f67539c2 542 self.mgr.log.info(f"{osd} weight is now {weight}")
f6b5b4d7 543 return True
e306af50 544
a4b75251
TL
545 def zap_osd(self, osd: "OSD") -> str:
546 "Zaps all devices that are associated with an OSD"
547 if osd.hostname is not None:
1e59de90
TL
548 cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)]
549 if not osd.no_destroy:
550 cmd.append('--destroy')
551 with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'):
552 out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
553 osd.hostname, 'osd', 'ceph-volume',
554 cmd,
555 error_ok=True))
a4b75251
TL
556 self.mgr.cache.invalidate_host_devices(osd.hostname)
557 if code:
558 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
559 return '\n'.join(out + err)
560 raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")
561
f6b5b4d7 562 def safe_to_destroy(self, osd_ids: List[int]) -> bool:
e306af50
TL
563 """ Queries the safe-to-destroy flag for OSDs """
564 cmd_args = {'prefix': 'osd safe-to-destroy',
f6b5b4d7 565 'ids': [str(x) for x in osd_ids]}
a4b75251 566 return self._run_mon_cmd(cmd_args, error_ok=True)
e306af50
TL
567
568 def destroy_osd(self, osd_id: int) -> bool:
569 """ Destroys an OSD (forcefully) """
570 cmd_args = {'prefix': 'osd destroy-actual',
571 'id': int(osd_id),
572 'yes_i_really_mean_it': True}
573 return self._run_mon_cmd(cmd_args)
574
e306af50
TL
575 def purge_osd(self, osd_id: int) -> bool:
576 """ Purges an OSD from the cluster (forcefully) """
577 cmd_args = {
578 'prefix': 'osd purge-actual',
579 'id': int(osd_id),
580 'yes_i_really_mean_it': True
581 }
582 return self._run_mon_cmd(cmd_args)
583
a4b75251 584 def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool:
e306af50
TL
585 """
586 Generic command to run mon_command and evaluate/log the results
587 """
588 ret, out, err = self.mgr.mon_command(cmd_args)
589 if ret != 0:
590 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
a4b75251 591 if not error_ok:
20effc67
TL
592 self.mgr.log.error(
593 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
e306af50
TL
594 return False
595 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
596 return True
f6b5b4d7 597
f6b5b4d7
TL
598
599class NotFoundError(Exception):
600 pass
601
602
603class OSD:
604
605 def __init__(self,
606 osd_id: int,
607 remove_util: RemoveUtil,
608 drain_started_at: Optional[datetime] = None,
609 process_started_at: Optional[datetime] = None,
610 drain_stopped_at: Optional[datetime] = None,
611 drain_done_at: Optional[datetime] = None,
612 draining: bool = False,
613 started: bool = False,
614 stopped: bool = False,
615 replace: bool = False,
616 force: bool = False,
617 hostname: Optional[str] = None,
1e59de90
TL
618 zap: bool = False,
619 no_destroy: bool = False):
f6b5b4d7
TL
620 # the ID of the OSD
621 self.osd_id = osd_id
622
623 # when did process (not the actual draining) start
624 self.process_started_at = process_started_at
625
626 # when did the drain start
627 self.drain_started_at = drain_started_at
628
629 # when did the drain stop
630 self.drain_stopped_at = drain_stopped_at
631
632 # when did the drain finish
633 self.drain_done_at = drain_done_at
634
635 # did the draining start
636 self.draining = draining
637
638 # was the operation started
639 self.started = started
640
641 # was the operation stopped
642 self.stopped = stopped
643
644 # If this is a replace or remove operation
645 self.replace = replace
646 # If we wait for the osd to be drained
647 self.force = force
648 # The name of the node
f91f0fd5 649 self.hostname = hostname
f6b5b4d7
TL
650
651 # mgr obj to make mgr/mon calls
adb31ebb 652 self.rm_util: RemoveUtil = remove_util
f6b5b4d7 653
f67539c2
TL
654 self.original_weight: Optional[float] = None
655
a4b75251
TL
656 # Whether devices associated with the OSD should be zapped (DATA ERASED)
657 self.zap = zap
1e59de90
TL
658 # Whether all associated LV devices should be destroyed.
659 self.no_destroy = no_destroy
a4b75251 660
f6b5b4d7
TL
661 def start(self) -> None:
662 if self.started:
663 logger.debug(f"Already started draining {self}")
664 return None
665 self.started = True
666 self.stopped = False
667
668 def start_draining(self) -> bool:
669 if self.stopped:
670 logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
671 return False
f67539c2
TL
672 if self.replace:
673 self.rm_util.set_osd_flag([self], 'out')
674 else:
675 self.original_weight = self.rm_util.get_weight(self)
676 self.rm_util.reweight_osd(self, 0.0)
f6b5b4d7
TL
677 self.drain_started_at = datetime.utcnow()
678 self.draining = True
679 logger.debug(f"Started draining {self}.")
680 return True
681
682 def stop_draining(self) -> bool:
f67539c2
TL
683 if self.replace:
684 self.rm_util.set_osd_flag([self], 'in')
685 else:
686 if self.original_weight:
687 self.rm_util.reweight_osd(self, self.original_weight)
f6b5b4d7
TL
688 self.drain_stopped_at = datetime.utcnow()
689 self.draining = False
690 logger.debug(f"Stopped draining {self}.")
691 return True
692
693 def stop(self) -> None:
694 if self.stopped:
695 logger.debug(f"Already stopped draining {self}")
696 return None
697 self.started = False
698 self.stopped = True
699 self.stop_draining()
700
701 @property
702 def is_draining(self) -> bool:
703 """
704 Consider an OSD draining when it is
705 actively draining but not yet empty
706 """
707 return self.draining and not self.is_empty
708
709 @property
710 def is_ok_to_stop(self) -> bool:
711 return self.rm_util.ok_to_stop([self])
712
713 @property
714 def is_empty(self) -> bool:
715 if self.get_pg_count() == 0:
716 if not self.drain_done_at:
717 self.drain_done_at = datetime.utcnow()
718 self.draining = False
719 return True
720 return False
721
722 def safe_to_destroy(self) -> bool:
723 return self.rm_util.safe_to_destroy([self.osd_id])
724
725 def down(self) -> bool:
726 return self.rm_util.set_osd_flag([self], 'down')
727
728 def destroy(self) -> bool:
729 return self.rm_util.destroy_osd(self.osd_id)
730
a4b75251
TL
731 def do_zap(self) -> str:
732 return self.rm_util.zap_osd(self)
733
f6b5b4d7
TL
734 def purge(self) -> bool:
735 return self.rm_util.purge_osd(self.osd_id)
736
737 def get_pg_count(self) -> int:
738 return self.rm_util.get_pg_count(self.osd_id)
739
740 @property
741 def exists(self) -> bool:
742 return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
743
f91f0fd5 744 def drain_status_human(self) -> str:
f6b5b4d7
TL
745 default_status = 'not started'
746 status = 'started' if self.started and not self.draining else default_status
747 status = 'draining' if self.draining else status
748 status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
749 return status
750
f91f0fd5 751 def pg_count_str(self) -> str:
f6b5b4d7
TL
752 return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
753
754 def to_json(self) -> dict:
f91f0fd5 755 out: Dict[str, Any] = dict()
f6b5b4d7
TL
756 out['osd_id'] = self.osd_id
757 out['started'] = self.started
758 out['draining'] = self.draining
759 out['stopped'] = self.stopped
760 out['replace'] = self.replace
761 out['force'] = self.force
a4b75251 762 out['zap'] = self.zap
f91f0fd5 763 out['hostname'] = self.hostname # type: ignore
f6b5b4d7
TL
764
765 for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
766 if getattr(self, k):
f91f0fd5 767 out[k] = datetime_to_str(getattr(self, k))
f6b5b4d7
TL
768 else:
769 out[k] = getattr(self, k)
770 return out
771
772 @classmethod
adb31ebb 773 def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
f6b5b4d7
TL
774 if not inp:
775 return None
776 for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
777 if inp.get(date_field):
f91f0fd5 778 inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
adb31ebb 779 inp.update({'remove_util': rm_util})
f91f0fd5
TL
780 if 'nodename' in inp:
781 hostname = inp.pop('nodename')
782 inp['hostname'] = hostname
f6b5b4d7
TL
783 return cls(**inp)
784
f91f0fd5 785 def __hash__(self) -> int:
f6b5b4d7
TL
786 return hash(self.osd_id)
787
788 def __eq__(self, other: object) -> bool:
789 if not isinstance(other, OSD):
790 return NotImplemented
791 return self.osd_id == other.osd_id
792
793 def __repr__(self) -> str:
f67539c2 794 return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
f6b5b4d7
TL
795
796
adb31ebb
TL
797class OSDRemovalQueue(object):
798
799 def __init__(self, mgr: "CephadmOrchestrator") -> None:
800 self.mgr: "CephadmOrchestrator" = mgr
801 self.osds: Set[OSD] = set()
802 self.rm_util = RemoveUtil(mgr)
803
804 # locks multithreaded access to self.osds. Please avoid locking
805 # network calls, like mon commands.
806 self.lock = Lock()
807
808 def process_removal_queue(self) -> None:
809 """
810 Performs actions in the _serve() loop to remove an OSD
811 when criteria is met.
812
813 we can't hold self.lock, as we're calling _remove_daemon in the loop
814 """
815
816 # make sure that we don't run on OSDs that are not in the cluster anymore.
817 self.cleanup()
818
819 # find osds that are ok-to-stop and not yet draining
a4b75251
TL
820 ready_to_drain_osds = self._ready_to_drain_osds()
821 if ready_to_drain_osds:
adb31ebb 822 # start draining those
a4b75251 823 _ = [osd.start_draining() for osd in ready_to_drain_osds]
adb31ebb
TL
824
825 all_osds = self.all_osds()
826
827 logger.debug(
828 f"{self.queue_size()} OSDs are scheduled "
829 f"for removal: {all_osds}")
830
831 # Check all osds for their state and take action (remove, purge etc)
832 new_queue: Set[OSD] = set()
833 for osd in all_osds: # type: OSD
834 if not osd.force:
835 # skip criteria
836 if not osd.is_empty:
f67539c2 837 logger.debug(f"{osd} is not empty yet. Waiting a bit more")
adb31ebb
TL
838 new_queue.add(osd)
839 continue
840
841 if not osd.safe_to_destroy():
f67539c2
TL
842 logger.debug(
843 f"{osd} is not safe-to-destroy yet. Waiting a bit more")
adb31ebb
TL
844 new_queue.add(osd)
845 continue
846
847 # abort criteria
848 if not osd.down():
849 # also remove it from the remove_osd list and set a health_check warning?
850 raise orchestrator.OrchestratorError(
f67539c2
TL
851 f"Could not mark {osd} down")
852
853 # stop and remove daemon
854 assert osd.hostname is not None
a4b75251
TL
855
856 if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
857 CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
858 logger.info(f"Successfully removed {osd} on {osd.hostname}")
859 else:
860 logger.info(f"Daemon {osd} on {osd.hostname} was already removed")
f6b5b4d7 861
adb31ebb 862 if osd.replace:
f67539c2 863 # mark destroyed in osdmap
adb31ebb
TL
864 if not osd.destroy():
865 raise orchestrator.OrchestratorError(
f67539c2
TL
866 f"Could not destroy {osd}")
867 logger.info(
868 f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
adb31ebb 869 else:
f67539c2 870 # purge from osdmap
adb31ebb 871 if not osd.purge():
f67539c2
TL
872 raise orchestrator.OrchestratorError(f"Could not purge {osd}")
873 logger.info(f"Successfully purged {osd} on {osd.hostname}")
adb31ebb 874
a4b75251
TL
875 if osd.zap:
876 # throws an exception if the zap fails
877 logger.info(f"Zapping devices for {osd} on {osd.hostname}")
878 osd.do_zap()
879 logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
880
f67539c2 881 logger.debug(f"Removing {osd} from the queue.")
adb31ebb
TL
882
883 # self could change while this is processing (osds get added from the CLI)
884 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
885 # osds that were added while this method was executed'
886 with self.lock:
887 self.osds.intersection_update(new_queue)
888 self._save_to_store()
889
890 def cleanup(self) -> None:
891 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
892 with self.lock:
893 for osd in self._not_in_cluster():
894 self.osds.remove(osd)
895
a4b75251
TL
896 def _ready_to_drain_osds(self) -> List["OSD"]:
897 """
898 Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
1e59de90 899 be accommodated by the 'max_osd_draining_count' config value, considering the number of OSDs
a4b75251
TL
900 that are already draining.
901 """
902 draining_limit = max(1, self.mgr.max_osd_draining_count)
903 num_already_draining = len(self.draining_osds())
904 num_to_start_draining = max(0, draining_limit - num_already_draining)
905 stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
906 return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]
907
adb31ebb
TL
908 def _save_to_store(self) -> None:
909 osd_queue = [osd.to_json() for osd in self.osds]
910 logger.debug(f"Saving {osd_queue} to store")
911 self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
912
913 def load_from_store(self) -> None:
914 with self.lock:
915 for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
916 for osd in json.loads(v):
917 logger.debug(f"Loading osd ->{osd} from store")
918 osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
919 if osd_obj is not None:
920 self.osds.add(osd_obj)
f6b5b4d7
TL
921
922 def as_osd_ids(self) -> List[int]:
adb31ebb
TL
923 with self.lock:
924 return [osd.osd_id for osd in self.osds]
f6b5b4d7
TL
925
926 def queue_size(self) -> int:
adb31ebb
TL
927 with self.lock:
928 return len(self.osds)
f6b5b4d7
TL
929
930 def draining_osds(self) -> List["OSD"]:
adb31ebb
TL
931 with self.lock:
932 return [osd for osd in self.osds if osd.is_draining]
f6b5b4d7
TL
933
934 def idling_osds(self) -> List["OSD"]:
adb31ebb
TL
935 with self.lock:
936 return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
f6b5b4d7
TL
937
938 def empty_osds(self) -> List["OSD"]:
adb31ebb
TL
939 with self.lock:
940 return [osd for osd in self.osds if osd.is_empty]
f6b5b4d7
TL
941
942 def all_osds(self) -> List["OSD"]:
adb31ebb
TL
943 with self.lock:
944 return [osd for osd in self.osds]
f6b5b4d7 945
adb31ebb
TL
946 def _not_in_cluster(self) -> List["OSD"]:
947 return [osd for osd in self.osds if not osd.exists]
f6b5b4d7
TL
948
949 def enqueue(self, osd: "OSD") -> None:
950 if not osd.exists:
951 raise NotFoundError()
adb31ebb
TL
952 with self.lock:
953 self.osds.add(osd)
f6b5b4d7
TL
954 osd.start()
955
956 def rm(self, osd: "OSD") -> None:
957 if not osd.exists:
958 raise NotFoundError()
959 osd.stop()
adb31ebb
TL
960 with self.lock:
961 try:
962 logger.debug(f'Removing {osd} from the queue.')
963 self.osds.remove(osd)
964 except KeyError:
965 logger.debug(f"Could not find {osd} in queue.")
966 raise KeyError
967
968 def __eq__(self, other: Any) -> bool:
969 if not isinstance(other, OSDRemovalQueue):
970 return False
971 with self.lock:
972 return self.osds == other.osds