]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
234d7a057c17a6884ee153e3ca9bf1af0281afad
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
1 import json
2 import logging
3 from asyncio import gather
4 from threading import Lock
5 from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
6
7 from ceph.deployment import translate
8 from ceph.deployment.drive_group import DriveGroupSpec
9 from ceph.deployment.drive_selection import DriveSelection
10 from ceph.deployment.inventory import Device
11 from ceph.utils import datetime_to_str, str_to_datetime
12
13 from datetime import datetime
14 import orchestrator
15 from cephadm.serve import CephadmServe
16 from ceph.utils import datetime_now
17 from orchestrator import OrchestratorError, DaemonDescription
18 from mgr_module import MonCommandFailed
19
20 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
21
22 if TYPE_CHECKING:
23 from cephadm.module import CephadmOrchestrator
24
25 logger = logging.getLogger(__name__)
26
27
28 class OSDService(CephService):
29 TYPE = 'osd'
30
31 def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
32 logger.debug(f"Processing DriveGroup {drive_group}")
33 osd_id_claims = OsdIdClaims(self.mgr)
34 if osd_id_claims.get():
35 logger.info(
36 f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
37
38 async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
39 # skip this host if there has been no change in inventory
40 if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
41 self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
42 host, drive_group))
43 return None
44 # skip this host if we cannot schedule here
45 if self.mgr.inventory.has_label(host, '_no_schedule'):
46 return None
47
48 osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
49
50 cmd = self.driveselection_to_ceph_volume(drive_selection,
51 osd_id_claims_for_host)
52 if not cmd:
53 logger.debug("No data_devices, skipping DriveGroup: {}".format(
54 drive_group.service_id))
55 return None
56
57 logger.debug('Applying service osd.%s on host %s...' % (
58 drive_group.service_id, host
59 ))
60 start_ts = datetime_now()
61 env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
62 ret_msg = await self.create_single_host(
63 drive_group, host, cmd,
64 replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
65 )
66 self.mgr.cache.update_osdspec_last_applied(
67 host, drive_group.service_name(), start_ts
68 )
69 self.mgr.cache.save_host(host)
70 return ret_msg
71
72 async def all_hosts() -> List[Optional[str]]:
73 futures = [create_from_spec_one(h, ds)
74 for h, ds in self.prepare_drivegroup(drive_group)]
75 return await gather(*futures)
76
77 ret = self.mgr.wait_async(all_hosts())
78 return ", ".join(filter(None, ret))
79
80 async def create_single_host(self,
81 drive_group: DriveGroupSpec,
82 host: str, cmd: str, replace_osd_ids: List[str],
83 env_vars: Optional[List[str]] = None) -> str:
84 out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
85
86 if code == 1 and ', it is already prepared' in '\n'.join(err):
87 # HACK: when we create against an existing LV, ceph-volume
88 # returns an error and the above message. To make this
89 # command idempotent, tolerate this "error" and continue.
90 logger.debug('the device was already prepared; continuing')
91 code = 0
92 if code:
93 raise RuntimeError(
94 'cephadm exited with an error code: %d, stderr:%s' % (
95 code, '\n'.join(err)))
96 return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
97 replace_osd_ids)
98
99 async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
100 replace_osd_ids: Optional[List[str]] = None) -> str:
101
102 if replace_osd_ids is None:
103 replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
104 assert replace_osd_ids is not None
105
106 # check result: lvm
107 osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
108 host, 'osd', 'ceph-volume',
109 [
110 '--',
111 'lvm', 'list',
112 '--format', 'json',
113 ])
114 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
115 fsid = self.mgr._cluster_fsid
116 osd_uuid_map = self.mgr.get_osd_uuid_map()
117 created = []
118 for osd_id, osds in osds_elems.items():
119 for osd in osds:
120 if osd['type'] == 'db':
121 continue
122 if osd['tags']['ceph.cluster_fsid'] != fsid:
123 logger.debug('mismatched fsid, skipping %s' % osd)
124 continue
125 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
126 # if it exists but is part of the replacement operation, don't skip
127 continue
128 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
129 # cephadm daemon instance already exists
130 logger.debug(f'osd id {osd_id} daemon already exists')
131 continue
132 if osd_id not in osd_uuid_map:
133 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
134 continue
135 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
136 logger.debug('mismatched osd uuid (cluster has %s, osd '
137 'has %s)' % (
138 osd_uuid_map.get(osd_id),
139 osd['tags']['ceph.osd_fsid']))
140 continue
141
142 created.append(osd_id)
143 daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
144 service_name=service_name,
145 daemon_id=str(osd_id),
146 host=host,
147 daemon_type='osd',
148 )
149 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
150 await CephadmServe(self.mgr)._create_daemon(
151 daemon_spec,
152 osd_uuid_map=osd_uuid_map)
153
154 # check result: raw
155 raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
156 host, 'osd', 'ceph-volume',
157 [
158 '--',
159 'raw', 'list',
160 '--format', 'json',
161 ])
162 for osd_uuid, osd in raw_elems.items():
163 if osd.get('ceph_fsid') != fsid:
164 continue
165 osd_id = str(osd.get('osd_id', '-1'))
166 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
167 # if it exists but is part of the replacement operation, don't skip
168 continue
169 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
170 # cephadm daemon instance already exists
171 logger.debug(f'osd id {osd_id} daemon already exists')
172 continue
173 if osd_id not in osd_uuid_map:
174 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
175 continue
176 if osd_uuid_map.get(osd_id) != osd_uuid:
177 logger.debug('mismatched osd uuid (cluster has %s, osd '
178 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid))
179 continue
180 if osd_id in created:
181 continue
182
183 created.append(osd_id)
184 daemon_spec = CephadmDaemonDeploySpec(
185 service_name=service_name,
186 daemon_id=osd_id,
187 host=host,
188 daemon_type='osd',
189 )
190 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
191 await CephadmServe(self.mgr)._create_daemon(
192 daemon_spec,
193 osd_uuid_map=osd_uuid_map)
194
195 if created:
196 self.mgr.cache.invalidate_host_devices(host)
197 self.mgr.cache.invalidate_autotune(host)
198 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
199 else:
200 return "Created no osd(s) on host %s; already created?" % host
201
202 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
203 # 1) use fn_filter to determine matching_hosts
204 matching_hosts = drive_group.placement.filter_matching_hostspecs(
205 self.mgr.cache.get_schedulable_hosts())
206 # 2) Map the inventory to the InventoryHost object
207 host_ds_map = []
208
209 # set osd_id_claims
210
211 def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
212 # This is stupid and needs to be loaded with the host
213 for _host, _inventory in inventory_dict.items():
214 if _host == hostname:
215 return _inventory
216 raise OrchestratorError("No inventory found for host: {}".format(hostname))
217
218 # 3) iterate over matching_host and call DriveSelection
219 logger.debug(f"Checking matching hosts -> {matching_hosts}")
220 for host in matching_hosts:
221 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
222 logger.debug(f"Found inventory for host {inventory_for_host}")
223
224 # List of Daemons on that host
225 dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
226 dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]
227
228 drive_selection = DriveSelection(drive_group, inventory_for_host,
229 existing_daemons=len(dd_for_spec_and_host))
230 logger.debug(f"Found drive selection {drive_selection}")
231 host_ds_map.append((host, drive_selection))
232 return host_ds_map
233
234 @staticmethod
235 def driveselection_to_ceph_volume(drive_selection: DriveSelection,
236 osd_id_claims: Optional[List[str]] = None,
237 preview: bool = False) -> Optional[str]:
238 logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
239 cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
240 osd_id_claims, preview=preview).run()
241 logger.debug(f"Resulting ceph-volume cmd: {cmd}")
242 return cmd
243
244 def get_previews(self, host: str) -> List[Dict[str, Any]]:
245 # Find OSDSpecs that match host.
246 osdspecs = self.resolve_osdspecs_for_host(host)
247 return self.generate_previews(osdspecs, host)
248
249 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
250 """
251
252 The return should look like this:
253
254 [
255 {'data': {<metadata>},
256 'osdspec': <name of osdspec>,
257 'host': <name of host>,
258 'notes': <notes>
259 },
260
261 {'data': ...,
262 'osdspec': ..,
263 'host': ...,
264 'notes': ...
265 }
266 ]
267
268 Note: One host can have multiple previews based on its assigned OSDSpecs.
269 """
270 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
271 ret_all: List[Dict[str, Any]] = []
272 if not osdspecs:
273 return ret_all
274 for osdspec in osdspecs:
275
276 # populate osd_id_claims
277 osd_id_claims = OsdIdClaims(self.mgr)
278
279 # prepare driveselection
280 for host, ds in self.prepare_drivegroup(osdspec):
281 if host != for_host:
282 continue
283
284 # driveselection for host
285 cmd = self.driveselection_to_ceph_volume(ds,
286 osd_id_claims.filtered_by_host(host),
287 preview=True)
288 if not cmd:
289 logger.debug("No data_devices, skipping DriveGroup: {}".format(
290 osdspec.service_name()))
291 continue
292
293 # get preview data from ceph-volume
294 out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
295 if out:
296 try:
297 concat_out: Dict[str, Any] = json.loads(' '.join(out))
298 except ValueError:
299 logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
300 concat_out = {}
301 notes = []
302 if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
303 found = len(concat_out)
304 limit = osdspec.data_devices.limit
305 notes.append(
306 f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
307 ret_all.append({'data': concat_out,
308 'osdspec': osdspec.service_id,
309 'host': host,
310 'notes': notes})
311 return ret_all
312
313 def resolve_hosts_for_osdspecs(self,
314 specs: Optional[List[DriveGroupSpec]] = None
315 ) -> List[str]:
316 osdspecs = []
317 if specs:
318 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
319 if not osdspecs:
320 self.mgr.log.debug("No OSDSpecs found")
321 return []
322 return sum([spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()) for spec in osdspecs], [])
323
324 def resolve_osdspecs_for_host(self, host: str,
325 specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
326 matching_specs = []
327 self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
328 if not specs:
329 specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
330 if spec.service_type == 'osd']
331 for spec in specs:
332 if host in spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()):
333 self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
334 matching_specs.append(spec)
335 return matching_specs
336
337 async def _run_ceph_volume_command(self, host: str,
338 cmd: str, env_vars: Optional[List[str]] = None
339 ) -> Tuple[List[str], List[str], int]:
340 self.mgr.inventory.assert_host(host)
341
342 # get bootstrap key
343 ret, keyring, err = self.mgr.check_mon_command({
344 'prefix': 'auth get',
345 'entity': 'client.bootstrap-osd',
346 })
347
348 j = json.dumps({
349 'config': self.mgr.get_minimal_ceph_conf(),
350 'keyring': keyring,
351 })
352
353 split_cmd = cmd.split(' ')
354 _cmd = ['--config-json', '-', '--']
355 _cmd.extend(split_cmd)
356 out, err, code = await CephadmServe(self.mgr)._run_cephadm(
357 host, 'osd', 'ceph-volume',
358 _cmd,
359 env_vars=env_vars,
360 stdin=j,
361 error_ok=True)
362 return out, err, code
363
364 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
365 # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
366 # we cannot recover from it. The OSD keys are created by ceph-volume and not by
367 # us.
368 if not is_failed_deploy:
369 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
370
371
372 class OsdIdClaims(object):
373 """
374 Retrieve and provide osd ids that can be reused in the cluster
375 """
376
377 def __init__(self, mgr: "CephadmOrchestrator") -> None:
378 self.mgr: "CephadmOrchestrator" = mgr
379 self.osd_host_map: Dict[str, List[str]] = dict()
380 self.refresh()
381
382 def refresh(self) -> None:
383 try:
384 ret, out, err = self.mgr.check_mon_command({
385 'prefix': 'osd tree',
386 'states': ['destroyed'],
387 'format': 'json'
388 })
389 except MonCommandFailed as e:
390 logger.exception('osd tree failed')
391 raise OrchestratorError(str(e))
392 try:
393 tree = json.loads(out)
394 except ValueError:
395 logger.exception(f'Cannot decode JSON: \'{out}\'')
396 return
397
398 nodes = tree.get('nodes', {})
399 for node in nodes:
400 if node.get('type') == 'host':
401 self.osd_host_map.update(
402 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
403 )
404 if self.osd_host_map:
405 self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")
406
407 def get(self) -> Dict[str, List[str]]:
408 return self.osd_host_map
409
410 def filtered_by_host(self, host: str) -> List[str]:
411 """
412 Return the list of osd ids that can be reused in a host
413
414 OSD id claims in CRUSH map are linked to the bare name of
415 the hostname. In case of FQDN hostnames the host is searched by the
416 bare name
417 """
418 return self.osd_host_map.get(host.split(".")[0], [])
419
420
421 class RemoveUtil(object):
422 def __init__(self, mgr: "CephadmOrchestrator") -> None:
423 self.mgr: "CephadmOrchestrator" = mgr
424
425 def get_osds_in_cluster(self) -> List[str]:
426 osd_map = self.mgr.get_osdmap()
427 return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
428
429 def osd_df(self) -> dict:
430 base_cmd = 'osd df'
431 ret, out, err = self.mgr.mon_command({
432 'prefix': base_cmd,
433 'format': 'json'
434 })
435 try:
436 return json.loads(out)
437 except ValueError:
438 logger.exception(f'Cannot decode JSON: \'{out}\'')
439 return {}
440
441 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
442 if not osd_df:
443 osd_df = self.osd_df()
444 osd_nodes = osd_df.get('nodes', [])
445 for osd_node in osd_nodes:
446 if osd_node.get('id') == int(osd_id):
447 return osd_node.get('pgs', -1)
448 return -1
449
450 def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
451 """
452 Cut osd_id list in half until it's ok-to-stop
453
454 :param osds: list of osd_ids
455 :return: list of ods_ids that can be stopped at once
456 """
457 if not osds:
458 return []
459 while not self.ok_to_stop(osds):
460 if len(osds) <= 1:
461 # can't even stop one OSD, aborting
462 self.mgr.log.debug(
463 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
464 return []
465
466 # This potentially prolongs the global wait time.
467 self.mgr.event.wait(1)
468 # splitting osd_ids in half until ok_to_stop yields success
469 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
470 # There's a lot of room for micro adjustments here
471 osds = osds[len(osds) // 2:]
472 return osds
473
474 # todo start draining
475 # return all([osd.start_draining() for osd in osds])
476
477 def ok_to_stop(self, osds: List["OSD"]) -> bool:
478 cmd_args = {
479 'prefix': "osd ok-to-stop",
480 'ids': [str(osd.osd_id) for osd in osds]
481 }
482 return self._run_mon_cmd(cmd_args, error_ok=True)
483
484 def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
485 base_cmd = f"osd {flag}"
486 self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
487 ret, out, err = self.mgr.mon_command({
488 'prefix': base_cmd,
489 'ids': [str(osd.osd_id) for osd in osds]
490 })
491 if ret != 0:
492 self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
493 return False
494 self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
495 return True
496
497 def get_weight(self, osd: "OSD") -> Optional[float]:
498 ret, out, err = self.mgr.mon_command({
499 'prefix': 'osd crush tree',
500 'format': 'json',
501 })
502 if ret != 0:
503 self.mgr.log.error(f"Could not dump crush weights. <{err}>")
504 return None
505 j = json.loads(out)
506 for n in j.get("nodes", []):
507 if n.get("name") == f"osd.{osd.osd_id}":
508 self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
509 return n.get("crush_weight")
510 return None
511
512 def reweight_osd(self, osd: "OSD", weight: float) -> bool:
513 self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
514 ret, out, err = self.mgr.mon_command({
515 'prefix': "osd crush reweight",
516 'name': f"osd.{osd.osd_id}",
517 'weight': weight,
518 })
519 if ret != 0:
520 self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
521 return False
522 self.mgr.log.info(f"{osd} weight is now {weight}")
523 return True
524
525 def zap_osd(self, osd: "OSD") -> str:
526 "Zaps all devices that are associated with an OSD"
527 if osd.hostname is not None:
528 out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
529 osd.hostname, 'osd', 'ceph-volume',
530 ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)],
531 error_ok=True))
532 self.mgr.cache.invalidate_host_devices(osd.hostname)
533 if code:
534 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
535 return '\n'.join(out + err)
536 raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")
537
538 def safe_to_destroy(self, osd_ids: List[int]) -> bool:
539 """ Queries the safe-to-destroy flag for OSDs """
540 cmd_args = {'prefix': 'osd safe-to-destroy',
541 'ids': [str(x) for x in osd_ids]}
542 return self._run_mon_cmd(cmd_args, error_ok=True)
543
544 def destroy_osd(self, osd_id: int) -> bool:
545 """ Destroys an OSD (forcefully) """
546 cmd_args = {'prefix': 'osd destroy-actual',
547 'id': int(osd_id),
548 'yes_i_really_mean_it': True}
549 return self._run_mon_cmd(cmd_args)
550
551 def purge_osd(self, osd_id: int) -> bool:
552 """ Purges an OSD from the cluster (forcefully) """
553 cmd_args = {
554 'prefix': 'osd purge-actual',
555 'id': int(osd_id),
556 'yes_i_really_mean_it': True
557 }
558 return self._run_mon_cmd(cmd_args)
559
560 def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool:
561 """
562 Generic command to run mon_command and evaluate/log the results
563 """
564 ret, out, err = self.mgr.mon_command(cmd_args)
565 if ret != 0:
566 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
567 if not error_ok:
568 self.mgr.log.error(
569 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
570 return False
571 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
572 return True
573
574
575 class NotFoundError(Exception):
576 pass
577
578
579 class OSD:
580
581 def __init__(self,
582 osd_id: int,
583 remove_util: RemoveUtil,
584 drain_started_at: Optional[datetime] = None,
585 process_started_at: Optional[datetime] = None,
586 drain_stopped_at: Optional[datetime] = None,
587 drain_done_at: Optional[datetime] = None,
588 draining: bool = False,
589 started: bool = False,
590 stopped: bool = False,
591 replace: bool = False,
592 force: bool = False,
593 hostname: Optional[str] = None,
594 zap: bool = False):
595 # the ID of the OSD
596 self.osd_id = osd_id
597
598 # when did process (not the actual draining) start
599 self.process_started_at = process_started_at
600
601 # when did the drain start
602 self.drain_started_at = drain_started_at
603
604 # when did the drain stop
605 self.drain_stopped_at = drain_stopped_at
606
607 # when did the drain finish
608 self.drain_done_at = drain_done_at
609
610 # did the draining start
611 self.draining = draining
612
613 # was the operation started
614 self.started = started
615
616 # was the operation stopped
617 self.stopped = stopped
618
619 # If this is a replace or remove operation
620 self.replace = replace
621 # If we wait for the osd to be drained
622 self.force = force
623 # The name of the node
624 self.hostname = hostname
625
626 # mgr obj to make mgr/mon calls
627 self.rm_util: RemoveUtil = remove_util
628
629 self.original_weight: Optional[float] = None
630
631 # Whether devices associated with the OSD should be zapped (DATA ERASED)
632 self.zap = zap
633
634 def start(self) -> None:
635 if self.started:
636 logger.debug(f"Already started draining {self}")
637 return None
638 self.started = True
639 self.stopped = False
640
641 def start_draining(self) -> bool:
642 if self.stopped:
643 logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
644 return False
645 if self.replace:
646 self.rm_util.set_osd_flag([self], 'out')
647 else:
648 self.original_weight = self.rm_util.get_weight(self)
649 self.rm_util.reweight_osd(self, 0.0)
650 self.drain_started_at = datetime.utcnow()
651 self.draining = True
652 logger.debug(f"Started draining {self}.")
653 return True
654
655 def stop_draining(self) -> bool:
656 if self.replace:
657 self.rm_util.set_osd_flag([self], 'in')
658 else:
659 if self.original_weight:
660 self.rm_util.reweight_osd(self, self.original_weight)
661 self.drain_stopped_at = datetime.utcnow()
662 self.draining = False
663 logger.debug(f"Stopped draining {self}.")
664 return True
665
666 def stop(self) -> None:
667 if self.stopped:
668 logger.debug(f"Already stopped draining {self}")
669 return None
670 self.started = False
671 self.stopped = True
672 self.stop_draining()
673
674 @property
675 def is_draining(self) -> bool:
676 """
677 Consider an OSD draining when it is
678 actively draining but not yet empty
679 """
680 return self.draining and not self.is_empty
681
682 @property
683 def is_ok_to_stop(self) -> bool:
684 return self.rm_util.ok_to_stop([self])
685
686 @property
687 def is_empty(self) -> bool:
688 if self.get_pg_count() == 0:
689 if not self.drain_done_at:
690 self.drain_done_at = datetime.utcnow()
691 self.draining = False
692 return True
693 return False
694
695 def safe_to_destroy(self) -> bool:
696 return self.rm_util.safe_to_destroy([self.osd_id])
697
698 def down(self) -> bool:
699 return self.rm_util.set_osd_flag([self], 'down')
700
701 def destroy(self) -> bool:
702 return self.rm_util.destroy_osd(self.osd_id)
703
704 def do_zap(self) -> str:
705 return self.rm_util.zap_osd(self)
706
707 def purge(self) -> bool:
708 return self.rm_util.purge_osd(self.osd_id)
709
710 def get_pg_count(self) -> int:
711 return self.rm_util.get_pg_count(self.osd_id)
712
713 @property
714 def exists(self) -> bool:
715 return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
716
717 def drain_status_human(self) -> str:
718 default_status = 'not started'
719 status = 'started' if self.started and not self.draining else default_status
720 status = 'draining' if self.draining else status
721 status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
722 return status
723
724 def pg_count_str(self) -> str:
725 return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
726
727 def to_json(self) -> dict:
728 out: Dict[str, Any] = dict()
729 out['osd_id'] = self.osd_id
730 out['started'] = self.started
731 out['draining'] = self.draining
732 out['stopped'] = self.stopped
733 out['replace'] = self.replace
734 out['force'] = self.force
735 out['zap'] = self.zap
736 out['hostname'] = self.hostname # type: ignore
737
738 for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
739 if getattr(self, k):
740 out[k] = datetime_to_str(getattr(self, k))
741 else:
742 out[k] = getattr(self, k)
743 return out
744
745 @classmethod
746 def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
747 if not inp:
748 return None
749 for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
750 if inp.get(date_field):
751 inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
752 inp.update({'remove_util': rm_util})
753 if 'nodename' in inp:
754 hostname = inp.pop('nodename')
755 inp['hostname'] = hostname
756 return cls(**inp)
757
758 def __hash__(self) -> int:
759 return hash(self.osd_id)
760
761 def __eq__(self, other: object) -> bool:
762 if not isinstance(other, OSD):
763 return NotImplemented
764 return self.osd_id == other.osd_id
765
766 def __repr__(self) -> str:
767 return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
768
769
770 class OSDRemovalQueue(object):
771
772 def __init__(self, mgr: "CephadmOrchestrator") -> None:
773 self.mgr: "CephadmOrchestrator" = mgr
774 self.osds: Set[OSD] = set()
775 self.rm_util = RemoveUtil(mgr)
776
777 # locks multithreaded access to self.osds. Please avoid locking
778 # network calls, like mon commands.
779 self.lock = Lock()
780
781 def process_removal_queue(self) -> None:
782 """
783 Performs actions in the _serve() loop to remove an OSD
784 when criteria is met.
785
786 we can't hold self.lock, as we're calling _remove_daemon in the loop
787 """
788
789 # make sure that we don't run on OSDs that are not in the cluster anymore.
790 self.cleanup()
791
792 # find osds that are ok-to-stop and not yet draining
793 ready_to_drain_osds = self._ready_to_drain_osds()
794 if ready_to_drain_osds:
795 # start draining those
796 _ = [osd.start_draining() for osd in ready_to_drain_osds]
797
798 all_osds = self.all_osds()
799
800 logger.debug(
801 f"{self.queue_size()} OSDs are scheduled "
802 f"for removal: {all_osds}")
803
804 # Check all osds for their state and take action (remove, purge etc)
805 new_queue: Set[OSD] = set()
806 for osd in all_osds: # type: OSD
807 if not osd.force:
808 # skip criteria
809 if not osd.is_empty:
810 logger.debug(f"{osd} is not empty yet. Waiting a bit more")
811 new_queue.add(osd)
812 continue
813
814 if not osd.safe_to_destroy():
815 logger.debug(
816 f"{osd} is not safe-to-destroy yet. Waiting a bit more")
817 new_queue.add(osd)
818 continue
819
820 # abort criteria
821 if not osd.down():
822 # also remove it from the remove_osd list and set a health_check warning?
823 raise orchestrator.OrchestratorError(
824 f"Could not mark {osd} down")
825
826 # stop and remove daemon
827 assert osd.hostname is not None
828
829 if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
830 CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
831 logger.info(f"Successfully removed {osd} on {osd.hostname}")
832 else:
833 logger.info(f"Daemon {osd} on {osd.hostname} was already removed")
834
835 if osd.replace:
836 # mark destroyed in osdmap
837 if not osd.destroy():
838 raise orchestrator.OrchestratorError(
839 f"Could not destroy {osd}")
840 logger.info(
841 f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
842 else:
843 # purge from osdmap
844 if not osd.purge():
845 raise orchestrator.OrchestratorError(f"Could not purge {osd}")
846 logger.info(f"Successfully purged {osd} on {osd.hostname}")
847
848 if osd.zap:
849 # throws an exception if the zap fails
850 logger.info(f"Zapping devices for {osd} on {osd.hostname}")
851 osd.do_zap()
852 logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
853
854 logger.debug(f"Removing {osd} from the queue.")
855
856 # self could change while this is processing (osds get added from the CLI)
857 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
858 # osds that were added while this method was executed'
859 with self.lock:
860 self.osds.intersection_update(new_queue)
861 self._save_to_store()
862
863 def cleanup(self) -> None:
864 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
865 with self.lock:
866 for osd in self._not_in_cluster():
867 self.osds.remove(osd)
868
869 def _ready_to_drain_osds(self) -> List["OSD"]:
870 """
871 Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
872 be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs
873 that are already draining.
874 """
875 draining_limit = max(1, self.mgr.max_osd_draining_count)
876 num_already_draining = len(self.draining_osds())
877 num_to_start_draining = max(0, draining_limit - num_already_draining)
878 stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
879 return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]
880
881 def _save_to_store(self) -> None:
882 osd_queue = [osd.to_json() for osd in self.osds]
883 logger.debug(f"Saving {osd_queue} to store")
884 self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
885
886 def load_from_store(self) -> None:
887 with self.lock:
888 for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
889 for osd in json.loads(v):
890 logger.debug(f"Loading osd ->{osd} from store")
891 osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
892 if osd_obj is not None:
893 self.osds.add(osd_obj)
894
895 def as_osd_ids(self) -> List[int]:
896 with self.lock:
897 return [osd.osd_id for osd in self.osds]
898
899 def queue_size(self) -> int:
900 with self.lock:
901 return len(self.osds)
902
903 def draining_osds(self) -> List["OSD"]:
904 with self.lock:
905 return [osd for osd in self.osds if osd.is_draining]
906
907 def idling_osds(self) -> List["OSD"]:
908 with self.lock:
909 return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
910
911 def empty_osds(self) -> List["OSD"]:
912 with self.lock:
913 return [osd for osd in self.osds if osd.is_empty]
914
915 def all_osds(self) -> List["OSD"]:
916 with self.lock:
917 return [osd for osd in self.osds]
918
919 def _not_in_cluster(self) -> List["OSD"]:
920 return [osd for osd in self.osds if not osd.exists]
921
922 def enqueue(self, osd: "OSD") -> None:
923 if not osd.exists:
924 raise NotFoundError()
925 with self.lock:
926 self.osds.add(osd)
927 osd.start()
928
929 def rm(self, osd: "OSD") -> None:
930 if not osd.exists:
931 raise NotFoundError()
932 osd.stop()
933 with self.lock:
934 try:
935 logger.debug(f'Removing {osd} from the queue.')
936 self.osds.remove(osd)
937 except KeyError:
938 logger.debug(f"Could not find {osd} in queue.")
939 raise KeyError
940
941 def __eq__(self, other: Any) -> bool:
942 if not isinstance(other, OSDRemovalQueue):
943 return False
944 with self.lock:
945 return self.osds == other.osds