]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
3 from asyncio
import gather
4 from threading
import Lock
5 from typing
import List
, Dict
, Any
, Set
, Tuple
, cast
, Optional
, TYPE_CHECKING
7 from ceph
.deployment
import translate
8 from ceph
.deployment
.drive_group
import DriveGroupSpec
9 from ceph
.deployment
.drive_selection
import DriveSelection
10 from ceph
.deployment
.inventory
import Device
11 from ceph
.utils
import datetime_to_str
, str_to_datetime
13 from datetime
import datetime
15 from cephadm
.serve
import CephadmServe
16 from cephadm
.utils
import SpecialHostLabels
17 from ceph
.utils
import datetime_now
18 from orchestrator
import OrchestratorError
, DaemonDescription
19 from mgr_module
import MonCommandFailed
21 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
, CephService
24 from cephadm
.module
import CephadmOrchestrator
26 logger
= logging
.getLogger(__name__
)
29 class OSDService(CephService
):
32 def create_from_spec(self
, drive_group
: DriveGroupSpec
) -> str:
33 logger
.debug(f
"Processing DriveGroup {drive_group}")
34 osd_id_claims
= OsdIdClaims(self
.mgr
)
35 if osd_id_claims
.get():
37 f
"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
39 async def create_from_spec_one(host
: str, drive_selection
: DriveSelection
) -> Optional
[str]:
40 # skip this host if there has been no change in inventory
41 if not self
.mgr
.cache
.osdspec_needs_apply(host
, drive_group
):
42 self
.mgr
.log
.debug("skipping apply of %s on %s (no change)" % (
45 # skip this host if we cannot schedule here
46 if self
.mgr
.inventory
.has_label(host
, SpecialHostLabels
.DRAIN_DAEMONS
):
49 osd_id_claims_for_host
= osd_id_claims
.filtered_by_host(host
)
51 cmds
: List
[str] = self
.driveselection_to_ceph_volume(drive_selection
,
52 osd_id_claims_for_host
)
54 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
55 drive_group
.service_id
))
58 logger
.debug('Applying service osd.%s on host %s...' % (
59 drive_group
.service_id
, host
61 start_ts
= datetime_now()
62 env_vars
: List
[str] = [f
"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
63 ret_msg
= await self
.create_single_host(
64 drive_group
, host
, cmds
,
65 replace_osd_ids
=osd_id_claims_for_host
, env_vars
=env_vars
67 self
.mgr
.cache
.update_osdspec_last_applied(
68 host
, drive_group
.service_name(), start_ts
70 self
.mgr
.cache
.save_host(host
)
73 async def all_hosts() -> List
[Optional
[str]]:
74 futures
= [create_from_spec_one(h
, ds
)
75 for h
, ds
in self
.prepare_drivegroup(drive_group
)]
76 return await gather(*futures
)
78 with self
.mgr
.async_timeout_handler('cephadm deploy (osd daemon)'):
79 ret
= self
.mgr
.wait_async(all_hosts())
80 return ", ".join(filter(None, ret
))
82 async def create_single_host(self
,
83 drive_group
: DriveGroupSpec
,
84 host
: str, cmds
: List
[str], replace_osd_ids
: List
[str],
85 env_vars
: Optional
[List
[str]] = None) -> str:
87 out
, err
, code
= await self
._run
_ceph
_volume
_command
(host
, cmd
, env_vars
=env_vars
)
88 if code
== 1 and ', it is already prepared' in '\n'.join(err
):
89 # HACK: when we create against an existing LV, ceph-volume
90 # returns an error and the above message. To make this
91 # command idempotent, tolerate this "error" and continue.
92 logger
.debug('the device was already prepared; continuing')
96 'cephadm exited with an error code: %d, stderr:%s' % (
97 code
, '\n'.join(err
)))
98 return await self
.deploy_osd_daemons_for_existing_osds(host
, drive_group
.service_name(),
101 async def deploy_osd_daemons_for_existing_osds(self
, host
: str, service_name
: str,
102 replace_osd_ids
: Optional
[List
[str]] = None) -> str:
104 if replace_osd_ids
is None:
105 replace_osd_ids
= OsdIdClaims(self
.mgr
).filtered_by_host(host
)
106 assert replace_osd_ids
is not None
109 osds_elems
: dict = await CephadmServe(self
.mgr
)._run
_cephadm
_json
(
110 host
, 'osd', 'ceph-volume',
116 before_osd_uuid_map
= self
.mgr
.get_osd_uuid_map(only_up
=True)
117 fsid
= self
.mgr
._cluster
_fsid
118 osd_uuid_map
= self
.mgr
.get_osd_uuid_map()
120 for osd_id
, osds
in osds_elems
.items():
122 if osd
['type'] == 'db':
124 if osd
['tags']['ceph.cluster_fsid'] != fsid
:
125 logger
.debug('mismatched fsid, skipping %s' % osd
)
127 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
128 # if it exists but is part of the replacement operation, don't skip
130 if self
.mgr
.cache
.has_daemon(f
'osd.{osd_id}', host
):
131 # cephadm daemon instance already exists
132 logger
.debug(f
'osd id {osd_id} daemon already exists')
134 if osd_id
not in osd_uuid_map
:
135 logger
.debug('osd id {} does not exist in cluster'.format(osd_id
))
137 if osd_uuid_map
.get(osd_id
) != osd
['tags']['ceph.osd_fsid']:
138 logger
.debug('mismatched osd uuid (cluster has %s, osd '
140 osd_uuid_map
.get(osd_id
),
141 osd
['tags']['ceph.osd_fsid']))
144 created
.append(osd_id
)
145 daemon_spec
: CephadmDaemonDeploySpec
= CephadmDaemonDeploySpec(
146 service_name
=service_name
,
147 daemon_id
=str(osd_id
),
151 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
152 await CephadmServe(self
.mgr
)._create
_daemon
(
154 osd_uuid_map
=osd_uuid_map
)
157 raw_elems
: dict = await CephadmServe(self
.mgr
)._run
_cephadm
_json
(
158 host
, 'osd', 'ceph-volume',
164 for osd_uuid
, osd
in raw_elems
.items():
165 if osd
.get('ceph_fsid') != fsid
:
167 osd_id
= str(osd
.get('osd_id', '-1'))
168 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
169 # if it exists but is part of the replacement operation, don't skip
171 if self
.mgr
.cache
.has_daemon(f
'osd.{osd_id}', host
):
172 # cephadm daemon instance already exists
173 logger
.debug(f
'osd id {osd_id} daemon already exists')
175 if osd_id
not in osd_uuid_map
:
176 logger
.debug('osd id {} does not exist in cluster'.format(osd_id
))
178 if osd_uuid_map
.get(osd_id
) != osd_uuid
:
179 logger
.debug('mismatched osd uuid (cluster has %s, osd '
180 'has %s)' % (osd_uuid_map
.get(osd_id
), osd_uuid
))
182 if osd_id
in created
:
185 created
.append(osd_id
)
186 daemon_spec
= CephadmDaemonDeploySpec(
187 service_name
=service_name
,
192 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
193 await CephadmServe(self
.mgr
)._create
_daemon
(
195 osd_uuid_map
=osd_uuid_map
)
198 self
.mgr
.cache
.invalidate_host_devices(host
)
199 self
.mgr
.cache
.invalidate_autotune(host
)
200 return "Created osd(s) %s on host '%s'" % (','.join(created
), host
)
202 return "Created no osd(s) on host %s; already created?" % host
204 def prepare_drivegroup(self
, drive_group
: DriveGroupSpec
) -> List
[Tuple
[str, DriveSelection
]]:
205 # 1) use fn_filter to determine matching_hosts
206 matching_hosts
= drive_group
.placement
.filter_matching_hostspecs(
207 self
.mgr
.cache
.get_schedulable_hosts())
208 # 2) Map the inventory to the InventoryHost object
213 def _find_inv_for_host(hostname
: str, inventory_dict
: dict) -> List
[Device
]:
214 # This is stupid and needs to be loaded with the host
215 for _host
, _inventory
in inventory_dict
.items():
216 if _host
== hostname
:
218 raise OrchestratorError("No inventory found for host: {}".format(hostname
))
220 # 3) iterate over matching_host and call DriveSelection
221 logger
.debug(f
"Checking matching hosts -> {matching_hosts}")
222 for host
in matching_hosts
:
223 inventory_for_host
= _find_inv_for_host(host
, self
.mgr
.cache
.devices
)
224 logger
.debug(f
"Found inventory for host {inventory_for_host}")
226 # List of Daemons on that host
227 dd_for_spec
= self
.mgr
.cache
.get_daemons_by_service(drive_group
.service_name())
228 dd_for_spec_and_host
= [dd
for dd
in dd_for_spec
if dd
.hostname
== host
]
230 drive_selection
= DriveSelection(drive_group
, inventory_for_host
,
231 existing_daemons
=len(dd_for_spec_and_host
))
232 logger
.debug(f
"Found drive selection {drive_selection}")
233 if drive_group
.method
and drive_group
.method
== 'raw':
234 # ceph-volume can currently only handle a 1:1 mapping
235 # of data/db/wal devices for raw mode osds. If db/wal devices
236 # are defined and the number does not match the number of data
237 # devices, we need to bail out
238 if drive_selection
.data_devices() and drive_selection
.db_devices():
239 if len(drive_selection
.data_devices()) != len(drive_selection
.db_devices()):
240 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
241 f
'{len(drive_selection.data_devices())} potential data device(s) and '
242 f
'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
243 if drive_selection
.data_devices() and drive_selection
.wal_devices():
244 if len(drive_selection
.data_devices()) != len(drive_selection
.wal_devices()):
245 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
246 f
'{len(drive_selection.data_devices())} potential data device(s) and '
247 f
'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
248 host_ds_map
.append((host
, drive_selection
))
252 def driveselection_to_ceph_volume(drive_selection
: DriveSelection
,
253 osd_id_claims
: Optional
[List
[str]] = None,
254 preview
: bool = False) -> List
[str]:
255 logger
.debug(f
"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
256 cmds
: List
[str] = translate
.to_ceph_volume(drive_selection
,
257 osd_id_claims
, preview
=preview
).run()
258 logger
.debug(f
"Resulting ceph-volume cmds: {cmds}")
261 def get_previews(self
, host
: str) -> List
[Dict
[str, Any
]]:
262 # Find OSDSpecs that match host.
263 osdspecs
= self
.resolve_osdspecs_for_host(host
)
264 return self
.generate_previews(osdspecs
, host
)
266 def generate_previews(self
, osdspecs
: List
[DriveGroupSpec
], for_host
: str) -> List
[Dict
[str, Any
]]:
269 The return should look like this:
272 {'data': {<metadata>},
273 'osdspec': <name of osdspec>,
274 'host': <name of host>,
285 Note: One host can have multiple previews based on its assigned OSDSpecs.
287 self
.mgr
.log
.debug(f
"Generating OSDSpec previews for {osdspecs}")
288 ret_all
: List
[Dict
[str, Any
]] = []
291 for osdspec
in osdspecs
:
293 # populate osd_id_claims
294 osd_id_claims
= OsdIdClaims(self
.mgr
)
296 # prepare driveselection
297 for host
, ds
in self
.prepare_drivegroup(osdspec
):
301 # driveselection for host
302 cmds
: List
[str] = self
.driveselection_to_ceph_volume(ds
,
303 osd_id_claims
.filtered_by_host(
307 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
308 osdspec
.service_name()))
311 # get preview data from ceph-volume
313 with self
.mgr
.async_timeout_handler(host
, f
'cephadm ceph-volume -- {cmd}'):
314 out
, err
, code
= self
.mgr
.wait_async(self
._run
_ceph
_volume
_command
(host
, cmd
))
317 concat_out
: Dict
[str, Any
] = json
.loads(' '.join(out
))
319 logger
.exception('Cannot decode JSON: \'%s\'' % ' '.join(out
))
322 if osdspec
.data_devices
is not None and osdspec
.data_devices
.limit
and len(concat_out
) < osdspec
.data_devices
.limit
:
323 found
= len(concat_out
)
324 limit
= osdspec
.data_devices
.limit
326 f
'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
327 ret_all
.append({'data': concat_out
,
328 'osdspec': osdspec
.service_id
,
333 def resolve_hosts_for_osdspecs(self
,
334 specs
: Optional
[List
[DriveGroupSpec
]] = None
338 osdspecs
= [cast(DriveGroupSpec
, spec
) for spec
in specs
]
340 self
.mgr
.log
.debug("No OSDSpecs found")
342 return sum([spec
.placement
.filter_matching_hostspecs(self
.mgr
.cache
.get_schedulable_hosts()) for spec
in osdspecs
], [])
344 def resolve_osdspecs_for_host(self
, host
: str,
345 specs
: Optional
[List
[DriveGroupSpec
]] = None) -> List
[DriveGroupSpec
]:
347 self
.mgr
.log
.debug(f
"Finding OSDSpecs for host: <{host}>")
349 specs
= [cast(DriveGroupSpec
, spec
) for (sn
, spec
) in self
.mgr
.spec_store
.spec_preview
.items()
350 if spec
.service_type
== 'osd']
352 if host
in spec
.placement
.filter_matching_hostspecs(self
.mgr
.cache
.get_schedulable_hosts()):
353 self
.mgr
.log
.debug(f
"Found OSDSpecs for host: <{host}> -> <{spec}>")
354 matching_specs
.append(spec
)
355 return matching_specs
357 async def _run_ceph_volume_command(self
, host
: str,
358 cmd
: str, env_vars
: Optional
[List
[str]] = None
359 ) -> Tuple
[List
[str], List
[str], int]:
360 self
.mgr
.inventory
.assert_host(host
)
363 ret
, keyring
, err
= self
.mgr
.check_mon_command({
364 'prefix': 'auth get',
365 'entity': 'client.bootstrap-osd',
369 'config': self
.mgr
.get_minimal_ceph_conf(),
373 split_cmd
= cmd
.split(' ')
374 _cmd
= ['--config-json', '-', '--']
375 _cmd
.extend(split_cmd
)
376 out
, err
, code
= await CephadmServe(self
.mgr
)._run
_cephadm
(
377 host
, 'osd', 'ceph-volume',
382 return out
, err
, code
384 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
385 # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
386 # we cannot recover from it. The OSD keys are created by ceph-volume and not by
388 if not is_failed_deploy
:
389 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
392 class OsdIdClaims(object):
394 Retrieve and provide osd ids that can be reused in the cluster
397 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
398 self
.mgr
: "CephadmOrchestrator" = mgr
399 self
.osd_host_map
: Dict
[str, List
[str]] = dict()
402 def refresh(self
) -> None:
404 ret
, out
, err
= self
.mgr
.check_mon_command({
405 'prefix': 'osd tree',
406 'states': ['destroyed'],
409 except MonCommandFailed
as e
:
410 logger
.exception('osd tree failed')
411 raise OrchestratorError(str(e
))
413 tree
= json
.loads(out
)
415 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
418 nodes
= tree
.get('nodes', {})
420 if node
.get('type') == 'host':
421 self
.osd_host_map
.update(
422 {node
.get('name'): [str(_id
) for _id
in node
.get('children', list())]}
424 if self
.osd_host_map
:
425 self
.mgr
.log
.info(f
"Found osd claims -> {self.osd_host_map}")
427 def get(self
) -> Dict
[str, List
[str]]:
428 return self
.osd_host_map
430 def filtered_by_host(self
, host
: str) -> List
[str]:
432 Return the list of osd ids that can be reused in a host
434 OSD id claims in CRUSH map are linked to the bare name of
435 the hostname. In case of FQDN hostnames the host is searched by the
438 return self
.osd_host_map
.get(host
.split(".")[0], [])
441 class RemoveUtil(object):
442 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
443 self
.mgr
: "CephadmOrchestrator" = mgr
445 def get_osds_in_cluster(self
) -> List
[str]:
446 osd_map
= self
.mgr
.get_osdmap()
447 return [str(x
.get('osd')) for x
in osd_map
.dump().get('osds', [])]
449 def osd_df(self
) -> dict:
451 ret
, out
, err
= self
.mgr
.mon_command({
456 return json
.loads(out
)
458 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
461 def get_pg_count(self
, osd_id
: int, osd_df
: Optional
[dict] = None) -> int:
463 osd_df
= self
.osd_df()
464 osd_nodes
= osd_df
.get('nodes', [])
465 for osd_node
in osd_nodes
:
466 if osd_node
.get('id') == int(osd_id
):
467 return osd_node
.get('pgs', -1)
470 def find_osd_stop_threshold(self
, osds
: List
["OSD"]) -> Optional
[List
["OSD"]]:
472 Cut osd_id list in half until it's ok-to-stop
474 :param osds: list of osd_ids
475 :return: list of ods_ids that can be stopped at once
479 while not self
.ok_to_stop(osds
):
481 # can't even stop one OSD, aborting
483 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
486 # This potentially prolongs the global wait time.
487 self
.mgr
.event
.wait(1)
488 # splitting osd_ids in half until ok_to_stop yields success
489 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
490 # There's a lot of room for micro adjustments here
491 osds
= osds
[len(osds
) // 2:]
494 # todo start draining
495 # return all([osd.start_draining() for osd in osds])
497 def ok_to_stop(self
, osds
: List
["OSD"]) -> bool:
499 'prefix': "osd ok-to-stop",
500 'ids': [str(osd
.osd_id
) for osd
in osds
]
502 return self
._run
_mon
_cmd
(cmd_args
, error_ok
=True)
504 def set_osd_flag(self
, osds
: List
["OSD"], flag
: str) -> bool:
505 base_cmd
= f
"osd {flag}"
506 self
.mgr
.log
.debug(f
"running cmd: {base_cmd} on ids {osds}")
507 ret
, out
, err
= self
.mgr
.mon_command({
509 'ids': [str(osd
.osd_id
) for osd
in osds
]
512 self
.mgr
.log
.error(f
"Could not set {flag} flag for {osds}. <{err}>")
514 self
.mgr
.log
.info(f
"{','.join([str(o) for o in osds])} now {flag}")
517 def get_weight(self
, osd
: "OSD") -> Optional
[float]:
518 ret
, out
, err
= self
.mgr
.mon_command({
519 'prefix': 'osd crush tree',
523 self
.mgr
.log
.error(f
"Could not dump crush weights. <{err}>")
526 for n
in j
.get("nodes", []):
527 if n
.get("name") == f
"osd.{osd.osd_id}":
528 self
.mgr
.log
.info(f
"{osd} crush weight is {n.get('crush_weight')}")
529 return n
.get("crush_weight")
532 def reweight_osd(self
, osd
: "OSD", weight
: float) -> bool:
533 self
.mgr
.log
.debug(f
"running cmd: osd crush reweight on {osd}")
534 ret
, out
, err
= self
.mgr
.mon_command({
535 'prefix': "osd crush reweight",
536 'name': f
"osd.{osd.osd_id}",
540 self
.mgr
.log
.error(f
"Could not reweight {osd} to {weight}. <{err}>")
542 self
.mgr
.log
.info(f
"{osd} weight is now {weight}")
545 def zap_osd(self
, osd
: "OSD") -> str:
546 "Zaps all devices that are associated with an OSD"
547 if osd
.hostname
is not None:
548 cmd
= ['--', 'lvm', 'zap', '--osd-id', str(osd
.osd_id
)]
549 if not osd
.no_destroy
:
550 cmd
.append('--destroy')
551 with self
.mgr
.async_timeout_handler(osd
.hostname
, f
'cephadm ceph-volume {" ".join(cmd)}'):
552 out
, err
, code
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._run
_cephadm
(
553 osd
.hostname
, 'osd', 'ceph-volume',
556 self
.mgr
.cache
.invalidate_host_devices(osd
.hostname
)
558 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
559 return '\n'.join(out
+ err
)
560 raise OrchestratorError(f
"Failed to zap OSD {osd.osd_id} because host was unknown")
562 def safe_to_destroy(self
, osd_ids
: List
[int]) -> bool:
563 """ Queries the safe-to-destroy flag for OSDs """
564 cmd_args
= {'prefix': 'osd safe-to-destroy',
565 'ids': [str(x
) for x
in osd_ids
]}
566 return self
._run
_mon
_cmd
(cmd_args
, error_ok
=True)
568 def destroy_osd(self
, osd_id
: int) -> bool:
569 """ Destroys an OSD (forcefully) """
570 cmd_args
= {'prefix': 'osd destroy-actual',
572 'yes_i_really_mean_it': True}
573 return self
._run
_mon
_cmd
(cmd_args
)
575 def purge_osd(self
, osd_id
: int) -> bool:
576 """ Purges an OSD from the cluster (forcefully) """
578 'prefix': 'osd purge-actual',
580 'yes_i_really_mean_it': True
582 return self
._run
_mon
_cmd
(cmd_args
)
584 def _run_mon_cmd(self
, cmd_args
: dict, error_ok
: bool = False) -> bool:
586 Generic command to run mon_command and evaluate/log the results
588 ret
, out
, err
= self
.mgr
.mon_command(cmd_args
)
590 self
.mgr
.log
.debug(f
"ran {cmd_args} with mon_command")
593 f
"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
595 self
.mgr
.log
.debug(f
"cmd: {cmd_args.get('prefix')} returns: {out}")
599 class NotFoundError(Exception):
607 remove_util
: RemoveUtil
,
608 drain_started_at
: Optional
[datetime
] = None,
609 process_started_at
: Optional
[datetime
] = None,
610 drain_stopped_at
: Optional
[datetime
] = None,
611 drain_done_at
: Optional
[datetime
] = None,
612 draining
: bool = False,
613 started
: bool = False,
614 stopped
: bool = False,
615 replace
: bool = False,
617 hostname
: Optional
[str] = None,
619 no_destroy
: bool = False):
623 # when did process (not the actual draining) start
624 self
.process_started_at
= process_started_at
626 # when did the drain start
627 self
.drain_started_at
= drain_started_at
629 # when did the drain stop
630 self
.drain_stopped_at
= drain_stopped_at
632 # when did the drain finish
633 self
.drain_done_at
= drain_done_at
635 # did the draining start
636 self
.draining
= draining
638 # was the operation started
639 self
.started
= started
641 # was the operation stopped
642 self
.stopped
= stopped
644 # If this is a replace or remove operation
645 self
.replace
= replace
646 # If we wait for the osd to be drained
648 # The name of the node
649 self
.hostname
= hostname
651 # mgr obj to make mgr/mon calls
652 self
.rm_util
: RemoveUtil
= remove_util
654 self
.original_weight
: Optional
[float] = None
656 # Whether devices associated with the OSD should be zapped (DATA ERASED)
658 # Whether all associated LV devices should be destroyed.
659 self
.no_destroy
= no_destroy
661 def start(self
) -> None:
663 logger
.debug(f
"Already started draining {self}")
668 def start_draining(self
) -> bool:
670 logger
.debug(f
"Won't start draining {self}. OSD draining is stopped.")
673 self
.rm_util
.set_osd_flag([self
], 'out')
675 self
.original_weight
= self
.rm_util
.get_weight(self
)
676 self
.rm_util
.reweight_osd(self
, 0.0)
677 self
.drain_started_at
= datetime
.utcnow()
679 logger
.debug(f
"Started draining {self}.")
682 def stop_draining(self
) -> bool:
684 self
.rm_util
.set_osd_flag([self
], 'in')
686 if self
.original_weight
:
687 self
.rm_util
.reweight_osd(self
, self
.original_weight
)
688 self
.drain_stopped_at
= datetime
.utcnow()
689 self
.draining
= False
690 logger
.debug(f
"Stopped draining {self}.")
693 def stop(self
) -> None:
695 logger
.debug(f
"Already stopped draining {self}")
702 def is_draining(self
) -> bool:
704 Consider an OSD draining when it is
705 actively draining but not yet empty
707 return self
.draining
and not self
.is_empty
710 def is_ok_to_stop(self
) -> bool:
711 return self
.rm_util
.ok_to_stop([self
])
714 def is_empty(self
) -> bool:
715 if self
.get_pg_count() == 0:
716 if not self
.drain_done_at
:
717 self
.drain_done_at
= datetime
.utcnow()
718 self
.draining
= False
722 def safe_to_destroy(self
) -> bool:
723 return self
.rm_util
.safe_to_destroy([self
.osd_id
])
725 def down(self
) -> bool:
726 return self
.rm_util
.set_osd_flag([self
], 'down')
728 def destroy(self
) -> bool:
729 return self
.rm_util
.destroy_osd(self
.osd_id
)
731 def do_zap(self
) -> str:
732 return self
.rm_util
.zap_osd(self
)
734 def purge(self
) -> bool:
735 return self
.rm_util
.purge_osd(self
.osd_id
)
737 def get_pg_count(self
) -> int:
738 return self
.rm_util
.get_pg_count(self
.osd_id
)
741 def exists(self
) -> bool:
742 return str(self
.osd_id
) in self
.rm_util
.get_osds_in_cluster()
744 def drain_status_human(self
) -> str:
745 default_status
= 'not started'
746 status
= 'started' if self
.started
and not self
.draining
else default_status
747 status
= 'draining' if self
.draining
else status
748 status
= 'done, waiting for purge' if self
.drain_done_at
and not self
.draining
else status
751 def pg_count_str(self
) -> str:
752 return 'n/a' if self
.get_pg_count() < 0 else str(self
.get_pg_count())
754 def to_json(self
) -> dict:
755 out
: Dict
[str, Any
] = dict()
756 out
['osd_id'] = self
.osd_id
757 out
['started'] = self
.started
758 out
['draining'] = self
.draining
759 out
['stopped'] = self
.stopped
760 out
['replace'] = self
.replace
761 out
['force'] = self
.force
762 out
['zap'] = self
.zap
763 out
['hostname'] = self
.hostname
# type: ignore
765 for k
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
767 out
[k
] = datetime_to_str(getattr(self
, k
))
769 out
[k
] = getattr(self
, k
)
773 def from_json(cls
, inp
: Optional
[Dict
[str, Any
]], rm_util
: RemoveUtil
) -> Optional
["OSD"]:
776 for date_field
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
777 if inp
.get(date_field
):
778 inp
.update({date_field
: str_to_datetime(inp
.get(date_field
, ''))})
779 inp
.update({'remove_util': rm_util
})
780 if 'nodename' in inp
:
781 hostname
= inp
.pop('nodename')
782 inp
['hostname'] = hostname
785 def __hash__(self
) -> int:
786 return hash(self
.osd_id
)
788 def __eq__(self
, other
: object) -> bool:
789 if not isinstance(other
, OSD
):
790 return NotImplemented
791 return self
.osd_id
== other
.osd_id
793 def __repr__(self
) -> str:
794 return f
"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
797 class OSDRemovalQueue(object):
799 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
800 self
.mgr
: "CephadmOrchestrator" = mgr
801 self
.osds
: Set
[OSD
] = set()
802 self
.rm_util
= RemoveUtil(mgr
)
804 # locks multithreaded access to self.osds. Please avoid locking
805 # network calls, like mon commands.
808 def process_removal_queue(self
) -> None:
810 Performs actions in the _serve() loop to remove an OSD
811 when criteria is met.
813 we can't hold self.lock, as we're calling _remove_daemon in the loop
816 # make sure that we don't run on OSDs that are not in the cluster anymore.
819 # find osds that are ok-to-stop and not yet draining
820 ready_to_drain_osds
= self
._ready
_to
_drain
_osds
()
821 if ready_to_drain_osds
:
822 # start draining those
823 _
= [osd
.start_draining() for osd
in ready_to_drain_osds
]
825 all_osds
= self
.all_osds()
828 f
"{self.queue_size()} OSDs are scheduled "
829 f
"for removal: {all_osds}")
831 # Check all osds for their state and take action (remove, purge etc)
832 new_queue
: Set
[OSD
] = set()
833 for osd
in all_osds
: # type: OSD
837 logger
.debug(f
"{osd} is not empty yet. Waiting a bit more")
841 if not osd
.safe_to_destroy():
843 f
"{osd} is not safe-to-destroy yet. Waiting a bit more")
849 # also remove it from the remove_osd list and set a health_check warning?
850 raise orchestrator
.OrchestratorError(
851 f
"Could not mark {osd} down")
853 # stop and remove daemon
854 assert osd
.hostname
is not None
856 if self
.mgr
.cache
.has_daemon(f
'osd.{osd.osd_id}'):
857 CephadmServe(self
.mgr
)._remove
_daemon
(f
'osd.{osd.osd_id}', osd
.hostname
)
858 logger
.info(f
"Successfully removed {osd} on {osd.hostname}")
860 logger
.info(f
"Daemon {osd} on {osd.hostname} was already removed")
863 # mark destroyed in osdmap
864 if not osd
.destroy():
865 raise orchestrator
.OrchestratorError(
866 f
"Could not destroy {osd}")
868 f
"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
872 raise orchestrator
.OrchestratorError(f
"Could not purge {osd}")
873 logger
.info(f
"Successfully purged {osd} on {osd.hostname}")
876 # throws an exception if the zap fails
877 logger
.info(f
"Zapping devices for {osd} on {osd.hostname}")
879 logger
.info(f
"Successfully zapped devices for {osd} on {osd.hostname}")
881 logger
.debug(f
"Removing {osd} from the queue.")
883 # self could change while this is processing (osds get added from the CLI)
884 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
885 # osds that were added while this method was executed'
887 self
.osds
.intersection_update(new_queue
)
888 self
._save
_to
_store
()
890 def cleanup(self
) -> None:
891 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
893 for osd
in self
._not
_in
_cluster
():
894 self
.osds
.remove(osd
)
896 def _ready_to_drain_osds(self
) -> List
["OSD"]:
898 Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
899 be accommodated by the 'max_osd_draining_count' config value, considering the number of OSDs
900 that are already draining.
902 draining_limit
= max(1, self
.mgr
.max_osd_draining_count
)
903 num_already_draining
= len(self
.draining_osds())
904 num_to_start_draining
= max(0, draining_limit
- num_already_draining
)
905 stoppable_osds
= self
.rm_util
.find_osd_stop_threshold(self
.idling_osds())
906 return [] if stoppable_osds
is None else stoppable_osds
[:num_to_start_draining
]
908 def _save_to_store(self
) -> None:
909 osd_queue
= [osd
.to_json() for osd
in self
.osds
]
910 logger
.debug(f
"Saving {osd_queue} to store")
911 self
.mgr
.set_store('osd_remove_queue', json
.dumps(osd_queue
))
913 def load_from_store(self
) -> None:
915 for k
, v
in self
.mgr
.get_store_prefix('osd_remove_queue').items():
916 for osd
in json
.loads(v
):
917 logger
.debug(f
"Loading osd ->{osd} from store")
918 osd_obj
= OSD
.from_json(osd
, rm_util
=self
.rm_util
)
919 if osd_obj
is not None:
920 self
.osds
.add(osd_obj
)
922 def as_osd_ids(self
) -> List
[int]:
924 return [osd
.osd_id
for osd
in self
.osds
]
926 def queue_size(self
) -> int:
928 return len(self
.osds
)
930 def draining_osds(self
) -> List
["OSD"]:
932 return [osd
for osd
in self
.osds
if osd
.is_draining
]
934 def idling_osds(self
) -> List
["OSD"]:
936 return [osd
for osd
in self
.osds
if not osd
.is_draining
and not osd
.is_empty
]
938 def empty_osds(self
) -> List
["OSD"]:
940 return [osd
for osd
in self
.osds
if osd
.is_empty
]
942 def all_osds(self
) -> List
["OSD"]:
944 return [osd
for osd
in self
.osds
]
946 def _not_in_cluster(self
) -> List
["OSD"]:
947 return [osd
for osd
in self
.osds
if not osd
.exists
]
949 def enqueue(self
, osd
: "OSD") -> None:
951 raise NotFoundError()
956 def rm(self
, osd
: "OSD") -> None:
958 raise NotFoundError()
962 logger
.debug(f
'Removing {osd} from the queue.')
963 self
.osds
.remove(osd
)
965 logger
.debug(f
"Could not find {osd} in queue.")
968 def __eq__(self
, other
: Any
) -> bool:
969 if not isinstance(other
, OSDRemovalQueue
):
972 return self
.osds
== other
.osds