]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
234d7a057c17a6884ee153e3ca9bf1af0281afad
3 from asyncio
import gather
4 from threading
import Lock
5 from typing
import List
, Dict
, Any
, Set
, Tuple
, cast
, Optional
, TYPE_CHECKING
7 from ceph
.deployment
import translate
8 from ceph
.deployment
.drive_group
import DriveGroupSpec
9 from ceph
.deployment
.drive_selection
import DriveSelection
10 from ceph
.deployment
.inventory
import Device
11 from ceph
.utils
import datetime_to_str
, str_to_datetime
13 from datetime
import datetime
15 from cephadm
.serve
import CephadmServe
16 from ceph
.utils
import datetime_now
17 from orchestrator
import OrchestratorError
, DaemonDescription
18 from mgr_module
import MonCommandFailed
20 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
, CephService
23 from cephadm
.module
import CephadmOrchestrator
25 logger
= logging
.getLogger(__name__
)
28 class OSDService(CephService
):
31 def create_from_spec(self
, drive_group
: DriveGroupSpec
) -> str:
32 logger
.debug(f
"Processing DriveGroup {drive_group}")
33 osd_id_claims
= OsdIdClaims(self
.mgr
)
34 if osd_id_claims
.get():
36 f
"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
38 async def create_from_spec_one(host
: str, drive_selection
: DriveSelection
) -> Optional
[str]:
39 # skip this host if there has been no change in inventory
40 if not self
.mgr
.cache
.osdspec_needs_apply(host
, drive_group
):
41 self
.mgr
.log
.debug("skipping apply of %s on %s (no change)" % (
44 # skip this host if we cannot schedule here
45 if self
.mgr
.inventory
.has_label(host
, '_no_schedule'):
48 osd_id_claims_for_host
= osd_id_claims
.filtered_by_host(host
)
50 cmd
= self
.driveselection_to_ceph_volume(drive_selection
,
51 osd_id_claims_for_host
)
53 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
54 drive_group
.service_id
))
57 logger
.debug('Applying service osd.%s on host %s...' % (
58 drive_group
.service_id
, host
60 start_ts
= datetime_now()
61 env_vars
: List
[str] = [f
"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
62 ret_msg
= await self
.create_single_host(
63 drive_group
, host
, cmd
,
64 replace_osd_ids
=osd_id_claims_for_host
, env_vars
=env_vars
66 self
.mgr
.cache
.update_osdspec_last_applied(
67 host
, drive_group
.service_name(), start_ts
69 self
.mgr
.cache
.save_host(host
)
72 async def all_hosts() -> List
[Optional
[str]]:
73 futures
= [create_from_spec_one(h
, ds
)
74 for h
, ds
in self
.prepare_drivegroup(drive_group
)]
75 return await gather(*futures
)
77 ret
= self
.mgr
.wait_async(all_hosts())
78 return ", ".join(filter(None, ret
))
80 async def create_single_host(self
,
81 drive_group
: DriveGroupSpec
,
82 host
: str, cmd
: str, replace_osd_ids
: List
[str],
83 env_vars
: Optional
[List
[str]] = None) -> str:
84 out
, err
, code
= await self
._run
_ceph
_volume
_command
(host
, cmd
, env_vars
=env_vars
)
86 if code
== 1 and ', it is already prepared' in '\n'.join(err
):
87 # HACK: when we create against an existing LV, ceph-volume
88 # returns an error and the above message. To make this
89 # command idempotent, tolerate this "error" and continue.
90 logger
.debug('the device was already prepared; continuing')
94 'cephadm exited with an error code: %d, stderr:%s' % (
95 code
, '\n'.join(err
)))
96 return await self
.deploy_osd_daemons_for_existing_osds(host
, drive_group
.service_name(),
99 async def deploy_osd_daemons_for_existing_osds(self
, host
: str, service_name
: str,
100 replace_osd_ids
: Optional
[List
[str]] = None) -> str:
102 if replace_osd_ids
is None:
103 replace_osd_ids
= OsdIdClaims(self
.mgr
).filtered_by_host(host
)
104 assert replace_osd_ids
is not None
107 osds_elems
: dict = await CephadmServe(self
.mgr
)._run
_cephadm
_json
(
108 host
, 'osd', 'ceph-volume',
114 before_osd_uuid_map
= self
.mgr
.get_osd_uuid_map(only_up
=True)
115 fsid
= self
.mgr
._cluster
_fsid
116 osd_uuid_map
= self
.mgr
.get_osd_uuid_map()
118 for osd_id
, osds
in osds_elems
.items():
120 if osd
['type'] == 'db':
122 if osd
['tags']['ceph.cluster_fsid'] != fsid
:
123 logger
.debug('mismatched fsid, skipping %s' % osd
)
125 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
126 # if it exists but is part of the replacement operation, don't skip
128 if self
.mgr
.cache
.has_daemon(f
'osd.{osd_id}', host
):
129 # cephadm daemon instance already exists
130 logger
.debug(f
'osd id {osd_id} daemon already exists')
132 if osd_id
not in osd_uuid_map
:
133 logger
.debug('osd id {} does not exist in cluster'.format(osd_id
))
135 if osd_uuid_map
.get(osd_id
) != osd
['tags']['ceph.osd_fsid']:
136 logger
.debug('mismatched osd uuid (cluster has %s, osd '
138 osd_uuid_map
.get(osd_id
),
139 osd
['tags']['ceph.osd_fsid']))
142 created
.append(osd_id
)
143 daemon_spec
: CephadmDaemonDeploySpec
= CephadmDaemonDeploySpec(
144 service_name
=service_name
,
145 daemon_id
=str(osd_id
),
149 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
150 await CephadmServe(self
.mgr
)._create
_daemon
(
152 osd_uuid_map
=osd_uuid_map
)
155 raw_elems
: dict = await CephadmServe(self
.mgr
)._run
_cephadm
_json
(
156 host
, 'osd', 'ceph-volume',
162 for osd_uuid
, osd
in raw_elems
.items():
163 if osd
.get('ceph_fsid') != fsid
:
165 osd_id
= str(osd
.get('osd_id', '-1'))
166 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
167 # if it exists but is part of the replacement operation, don't skip
169 if self
.mgr
.cache
.has_daemon(f
'osd.{osd_id}', host
):
170 # cephadm daemon instance already exists
171 logger
.debug(f
'osd id {osd_id} daemon already exists')
173 if osd_id
not in osd_uuid_map
:
174 logger
.debug('osd id {} does not exist in cluster'.format(osd_id
))
176 if osd_uuid_map
.get(osd_id
) != osd_uuid
:
177 logger
.debug('mismatched osd uuid (cluster has %s, osd '
178 'has %s)' % (osd_uuid_map
.get(osd_id
), osd_uuid
))
180 if osd_id
in created
:
183 created
.append(osd_id
)
184 daemon_spec
= CephadmDaemonDeploySpec(
185 service_name
=service_name
,
190 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
191 await CephadmServe(self
.mgr
)._create
_daemon
(
193 osd_uuid_map
=osd_uuid_map
)
196 self
.mgr
.cache
.invalidate_host_devices(host
)
197 self
.mgr
.cache
.invalidate_autotune(host
)
198 return "Created osd(s) %s on host '%s'" % (','.join(created
), host
)
200 return "Created no osd(s) on host %s; already created?" % host
202 def prepare_drivegroup(self
, drive_group
: DriveGroupSpec
) -> List
[Tuple
[str, DriveSelection
]]:
203 # 1) use fn_filter to determine matching_hosts
204 matching_hosts
= drive_group
.placement
.filter_matching_hostspecs(
205 self
.mgr
.cache
.get_schedulable_hosts())
206 # 2) Map the inventory to the InventoryHost object
211 def _find_inv_for_host(hostname
: str, inventory_dict
: dict) -> List
[Device
]:
212 # This is stupid and needs to be loaded with the host
213 for _host
, _inventory
in inventory_dict
.items():
214 if _host
== hostname
:
216 raise OrchestratorError("No inventory found for host: {}".format(hostname
))
218 # 3) iterate over matching_host and call DriveSelection
219 logger
.debug(f
"Checking matching hosts -> {matching_hosts}")
220 for host
in matching_hosts
:
221 inventory_for_host
= _find_inv_for_host(host
, self
.mgr
.cache
.devices
)
222 logger
.debug(f
"Found inventory for host {inventory_for_host}")
224 # List of Daemons on that host
225 dd_for_spec
= self
.mgr
.cache
.get_daemons_by_service(drive_group
.service_name())
226 dd_for_spec_and_host
= [dd
for dd
in dd_for_spec
if dd
.hostname
== host
]
228 drive_selection
= DriveSelection(drive_group
, inventory_for_host
,
229 existing_daemons
=len(dd_for_spec_and_host
))
230 logger
.debug(f
"Found drive selection {drive_selection}")
231 host_ds_map
.append((host
, drive_selection
))
235 def driveselection_to_ceph_volume(drive_selection
: DriveSelection
,
236 osd_id_claims
: Optional
[List
[str]] = None,
237 preview
: bool = False) -> Optional
[str]:
238 logger
.debug(f
"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
239 cmd
: Optional
[str] = translate
.to_ceph_volume(drive_selection
,
240 osd_id_claims
, preview
=preview
).run()
241 logger
.debug(f
"Resulting ceph-volume cmd: {cmd}")
244 def get_previews(self
, host
: str) -> List
[Dict
[str, Any
]]:
245 # Find OSDSpecs that match host.
246 osdspecs
= self
.resolve_osdspecs_for_host(host
)
247 return self
.generate_previews(osdspecs
, host
)
249 def generate_previews(self
, osdspecs
: List
[DriveGroupSpec
], for_host
: str) -> List
[Dict
[str, Any
]]:
252 The return should look like this:
255 {'data': {<metadata>},
256 'osdspec': <name of osdspec>,
257 'host': <name of host>,
268 Note: One host can have multiple previews based on its assigned OSDSpecs.
270 self
.mgr
.log
.debug(f
"Generating OSDSpec previews for {osdspecs}")
271 ret_all
: List
[Dict
[str, Any
]] = []
274 for osdspec
in osdspecs
:
276 # populate osd_id_claims
277 osd_id_claims
= OsdIdClaims(self
.mgr
)
279 # prepare driveselection
280 for host
, ds
in self
.prepare_drivegroup(osdspec
):
284 # driveselection for host
285 cmd
= self
.driveselection_to_ceph_volume(ds
,
286 osd_id_claims
.filtered_by_host(host
),
289 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
290 osdspec
.service_name()))
293 # get preview data from ceph-volume
294 out
, err
, code
= self
.mgr
.wait_async(self
._run
_ceph
_volume
_command
(host
, cmd
))
297 concat_out
: Dict
[str, Any
] = json
.loads(' '.join(out
))
299 logger
.exception('Cannot decode JSON: \'%s\'' % ' '.join(out
))
302 if osdspec
.data_devices
is not None and osdspec
.data_devices
.limit
and len(concat_out
) < osdspec
.data_devices
.limit
:
303 found
= len(concat_out
)
304 limit
= osdspec
.data_devices
.limit
306 f
'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
307 ret_all
.append({'data': concat_out
,
308 'osdspec': osdspec
.service_id
,
313 def resolve_hosts_for_osdspecs(self
,
314 specs
: Optional
[List
[DriveGroupSpec
]] = None
318 osdspecs
= [cast(DriveGroupSpec
, spec
) for spec
in specs
]
320 self
.mgr
.log
.debug("No OSDSpecs found")
322 return sum([spec
.placement
.filter_matching_hostspecs(self
.mgr
.cache
.get_schedulable_hosts()) for spec
in osdspecs
], [])
324 def resolve_osdspecs_for_host(self
, host
: str,
325 specs
: Optional
[List
[DriveGroupSpec
]] = None) -> List
[DriveGroupSpec
]:
327 self
.mgr
.log
.debug(f
"Finding OSDSpecs for host: <{host}>")
329 specs
= [cast(DriveGroupSpec
, spec
) for (sn
, spec
) in self
.mgr
.spec_store
.spec_preview
.items()
330 if spec
.service_type
== 'osd']
332 if host
in spec
.placement
.filter_matching_hostspecs(self
.mgr
.cache
.get_schedulable_hosts()):
333 self
.mgr
.log
.debug(f
"Found OSDSpecs for host: <{host}> -> <{spec}>")
334 matching_specs
.append(spec
)
335 return matching_specs
337 async def _run_ceph_volume_command(self
, host
: str,
338 cmd
: str, env_vars
: Optional
[List
[str]] = None
339 ) -> Tuple
[List
[str], List
[str], int]:
340 self
.mgr
.inventory
.assert_host(host
)
343 ret
, keyring
, err
= self
.mgr
.check_mon_command({
344 'prefix': 'auth get',
345 'entity': 'client.bootstrap-osd',
349 'config': self
.mgr
.get_minimal_ceph_conf(),
353 split_cmd
= cmd
.split(' ')
354 _cmd
= ['--config-json', '-', '--']
355 _cmd
.extend(split_cmd
)
356 out
, err
, code
= await CephadmServe(self
.mgr
)._run
_cephadm
(
357 host
, 'osd', 'ceph-volume',
362 return out
, err
, code
364 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
365 # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
366 # we cannot recover from it. The OSD keys are created by ceph-volume and not by
368 if not is_failed_deploy
:
369 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
372 class OsdIdClaims(object):
374 Retrieve and provide osd ids that can be reused in the cluster
377 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
378 self
.mgr
: "CephadmOrchestrator" = mgr
379 self
.osd_host_map
: Dict
[str, List
[str]] = dict()
382 def refresh(self
) -> None:
384 ret
, out
, err
= self
.mgr
.check_mon_command({
385 'prefix': 'osd tree',
386 'states': ['destroyed'],
389 except MonCommandFailed
as e
:
390 logger
.exception('osd tree failed')
391 raise OrchestratorError(str(e
))
393 tree
= json
.loads(out
)
395 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
398 nodes
= tree
.get('nodes', {})
400 if node
.get('type') == 'host':
401 self
.osd_host_map
.update(
402 {node
.get('name'): [str(_id
) for _id
in node
.get('children', list())]}
404 if self
.osd_host_map
:
405 self
.mgr
.log
.info(f
"Found osd claims -> {self.osd_host_map}")
407 def get(self
) -> Dict
[str, List
[str]]:
408 return self
.osd_host_map
410 def filtered_by_host(self
, host
: str) -> List
[str]:
412 Return the list of osd ids that can be reused in a host
414 OSD id claims in CRUSH map are linked to the bare name of
415 the hostname. In case of FQDN hostnames the host is searched by the
418 return self
.osd_host_map
.get(host
.split(".")[0], [])
421 class RemoveUtil(object):
422 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
423 self
.mgr
: "CephadmOrchestrator" = mgr
425 def get_osds_in_cluster(self
) -> List
[str]:
426 osd_map
= self
.mgr
.get_osdmap()
427 return [str(x
.get('osd')) for x
in osd_map
.dump().get('osds', [])]
429 def osd_df(self
) -> dict:
431 ret
, out
, err
= self
.mgr
.mon_command({
436 return json
.loads(out
)
438 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
441 def get_pg_count(self
, osd_id
: int, osd_df
: Optional
[dict] = None) -> int:
443 osd_df
= self
.osd_df()
444 osd_nodes
= osd_df
.get('nodes', [])
445 for osd_node
in osd_nodes
:
446 if osd_node
.get('id') == int(osd_id
):
447 return osd_node
.get('pgs', -1)
450 def find_osd_stop_threshold(self
, osds
: List
["OSD"]) -> Optional
[List
["OSD"]]:
452 Cut osd_id list in half until it's ok-to-stop
454 :param osds: list of osd_ids
455 :return: list of ods_ids that can be stopped at once
459 while not self
.ok_to_stop(osds
):
461 # can't even stop one OSD, aborting
463 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
466 # This potentially prolongs the global wait time.
467 self
.mgr
.event
.wait(1)
468 # splitting osd_ids in half until ok_to_stop yields success
469 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
470 # There's a lot of room for micro adjustments here
471 osds
= osds
[len(osds
) // 2:]
474 # todo start draining
475 # return all([osd.start_draining() for osd in osds])
477 def ok_to_stop(self
, osds
: List
["OSD"]) -> bool:
479 'prefix': "osd ok-to-stop",
480 'ids': [str(osd
.osd_id
) for osd
in osds
]
482 return self
._run
_mon
_cmd
(cmd_args
, error_ok
=True)
484 def set_osd_flag(self
, osds
: List
["OSD"], flag
: str) -> bool:
485 base_cmd
= f
"osd {flag}"
486 self
.mgr
.log
.debug(f
"running cmd: {base_cmd} on ids {osds}")
487 ret
, out
, err
= self
.mgr
.mon_command({
489 'ids': [str(osd
.osd_id
) for osd
in osds
]
492 self
.mgr
.log
.error(f
"Could not set {flag} flag for {osds}. <{err}>")
494 self
.mgr
.log
.info(f
"{','.join([str(o) for o in osds])} now {flag}")
497 def get_weight(self
, osd
: "OSD") -> Optional
[float]:
498 ret
, out
, err
= self
.mgr
.mon_command({
499 'prefix': 'osd crush tree',
503 self
.mgr
.log
.error(f
"Could not dump crush weights. <{err}>")
506 for n
in j
.get("nodes", []):
507 if n
.get("name") == f
"osd.{osd.osd_id}":
508 self
.mgr
.log
.info(f
"{osd} crush weight is {n.get('crush_weight')}")
509 return n
.get("crush_weight")
512 def reweight_osd(self
, osd
: "OSD", weight
: float) -> bool:
513 self
.mgr
.log
.debug(f
"running cmd: osd crush reweight on {osd}")
514 ret
, out
, err
= self
.mgr
.mon_command({
515 'prefix': "osd crush reweight",
516 'name': f
"osd.{osd.osd_id}",
520 self
.mgr
.log
.error(f
"Could not reweight {osd} to {weight}. <{err}>")
522 self
.mgr
.log
.info(f
"{osd} weight is now {weight}")
525 def zap_osd(self
, osd
: "OSD") -> str:
526 "Zaps all devices that are associated with an OSD"
527 if osd
.hostname
is not None:
528 out
, err
, code
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._run
_cephadm
(
529 osd
.hostname
, 'osd', 'ceph-volume',
530 ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd
.osd_id
)],
532 self
.mgr
.cache
.invalidate_host_devices(osd
.hostname
)
534 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
535 return '\n'.join(out
+ err
)
536 raise OrchestratorError(f
"Failed to zap OSD {osd.osd_id} because host was unknown")
538 def safe_to_destroy(self
, osd_ids
: List
[int]) -> bool:
539 """ Queries the safe-to-destroy flag for OSDs """
540 cmd_args
= {'prefix': 'osd safe-to-destroy',
541 'ids': [str(x
) for x
in osd_ids
]}
542 return self
._run
_mon
_cmd
(cmd_args
, error_ok
=True)
544 def destroy_osd(self
, osd_id
: int) -> bool:
545 """ Destroys an OSD (forcefully) """
546 cmd_args
= {'prefix': 'osd destroy-actual',
548 'yes_i_really_mean_it': True}
549 return self
._run
_mon
_cmd
(cmd_args
)
551 def purge_osd(self
, osd_id
: int) -> bool:
552 """ Purges an OSD from the cluster (forcefully) """
554 'prefix': 'osd purge-actual',
556 'yes_i_really_mean_it': True
558 return self
._run
_mon
_cmd
(cmd_args
)
560 def _run_mon_cmd(self
, cmd_args
: dict, error_ok
: bool = False) -> bool:
562 Generic command to run mon_command and evaluate/log the results
564 ret
, out
, err
= self
.mgr
.mon_command(cmd_args
)
566 self
.mgr
.log
.debug(f
"ran {cmd_args} with mon_command")
569 f
"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
571 self
.mgr
.log
.debug(f
"cmd: {cmd_args.get('prefix')} returns: {out}")
575 class NotFoundError(Exception):
583 remove_util
: RemoveUtil
,
584 drain_started_at
: Optional
[datetime
] = None,
585 process_started_at
: Optional
[datetime
] = None,
586 drain_stopped_at
: Optional
[datetime
] = None,
587 drain_done_at
: Optional
[datetime
] = None,
588 draining
: bool = False,
589 started
: bool = False,
590 stopped
: bool = False,
591 replace
: bool = False,
593 hostname
: Optional
[str] = None,
598 # when did process (not the actual draining) start
599 self
.process_started_at
= process_started_at
601 # when did the drain start
602 self
.drain_started_at
= drain_started_at
604 # when did the drain stop
605 self
.drain_stopped_at
= drain_stopped_at
607 # when did the drain finish
608 self
.drain_done_at
= drain_done_at
610 # did the draining start
611 self
.draining
= draining
613 # was the operation started
614 self
.started
= started
616 # was the operation stopped
617 self
.stopped
= stopped
619 # If this is a replace or remove operation
620 self
.replace
= replace
621 # If we wait for the osd to be drained
623 # The name of the node
624 self
.hostname
= hostname
626 # mgr obj to make mgr/mon calls
627 self
.rm_util
: RemoveUtil
= remove_util
629 self
.original_weight
: Optional
[float] = None
631 # Whether devices associated with the OSD should be zapped (DATA ERASED)
634 def start(self
) -> None:
636 logger
.debug(f
"Already started draining {self}")
641 def start_draining(self
) -> bool:
643 logger
.debug(f
"Won't start draining {self}. OSD draining is stopped.")
646 self
.rm_util
.set_osd_flag([self
], 'out')
648 self
.original_weight
= self
.rm_util
.get_weight(self
)
649 self
.rm_util
.reweight_osd(self
, 0.0)
650 self
.drain_started_at
= datetime
.utcnow()
652 logger
.debug(f
"Started draining {self}.")
655 def stop_draining(self
) -> bool:
657 self
.rm_util
.set_osd_flag([self
], 'in')
659 if self
.original_weight
:
660 self
.rm_util
.reweight_osd(self
, self
.original_weight
)
661 self
.drain_stopped_at
= datetime
.utcnow()
662 self
.draining
= False
663 logger
.debug(f
"Stopped draining {self}.")
666 def stop(self
) -> None:
668 logger
.debug(f
"Already stopped draining {self}")
675 def is_draining(self
) -> bool:
677 Consider an OSD draining when it is
678 actively draining but not yet empty
680 return self
.draining
and not self
.is_empty
683 def is_ok_to_stop(self
) -> bool:
684 return self
.rm_util
.ok_to_stop([self
])
687 def is_empty(self
) -> bool:
688 if self
.get_pg_count() == 0:
689 if not self
.drain_done_at
:
690 self
.drain_done_at
= datetime
.utcnow()
691 self
.draining
= False
695 def safe_to_destroy(self
) -> bool:
696 return self
.rm_util
.safe_to_destroy([self
.osd_id
])
698 def down(self
) -> bool:
699 return self
.rm_util
.set_osd_flag([self
], 'down')
701 def destroy(self
) -> bool:
702 return self
.rm_util
.destroy_osd(self
.osd_id
)
704 def do_zap(self
) -> str:
705 return self
.rm_util
.zap_osd(self
)
707 def purge(self
) -> bool:
708 return self
.rm_util
.purge_osd(self
.osd_id
)
710 def get_pg_count(self
) -> int:
711 return self
.rm_util
.get_pg_count(self
.osd_id
)
714 def exists(self
) -> bool:
715 return str(self
.osd_id
) in self
.rm_util
.get_osds_in_cluster()
717 def drain_status_human(self
) -> str:
718 default_status
= 'not started'
719 status
= 'started' if self
.started
and not self
.draining
else default_status
720 status
= 'draining' if self
.draining
else status
721 status
= 'done, waiting for purge' if self
.drain_done_at
and not self
.draining
else status
724 def pg_count_str(self
) -> str:
725 return 'n/a' if self
.get_pg_count() < 0 else str(self
.get_pg_count())
727 def to_json(self
) -> dict:
728 out
: Dict
[str, Any
] = dict()
729 out
['osd_id'] = self
.osd_id
730 out
['started'] = self
.started
731 out
['draining'] = self
.draining
732 out
['stopped'] = self
.stopped
733 out
['replace'] = self
.replace
734 out
['force'] = self
.force
735 out
['zap'] = self
.zap
736 out
['hostname'] = self
.hostname
# type: ignore
738 for k
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
740 out
[k
] = datetime_to_str(getattr(self
, k
))
742 out
[k
] = getattr(self
, k
)
746 def from_json(cls
, inp
: Optional
[Dict
[str, Any
]], rm_util
: RemoveUtil
) -> Optional
["OSD"]:
749 for date_field
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
750 if inp
.get(date_field
):
751 inp
.update({date_field
: str_to_datetime(inp
.get(date_field
, ''))})
752 inp
.update({'remove_util': rm_util
})
753 if 'nodename' in inp
:
754 hostname
= inp
.pop('nodename')
755 inp
['hostname'] = hostname
758 def __hash__(self
) -> int:
759 return hash(self
.osd_id
)
761 def __eq__(self
, other
: object) -> bool:
762 if not isinstance(other
, OSD
):
763 return NotImplemented
764 return self
.osd_id
== other
.osd_id
766 def __repr__(self
) -> str:
767 return f
"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
770 class OSDRemovalQueue(object):
772 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
773 self
.mgr
: "CephadmOrchestrator" = mgr
774 self
.osds
: Set
[OSD
] = set()
775 self
.rm_util
= RemoveUtil(mgr
)
777 # locks multithreaded access to self.osds. Please avoid locking
778 # network calls, like mon commands.
781 def process_removal_queue(self
) -> None:
783 Performs actions in the _serve() loop to remove an OSD
784 when criteria is met.
786 we can't hold self.lock, as we're calling _remove_daemon in the loop
789 # make sure that we don't run on OSDs that are not in the cluster anymore.
792 # find osds that are ok-to-stop and not yet draining
793 ready_to_drain_osds
= self
._ready
_to
_drain
_osds
()
794 if ready_to_drain_osds
:
795 # start draining those
796 _
= [osd
.start_draining() for osd
in ready_to_drain_osds
]
798 all_osds
= self
.all_osds()
801 f
"{self.queue_size()} OSDs are scheduled "
802 f
"for removal: {all_osds}")
804 # Check all osds for their state and take action (remove, purge etc)
805 new_queue
: Set
[OSD
] = set()
806 for osd
in all_osds
: # type: OSD
810 logger
.debug(f
"{osd} is not empty yet. Waiting a bit more")
814 if not osd
.safe_to_destroy():
816 f
"{osd} is not safe-to-destroy yet. Waiting a bit more")
822 # also remove it from the remove_osd list and set a health_check warning?
823 raise orchestrator
.OrchestratorError(
824 f
"Could not mark {osd} down")
826 # stop and remove daemon
827 assert osd
.hostname
is not None
829 if self
.mgr
.cache
.has_daemon(f
'osd.{osd.osd_id}'):
830 CephadmServe(self
.mgr
)._remove
_daemon
(f
'osd.{osd.osd_id}', osd
.hostname
)
831 logger
.info(f
"Successfully removed {osd} on {osd.hostname}")
833 logger
.info(f
"Daemon {osd} on {osd.hostname} was already removed")
836 # mark destroyed in osdmap
837 if not osd
.destroy():
838 raise orchestrator
.OrchestratorError(
839 f
"Could not destroy {osd}")
841 f
"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
845 raise orchestrator
.OrchestratorError(f
"Could not purge {osd}")
846 logger
.info(f
"Successfully purged {osd} on {osd.hostname}")
849 # throws an exception if the zap fails
850 logger
.info(f
"Zapping devices for {osd} on {osd.hostname}")
852 logger
.info(f
"Successfully zapped devices for {osd} on {osd.hostname}")
854 logger
.debug(f
"Removing {osd} from the queue.")
856 # self could change while this is processing (osds get added from the CLI)
857 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
858 # osds that were added while this method was executed'
860 self
.osds
.intersection_update(new_queue
)
861 self
._save
_to
_store
()
863 def cleanup(self
) -> None:
864 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
866 for osd
in self
._not
_in
_cluster
():
867 self
.osds
.remove(osd
)
869 def _ready_to_drain_osds(self
) -> List
["OSD"]:
871 Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
872 be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs
873 that are already draining.
875 draining_limit
= max(1, self
.mgr
.max_osd_draining_count
)
876 num_already_draining
= len(self
.draining_osds())
877 num_to_start_draining
= max(0, draining_limit
- num_already_draining
)
878 stoppable_osds
= self
.rm_util
.find_osd_stop_threshold(self
.idling_osds())
879 return [] if stoppable_osds
is None else stoppable_osds
[:num_to_start_draining
]
881 def _save_to_store(self
) -> None:
882 osd_queue
= [osd
.to_json() for osd
in self
.osds
]
883 logger
.debug(f
"Saving {osd_queue} to store")
884 self
.mgr
.set_store('osd_remove_queue', json
.dumps(osd_queue
))
886 def load_from_store(self
) -> None:
888 for k
, v
in self
.mgr
.get_store_prefix('osd_remove_queue').items():
889 for osd
in json
.loads(v
):
890 logger
.debug(f
"Loading osd ->{osd} from store")
891 osd_obj
= OSD
.from_json(osd
, rm_util
=self
.rm_util
)
892 if osd_obj
is not None:
893 self
.osds
.add(osd_obj
)
895 def as_osd_ids(self
) -> List
[int]:
897 return [osd
.osd_id
for osd
in self
.osds
]
899 def queue_size(self
) -> int:
901 return len(self
.osds
)
903 def draining_osds(self
) -> List
["OSD"]:
905 return [osd
for osd
in self
.osds
if osd
.is_draining
]
907 def idling_osds(self
) -> List
["OSD"]:
909 return [osd
for osd
in self
.osds
if not osd
.is_draining
and not osd
.is_empty
]
911 def empty_osds(self
) -> List
["OSD"]:
913 return [osd
for osd
in self
.osds
if osd
.is_empty
]
915 def all_osds(self
) -> List
["OSD"]:
917 return [osd
for osd
in self
.osds
]
919 def _not_in_cluster(self
) -> List
["OSD"]:
920 return [osd
for osd
in self
.osds
if not osd
.exists
]
922 def enqueue(self
, osd
: "OSD") -> None:
924 raise NotFoundError()
929 def rm(self
, osd
: "OSD") -> None:
931 raise NotFoundError()
935 logger
.debug(f
'Removing {osd} from the queue.')
936 self
.osds
.remove(osd
)
938 logger
.debug(f
"Could not find {osd} in queue.")
941 def __eq__(self
, other
: Any
) -> bool:
942 if not isinstance(other
, OSDRemovalQueue
):
945 return self
.osds
== other
.osds