]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
3 from threading
import Lock
4 from typing
import List
, Dict
, Any
, Set
, Tuple
, cast
, Optional
, TYPE_CHECKING
6 from ceph
.deployment
import translate
7 from ceph
.deployment
.drive_group
import DriveGroupSpec
8 from ceph
.deployment
.drive_selection
import DriveSelection
9 from ceph
.deployment
.inventory
import Device
10 from ceph
.utils
import datetime_to_str
, str_to_datetime
12 from datetime
import datetime
14 from cephadm
.serve
import CephadmServe
15 from cephadm
.utils
import forall_hosts
16 from ceph
.utils
import datetime_now
17 from orchestrator
import OrchestratorError
18 from mgr_module
import MonCommandFailed
20 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
, CephService
23 from cephadm
.module
import CephadmOrchestrator
25 logger
= logging
.getLogger(__name__
)
28 class OSDService(CephService
):
31 def create_from_spec(self
, drive_group
: DriveGroupSpec
) -> str:
32 logger
.debug(f
"Processing DriveGroup {drive_group}")
33 osd_id_claims
= self
.find_destroyed_osds()
36 f
"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
39 def create_from_spec_one(host
: str, drive_selection
: DriveSelection
) -> Optional
[str]:
40 # skip this host if there has been no change in inventory
41 if not self
.mgr
.cache
.osdspec_needs_apply(host
, drive_group
):
42 self
.mgr
.log
.debug("skipping apply of %s on %s (no change)" % (
46 cmd
= self
.driveselection_to_ceph_volume(drive_selection
,
47 osd_id_claims
.get(host
, []))
49 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
50 drive_group
.service_id
))
53 logger
.info('Applying service osd.%s on host %s...' % (
54 drive_group
.service_id
, host
56 start_ts
= datetime_now()
57 env_vars
: List
[str] = [f
"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
58 ret_msg
= self
.create_single_host(
59 drive_group
, host
, cmd
,
60 replace_osd_ids
=osd_id_claims
.get(host
, []), env_vars
=env_vars
62 self
.mgr
.cache
.update_osdspec_last_applied(
63 host
, drive_group
.service_name(), start_ts
65 self
.mgr
.cache
.save_host(host
)
68 ret
= create_from_spec_one(self
.prepare_drivegroup(drive_group
))
69 return ", ".join(filter(None, ret
))
71 def create_single_host(self
,
72 drive_group
: DriveGroupSpec
,
73 host
: str, cmd
: str, replace_osd_ids
: List
[str],
74 env_vars
: Optional
[List
[str]] = None) -> str:
75 out
, err
, code
= self
._run
_ceph
_volume
_command
(host
, cmd
, env_vars
=env_vars
)
77 if code
== 1 and ', it is already prepared' in '\n'.join(err
):
78 # HACK: when we create against an existing LV, ceph-volume
79 # returns an error and the above message. To make this
80 # command idempotent, tolerate this "error" and continue.
81 logger
.debug('the device was already prepared; continuing')
85 'cephadm exited with an error code: %d, stderr:%s' % (
86 code
, '\n'.join(err
)))
87 return self
.deploy_osd_daemons_for_existing_osds(host
, drive_group
.service_name(),
90 def deploy_osd_daemons_for_existing_osds(self
, host
: str, service_name
: str,
91 replace_osd_ids
: Optional
[List
[str]] = None) -> str:
93 if replace_osd_ids
is None:
94 replace_osd_ids
= self
.find_destroyed_osds().get(host
, [])
95 assert replace_osd_ids
is not None
97 osds_elems
: dict = CephadmServe(self
.mgr
)._run
_cephadm
_json
(
98 host
, 'osd', 'ceph-volume',
104 before_osd_uuid_map
= self
.mgr
.get_osd_uuid_map(only_up
=True)
105 fsid
= self
.mgr
._cluster
_fsid
106 osd_uuid_map
= self
.mgr
.get_osd_uuid_map()
108 for osd_id
, osds
in osds_elems
.items():
110 if osd
['tags']['ceph.cluster_fsid'] != fsid
:
111 logger
.debug('mismatched fsid, skipping %s' % osd
)
113 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
114 # if it exists but is part of the replacement operation, don't skip
116 if osd_id
not in osd_uuid_map
:
117 logger
.debug('osd id {} does not exist in cluster'.format(osd_id
))
119 if osd_uuid_map
.get(osd_id
) != osd
['tags']['ceph.osd_fsid']:
120 logger
.debug('mismatched osd uuid (cluster has %s, osd '
122 osd_uuid_map
.get(osd_id
),
123 osd
['tags']['ceph.osd_fsid']))
126 created
.append(osd_id
)
127 daemon_spec
: CephadmDaemonDeploySpec
= CephadmDaemonDeploySpec(
128 service_name
=service_name
,
133 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
134 CephadmServe(self
.mgr
)._create
_daemon
(
136 osd_uuid_map
=osd_uuid_map
)
139 self
.mgr
.cache
.invalidate_host_devices(host
)
140 return "Created osd(s) %s on host '%s'" % (','.join(created
), host
)
142 return "Created no osd(s) on host %s; already created?" % host
144 def prepare_drivegroup(self
, drive_group
: DriveGroupSpec
) -> List
[Tuple
[str, DriveSelection
]]:
145 # 1) use fn_filter to determine matching_hosts
146 matching_hosts
= drive_group
.placement
.filter_matching_hostspecs(
147 self
.mgr
.inventory
.all_specs())
148 # 2) Map the inventory to the InventoryHost object
153 def _find_inv_for_host(hostname
: str, inventory_dict
: dict) -> List
[Device
]:
154 # This is stupid and needs to be loaded with the host
155 for _host
, _inventory
in inventory_dict
.items():
156 if _host
== hostname
:
158 raise OrchestratorError("No inventory found for host: {}".format(hostname
))
160 # 3) iterate over matching_host and call DriveSelection
161 logger
.debug(f
"Checking matching hosts -> {matching_hosts}")
162 for host
in matching_hosts
:
163 inventory_for_host
= _find_inv_for_host(host
, self
.mgr
.cache
.devices
)
164 logger
.debug(f
"Found inventory for host {inventory_for_host}")
166 # List of Daemons on that host
167 dd_for_spec
= self
.mgr
.cache
.get_daemons_by_service(drive_group
.service_name())
168 dd_for_spec_and_host
= [dd
for dd
in dd_for_spec
if dd
.hostname
== host
]
170 drive_selection
= DriveSelection(drive_group
, inventory_for_host
,
171 existing_daemons
=len(dd_for_spec_and_host
))
172 logger
.debug(f
"Found drive selection {drive_selection}")
173 host_ds_map
.append((host
, drive_selection
))
177 def driveselection_to_ceph_volume(drive_selection
: DriveSelection
,
178 osd_id_claims
: Optional
[List
[str]] = None,
179 preview
: bool = False) -> Optional
[str]:
180 logger
.debug(f
"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
181 cmd
: Optional
[str] = translate
.to_ceph_volume(drive_selection
,
182 osd_id_claims
, preview
=preview
).run()
183 logger
.debug(f
"Resulting ceph-volume cmd: {cmd}")
186 def get_previews(self
, host
: str) -> List
[Dict
[str, Any
]]:
187 # Find OSDSpecs that match host.
188 osdspecs
= self
.resolve_osdspecs_for_host(host
)
189 return self
.generate_previews(osdspecs
, host
)
191 def generate_previews(self
, osdspecs
: List
[DriveGroupSpec
], for_host
: str) -> List
[Dict
[str, Any
]]:
194 The return should look like this:
197 {'data': {<metadata>},
198 'osdspec': <name of osdspec>,
199 'host': <name of host>
208 Note: One host can have multiple previews based on its assigned OSDSpecs.
210 self
.mgr
.log
.debug(f
"Generating OSDSpec previews for {osdspecs}")
211 ret_all
: List
[Dict
[str, Any
]] = []
214 for osdspec
in osdspecs
:
216 # populate osd_id_claims
217 osd_id_claims
= self
.find_destroyed_osds()
219 # prepare driveselection
220 for host
, ds
in self
.prepare_drivegroup(osdspec
):
224 # driveselection for host
225 cmd
= self
.driveselection_to_ceph_volume(ds
,
226 osd_id_claims
.get(host
, []),
229 logger
.debug("No data_devices, skipping DriveGroup: {}".format(
230 osdspec
.service_name()))
233 # get preview data from ceph-volume
234 out
, err
, code
= self
._run
_ceph
_volume
_command
(host
, cmd
)
237 concat_out
: Dict
[str, Any
] = json
.loads(' '.join(out
))
239 logger
.exception('Cannot decode JSON: \'%s\'' % ' '.join(out
))
242 ret_all
.append({'data': concat_out
,
243 'osdspec': osdspec
.service_id
,
247 def resolve_hosts_for_osdspecs(self
,
248 specs
: Optional
[List
[DriveGroupSpec
]] = None
252 osdspecs
= [cast(DriveGroupSpec
, spec
) for spec
in specs
]
254 self
.mgr
.log
.debug("No OSDSpecs found")
256 return sum([spec
.placement
.filter_matching_hostspecs(self
.mgr
.inventory
.all_specs()) for spec
in osdspecs
], [])
258 def resolve_osdspecs_for_host(self
, host
: str,
259 specs
: Optional
[List
[DriveGroupSpec
]] = None) -> List
[DriveGroupSpec
]:
261 self
.mgr
.log
.debug(f
"Finding OSDSpecs for host: <{host}>")
263 specs
= [cast(DriveGroupSpec
, spec
) for (sn
, spec
) in self
.mgr
.spec_store
.spec_preview
.items()
264 if spec
.service_type
== 'osd']
266 if host
in spec
.placement
.filter_matching_hostspecs(self
.mgr
.inventory
.all_specs()):
267 self
.mgr
.log
.debug(f
"Found OSDSpecs for host: <{host}> -> <{spec}>")
268 matching_specs
.append(spec
)
269 return matching_specs
271 def _run_ceph_volume_command(self
, host
: str,
272 cmd
: str, env_vars
: Optional
[List
[str]] = None
273 ) -> Tuple
[List
[str], List
[str], int]:
274 self
.mgr
.inventory
.assert_host(host
)
277 ret
, keyring
, err
= self
.mgr
.check_mon_command({
278 'prefix': 'auth get',
279 'entity': 'client.bootstrap-osd',
283 'config': self
.mgr
.get_minimal_ceph_conf(),
287 split_cmd
= cmd
.split(' ')
288 _cmd
= ['--config-json', '-', '--']
289 _cmd
.extend(split_cmd
)
290 out
, err
, code
= CephadmServe(self
.mgr
)._run
_cephadm
(
291 host
, 'osd', 'ceph-volume',
296 return out
, err
, code
298 def get_osdspec_affinity(self
, osd_id
: str) -> str:
299 return self
.mgr
.get('osd_metadata').get(osd_id
, {}).get('osdspec_affinity', '')
301 def find_destroyed_osds(self
) -> Dict
[str, List
[str]]:
302 osd_host_map
: Dict
[str, List
[str]] = dict()
304 ret
, out
, err
= self
.mgr
.check_mon_command({
305 'prefix': 'osd tree',
306 'states': ['destroyed'],
309 except MonCommandFailed
as e
:
310 logger
.exception('osd tree failed')
311 raise OrchestratorError(str(e
))
313 tree
= json
.loads(out
)
315 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
318 nodes
= tree
.get('nodes', {})
320 if node
.get('type') == 'host':
322 {node
.get('name'): [str(_id
) for _id
in node
.get('children', list())]}
325 self
.mgr
.log
.info(f
"Found osd claims -> {osd_host_map}")
329 class RemoveUtil(object):
330 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
331 self
.mgr
: "CephadmOrchestrator" = mgr
333 def get_osds_in_cluster(self
) -> List
[str]:
334 osd_map
= self
.mgr
.get_osdmap()
335 return [str(x
.get('osd')) for x
in osd_map
.dump().get('osds', [])]
337 def osd_df(self
) -> dict:
339 ret
, out
, err
= self
.mgr
.mon_command({
344 return json
.loads(out
)
346 logger
.exception(f
'Cannot decode JSON: \'{out}\'')
349 def get_pg_count(self
, osd_id
: int, osd_df
: Optional
[dict] = None) -> int:
351 osd_df
= self
.osd_df()
352 osd_nodes
= osd_df
.get('nodes', [])
353 for osd_node
in osd_nodes
:
354 if osd_node
.get('id') == int(osd_id
):
355 return osd_node
.get('pgs', -1)
358 def find_osd_stop_threshold(self
, osds
: List
["OSD"]) -> Optional
[List
["OSD"]]:
360 Cut osd_id list in half until it's ok-to-stop
362 :param osds: list of osd_ids
363 :return: list of ods_ids that can be stopped at once
367 while not self
.ok_to_stop(osds
):
369 # can't even stop one OSD, aborting
371 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
374 # This potentially prolongs the global wait time.
375 self
.mgr
.event
.wait(1)
376 # splitting osd_ids in half until ok_to_stop yields success
377 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
378 # There's a lot of room for micro adjustments here
379 osds
= osds
[len(osds
) // 2:]
382 # todo start draining
383 # return all([osd.start_draining() for osd in osds])
385 def ok_to_stop(self
, osds
: List
["OSD"]) -> bool:
387 'prefix': "osd ok-to-stop",
388 'ids': [str(osd
.osd_id
) for osd
in osds
]
390 return self
._run
_mon
_cmd
(cmd_args
)
392 def set_osd_flag(self
, osds
: List
["OSD"], flag
: str) -> bool:
393 base_cmd
= f
"osd {flag}"
394 self
.mgr
.log
.debug(f
"running cmd: {base_cmd} on ids {osds}")
395 ret
, out
, err
= self
.mgr
.mon_command({
397 'ids': [str(osd
.osd_id
) for osd
in osds
]
400 self
.mgr
.log
.error(f
"Could not set {flag} flag for {osds}. <{err}>")
402 self
.mgr
.log
.info(f
"{','.join([str(o) for o in osds])} now {flag}")
405 def get_weight(self
, osd
: "OSD") -> Optional
[float]:
406 ret
, out
, err
= self
.mgr
.mon_command({
407 'prefix': 'osd crush tree',
411 self
.mgr
.log
.error(f
"Could not dump crush weights. <{err}>")
414 for n
in j
.get("nodes", []):
415 if n
.get("name") == f
"osd.{osd.osd_id}":
416 self
.mgr
.log
.info(f
"{osd} crush weight is {n.get('crush_weight')}")
417 return n
.get("crush_weight")
420 def reweight_osd(self
, osd
: "OSD", weight
: float) -> bool:
421 self
.mgr
.log
.debug(f
"running cmd: osd crush reweight on {osd}")
422 ret
, out
, err
= self
.mgr
.mon_command({
423 'prefix': "osd crush reweight",
424 'name': f
"osd.{osd.osd_id}",
428 self
.mgr
.log
.error(f
"Could not reweight {osd} to {weight}. <{err}>")
430 self
.mgr
.log
.info(f
"{osd} weight is now {weight}")
433 def safe_to_destroy(self
, osd_ids
: List
[int]) -> bool:
434 """ Queries the safe-to-destroy flag for OSDs """
435 cmd_args
= {'prefix': 'osd safe-to-destroy',
436 'ids': [str(x
) for x
in osd_ids
]}
437 return self
._run
_mon
_cmd
(cmd_args
)
439 def destroy_osd(self
, osd_id
: int) -> bool:
440 """ Destroys an OSD (forcefully) """
441 cmd_args
= {'prefix': 'osd destroy-actual',
443 'yes_i_really_mean_it': True}
444 return self
._run
_mon
_cmd
(cmd_args
)
446 def purge_osd(self
, osd_id
: int) -> bool:
447 """ Purges an OSD from the cluster (forcefully) """
449 'prefix': 'osd purge-actual',
451 'yes_i_really_mean_it': True
453 return self
._run
_mon
_cmd
(cmd_args
)
455 def _run_mon_cmd(self
, cmd_args
: dict) -> bool:
457 Generic command to run mon_command and evaluate/log the results
459 ret
, out
, err
= self
.mgr
.mon_command(cmd_args
)
461 self
.mgr
.log
.debug(f
"ran {cmd_args} with mon_command")
462 self
.mgr
.log
.error(f
"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
464 self
.mgr
.log
.debug(f
"cmd: {cmd_args.get('prefix')} returns: {out}")
468 class NotFoundError(Exception):
476 remove_util
: RemoveUtil
,
477 drain_started_at
: Optional
[datetime
] = None,
478 process_started_at
: Optional
[datetime
] = None,
479 drain_stopped_at
: Optional
[datetime
] = None,
480 drain_done_at
: Optional
[datetime
] = None,
481 draining
: bool = False,
482 started
: bool = False,
483 stopped
: bool = False,
484 replace
: bool = False,
486 hostname
: Optional
[str] = None,
491 # when did process (not the actual draining) start
492 self
.process_started_at
= process_started_at
494 # when did the drain start
495 self
.drain_started_at
= drain_started_at
497 # when did the drain stop
498 self
.drain_stopped_at
= drain_stopped_at
500 # when did the drain finish
501 self
.drain_done_at
= drain_done_at
503 # did the draining start
504 self
.draining
= draining
506 # was the operation started
507 self
.started
= started
509 # was the operation stopped
510 self
.stopped
= stopped
512 # If this is a replace or remove operation
513 self
.replace
= replace
514 # If we wait for the osd to be drained
516 # The name of the node
517 self
.hostname
= hostname
519 # mgr obj to make mgr/mon calls
520 self
.rm_util
: RemoveUtil
= remove_util
522 self
.original_weight
: Optional
[float] = None
524 def start(self
) -> None:
526 logger
.debug(f
"Already started draining {self}")
531 def start_draining(self
) -> bool:
533 logger
.debug(f
"Won't start draining {self}. OSD draining is stopped.")
536 self
.rm_util
.set_osd_flag([self
], 'out')
538 self
.original_weight
= self
.rm_util
.get_weight(self
)
539 self
.rm_util
.reweight_osd(self
, 0.0)
540 self
.drain_started_at
= datetime
.utcnow()
542 logger
.debug(f
"Started draining {self}.")
545 def stop_draining(self
) -> bool:
547 self
.rm_util
.set_osd_flag([self
], 'in')
549 if self
.original_weight
:
550 self
.rm_util
.reweight_osd(self
, self
.original_weight
)
551 self
.drain_stopped_at
= datetime
.utcnow()
552 self
.draining
= False
553 logger
.debug(f
"Stopped draining {self}.")
556 def stop(self
) -> None:
558 logger
.debug(f
"Already stopped draining {self}")
565 def is_draining(self
) -> bool:
567 Consider an OSD draining when it is
568 actively draining but not yet empty
570 return self
.draining
and not self
.is_empty
573 def is_ok_to_stop(self
) -> bool:
574 return self
.rm_util
.ok_to_stop([self
])
577 def is_empty(self
) -> bool:
578 if self
.get_pg_count() == 0:
579 if not self
.drain_done_at
:
580 self
.drain_done_at
= datetime
.utcnow()
581 self
.draining
= False
585 def safe_to_destroy(self
) -> bool:
586 return self
.rm_util
.safe_to_destroy([self
.osd_id
])
588 def down(self
) -> bool:
589 return self
.rm_util
.set_osd_flag([self
], 'down')
591 def destroy(self
) -> bool:
592 return self
.rm_util
.destroy_osd(self
.osd_id
)
594 def purge(self
) -> bool:
595 return self
.rm_util
.purge_osd(self
.osd_id
)
597 def get_pg_count(self
) -> int:
598 return self
.rm_util
.get_pg_count(self
.osd_id
)
601 def exists(self
) -> bool:
602 return str(self
.osd_id
) in self
.rm_util
.get_osds_in_cluster()
604 def drain_status_human(self
) -> str:
605 default_status
= 'not started'
606 status
= 'started' if self
.started
and not self
.draining
else default_status
607 status
= 'draining' if self
.draining
else status
608 status
= 'done, waiting for purge' if self
.drain_done_at
and not self
.draining
else status
611 def pg_count_str(self
) -> str:
612 return 'n/a' if self
.get_pg_count() < 0 else str(self
.get_pg_count())
614 def to_json(self
) -> dict:
615 out
: Dict
[str, Any
] = dict()
616 out
['osd_id'] = self
.osd_id
617 out
['started'] = self
.started
618 out
['draining'] = self
.draining
619 out
['stopped'] = self
.stopped
620 out
['replace'] = self
.replace
621 out
['force'] = self
.force
622 out
['hostname'] = self
.hostname
# type: ignore
624 for k
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
626 out
[k
] = datetime_to_str(getattr(self
, k
))
628 out
[k
] = getattr(self
, k
)
632 def from_json(cls
, inp
: Optional
[Dict
[str, Any
]], rm_util
: RemoveUtil
) -> Optional
["OSD"]:
635 for date_field
in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
636 if inp
.get(date_field
):
637 inp
.update({date_field
: str_to_datetime(inp
.get(date_field
, ''))})
638 inp
.update({'remove_util': rm_util
})
639 if 'nodename' in inp
:
640 hostname
= inp
.pop('nodename')
641 inp
['hostname'] = hostname
644 def __hash__(self
) -> int:
645 return hash(self
.osd_id
)
647 def __eq__(self
, other
: object) -> bool:
648 if not isinstance(other
, OSD
):
649 return NotImplemented
650 return self
.osd_id
== other
.osd_id
652 def __repr__(self
) -> str:
653 return f
"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
656 class OSDRemovalQueue(object):
658 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
659 self
.mgr
: "CephadmOrchestrator" = mgr
660 self
.osds
: Set
[OSD
] = set()
661 self
.rm_util
= RemoveUtil(mgr
)
663 # locks multithreaded access to self.osds. Please avoid locking
664 # network calls, like mon commands.
667 def process_removal_queue(self
) -> None:
669 Performs actions in the _serve() loop to remove an OSD
670 when criteria is met.
672 we can't hold self.lock, as we're calling _remove_daemon in the loop
675 # make sure that we don't run on OSDs that are not in the cluster anymore.
678 # find osds that are ok-to-stop and not yet draining
679 ok_to_stop_osds
= self
.rm_util
.find_osd_stop_threshold(self
.idling_osds())
681 # start draining those
682 _
= [osd
.start_draining() for osd
in ok_to_stop_osds
]
684 all_osds
= self
.all_osds()
687 f
"{self.queue_size()} OSDs are scheduled "
688 f
"for removal: {all_osds}")
690 # Check all osds for their state and take action (remove, purge etc)
691 new_queue
: Set
[OSD
] = set()
692 for osd
in all_osds
: # type: OSD
696 logger
.debug(f
"{osd} is not empty yet. Waiting a bit more")
700 if not osd
.safe_to_destroy():
702 f
"{osd} is not safe-to-destroy yet. Waiting a bit more")
708 # also remove it from the remove_osd list and set a health_check warning?
709 raise orchestrator
.OrchestratorError(
710 f
"Could not mark {osd} down")
712 # stop and remove daemon
713 assert osd
.hostname
is not None
714 CephadmServe(self
.mgr
)._remove
_daemon
(f
'osd.{osd.osd_id}', osd
.hostname
)
715 logger
.info(f
"Successfully removed {osd} on {osd.hostname}")
718 # mark destroyed in osdmap
719 if not osd
.destroy():
720 raise orchestrator
.OrchestratorError(
721 f
"Could not destroy {osd}")
723 f
"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
727 raise orchestrator
.OrchestratorError(f
"Could not purge {osd}")
728 logger
.info(f
"Successfully purged {osd} on {osd.hostname}")
730 logger
.debug(f
"Removing {osd} from the queue.")
732 # self could change while this is processing (osds get added from the CLI)
733 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
734 # osds that were added while this method was executed'
736 self
.osds
.intersection_update(new_queue
)
737 self
._save
_to
_store
()
739 def cleanup(self
) -> None:
740 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
742 for osd
in self
._not
_in
_cluster
():
743 self
.osds
.remove(osd
)
745 def _save_to_store(self
) -> None:
746 osd_queue
= [osd
.to_json() for osd
in self
.osds
]
747 logger
.debug(f
"Saving {osd_queue} to store")
748 self
.mgr
.set_store('osd_remove_queue', json
.dumps(osd_queue
))
750 def load_from_store(self
) -> None:
752 for k
, v
in self
.mgr
.get_store_prefix('osd_remove_queue').items():
753 for osd
in json
.loads(v
):
754 logger
.debug(f
"Loading osd ->{osd} from store")
755 osd_obj
= OSD
.from_json(osd
, rm_util
=self
.rm_util
)
756 if osd_obj
is not None:
757 self
.osds
.add(osd_obj
)
759 def as_osd_ids(self
) -> List
[int]:
761 return [osd
.osd_id
for osd
in self
.osds
]
763 def queue_size(self
) -> int:
765 return len(self
.osds
)
767 def draining_osds(self
) -> List
["OSD"]:
769 return [osd
for osd
in self
.osds
if osd
.is_draining
]
771 def idling_osds(self
) -> List
["OSD"]:
773 return [osd
for osd
in self
.osds
if not osd
.is_draining
and not osd
.is_empty
]
775 def empty_osds(self
) -> List
["OSD"]:
777 return [osd
for osd
in self
.osds
if osd
.is_empty
]
779 def all_osds(self
) -> List
["OSD"]:
781 return [osd
for osd
in self
.osds
]
783 def _not_in_cluster(self
) -> List
["OSD"]:
784 return [osd
for osd
in self
.osds
if not osd
.exists
]
786 def enqueue(self
, osd
: "OSD") -> None:
788 raise NotFoundError()
793 def rm(self
, osd
: "OSD") -> None:
795 raise NotFoundError()
799 logger
.debug(f
'Removing {osd} from the queue.')
800 self
.osds
.remove(osd
)
802 logger
.debug(f
"Could not find {osd} in queue.")
805 def __eq__(self
, other
: Any
) -> bool:
806 if not isinstance(other
, OSDRemovalQueue
):
809 return self
.osds
== other
.osds