]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/services/osd.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
CommitLineData
e306af50
TL
1import json
2import logging
3from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
4
5from ceph.deployment import translate
6from ceph.deployment.drive_group import DriveGroupSpec
7from ceph.deployment.drive_selection import DriveSelection
8
f6b5b4d7 9from datetime import datetime
e306af50 10import orchestrator
f6b5b4d7 11from cephadm.utils import forall_hosts
e306af50
TL
12from orchestrator import OrchestratorError
13from mgr_module import MonCommandFailed
14
f6b5b4d7 15from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec
e306af50
TL
16
17logger = logging.getLogger(__name__)
f6b5b4d7 18DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
e306af50
TL
19
20
21class OSDService(CephadmService):
f6b5b4d7
TL
22 TYPE = 'osd'
23
24 def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
e306af50 25 logger.debug(f"Processing DriveGroup {drive_group}")
f6b5b4d7
TL
26 osd_id_claims = self.find_destroyed_osds()
27 logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
28
29 @forall_hosts
30 def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
e306af50 31 logger.info('Applying %s on host %s...' % (drive_group.service_id, host))
f6b5b4d7
TL
32 cmd = self.driveselection_to_ceph_volume(drive_selection,
33 osd_id_claims.get(host, []))
e306af50
TL
34 if not cmd:
35 logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
f6b5b4d7
TL
36 return None
37 env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
e306af50 38 ret_msg = self.create_single_host(
f6b5b4d7 39 host, cmd, replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
e306af50 40 )
f6b5b4d7
TL
41 return ret_msg
42
43 ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
44 return ", ".join(filter(None, ret))
45
e306af50
TL
46 def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str:
47 out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
48
49 if code == 1 and ', it is already prepared' in '\n'.join(err):
50 # HACK: when we create against an existing LV, ceph-volume
51 # returns an error and the above message. To make this
52 # command idempotent, tolerate this "error" and continue.
53 logger.debug('the device was already prepared; continuing')
54 code = 0
55 if code:
56 raise RuntimeError(
57 'cephadm exited with an error code: %d, stderr:%s' % (
58 code, '\n'.join(err)))
59
60 # check result
61 out, err, code = self.mgr._run_cephadm(
62 host, 'osd', 'ceph-volume',
63 [
64 '--',
65 'lvm', 'list',
66 '--format', 'json',
67 ])
68 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
69 osds_elems = json.loads('\n'.join(out))
70 fsid = self.mgr._cluster_fsid
71 osd_uuid_map = self.mgr.get_osd_uuid_map()
72 created = []
73 for osd_id, osds in osds_elems.items():
74 for osd in osds:
75 if osd['tags']['ceph.cluster_fsid'] != fsid:
76 logger.debug('mismatched fsid, skipping %s' % osd)
77 continue
78 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
79 # if it exists but is part of the replacement operation, don't skip
80 continue
81 if osd_id not in osd_uuid_map:
82 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
83 continue
84 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
85 logger.debug('mismatched osd uuid (cluster has %s, osd '
86 'has %s)' % (
87 osd_uuid_map.get(osd_id),
88 osd['tags']['ceph.osd_fsid']))
89 continue
90
91 created.append(osd_id)
f6b5b4d7
TL
92 daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
93 daemon_id=osd_id,
94 host=host,
95 daemon_type='osd',
96 )
e306af50 97 self.mgr._create_daemon(
f6b5b4d7 98 daemon_spec,
e306af50
TL
99 osd_uuid_map=osd_uuid_map)
100
101 if created:
102 self.mgr.cache.invalidate_host_devices(host)
103 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
104 else:
105 return "Created no osd(s) on host %s; already created?" % host
106
107 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
108 # 1) use fn_filter to determine matching_hosts
109 matching_hosts = drive_group.placement.filter_matching_hosts(self.mgr._get_hosts)
110 # 2) Map the inventory to the InventoryHost object
111 host_ds_map = []
112
113 # set osd_id_claims
114
115 def _find_inv_for_host(hostname: str, inventory_dict: dict):
116 # This is stupid and needs to be loaded with the host
117 for _host, _inventory in inventory_dict.items():
118 if _host == hostname:
119 return _inventory
120 raise OrchestratorError("No inventory found for host: {}".format(hostname))
121
122 # 3) iterate over matching_host and call DriveSelection
123 logger.debug(f"Checking matching hosts -> {matching_hosts}")
124 for host in matching_hosts:
125 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
126 logger.debug(f"Found inventory for host {inventory_for_host}")
127 drive_selection = DriveSelection(drive_group, inventory_for_host)
128 logger.debug(f"Found drive selection {drive_selection}")
129 host_ds_map.append((host, drive_selection))
130 return host_ds_map
131
f6b5b4d7 132 def driveselection_to_ceph_volume(self,
e306af50
TL
133 drive_selection: DriveSelection,
134 osd_id_claims: Optional[List[str]] = None,
135 preview: bool = False) -> Optional[str]:
f6b5b4d7
TL
136 logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
137 cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
e306af50
TL
138 osd_id_claims, preview=preview).run()
139 logger.debug(f"Resulting ceph-volume cmd: {cmd}")
140 return cmd
141
142 def get_previews(self, host) -> List[Dict[str, Any]]:
143 # Find OSDSpecs that match host.
f6b5b4d7 144 osdspecs = self.resolve_osdspecs_for_host(host)
e306af50
TL
145 return self.generate_previews(osdspecs, host)
146
147 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
148 """
149
150 The return should look like this:
151
152 [
153 {'data': {<metadata>},
154 'osdspec': <name of osdspec>,
155 'host': <name of host>
156 },
157
158 {'data': ...,
159 'osdspec': ..,
160 'host': ..
161 }
162 ]
163
164 Note: One host can have multiple previews based on its assigned OSDSpecs.
165 """
166 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
167 ret_all: List[Dict[str, Any]] = []
168 if not osdspecs:
169 return ret_all
170 for osdspec in osdspecs:
171
172 # populate osd_id_claims
f6b5b4d7 173 osd_id_claims = self.find_destroyed_osds()
e306af50
TL
174
175 # prepare driveselection
176 for host, ds in self.prepare_drivegroup(osdspec):
177 if host != for_host:
178 continue
179
180 # driveselection for host
f6b5b4d7
TL
181 cmd = self.driveselection_to_ceph_volume(ds,
182 osd_id_claims.get(host, []),
e306af50
TL
183 preview=True)
184 if not cmd:
185 logger.debug("No data_devices, skipping DriveGroup: {}".format(
186 osdspec.service_name()))
187 continue
188
189 # get preview data from ceph-volume
190 out, err, code = self._run_ceph_volume_command(host, cmd)
191 if out:
192 concat_out: Dict[str, Any] = json.loads(" ".join(out))
193 ret_all.append({'data': concat_out,
194 'osdspec': osdspec.service_id,
195 'host': host})
196 return ret_all
197
f6b5b4d7
TL
198 def resolve_hosts_for_osdspecs(self,
199 specs: Optional[List[DriveGroupSpec]] = None
200 ) -> List[str]:
201 osdspecs = []
202 if specs:
203 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
204 if not osdspecs:
205 self.mgr.log.debug("No OSDSpecs found")
206 return []
207 return sum([spec.placement.filter_matching_hosts(self.mgr._get_hosts) for spec in osdspecs], [])
208
209 def resolve_osdspecs_for_host(self, host: str, specs: Optional[List[DriveGroupSpec]] = None):
210 matching_specs = []
211 self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
212 if not specs:
213 specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
214 if spec.service_type == 'osd']
215 for spec in specs:
216 if host in spec.placement.filter_matching_hosts(self.mgr._get_hosts):
217 self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
218 matching_specs.append(spec)
219 return matching_specs
220
e306af50
TL
221 def _run_ceph_volume_command(self, host: str,
222 cmd: str, env_vars: Optional[List[str]] = None
223 ) -> Tuple[List[str], List[str], int]:
224 self.mgr.inventory.assert_host(host)
225
226 # get bootstrap key
227 ret, keyring, err = self.mgr.check_mon_command({
228 'prefix': 'auth get',
229 'entity': 'client.bootstrap-osd',
230 })
231
232 # generate config
233 ret, config, err = self.mgr.check_mon_command({
234 "prefix": "config generate-minimal-conf",
235 })
236
237 j = json.dumps({
238 'config': config,
239 'keyring': keyring,
240 })
241
242 split_cmd = cmd.split(' ')
243 _cmd = ['--config-json', '-', '--']
244 _cmd.extend(split_cmd)
245 out, err, code = self.mgr._run_cephadm(
246 host, 'osd', 'ceph-volume',
247 _cmd,
248 env_vars=env_vars,
249 stdin=j,
250 error_ok=True)
251 return out, err, code
252
253 def get_osdspec_affinity(self, osd_id: str) -> str:
254 return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
255
256 def find_destroyed_osds(self) -> Dict[str, List[str]]:
257 osd_host_map: Dict[str, List[str]] = dict()
258 try:
259 ret, out, err = self.mgr.check_mon_command({
260 'prefix': 'osd tree',
261 'states': ['destroyed'],
262 'format': 'json'
263 })
264 except MonCommandFailed as e:
265 logger.exception('osd tree failed')
266 raise OrchestratorError(str(e))
267 try:
268 tree = json.loads(out)
269 except json.decoder.JSONDecodeError:
270 logger.exception(f"Could not decode json -> {out}")
271 return osd_host_map
272
273 nodes = tree.get('nodes', {})
274 for node in nodes:
275 if node.get('type') == 'host':
276 osd_host_map.update(
277 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
278 )
279 self.mgr.log.info(
280 f"Found osd claims -> {osd_host_map}")
281 return osd_host_map
282
283
e306af50
TL
284class RemoveUtil(object):
285 def __init__(self, mgr):
286 self.mgr = mgr
e306af50 287
f6b5b4d7 288 def process_removal_queue(self) -> None:
e306af50
TL
289 """
290 Performs actions in the _serve() loop to remove an OSD
291 when criteria is met.
292 """
f6b5b4d7
TL
293
294 # make sure that we don't run on OSDs that are not in the cluster anymore.
295 self.cleanup()
296
e306af50 297 logger.debug(
f6b5b4d7
TL
298 f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
299 f"for removal: {self.mgr.to_remove_osds.all_osds()}")
300
301 # find osds that are ok-to-stop and not yet draining
302 ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
303 if ok_to_stop_osds:
304 # start draining those
305 _ = [osd.start_draining() for osd in ok_to_stop_osds]
306
307 # Check all osds for their state and take action (remove, purge etc)
308 to_remove_osds = self.mgr.to_remove_osds.all_osds()
309 new_queue = set()
310 for osd in to_remove_osds:
e306af50 311 if not osd.force:
e306af50 312 # skip criteria
f6b5b4d7 313 if not osd.is_empty:
e306af50 314 logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
f6b5b4d7 315 new_queue.add(osd)
e306af50
TL
316 continue
317
f6b5b4d7 318 if not osd.safe_to_destroy():
e306af50
TL
319 logger.info(
320 f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
f6b5b4d7 321 new_queue.add(osd)
e306af50
TL
322 continue
323
324 # abort criteria
f6b5b4d7 325 if not osd.down():
e306af50
TL
326 # also remove it from the remove_osd list and set a health_check warning?
327 raise orchestrator.OrchestratorError(
328 f"Could not set OSD <{osd.osd_id}> to 'down'")
329
330 if osd.replace:
f6b5b4d7 331 if not osd.destroy():
e306af50
TL
332 raise orchestrator.OrchestratorError(
333 f"Could not destroy OSD <{osd.osd_id}>")
334 else:
f6b5b4d7 335 if not osd.purge():
e306af50
TL
336 raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
337
f6b5b4d7
TL
338 if not osd.exists:
339 continue
e306af50
TL
340 self.mgr._remove_daemon(osd.fullname, osd.nodename)
341 logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
342 logger.debug(f"Removing {osd.osd_id} from the queue.")
e306af50 343
f6b5b4d7
TL
344 # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
345 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
346 # osds that were added while this method was executed'
347 self.mgr.to_remove_osds.intersection_update(new_queue)
348 self.save_to_store()
349
350 def cleanup(self):
351 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
352 not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
353 [self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds]
354
355 def get_osds_in_cluster(self) -> List[str]:
356 osd_map = self.mgr.get_osdmap()
357 return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
358
359 def osd_df(self) -> dict:
360 base_cmd = 'osd df'
361 ret, out, err = self.mgr.mon_command({
362 'prefix': base_cmd,
363 'format': 'json'
364 })
365 return json.loads(out)
366
367 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
368 if not osd_df:
369 osd_df = self.osd_df()
370 osd_nodes = osd_df.get('nodes', [])
371 for osd_node in osd_nodes:
372 if osd_node.get('id') == int(osd_id):
373 return osd_node.get('pgs', -1)
374 return -1
e306af50 375
f6b5b4d7 376 def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
e306af50 377 """
f6b5b4d7
TL
378 Cut osd_id list in half until it's ok-to-stop
379
380 :param osds: list of osd_ids
381 :return: list of ods_ids that can be stopped at once
e306af50 382 """
f6b5b4d7
TL
383 if not osds:
384 return []
385 while not self.ok_to_stop(osds):
386 if len(osds) <= 1:
387 # can't even stop one OSD, aborting
388 self.mgr.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
389 return []
390
391 # This potentially prolongs the global wait time.
392 self.mgr.event.wait(1)
393 # splitting osd_ids in half until ok_to_stop yields success
394 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
395 # There's a lot of room for micro adjustments here
396 osds = osds[len(osds) // 2:]
397 return osds
398
399 # todo start draining
400 # return all([osd.start_draining() for osd in osds])
401
402 def ok_to_stop(self, osds: List["OSD"]) -> bool:
e306af50 403 cmd_args = {
f6b5b4d7
TL
404 'prefix': "osd ok-to-stop",
405 'ids': [str(osd.osd_id) for osd in osds]
e306af50
TL
406 }
407 return self._run_mon_cmd(cmd_args)
408
f6b5b4d7
TL
409 def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
410 base_cmd = f"osd {flag}"
411 self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
e306af50 412 ret, out, err = self.mgr.mon_command({
f6b5b4d7
TL
413 'prefix': base_cmd,
414 'ids': [str(osd.osd_id) for osd in osds]
e306af50
TL
415 })
416 if ret != 0:
f6b5b4d7
TL
417 self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>")
418 return False
419 self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>")
420 return True
e306af50 421
f6b5b4d7 422 def safe_to_destroy(self, osd_ids: List[int]) -> bool:
e306af50
TL
423 """ Queries the safe-to-destroy flag for OSDs """
424 cmd_args = {'prefix': 'osd safe-to-destroy',
f6b5b4d7 425 'ids': [str(x) for x in osd_ids]}
e306af50
TL
426 return self._run_mon_cmd(cmd_args)
427
428 def destroy_osd(self, osd_id: int) -> bool:
429 """ Destroys an OSD (forcefully) """
430 cmd_args = {'prefix': 'osd destroy-actual',
431 'id': int(osd_id),
432 'yes_i_really_mean_it': True}
433 return self._run_mon_cmd(cmd_args)
434
e306af50
TL
435 def purge_osd(self, osd_id: int) -> bool:
436 """ Purges an OSD from the cluster (forcefully) """
437 cmd_args = {
438 'prefix': 'osd purge-actual',
439 'id': int(osd_id),
440 'yes_i_really_mean_it': True
441 }
442 return self._run_mon_cmd(cmd_args)
443
e306af50
TL
444 def _run_mon_cmd(self, cmd_args: dict) -> bool:
445 """
446 Generic command to run mon_command and evaluate/log the results
447 """
448 ret, out, err = self.mgr.mon_command(cmd_args)
449 if ret != 0:
450 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
451 self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
452 return False
453 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
454 return True
f6b5b4d7
TL
455
456 def save_to_store(self):
457 osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
458 logger.debug(f"Saving {osd_queue} to store")
459 self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
460
461 def load_from_store(self):
462 for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
463 for osd in json.loads(v):
464 logger.debug(f"Loading osd ->{osd} from store")
465 osd_obj = OSD.from_json(json.loads(osd), ctx=self)
466 self.mgr.to_remove_osds.add(osd_obj)
467
468
469class NotFoundError(Exception):
470 pass
471
472
473class OSD:
474
475 def __init__(self,
476 osd_id: int,
477 remove_util: RemoveUtil,
478 drain_started_at: Optional[datetime] = None,
479 process_started_at: Optional[datetime] = None,
480 drain_stopped_at: Optional[datetime] = None,
481 drain_done_at: Optional[datetime] = None,
482 draining: bool = False,
483 started: bool = False,
484 stopped: bool = False,
485 replace: bool = False,
486 force: bool = False,
487 hostname: Optional[str] = None,
488 fullname: Optional[str] = None,
489 ):
490 # the ID of the OSD
491 self.osd_id = osd_id
492
493 # when did process (not the actual draining) start
494 self.process_started_at = process_started_at
495
496 # when did the drain start
497 self.drain_started_at = drain_started_at
498
499 # when did the drain stop
500 self.drain_stopped_at = drain_stopped_at
501
502 # when did the drain finish
503 self.drain_done_at = drain_done_at
504
505 # did the draining start
506 self.draining = draining
507
508 # was the operation started
509 self.started = started
510
511 # was the operation stopped
512 self.stopped = stopped
513
514 # If this is a replace or remove operation
515 self.replace = replace
516 # If we wait for the osd to be drained
517 self.force = force
518 # The name of the node
519 self.nodename = hostname
520 # The full name of the osd
521 self.fullname = fullname
522
523 # mgr obj to make mgr/mon calls
524 self.rm_util = remove_util
525
526 def start(self) -> None:
527 if self.started:
528 logger.debug(f"Already started draining {self}")
529 return None
530 self.started = True
531 self.stopped = False
532
533 def start_draining(self) -> bool:
534 if self.stopped:
535 logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
536 return False
537 self.rm_util.set_osd_flag([self], 'out')
538 self.drain_started_at = datetime.utcnow()
539 self.draining = True
540 logger.debug(f"Started draining {self}.")
541 return True
542
543 def stop_draining(self) -> bool:
544 self.rm_util.set_osd_flag([self], 'in')
545 self.drain_stopped_at = datetime.utcnow()
546 self.draining = False
547 logger.debug(f"Stopped draining {self}.")
548 return True
549
550 def stop(self) -> None:
551 if self.stopped:
552 logger.debug(f"Already stopped draining {self}")
553 return None
554 self.started = False
555 self.stopped = True
556 self.stop_draining()
557
558 @property
559 def is_draining(self) -> bool:
560 """
561 Consider an OSD draining when it is
562 actively draining but not yet empty
563 """
564 return self.draining and not self.is_empty
565
566 @property
567 def is_ok_to_stop(self) -> bool:
568 return self.rm_util.ok_to_stop([self])
569
570 @property
571 def is_empty(self) -> bool:
572 if self.get_pg_count() == 0:
573 if not self.drain_done_at:
574 self.drain_done_at = datetime.utcnow()
575 self.draining = False
576 return True
577 return False
578
579 def safe_to_destroy(self) -> bool:
580 return self.rm_util.safe_to_destroy([self.osd_id])
581
582 def down(self) -> bool:
583 return self.rm_util.set_osd_flag([self], 'down')
584
585 def destroy(self) -> bool:
586 return self.rm_util.destroy_osd(self.osd_id)
587
588 def purge(self) -> bool:
589 return self.rm_util.purge_osd(self.osd_id)
590
591 def get_pg_count(self) -> int:
592 return self.rm_util.get_pg_count(self.osd_id)
593
594 @property
595 def exists(self) -> bool:
596 return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
597
598 def drain_status_human(self):
599 default_status = 'not started'
600 status = 'started' if self.started and not self.draining else default_status
601 status = 'draining' if self.draining else status
602 status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
603 return status
604
605 def pg_count_str(self):
606 return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
607
608 def to_json(self) -> dict:
609 out = dict()
610 out['osd_id'] = self.osd_id
611 out['started'] = self.started
612 out['draining'] = self.draining
613 out['stopped'] = self.stopped
614 out['replace'] = self.replace
615 out['force'] = self.force
616 out['nodename'] = self.nodename # type: ignore
617
618 for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
619 if getattr(self, k):
620 out[k] = getattr(self, k).strftime(DATEFMT)
621 else:
622 out[k] = getattr(self, k)
623 return out
624
625 @classmethod
626 def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
627 if not inp:
628 return None
629 for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
630 if inp.get(date_field):
631 inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)})
632 inp.update({'remove_util': ctx})
633 return cls(**inp)
634
635 def __hash__(self):
636 return hash(self.osd_id)
637
638 def __eq__(self, other: object) -> bool:
639 if not isinstance(other, OSD):
640 return NotImplemented
641 return self.osd_id == other.osd_id
642
643 def __repr__(self) -> str:
644 return f"<OSD>(osd_id={self.osd_id}, is_draining={self.is_draining})"
645
646
647class OSDQueue(Set):
648
649 def __init__(self):
650 super().__init__()
651
652 def as_osd_ids(self) -> List[int]:
653 return [osd.osd_id for osd in self]
654
655 def queue_size(self) -> int:
656 return len(self)
657
658 def draining_osds(self) -> List["OSD"]:
659 return [osd for osd in self if osd.is_draining]
660
661 def idling_osds(self) -> List["OSD"]:
662 return [osd for osd in self if not osd.is_draining and not osd.is_empty]
663
664 def empty_osds(self) -> List["OSD"]:
665 return [osd for osd in self if osd.is_empty]
666
667 def all_osds(self) -> List["OSD"]:
668 return [osd for osd in self]
669
670 def not_in_cluster(self) -> List["OSD"]:
671 return [osd for osd in self if not osd.exists]
672
673 def enqueue(self, osd: "OSD") -> None:
674 if not osd.exists:
675 raise NotFoundError()
676 self.add(osd)
677 osd.start()
678
679 def rm(self, osd: "OSD") -> None:
680 if not osd.exists:
681 raise NotFoundError()
682 osd.stop()
683 try:
684 logger.debug(f'Removing {osd} from the queue.')
685 self.remove(osd)
686 except KeyError:
687 logger.debug(f"Could not find {osd} in queue.")
688 raise KeyError