]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
31771fb5fcebe4cdf404dac344d5a630ee51da58
3 from asyncio
import gather
4 from threading
import Lock
5 from typing
import List
, Dict
, Any
, Set
, Tuple
, cast
, Optional
, TYPE_CHECKING
7 from ceph
.deployment
import translate
8 from ceph
.deployment
.drive_group
import DriveGroupSpec
9 from ceph
.deployment
.drive_selection
import DriveSelection
10 from ceph
.deployment
.inventory
import Device
11 from ceph
.utils
import datetime_to_str
, str_to_datetime
13 from datetime
import datetime
15 from cephadm
.serve
import CephadmServe
16 from ceph
.utils
import datetime_now
17 from orchestrator
import OrchestratorError
, DaemonDescription
18 from mgr_module
import MonCommandFailed
20 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
, CephService
23 from cephadm
.module
import CephadmOrchestrator
25 logger
= logging
.getLogger(__name__
)
28 class OSDService(CephService
):
31 def create_from_spec(self
, drive_group
: DriveGroupSpec
) -> str:
32 logger
.debug(f
"Processing DriveGroup {drive_group}")
33 osd_id_claims
= OsdIdClaims(self
.mgr
)
34 if osd_id_claims
.get():
36 f
"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
38 async def create_from_spec_one(host
: str, drive_selection
: DriveSelection
) -> Optional
[str]:
39 # skip this host if there has been no change in inventory
40 if not self
.mgr
.cache
.osdspec_needs_apply(host
, drive_group
):
41 self
.mgr
.log
.debug("skipping apply of %s on %s (no change)" % (
44 # skip this host if we cannot schedule here
45 if self
.mgr
.inventory
.has_label(host
, '_no_schedule'):
48 osd_id_claims_for_host
= osd_id_claims
.filtered_by_host(host
)
50 cmds
: List
[str] = self
.driveselection_to_ceph_volume(drive_selection
,
51 osd_id_claims_for_host
)
53 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
54 drive_group
.service_id
))
57 logger
.debug('Applying service osd.%s on host %s...' % (
58 drive_group
.service_id
, host
60 start_ts
= datetime_now()
61 env_vars
: List
[str] = [f
"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
62 ret_msg
= await self
.create_single_host(
63 drive_group
, host
, cmds
,
64 replace_osd_ids
=osd_id_claims_for_host
, env_vars
=env_vars
66 self
.mgr
.cache
.update_osdspec_last_applied(
67 host
, drive_group
.service_name(), start_ts
69 self
.mgr
.cache
.save_host(host
)
72 async def all_hosts() -> List
[Optional
[str]]:
73 futures
= [create_from_spec_one(h
, ds
)
74 for h
, ds
in self
.prepare_drivegroup(drive_group
)]
75 return await gather(*futures
)
77 ret
= self
.mgr
.wait_async(all_hosts())
78 return ", ".join(filter(None, ret
))
80 async def create_single_host(self
,
81 drive_group
: DriveGroupSpec
,
82 host
: str, cmds
: List
[str], replace_osd_ids
: List
[str],
83 env_vars
: Optional
[List
[str]] = None) -> str:
85 out
, err
, code
= await self
._run
_ceph
_volume
_command
(host
, cmd
, env_vars
=env_vars
)
86 if code
== 1 and ', it is already prepared' in '\n'.join(err
):
87 # HACK: when we create against an existing LV, ceph-volume
88 # returns an error and the above message. To make this
89 # command idempotent, tolerate this "error" and continue.
90 logger
.debug('the device was already prepared; continuing')
94 'cephadm exited with an error code: %d, stderr:%s' % (
95 code
, '\n'.join(err
)))
96 return await self
.deploy_osd_daemons_for_existing_osds(host
, drive_group
.service_name(),
99 async def deploy_osd_daemons_for_existing_osds(self
, host
: str, service_name
: str,
100 replace_osd_ids
: Optional
[List
[str]] = None) -> str:
102 if replace_osd_ids
is None:
103 replace_osd_ids
= OsdIdClaims(self
.mgr
).filtered_by_host(host
)
104 assert replace_osd_ids
is not None
107 osds_elems
: dict = await CephadmServe(self
.mgr
)._run
_cephadm
_json
(
108 host
, 'osd', 'ceph-volume',
114 before_osd_uuid_map
= self
.mgr
.get_osd_uuid_map(only_up
=True)
115 fsid
= self
.mgr
._cluster
_fsid
116 osd_uuid_map
= self
.mgr
.get_osd_uuid_map()
118 for osd_id
, osds
in osds_elems
.items():
120 if osd
['type'] == 'db':
122 if osd
['tags']['ceph.cluster_fsid'] != fsid
:
123 logger
.debug('mismatched fsid, skipping %s' % osd
)
125 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
126 # if it exists but is part of the replacement operation, don't skip
128 if self
.mgr
.cache
.has_daemon(f
'osd.{osd_id}', host
):
129 # cephadm daemon instance already exists
130 logger
.debug(f
'osd id {osd_id} daemon already exists')
132 if osd_id
not in osd_uuid_map
:
133 logger
.debug('osd id {} does not exist in cluster'.format(osd_id
))
135 if osd_uuid_map
.get(osd_id
) != osd
['tags']['ceph.osd_fsid']:
136 logger
.debug('mismatched osd uuid (cluster has %s, osd '
138 osd_uuid_map
.get(osd_id
),
139 osd
['tags']['ceph.osd_fsid']))
142 created
.append(osd_id
)
143 daemon_spec
: CephadmDaemonDeploySpec
= CephadmDaemonDeploySpec(
144 service_name
=service_name
,
145 daemon_id
=str(osd_id
),
149 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
150 await CephadmServe(self
.mgr
)._create
_daemon
(
152 osd_uuid_map
=osd_uuid_map
)
155 raw_elems
: dict = await CephadmServe(self
.mgr
)._run
_cephadm
_json
(
156 host
, 'osd', 'ceph-volume',
162 for osd_uuid
, osd
in raw_elems
.items():
163 if osd
.get('ceph_fsid') != fsid
:
165 osd_id
= str(osd
.get('osd_id', '-1'))
166 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
167 # if it exists but is part of the replacement operation, don't skip
169 if self
.mgr
.cache
.has_daemon(f
'osd.{osd_id}', host
):
170 # cephadm daemon instance already exists
171 logger
.debug(f
'osd id {osd_id} daemon already exists')
173 if osd_id
not in osd_uuid_map
:
174 logger
.debug('osd id {} does not exist in cluster'.format(osd_id
))
176 if osd_uuid_map
.get(osd_id
) != osd_uuid
:
177 logger
.debug('mismatched osd uuid (cluster has %s, osd '
178 'has %s)' % (osd_uuid_map
.get(osd_id
), osd_uuid
))
180 if osd_id
in created
:
183 created
.append(osd_id
)
184 daemon_spec
= CephadmDaemonDeploySpec(
185 service_name
=service_name
,
190 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
191 await CephadmServe(self
.mgr
)._create
_daemon
(
193 osd_uuid_map
=osd_uuid_map
)
196 self
.mgr
.cache
.invalidate_host_devices(host
)
197 self
.mgr
.cache
.invalidate_autotune(host
)
198 return "Created osd(s) %s on host '%s'" % (','.join(created
), host
)
200 return "Created no osd(s) on host %s; already created?" % host
202 def prepare_drivegroup(self
, drive_group
: DriveGroupSpec
) -> List
[Tuple
[str, DriveSelection
]]:
203 # 1) use fn_filter to determine matching_hosts
204 matching_hosts
= drive_group
.placement
.filter_matching_hostspecs(
205 self
.mgr
.cache
.get_schedulable_hosts())
206 # 2) Map the inventory to the InventoryHost object
211 def _find_inv_for_host(hostname
: str, inventory_dict
: dict) -> List
[Device
]:
212 # This is stupid and needs to be loaded with the host
213 for _host
, _inventory
in inventory_dict
.items():
214 if _host
== hostname
:
216 raise OrchestratorError("No inventory found for host: {}".format(hostname
))
218 # 3) iterate over matching_host and call DriveSelection
219 logger
.debug(f
"Checking matching hosts -> {matching_hosts}")
220 for host
in matching_hosts
:
221 inventory_for_host
= _find_inv_for_host(host
, self
.mgr
.cache
.devices
)
222 logger
.debug(f
"Found inventory for host {inventory_for_host}")
224 # List of Daemons on that host
225 dd_for_spec
= self
.mgr
.cache
.get_daemons_by_service(drive_group
.service_name())
226 dd_for_spec_and_host
= [dd
for dd
in dd_for_spec
if dd
.hostname
== host
]
228 drive_selection
= DriveSelection(drive_group
, inventory_for_host
,
229 existing_daemons
=len(dd_for_spec_and_host
))
230 logger
.debug(f
"Found drive selection {drive_selection}")
231 if drive_group
.method
and drive_group
.method
== 'raw':
232 # ceph-volume can currently only handle a 1:1 mapping
233 # of data/db/wal devices for raw mode osds. If db/wal devices
234 # are defined and the number does not match the number of data
235 # devices, we need to bail out
236 if drive_selection
.data_devices() and drive_selection
.db_devices():
237 if len(drive_selection
.data_devices()) != len(drive_selection
.db_devices()):
238 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
239 f
'{len(drive_selection.data_devices())} potential data device(s) and '
240 f
'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
241 if drive_selection
.data_devices() and drive_selection
.wal_devices():
242 if len(drive_selection
.data_devices()) != len(drive_selection
.wal_devices()):
243 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
244 f
'{len(drive_selection.data_devices())} potential data device(s) and '
245 f
'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
246 host_ds_map
.append((host
, drive_selection
))
250 def driveselection_to_ceph_volume(drive_selection
: DriveSelection
,
251 osd_id_claims
: Optional
[List
[str]] = None,
252 preview
: bool = False) -> List
[str]:
253 logger
.debug(f
"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
254 cmds
: List
[str] = translate
.to_ceph_volume(drive_selection
,
255 osd_id_claims
, preview
=preview
).run()
256 logger
.debug(f
"Resulting ceph-volume cmds: {cmds}")
259 def get_previews(self
, host
: str) -> List
[Dict
[str, Any
]]:
260 # Find OSDSpecs that match host.
261 osdspecs
= self
.resolve_osdspecs_for_host(host
)
262 return self
.generate_previews(osdspecs
, host
)
264 def generate_previews(self
, osdspecs
: List
[DriveGroupSpec
], for_host
: str) -> List
[Dict
[str, Any
]]:
267 The return should look like this:
270 {'data': {<metadata>},
271 'osdspec': <name of osdspec>,
272 'host': <name of host>,
283 Note: One host can have multiple previews based on its assigned OSDSpecs.
285 self
.mgr
.log
.debug(f
"Generating OSDSpec previews for {osdspecs}")
286 ret_all
: List
[Dict
[str, Any
]] = []
289 for osdspec
in osdspecs
:
291 # populate osd_id_claims
292 osd_id_claims
= OsdIdClaims(self
.mgr
)
294 # prepare driveselection
295 for host
, ds
in self
.prepare_drivegroup(osdspec
):
299 # driveselection for host
300 cmds
: List
[str] = self
.driveselection_to_ceph_volume(ds
,
301 osd_id_claims
.filtered_by_host(
305 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
306 osdspec
.service_name()))
309 # get preview data from ceph-volume
311 out
, err
, code
= self
.mgr
.wait_async(self
._run
_ceph
_volume
_command
(host
, cmd
))
314 concat_out
: Dict
[str, Any
] = json
.loads(' '.join(out
))
316 logger
.exception('Cannot decode JSON: \'%s\'' % ' '.join(out
))
319 if osdspec
.data_devices
is not None and osdspec
.data_devices
.limit
and len(concat_out
) < osdspec
.data_devices
.limit
:
320 found
= len(concat_out
)
321 limit
= osdspec
.data_devices
.limit
323 f
'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
324 ret_all
.append({'data': concat_out
,
325 'osdspec': osdspec
.service_id
,
330 def resolve_hosts_for_osdspecs(self
,
331 specs
: Optional
[List
[DriveGroupSpec
]] = None
335 osdspecs
= [cast(DriveGroupSpec
, spec
) for spec
in specs
]
337 self
.mgr
.log
.debug("No OSDSpecs found")
339 return sum([spec
.placement
.filter_matching_hostspecs(self
.mgr
.cache
.get_schedulable_hosts()) for spec
in osdspecs
], [])
341 def resolve_osdspecs_for_host(self
, host
: str,
342 specs
: Optional
[List
[DriveGroupSpec
]] = None) -> List
[DriveGroupSpec
]:
344 self
.mgr
.log
.debug(f
"Finding OSDSpecs for host: <{host}>")
346 specs
= [cast(DriveGroupSpec
, spec
) for (sn
, spec
) in self
.mgr
.spec_store
.spec_preview
.items()
347 if spec
.service_type
== 'osd']
349 if host
in spec
.placement
.filter_matching_hostspecs(self
.mgr
.cache
.get_schedulable_hosts()):
350 self
.mgr
.log
.debug(f
"Found OSDSpecs for host: <{host}> -> <{spec}>")
351 matching_specs
.append(spec
)
352 return matching_specs
354 async def _run_ceph_volume_command(self
, host
: str,
355 cmd
: str, env_vars
: Optional
[List
[str]] = None
356 ) -> Tuple
[List
[str], List
[str], int]:
357 self
.mgr
.inventory
.assert_host(host
)
360 ret
, keyring
, err
= self
.mgr
.check_mon_command({
361 'prefix': 'auth get',
362 'entity': 'client.bootstrap-osd',
366 'config': self
.mgr
.get_minimal_ceph_conf(),
370 split_cmd
= cmd
.split(' ')
371 _cmd
= ['--config-json', '-', '--']
372 _cmd
.extend(split_cmd
)
373 out
, err
, code
= await CephadmServe(self
.mgr
)._run
_cephadm
(
374 host
, 'osd', 'ceph-volume',
379 return out
, err
, code
381 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
382 # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
383 # we cannot recover from it. The OSD keys are created by ceph-volume and not by
385 if not is_failed_deploy
:
386 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
389 class OsdIdClaims(object):
391 Retrieve and provide osd ids that can be reused in the cluster
394 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
395 self
.mgr
: "CephadmOrchestrator" = mgr
396 self
.osd_host_map
: Dict
[str, List
[str]] = dict()
399 def refresh(self
) -> None:
401 ret
, out
, err
= self
.mgr
.check_mon_command({
402 'prefix': 'osd tree',
403 'states': ['destroyed'],
406 except MonCommandFailed
as e
:
407 logger
.exception('osd tree failed')
408 raise OrchestratorError(str(e
))
410 tree
= json
.loads(out
)
412 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
415 nodes
= tree
.get('nodes', {})
417 if node
.get('type') == 'host':
418 self
.osd_host_map
.update(
419 {node
.get('name'): [str(_id
) for _id
in node
.get('children', list())]}
421 if self
.osd_host_map
:
422 self
.mgr
.log
.info(f
"Found osd claims -> {self.osd_host_map}")
424 def get(self
) -> Dict
[str, List
[str]]:
425 return self
.osd_host_map
427 def filtered_by_host(self
, host
: str) -> List
[str]:
429 Return the list of osd ids that can be reused in a host
431 OSD id claims in CRUSH map are linked to the bare name of
432 the hostname. In case of FQDN hostnames the host is searched by the
435 return self
.osd_host_map
.get(host
.split(".")[0], [])
438 class RemoveUtil(object):
439 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
440 self
.mgr
: "CephadmOrchestrator" = mgr
442 def get_osds_in_cluster(self
) -> List
[str]:
443 osd_map
= self
.mgr
.get_osdmap()
444 return [str(x
.get('osd')) for x
in osd_map
.dump().get('osds', [])]
446 def osd_df(self
) -> dict:
448 ret
, out
, err
= self
.mgr
.mon_command({
453 return json
.loads(out
)
455 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
458 def get_pg_count(self
, osd_id
: int, osd_df
: Optional
[dict] = None) -> int:
460 osd_df
= self
.osd_df()
461 osd_nodes
= osd_df
.get('nodes', [])
462 for osd_node
in osd_nodes
:
463 if osd_node
.get('id') == int(osd_id
):
464 return osd_node
.get('pgs', -1)
467 def find_osd_stop_threshold(self
, osds
: List
["OSD"]) -> Optional
[List
["OSD"]]:
469 Cut osd_id list in half until it's ok-to-stop
471 :param osds: list of osd_ids
472 :return: list of ods_ids that can be stopped at once
476 while not self
.ok_to_stop(osds
):
478 # can't even stop one OSD, aborting
480 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
483 # This potentially prolongs the global wait time.
484 self
.mgr
.event
.wait(1)
485 # splitting osd_ids in half until ok_to_stop yields success
486 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
487 # There's a lot of room for micro adjustments here
488 osds
= osds
[len(osds
) // 2:]
491 # todo start draining
492 # return all([osd.start_draining() for osd in osds])
494 def ok_to_stop(self
, osds
: List
["OSD"]) -> bool:
496 'prefix': "osd ok-to-stop",
497 'ids': [str(osd
.osd_id
) for osd
in osds
]
499 return self
._run
_mon
_cmd
(cmd_args
, error_ok
=True)
501 def set_osd_flag(self
, osds
: List
["OSD"], flag
: str) -> bool:
502 base_cmd
= f
"osd {flag}"
503 self
.mgr
.log
.debug(f
"running cmd: {base_cmd} on ids {osds}")
504 ret
, out
, err
= self
.mgr
.mon_command({
506 'ids': [str(osd
.osd_id
) for osd
in osds
]
509 self
.mgr
.log
.error(f
"Could not set {flag} flag for {osds}. <{err}>")
511 self
.mgr
.log
.info(f
"{','.join([str(o) for o in osds])} now {flag}")
514 def get_weight(self
, osd
: "OSD") -> Optional
[float]:
515 ret
, out
, err
= self
.mgr
.mon_command({
516 'prefix': 'osd crush tree',
520 self
.mgr
.log
.error(f
"Could not dump crush weights. <{err}>")
523 for n
in j
.get("nodes", []):
524 if n
.get("name") == f
"osd.{osd.osd_id}":
525 self
.mgr
.log
.info(f
"{osd} crush weight is {n.get('crush_weight')}")
526 return n
.get("crush_weight")
529 def reweight_osd(self
, osd
: "OSD", weight
: float) -> bool:
530 self
.mgr
.log
.debug(f
"running cmd: osd crush reweight on {osd}")
531 ret
, out
, err
= self
.mgr
.mon_command({
532 'prefix': "osd crush reweight",
533 'name': f
"osd.{osd.osd_id}",
537 self
.mgr
.log
.error(f
"Could not reweight {osd} to {weight}. <{err}>")
539 self
.mgr
.log
.info(f
"{osd} weight is now {weight}")
542 def zap_osd(self
, osd
: "OSD") -> str:
543 "Zaps all devices that are associated with an OSD"
544 if osd
.hostname
is not None:
545 out
, err
, code
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._run
_cephadm
(
546 osd
.hostname
, 'osd', 'ceph-volume',
547 ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd
.osd_id
)],
549 self
.mgr
.cache
.invalidate_host_devices(osd
.hostname
)
551 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
552 return '\n'.join(out
+ err
)
553 raise OrchestratorError(f
"Failed to zap OSD {osd.osd_id} because host was unknown")
555 def safe_to_destroy(self
, osd_ids
: List
[int]) -> bool:
556 """ Queries the safe-to-destroy flag for OSDs """
557 cmd_args
= {'prefix': 'osd safe-to-destroy',
558 'ids': [str(x
) for x
in osd_ids
]}
559 return self
._run
_mon
_cmd
(cmd_args
, error_ok
=True)
561 def destroy_osd(self
, osd_id
: int) -> bool:
562 """ Destroys an OSD (forcefully) """
563 cmd_args
= {'prefix': 'osd destroy-actual',
565 'yes_i_really_mean_it': True}
566 return self
._run
_mon
_cmd
(cmd_args
)
568 def purge_osd(self
, osd_id
: int) -> bool:
569 """ Purges an OSD from the cluster (forcefully) """
571 'prefix': 'osd purge-actual',
573 'yes_i_really_mean_it': True
575 return self
._run
_mon
_cmd
(cmd_args
)
577 def _run_mon_cmd(self
, cmd_args
: dict, error_ok
: bool = False) -> bool:
579 Generic command to run mon_command and evaluate/log the results
581 ret
, out
, err
= self
.mgr
.mon_command(cmd_args
)
583 self
.mgr
.log
.debug(f
"ran {cmd_args} with mon_command")
586 f
"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
588 self
.mgr
.log
.debug(f
"cmd: {cmd_args.get('prefix')} returns: {out}")
592 class NotFoundError(Exception):
600 remove_util
: RemoveUtil
,
601 drain_started_at
: Optional
[datetime
] = None,
602 process_started_at
: Optional
[datetime
] = None,
603 drain_stopped_at
: Optional
[datetime
] = None,
604 drain_done_at
: Optional
[datetime
] = None,
605 draining
: bool = False,
606 started
: bool = False,
607 stopped
: bool = False,
608 replace
: bool = False,
610 hostname
: Optional
[str] = None,
615 # when did process (not the actual draining) start
616 self
.process_started_at
= process_started_at
618 # when did the drain start
619 self
.drain_started_at
= drain_started_at
621 # when did the drain stop
622 self
.drain_stopped_at
= drain_stopped_at
624 # when did the drain finish
625 self
.drain_done_at
= drain_done_at
627 # did the draining start
628 self
.draining
= draining
630 # was the operation started
631 self
.started
= started
633 # was the operation stopped
634 self
.stopped
= stopped
636 # If this is a replace or remove operation
637 self
.replace
= replace
638 # If we wait for the osd to be drained
640 # The name of the node
641 self
.hostname
= hostname
643 # mgr obj to make mgr/mon calls
644 self
.rm_util
: RemoveUtil
= remove_util
646 self
.original_weight
: Optional
[float] = None
648 # Whether devices associated with the OSD should be zapped (DATA ERASED)
651 def start(self
) -> None:
653 logger
.debug(f
"Already started draining {self}")
658 def start_draining(self
) -> bool:
660 logger
.debug(f
"Won't start draining {self}. OSD draining is stopped.")
663 self
.rm_util
.set_osd_flag([self
], 'out')
665 self
.original_weight
= self
.rm_util
.get_weight(self
)
666 self
.rm_util
.reweight_osd(self
, 0.0)
667 self
.drain_started_at
= datetime
.utcnow()
669 logger
.debug(f
"Started draining {self}.")
672 def stop_draining(self
) -> bool:
674 self
.rm_util
.set_osd_flag([self
], 'in')
676 if self
.original_weight
:
677 self
.rm_util
.reweight_osd(self
, self
.original_weight
)
678 self
.drain_stopped_at
= datetime
.utcnow()
679 self
.draining
= False
680 logger
.debug(f
"Stopped draining {self}.")
683 def stop(self
) -> None:
685 logger
.debug(f
"Already stopped draining {self}")
692 def is_draining(self
) -> bool:
694 Consider an OSD draining when it is
695 actively draining but not yet empty
697 return self
.draining
and not self
.is_empty
700 def is_ok_to_stop(self
) -> bool:
701 return self
.rm_util
.ok_to_stop([self
])
704 def is_empty(self
) -> bool:
705 if self
.get_pg_count() == 0:
706 if not self
.drain_done_at
:
707 self
.drain_done_at
= datetime
.utcnow()
708 self
.draining
= False
712 def safe_to_destroy(self
) -> bool:
713 return self
.rm_util
.safe_to_destroy([self
.osd_id
])
715 def down(self
) -> bool:
716 return self
.rm_util
.set_osd_flag([self
], 'down')
718 def destroy(self
) -> bool:
719 return self
.rm_util
.destroy_osd(self
.osd_id
)
721 def do_zap(self
) -> str:
722 return self
.rm_util
.zap_osd(self
)
724 def purge(self
) -> bool:
725 return self
.rm_util
.purge_osd(self
.osd_id
)
727 def get_pg_count(self
) -> int:
728 return self
.rm_util
.get_pg_count(self
.osd_id
)
731 def exists(self
) -> bool:
732 return str(self
.osd_id
) in self
.rm_util
.get_osds_in_cluster()
734 def drain_status_human(self
) -> str:
735 default_status
= 'not started'
736 status
= 'started' if self
.started
and not self
.draining
else default_status
737 status
= 'draining' if self
.draining
else status
738 status
= 'done, waiting for purge' if self
.drain_done_at
and not self
.draining
else status
741 def pg_count_str(self
) -> str:
742 return 'n/a' if self
.get_pg_count() < 0 else str(self
.get_pg_count())
744 def to_json(self
) -> dict:
745 out
: Dict
[str, Any
] = dict()
746 out
['osd_id'] = self
.osd_id
747 out
['started'] = self
.started
748 out
['draining'] = self
.draining
749 out
['stopped'] = self
.stopped
750 out
['replace'] = self
.replace
751 out
['force'] = self
.force
752 out
['zap'] = self
.zap
753 out
['hostname'] = self
.hostname
# type: ignore
755 for k
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
757 out
[k
] = datetime_to_str(getattr(self
, k
))
759 out
[k
] = getattr(self
, k
)
763 def from_json(cls
, inp
: Optional
[Dict
[str, Any
]], rm_util
: RemoveUtil
) -> Optional
["OSD"]:
766 for date_field
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
767 if inp
.get(date_field
):
768 inp
.update({date_field
: str_to_datetime(inp
.get(date_field
, ''))})
769 inp
.update({'remove_util': rm_util
})
770 if 'nodename' in inp
:
771 hostname
= inp
.pop('nodename')
772 inp
['hostname'] = hostname
775 def __hash__(self
) -> int:
776 return hash(self
.osd_id
)
778 def __eq__(self
, other
: object) -> bool:
779 if not isinstance(other
, OSD
):
780 return NotImplemented
781 return self
.osd_id
== other
.osd_id
783 def __repr__(self
) -> str:
784 return f
"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
787 class OSDRemovalQueue(object):
789 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
790 self
.mgr
: "CephadmOrchestrator" = mgr
791 self
.osds
: Set
[OSD
] = set()
792 self
.rm_util
= RemoveUtil(mgr
)
794 # locks multithreaded access to self.osds. Please avoid locking
795 # network calls, like mon commands.
798 def process_removal_queue(self
) -> None:
800 Performs actions in the _serve() loop to remove an OSD
801 when criteria is met.
803 we can't hold self.lock, as we're calling _remove_daemon in the loop
806 # make sure that we don't run on OSDs that are not in the cluster anymore.
809 # find osds that are ok-to-stop and not yet draining
810 ready_to_drain_osds
= self
._ready
_to
_drain
_osds
()
811 if ready_to_drain_osds
:
812 # start draining those
813 _
= [osd
.start_draining() for osd
in ready_to_drain_osds
]
815 all_osds
= self
.all_osds()
818 f
"{self.queue_size()} OSDs are scheduled "
819 f
"for removal: {all_osds}")
821 # Check all osds for their state and take action (remove, purge etc)
822 new_queue
: Set
[OSD
] = set()
823 for osd
in all_osds
: # type: OSD
827 logger
.debug(f
"{osd} is not empty yet. Waiting a bit more")
831 if not osd
.safe_to_destroy():
833 f
"{osd} is not safe-to-destroy yet. Waiting a bit more")
839 # also remove it from the remove_osd list and set a health_check warning?
840 raise orchestrator
.OrchestratorError(
841 f
"Could not mark {osd} down")
843 # stop and remove daemon
844 assert osd
.hostname
is not None
846 if self
.mgr
.cache
.has_daemon(f
'osd.{osd.osd_id}'):
847 CephadmServe(self
.mgr
)._remove
_daemon
(f
'osd.{osd.osd_id}', osd
.hostname
)
848 logger
.info(f
"Successfully removed {osd} on {osd.hostname}")
850 logger
.info(f
"Daemon {osd} on {osd.hostname} was already removed")
853 # mark destroyed in osdmap
854 if not osd
.destroy():
855 raise orchestrator
.OrchestratorError(
856 f
"Could not destroy {osd}")
858 f
"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
862 raise orchestrator
.OrchestratorError(f
"Could not purge {osd}")
863 logger
.info(f
"Successfully purged {osd} on {osd.hostname}")
866 # throws an exception if the zap fails
867 logger
.info(f
"Zapping devices for {osd} on {osd.hostname}")
869 logger
.info(f
"Successfully zapped devices for {osd} on {osd.hostname}")
871 logger
.debug(f
"Removing {osd} from the queue.")
873 # self could change while this is processing (osds get added from the CLI)
874 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
875 # osds that were added while this method was executed'
877 self
.osds
.intersection_update(new_queue
)
878 self
._save
_to
_store
()
880 def cleanup(self
) -> None:
881 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
883 for osd
in self
._not
_in
_cluster
():
884 self
.osds
.remove(osd
)
886 def _ready_to_drain_osds(self
) -> List
["OSD"]:
888 Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
889 be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs
890 that are already draining.
892 draining_limit
= max(1, self
.mgr
.max_osd_draining_count
)
893 num_already_draining
= len(self
.draining_osds())
894 num_to_start_draining
= max(0, draining_limit
- num_already_draining
)
895 stoppable_osds
= self
.rm_util
.find_osd_stop_threshold(self
.idling_osds())
896 return [] if stoppable_osds
is None else stoppable_osds
[:num_to_start_draining
]
898 def _save_to_store(self
) -> None:
899 osd_queue
= [osd
.to_json() for osd
in self
.osds
]
900 logger
.debug(f
"Saving {osd_queue} to store")
901 self
.mgr
.set_store('osd_remove_queue', json
.dumps(osd_queue
))
903 def load_from_store(self
) -> None:
905 for k
, v
in self
.mgr
.get_store_prefix('osd_remove_queue').items():
906 for osd
in json
.loads(v
):
907 logger
.debug(f
"Loading osd ->{osd} from store")
908 osd_obj
= OSD
.from_json(osd
, rm_util
=self
.rm_util
)
909 if osd_obj
is not None:
910 self
.osds
.add(osd_obj
)
912 def as_osd_ids(self
) -> List
[int]:
914 return [osd
.osd_id
for osd
in self
.osds
]
916 def queue_size(self
) -> int:
918 return len(self
.osds
)
920 def draining_osds(self
) -> List
["OSD"]:
922 return [osd
for osd
in self
.osds
if osd
.is_draining
]
924 def idling_osds(self
) -> List
["OSD"]:
926 return [osd
for osd
in self
.osds
if not osd
.is_draining
and not osd
.is_empty
]
928 def empty_osds(self
) -> List
["OSD"]:
930 return [osd
for osd
in self
.osds
if osd
.is_empty
]
932 def all_osds(self
) -> List
["OSD"]:
934 return [osd
for osd
in self
.osds
]
936 def _not_in_cluster(self
) -> List
["OSD"]:
937 return [osd
for osd
in self
.osds
if not osd
.exists
]
939 def enqueue(self
, osd
: "OSD") -> None:
941 raise NotFoundError()
946 def rm(self
, osd
: "OSD") -> None:
948 raise NotFoundError()
952 logger
.debug(f
'Removing {osd} from the queue.')
953 self
.osds
.remove(osd
)
955 logger
.debug(f
"Could not find {osd} in queue.")
958 def __eq__(self
, other
: Any
) -> bool:
959 if not isinstance(other
, OSDRemovalQueue
):
962 return self
.osds
== other
.osds