]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
1 import json
2 import logging
3 from asyncio import gather
4 from threading import Lock
5 from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
6
7 from ceph.deployment import translate
8 from ceph.deployment.drive_group import DriveGroupSpec
9 from ceph.deployment.drive_selection import DriveSelection
10 from ceph.deployment.inventory import Device
11 from ceph.utils import datetime_to_str, str_to_datetime
12
13 from datetime import datetime
14 import orchestrator
15 from cephadm.serve import CephadmServe
16 from cephadm.utils import SpecialHostLabels
17 from ceph.utils import datetime_now
18 from orchestrator import OrchestratorError, DaemonDescription
19 from mgr_module import MonCommandFailed
20
21 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
22
23 if TYPE_CHECKING:
24 from cephadm.module import CephadmOrchestrator
25
26 logger = logging.getLogger(__name__)
27
28
29 class OSDService(CephService):
30 TYPE = 'osd'
31
32 def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
33 logger.debug(f"Processing DriveGroup {drive_group}")
34 osd_id_claims = OsdIdClaims(self.mgr)
35 if osd_id_claims.get():
36 logger.info(
37 f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
38
39 async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
40 # skip this host if there has been no change in inventory
41 if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
42 self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
43 host, drive_group))
44 return None
45 # skip this host if we cannot schedule here
46 if self.mgr.inventory.has_label(host, SpecialHostLabels.DRAIN_DAEMONS):
47 return None
48
49 osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
50
51 cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
52 osd_id_claims_for_host)
53 if not cmds:
54 logger.debug("No data_devices, skipping DriveGroup: {}".format(
55 drive_group.service_id))
56 return None
57
58 logger.debug('Applying service osd.%s on host %s...' % (
59 drive_group.service_id, host
60 ))
61 start_ts = datetime_now()
62 env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
63 ret_msg = await self.create_single_host(
64 drive_group, host, cmds,
65 replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
66 )
67 self.mgr.cache.update_osdspec_last_applied(
68 host, drive_group.service_name(), start_ts
69 )
70 self.mgr.cache.save_host(host)
71 return ret_msg
72
73 async def all_hosts() -> List[Optional[str]]:
74 futures = [create_from_spec_one(h, ds)
75 for h, ds in self.prepare_drivegroup(drive_group)]
76 return await gather(*futures)
77
78 with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'):
79 ret = self.mgr.wait_async(all_hosts())
80 return ", ".join(filter(None, ret))
81
82 async def create_single_host(self,
83 drive_group: DriveGroupSpec,
84 host: str, cmds: List[str], replace_osd_ids: List[str],
85 env_vars: Optional[List[str]] = None) -> str:
86 for cmd in cmds:
87 out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
88 if code == 1 and ', it is already prepared' in '\n'.join(err):
89 # HACK: when we create against an existing LV, ceph-volume
90 # returns an error and the above message. To make this
91 # command idempotent, tolerate this "error" and continue.
92 logger.debug('the device was already prepared; continuing')
93 code = 0
94 if code:
95 raise RuntimeError(
96 'cephadm exited with an error code: %d, stderr:%s' % (
97 code, '\n'.join(err)))
98 return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
99 replace_osd_ids)
100
101 async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
102 replace_osd_ids: Optional[List[str]] = None) -> str:
103
104 if replace_osd_ids is None:
105 replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
106 assert replace_osd_ids is not None
107
108 # check result: lvm
109 osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
110 host, 'osd', 'ceph-volume',
111 [
112 '--',
113 'lvm', 'list',
114 '--format', 'json',
115 ])
116 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
117 fsid = self.mgr._cluster_fsid
118 osd_uuid_map = self.mgr.get_osd_uuid_map()
119 created = []
120 for osd_id, osds in osds_elems.items():
121 for osd in osds:
122 if osd['type'] == 'db':
123 continue
124 if osd['tags']['ceph.cluster_fsid'] != fsid:
125 logger.debug('mismatched fsid, skipping %s' % osd)
126 continue
127 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
128 # if it exists but is part of the replacement operation, don't skip
129 continue
130 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
131 # cephadm daemon instance already exists
132 logger.debug(f'osd id {osd_id} daemon already exists')
133 continue
134 if osd_id not in osd_uuid_map:
135 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
136 continue
137 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
138 logger.debug('mismatched osd uuid (cluster has %s, osd '
139 'has %s)' % (
140 osd_uuid_map.get(osd_id),
141 osd['tags']['ceph.osd_fsid']))
142 continue
143
144 created.append(osd_id)
145 daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
146 service_name=service_name,
147 daemon_id=str(osd_id),
148 host=host,
149 daemon_type='osd',
150 )
151 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
152 await CephadmServe(self.mgr)._create_daemon(
153 daemon_spec,
154 osd_uuid_map=osd_uuid_map)
155
156 # check result: raw
157 raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
158 host, 'osd', 'ceph-volume',
159 [
160 '--',
161 'raw', 'list',
162 '--format', 'json',
163 ])
164 for osd_uuid, osd in raw_elems.items():
165 if osd.get('ceph_fsid') != fsid:
166 continue
167 osd_id = str(osd.get('osd_id', '-1'))
168 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
169 # if it exists but is part of the replacement operation, don't skip
170 continue
171 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
172 # cephadm daemon instance already exists
173 logger.debug(f'osd id {osd_id} daemon already exists')
174 continue
175 if osd_id not in osd_uuid_map:
176 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
177 continue
178 if osd_uuid_map.get(osd_id) != osd_uuid:
179 logger.debug('mismatched osd uuid (cluster has %s, osd '
180 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid))
181 continue
182 if osd_id in created:
183 continue
184
185 created.append(osd_id)
186 daemon_spec = CephadmDaemonDeploySpec(
187 service_name=service_name,
188 daemon_id=osd_id,
189 host=host,
190 daemon_type='osd',
191 )
192 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
193 await CephadmServe(self.mgr)._create_daemon(
194 daemon_spec,
195 osd_uuid_map=osd_uuid_map)
196
197 if created:
198 self.mgr.cache.invalidate_host_devices(host)
199 self.mgr.cache.invalidate_autotune(host)
200 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
201 else:
202 return "Created no osd(s) on host %s; already created?" % host
203
204 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
205 # 1) use fn_filter to determine matching_hosts
206 matching_hosts = drive_group.placement.filter_matching_hostspecs(
207 self.mgr.cache.get_schedulable_hosts())
208 # 2) Map the inventory to the InventoryHost object
209 host_ds_map = []
210
211 # set osd_id_claims
212
213 def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
214 # This is stupid and needs to be loaded with the host
215 for _host, _inventory in inventory_dict.items():
216 if _host == hostname:
217 return _inventory
218 raise OrchestratorError("No inventory found for host: {}".format(hostname))
219
220 # 3) iterate over matching_host and call DriveSelection
221 logger.debug(f"Checking matching hosts -> {matching_hosts}")
222 for host in matching_hosts:
223 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
224 logger.debug(f"Found inventory for host {inventory_for_host}")
225
226 # List of Daemons on that host
227 dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
228 dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]
229
230 drive_selection = DriveSelection(drive_group, inventory_for_host,
231 existing_daemons=len(dd_for_spec_and_host))
232 logger.debug(f"Found drive selection {drive_selection}")
233 if drive_group.method and drive_group.method == 'raw':
234 # ceph-volume can currently only handle a 1:1 mapping
235 # of data/db/wal devices for raw mode osds. If db/wal devices
236 # are defined and the number does not match the number of data
237 # devices, we need to bail out
238 if drive_selection.data_devices() and drive_selection.db_devices():
239 if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
240 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
241 f'{len(drive_selection.data_devices())} potential data device(s) and '
242 f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
243 if drive_selection.data_devices() and drive_selection.wal_devices():
244 if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
245 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
246 f'{len(drive_selection.data_devices())} potential data device(s) and '
247 f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
248 host_ds_map.append((host, drive_selection))
249 return host_ds_map
250
251 @staticmethod
252 def driveselection_to_ceph_volume(drive_selection: DriveSelection,
253 osd_id_claims: Optional[List[str]] = None,
254 preview: bool = False) -> List[str]:
255 logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
256 cmds: List[str] = translate.to_ceph_volume(drive_selection,
257 osd_id_claims, preview=preview).run()
258 logger.debug(f"Resulting ceph-volume cmds: {cmds}")
259 return cmds
260
261 def get_previews(self, host: str) -> List[Dict[str, Any]]:
262 # Find OSDSpecs that match host.
263 osdspecs = self.resolve_osdspecs_for_host(host)
264 return self.generate_previews(osdspecs, host)
265
266 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
267 """
268
269 The return should look like this:
270
271 [
272 {'data': {<metadata>},
273 'osdspec': <name of osdspec>,
274 'host': <name of host>,
275 'notes': <notes>
276 },
277
278 {'data': ...,
279 'osdspec': ..,
280 'host': ...,
281 'notes': ...
282 }
283 ]
284
285 Note: One host can have multiple previews based on its assigned OSDSpecs.
286 """
287 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
288 ret_all: List[Dict[str, Any]] = []
289 if not osdspecs:
290 return ret_all
291 for osdspec in osdspecs:
292
293 # populate osd_id_claims
294 osd_id_claims = OsdIdClaims(self.mgr)
295
296 # prepare driveselection
297 for host, ds in self.prepare_drivegroup(osdspec):
298 if host != for_host:
299 continue
300
301 # driveselection for host
302 cmds: List[str] = self.driveselection_to_ceph_volume(ds,
303 osd_id_claims.filtered_by_host(
304 host),
305 preview=True)
306 if not cmds:
307 logger.debug("No data_devices, skipping DriveGroup: {}".format(
308 osdspec.service_name()))
309 continue
310
311 # get preview data from ceph-volume
312 for cmd in cmds:
313 with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'):
314 out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
315 if out:
316 try:
317 concat_out: Dict[str, Any] = json.loads(' '.join(out))
318 except ValueError:
319 logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
320 concat_out = {}
321 notes = []
322 if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
323 found = len(concat_out)
324 limit = osdspec.data_devices.limit
325 notes.append(
326 f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
327 ret_all.append({'data': concat_out,
328 'osdspec': osdspec.service_id,
329 'host': host,
330 'notes': notes})
331 return ret_all
332
333 def resolve_hosts_for_osdspecs(self,
334 specs: Optional[List[DriveGroupSpec]] = None
335 ) -> List[str]:
336 osdspecs = []
337 if specs:
338 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
339 if not osdspecs:
340 self.mgr.log.debug("No OSDSpecs found")
341 return []
342 return sum([spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()) for spec in osdspecs], [])
343
344 def resolve_osdspecs_for_host(self, host: str,
345 specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
346 matching_specs = []
347 self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
348 if not specs:
349 specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
350 if spec.service_type == 'osd']
351 for spec in specs:
352 if host in spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()):
353 self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
354 matching_specs.append(spec)
355 return matching_specs
356
357 async def _run_ceph_volume_command(self, host: str,
358 cmd: str, env_vars: Optional[List[str]] = None
359 ) -> Tuple[List[str], List[str], int]:
360 self.mgr.inventory.assert_host(host)
361
362 # get bootstrap key
363 ret, keyring, err = self.mgr.check_mon_command({
364 'prefix': 'auth get',
365 'entity': 'client.bootstrap-osd',
366 })
367
368 j = json.dumps({
369 'config': self.mgr.get_minimal_ceph_conf(),
370 'keyring': keyring,
371 })
372
373 split_cmd = cmd.split(' ')
374 _cmd = ['--config-json', '-', '--']
375 _cmd.extend(split_cmd)
376 out, err, code = await CephadmServe(self.mgr)._run_cephadm(
377 host, 'osd', 'ceph-volume',
378 _cmd,
379 env_vars=env_vars,
380 stdin=j,
381 error_ok=True)
382 return out, err, code
383
384 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
385 # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
386 # we cannot recover from it. The OSD keys are created by ceph-volume and not by
387 # us.
388 if not is_failed_deploy:
389 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
390
391
392 class OsdIdClaims(object):
393 """
394 Retrieve and provide osd ids that can be reused in the cluster
395 """
396
397 def __init__(self, mgr: "CephadmOrchestrator") -> None:
398 self.mgr: "CephadmOrchestrator" = mgr
399 self.osd_host_map: Dict[str, List[str]] = dict()
400 self.refresh()
401
402 def refresh(self) -> None:
403 try:
404 ret, out, err = self.mgr.check_mon_command({
405 'prefix': 'osd tree',
406 'states': ['destroyed'],
407 'format': 'json'
408 })
409 except MonCommandFailed as e:
410 logger.exception('osd tree failed')
411 raise OrchestratorError(str(e))
412 try:
413 tree = json.loads(out)
414 except ValueError:
415 logger.exception(f'Cannot decode JSON: \'{out}\'')
416 return
417
418 nodes = tree.get('nodes', {})
419 for node in nodes:
420 if node.get('type') == 'host':
421 self.osd_host_map.update(
422 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
423 )
424 if self.osd_host_map:
425 self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")
426
427 def get(self) -> Dict[str, List[str]]:
428 return self.osd_host_map
429
430 def filtered_by_host(self, host: str) -> List[str]:
431 """
432 Return the list of osd ids that can be reused in a host
433
434 OSD id claims in CRUSH map are linked to the bare name of
435 the hostname. In case of FQDN hostnames the host is searched by the
436 bare name
437 """
438 return self.osd_host_map.get(host.split(".")[0], [])
439
440
441 class RemoveUtil(object):
442 def __init__(self, mgr: "CephadmOrchestrator") -> None:
443 self.mgr: "CephadmOrchestrator" = mgr
444
445 def get_osds_in_cluster(self) -> List[str]:
446 osd_map = self.mgr.get_osdmap()
447 return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
448
449 def osd_df(self) -> dict:
450 base_cmd = 'osd df'
451 ret, out, err = self.mgr.mon_command({
452 'prefix': base_cmd,
453 'format': 'json'
454 })
455 try:
456 return json.loads(out)
457 except ValueError:
458 logger.exception(f'Cannot decode JSON: \'{out}\'')
459 return {}
460
461 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
462 if not osd_df:
463 osd_df = self.osd_df()
464 osd_nodes = osd_df.get('nodes', [])
465 for osd_node in osd_nodes:
466 if osd_node.get('id') == int(osd_id):
467 return osd_node.get('pgs', -1)
468 return -1
469
470 def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
471 """
472 Cut osd_id list in half until it's ok-to-stop
473
474 :param osds: list of osd_ids
475 :return: list of ods_ids that can be stopped at once
476 """
477 if not osds:
478 return []
479 while not self.ok_to_stop(osds):
480 if len(osds) <= 1:
481 # can't even stop one OSD, aborting
482 self.mgr.log.debug(
483 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
484 return []
485
486 # This potentially prolongs the global wait time.
487 self.mgr.event.wait(1)
488 # splitting osd_ids in half until ok_to_stop yields success
489 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
490 # There's a lot of room for micro adjustments here
491 osds = osds[len(osds) // 2:]
492 return osds
493
494 # todo start draining
495 # return all([osd.start_draining() for osd in osds])
496
497 def ok_to_stop(self, osds: List["OSD"]) -> bool:
498 cmd_args = {
499 'prefix': "osd ok-to-stop",
500 'ids': [str(osd.osd_id) for osd in osds]
501 }
502 return self._run_mon_cmd(cmd_args, error_ok=True)
503
504 def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
505 base_cmd = f"osd {flag}"
506 self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
507 ret, out, err = self.mgr.mon_command({
508 'prefix': base_cmd,
509 'ids': [str(osd.osd_id) for osd in osds]
510 })
511 if ret != 0:
512 self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
513 return False
514 self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
515 return True
516
517 def get_weight(self, osd: "OSD") -> Optional[float]:
518 ret, out, err = self.mgr.mon_command({
519 'prefix': 'osd crush tree',
520 'format': 'json',
521 })
522 if ret != 0:
523 self.mgr.log.error(f"Could not dump crush weights. <{err}>")
524 return None
525 j = json.loads(out)
526 for n in j.get("nodes", []):
527 if n.get("name") == f"osd.{osd.osd_id}":
528 self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
529 return n.get("crush_weight")
530 return None
531
532 def reweight_osd(self, osd: "OSD", weight: float) -> bool:
533 self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
534 ret, out, err = self.mgr.mon_command({
535 'prefix': "osd crush reweight",
536 'name': f"osd.{osd.osd_id}",
537 'weight': weight,
538 })
539 if ret != 0:
540 self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
541 return False
542 self.mgr.log.info(f"{osd} weight is now {weight}")
543 return True
544
545 def zap_osd(self, osd: "OSD") -> str:
546 "Zaps all devices that are associated with an OSD"
547 if osd.hostname is not None:
548 cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)]
549 if not osd.no_destroy:
550 cmd.append('--destroy')
551 with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'):
552 out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
553 osd.hostname, 'osd', 'ceph-volume',
554 cmd,
555 error_ok=True))
556 self.mgr.cache.invalidate_host_devices(osd.hostname)
557 if code:
558 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
559 return '\n'.join(out + err)
560 raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")
561
562 def safe_to_destroy(self, osd_ids: List[int]) -> bool:
563 """ Queries the safe-to-destroy flag for OSDs """
564 cmd_args = {'prefix': 'osd safe-to-destroy',
565 'ids': [str(x) for x in osd_ids]}
566 return self._run_mon_cmd(cmd_args, error_ok=True)
567
568 def destroy_osd(self, osd_id: int) -> bool:
569 """ Destroys an OSD (forcefully) """
570 cmd_args = {'prefix': 'osd destroy-actual',
571 'id': int(osd_id),
572 'yes_i_really_mean_it': True}
573 return self._run_mon_cmd(cmd_args)
574
575 def purge_osd(self, osd_id: int) -> bool:
576 """ Purges an OSD from the cluster (forcefully) """
577 cmd_args = {
578 'prefix': 'osd purge-actual',
579 'id': int(osd_id),
580 'yes_i_really_mean_it': True
581 }
582 return self._run_mon_cmd(cmd_args)
583
584 def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool:
585 """
586 Generic command to run mon_command and evaluate/log the results
587 """
588 ret, out, err = self.mgr.mon_command(cmd_args)
589 if ret != 0:
590 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
591 if not error_ok:
592 self.mgr.log.error(
593 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
594 return False
595 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
596 return True
597
598
599 class NotFoundError(Exception):
600 pass
601
602
603 class OSD:
604
605 def __init__(self,
606 osd_id: int,
607 remove_util: RemoveUtil,
608 drain_started_at: Optional[datetime] = None,
609 process_started_at: Optional[datetime] = None,
610 drain_stopped_at: Optional[datetime] = None,
611 drain_done_at: Optional[datetime] = None,
612 draining: bool = False,
613 started: bool = False,
614 stopped: bool = False,
615 replace: bool = False,
616 force: bool = False,
617 hostname: Optional[str] = None,
618 zap: bool = False,
619 no_destroy: bool = False):
620 # the ID of the OSD
621 self.osd_id = osd_id
622
623 # when did process (not the actual draining) start
624 self.process_started_at = process_started_at
625
626 # when did the drain start
627 self.drain_started_at = drain_started_at
628
629 # when did the drain stop
630 self.drain_stopped_at = drain_stopped_at
631
632 # when did the drain finish
633 self.drain_done_at = drain_done_at
634
635 # did the draining start
636 self.draining = draining
637
638 # was the operation started
639 self.started = started
640
641 # was the operation stopped
642 self.stopped = stopped
643
644 # If this is a replace or remove operation
645 self.replace = replace
646 # If we wait for the osd to be drained
647 self.force = force
648 # The name of the node
649 self.hostname = hostname
650
651 # mgr obj to make mgr/mon calls
652 self.rm_util: RemoveUtil = remove_util
653
654 self.original_weight: Optional[float] = None
655
656 # Whether devices associated with the OSD should be zapped (DATA ERASED)
657 self.zap = zap
658 # Whether all associated LV devices should be destroyed.
659 self.no_destroy = no_destroy
660
661 def start(self) -> None:
662 if self.started:
663 logger.debug(f"Already started draining {self}")
664 return None
665 self.started = True
666 self.stopped = False
667
668 def start_draining(self) -> bool:
669 if self.stopped:
670 logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
671 return False
672 if self.replace:
673 self.rm_util.set_osd_flag([self], 'out')
674 else:
675 self.original_weight = self.rm_util.get_weight(self)
676 self.rm_util.reweight_osd(self, 0.0)
677 self.drain_started_at = datetime.utcnow()
678 self.draining = True
679 logger.debug(f"Started draining {self}.")
680 return True
681
682 def stop_draining(self) -> bool:
683 if self.replace:
684 self.rm_util.set_osd_flag([self], 'in')
685 else:
686 if self.original_weight:
687 self.rm_util.reweight_osd(self, self.original_weight)
688 self.drain_stopped_at = datetime.utcnow()
689 self.draining = False
690 logger.debug(f"Stopped draining {self}.")
691 return True
692
693 def stop(self) -> None:
694 if self.stopped:
695 logger.debug(f"Already stopped draining {self}")
696 return None
697 self.started = False
698 self.stopped = True
699 self.stop_draining()
700
701 @property
702 def is_draining(self) -> bool:
703 """
704 Consider an OSD draining when it is
705 actively draining but not yet empty
706 """
707 return self.draining and not self.is_empty
708
709 @property
710 def is_ok_to_stop(self) -> bool:
711 return self.rm_util.ok_to_stop([self])
712
713 @property
714 def is_empty(self) -> bool:
715 if self.get_pg_count() == 0:
716 if not self.drain_done_at:
717 self.drain_done_at = datetime.utcnow()
718 self.draining = False
719 return True
720 return False
721
722 def safe_to_destroy(self) -> bool:
723 return self.rm_util.safe_to_destroy([self.osd_id])
724
725 def down(self) -> bool:
726 return self.rm_util.set_osd_flag([self], 'down')
727
728 def destroy(self) -> bool:
729 return self.rm_util.destroy_osd(self.osd_id)
730
731 def do_zap(self) -> str:
732 return self.rm_util.zap_osd(self)
733
734 def purge(self) -> bool:
735 return self.rm_util.purge_osd(self.osd_id)
736
737 def get_pg_count(self) -> int:
738 return self.rm_util.get_pg_count(self.osd_id)
739
740 @property
741 def exists(self) -> bool:
742 return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
743
744 def drain_status_human(self) -> str:
745 default_status = 'not started'
746 status = 'started' if self.started and not self.draining else default_status
747 status = 'draining' if self.draining else status
748 status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
749 return status
750
751 def pg_count_str(self) -> str:
752 return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
753
754 def to_json(self) -> dict:
755 out: Dict[str, Any] = dict()
756 out['osd_id'] = self.osd_id
757 out['started'] = self.started
758 out['draining'] = self.draining
759 out['stopped'] = self.stopped
760 out['replace'] = self.replace
761 out['force'] = self.force
762 out['zap'] = self.zap
763 out['hostname'] = self.hostname # type: ignore
764
765 for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
766 if getattr(self, k):
767 out[k] = datetime_to_str(getattr(self, k))
768 else:
769 out[k] = getattr(self, k)
770 return out
771
772 @classmethod
773 def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
774 if not inp:
775 return None
776 for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
777 if inp.get(date_field):
778 inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
779 inp.update({'remove_util': rm_util})
780 if 'nodename' in inp:
781 hostname = inp.pop('nodename')
782 inp['hostname'] = hostname
783 return cls(**inp)
784
785 def __hash__(self) -> int:
786 return hash(self.osd_id)
787
788 def __eq__(self, other: object) -> bool:
789 if not isinstance(other, OSD):
790 return NotImplemented
791 return self.osd_id == other.osd_id
792
793 def __repr__(self) -> str:
794 return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
795
796
797 class OSDRemovalQueue(object):
798
799 def __init__(self, mgr: "CephadmOrchestrator") -> None:
800 self.mgr: "CephadmOrchestrator" = mgr
801 self.osds: Set[OSD] = set()
802 self.rm_util = RemoveUtil(mgr)
803
804 # locks multithreaded access to self.osds. Please avoid locking
805 # network calls, like mon commands.
806 self.lock = Lock()
807
808 def process_removal_queue(self) -> None:
809 """
810 Performs actions in the _serve() loop to remove an OSD
811 when criteria is met.
812
813 we can't hold self.lock, as we're calling _remove_daemon in the loop
814 """
815
816 # make sure that we don't run on OSDs that are not in the cluster anymore.
817 self.cleanup()
818
819 # find osds that are ok-to-stop and not yet draining
820 ready_to_drain_osds = self._ready_to_drain_osds()
821 if ready_to_drain_osds:
822 # start draining those
823 _ = [osd.start_draining() for osd in ready_to_drain_osds]
824
825 all_osds = self.all_osds()
826
827 logger.debug(
828 f"{self.queue_size()} OSDs are scheduled "
829 f"for removal: {all_osds}")
830
831 # Check all osds for their state and take action (remove, purge etc)
832 new_queue: Set[OSD] = set()
833 for osd in all_osds: # type: OSD
834 if not osd.force:
835 # skip criteria
836 if not osd.is_empty:
837 logger.debug(f"{osd} is not empty yet. Waiting a bit more")
838 new_queue.add(osd)
839 continue
840
841 if not osd.safe_to_destroy():
842 logger.debug(
843 f"{osd} is not safe-to-destroy yet. Waiting a bit more")
844 new_queue.add(osd)
845 continue
846
847 # abort criteria
848 if not osd.down():
849 # also remove it from the remove_osd list and set a health_check warning?
850 raise orchestrator.OrchestratorError(
851 f"Could not mark {osd} down")
852
853 # stop and remove daemon
854 assert osd.hostname is not None
855
856 if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
857 CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
858 logger.info(f"Successfully removed {osd} on {osd.hostname}")
859 else:
860 logger.info(f"Daemon {osd} on {osd.hostname} was already removed")
861
862 if osd.replace:
863 # mark destroyed in osdmap
864 if not osd.destroy():
865 raise orchestrator.OrchestratorError(
866 f"Could not destroy {osd}")
867 logger.info(
868 f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
869 else:
870 # purge from osdmap
871 if not osd.purge():
872 raise orchestrator.OrchestratorError(f"Could not purge {osd}")
873 logger.info(f"Successfully purged {osd} on {osd.hostname}")
874
875 if osd.zap:
876 # throws an exception if the zap fails
877 logger.info(f"Zapping devices for {osd} on {osd.hostname}")
878 osd.do_zap()
879 logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
880
881 logger.debug(f"Removing {osd} from the queue.")
882
883 # self could change while this is processing (osds get added from the CLI)
884 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
885 # osds that were added while this method was executed'
886 with self.lock:
887 self.osds.intersection_update(new_queue)
888 self._save_to_store()
889
890 def cleanup(self) -> None:
891 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
892 with self.lock:
893 for osd in self._not_in_cluster():
894 self.osds.remove(osd)
895
896 def _ready_to_drain_osds(self) -> List["OSD"]:
897 """
898 Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
899 be accommodated by the 'max_osd_draining_count' config value, considering the number of OSDs
900 that are already draining.
901 """
902 draining_limit = max(1, self.mgr.max_osd_draining_count)
903 num_already_draining = len(self.draining_osds())
904 num_to_start_draining = max(0, draining_limit - num_already_draining)
905 stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
906 return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]
907
908 def _save_to_store(self) -> None:
909 osd_queue = [osd.to_json() for osd in self.osds]
910 logger.debug(f"Saving {osd_queue} to store")
911 self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
912
913 def load_from_store(self) -> None:
914 with self.lock:
915 for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
916 for osd in json.loads(v):
917 logger.debug(f"Loading osd ->{osd} from store")
918 osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
919 if osd_obj is not None:
920 self.osds.add(osd_obj)
921
922 def as_osd_ids(self) -> List[int]:
923 with self.lock:
924 return [osd.osd_id for osd in self.osds]
925
926 def queue_size(self) -> int:
927 with self.lock:
928 return len(self.osds)
929
930 def draining_osds(self) -> List["OSD"]:
931 with self.lock:
932 return [osd for osd in self.osds if osd.is_draining]
933
934 def idling_osds(self) -> List["OSD"]:
935 with self.lock:
936 return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
937
938 def empty_osds(self) -> List["OSD"]:
939 with self.lock:
940 return [osd for osd in self.osds if osd.is_empty]
941
942 def all_osds(self) -> List["OSD"]:
943 with self.lock:
944 return [osd for osd in self.osds]
945
946 def _not_in_cluster(self) -> List["OSD"]:
947 return [osd for osd in self.osds if not osd.exists]
948
949 def enqueue(self, osd: "OSD") -> None:
950 if not osd.exists:
951 raise NotFoundError()
952 with self.lock:
953 self.osds.add(osd)
954 osd.start()
955
956 def rm(self, osd: "OSD") -> None:
957 if not osd.exists:
958 raise NotFoundError()
959 osd.stop()
960 with self.lock:
961 try:
962 logger.debug(f'Removing {osd} from the queue.')
963 self.osds.remove(osd)
964 except KeyError:
965 logger.debug(f"Could not find {osd} in queue.")
966 raise KeyError
967
968 def __eq__(self, other: Any) -> bool:
969 if not isinstance(other, OSDRemovalQueue):
970 return False
971 with self.lock:
972 return self.osds == other.osds