]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
1 import json
2 import logging
3 from threading import Lock
4 from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
5
6 from ceph.deployment import translate
7 from ceph.deployment.drive_group import DriveGroupSpec
8 from ceph.deployment.drive_selection import DriveSelection
9 from ceph.deployment.inventory import Device
10 from ceph.utils import datetime_to_str, str_to_datetime
11
12 from datetime import datetime
13 import orchestrator
14 from cephadm.serve import CephadmServe
15 from cephadm.utils import forall_hosts
16 from ceph.utils import datetime_now
17 from orchestrator import OrchestratorError
18 from mgr_module import MonCommandFailed
19
20 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
21
22 if TYPE_CHECKING:
23 from cephadm.module import CephadmOrchestrator
24
25 logger = logging.getLogger(__name__)
26
27
28 class OSDService(CephService):
29 TYPE = 'osd'
30
31 def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
32 logger.debug(f"Processing DriveGroup {drive_group}")
33 osd_id_claims = self.find_destroyed_osds()
34 if osd_id_claims:
35 logger.info(
36 f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
37
38 @forall_hosts
39 def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
40 # skip this host if there has been no change in inventory
41 if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
42 self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
43 host, drive_group))
44 return None
45
46 cmd = self.driveselection_to_ceph_volume(drive_selection,
47 osd_id_claims.get(host, []))
48 if not cmd:
49 logger.debug("No data_devices, skipping DriveGroup: {}".format(
50 drive_group.service_id))
51 return None
52
53 logger.info('Applying service osd.%s on host %s...' % (
54 drive_group.service_id, host
55 ))
56 start_ts = datetime_now()
57 env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
58 ret_msg = self.create_single_host(
59 drive_group, host, cmd,
60 replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
61 )
62 self.mgr.cache.update_osdspec_last_applied(
63 host, drive_group.service_name(), start_ts
64 )
65 self.mgr.cache.save_host(host)
66 return ret_msg
67
68 ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
69 return ", ".join(filter(None, ret))
70
71 def create_single_host(self,
72 drive_group: DriveGroupSpec,
73 host: str, cmd: str, replace_osd_ids: List[str],
74 env_vars: Optional[List[str]] = None) -> str:
75 out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
76
77 if code == 1 and ', it is already prepared' in '\n'.join(err):
78 # HACK: when we create against an existing LV, ceph-volume
79 # returns an error and the above message. To make this
80 # command idempotent, tolerate this "error" and continue.
81 logger.debug('the device was already prepared; continuing')
82 code = 0
83 if code:
84 raise RuntimeError(
85 'cephadm exited with an error code: %d, stderr:%s' % (
86 code, '\n'.join(err)))
87 return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
88 replace_osd_ids)
89
90 def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
91 replace_osd_ids: Optional[List[str]] = None) -> str:
92
93 if replace_osd_ids is None:
94 replace_osd_ids = self.find_destroyed_osds().get(host, [])
95 assert replace_osd_ids is not None
96 # check result
97 osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
98 host, 'osd', 'ceph-volume',
99 [
100 '--',
101 'lvm', 'list',
102 '--format', 'json',
103 ])
104 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
105 fsid = self.mgr._cluster_fsid
106 osd_uuid_map = self.mgr.get_osd_uuid_map()
107 created = []
108 for osd_id, osds in osds_elems.items():
109 for osd in osds:
110 if osd['tags']['ceph.cluster_fsid'] != fsid:
111 logger.debug('mismatched fsid, skipping %s' % osd)
112 continue
113 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
114 # if it exists but is part of the replacement operation, don't skip
115 continue
116 if osd_id not in osd_uuid_map:
117 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
118 continue
119 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
120 logger.debug('mismatched osd uuid (cluster has %s, osd '
121 'has %s)' % (
122 osd_uuid_map.get(osd_id),
123 osd['tags']['ceph.osd_fsid']))
124 continue
125
126 created.append(osd_id)
127 daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
128 service_name=service_name,
129 daemon_id=osd_id,
130 host=host,
131 daemon_type='osd',
132 )
133 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
134 CephadmServe(self.mgr)._create_daemon(
135 daemon_spec,
136 osd_uuid_map=osd_uuid_map)
137
138 if created:
139 self.mgr.cache.invalidate_host_devices(host)
140 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
141 else:
142 return "Created no osd(s) on host %s; already created?" % host
143
144 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
145 # 1) use fn_filter to determine matching_hosts
146 matching_hosts = drive_group.placement.filter_matching_hostspecs(
147 self.mgr.inventory.all_specs())
148 # 2) Map the inventory to the InventoryHost object
149 host_ds_map = []
150
151 # set osd_id_claims
152
153 def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
154 # This is stupid and needs to be loaded with the host
155 for _host, _inventory in inventory_dict.items():
156 if _host == hostname:
157 return _inventory
158 raise OrchestratorError("No inventory found for host: {}".format(hostname))
159
160 # 3) iterate over matching_host and call DriveSelection
161 logger.debug(f"Checking matching hosts -> {matching_hosts}")
162 for host in matching_hosts:
163 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
164 logger.debug(f"Found inventory for host {inventory_for_host}")
165
166 # List of Daemons on that host
167 dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
168 dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]
169
170 drive_selection = DriveSelection(drive_group, inventory_for_host,
171 existing_daemons=len(dd_for_spec_and_host))
172 logger.debug(f"Found drive selection {drive_selection}")
173 host_ds_map.append((host, drive_selection))
174 return host_ds_map
175
176 @staticmethod
177 def driveselection_to_ceph_volume(drive_selection: DriveSelection,
178 osd_id_claims: Optional[List[str]] = None,
179 preview: bool = False) -> Optional[str]:
180 logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
181 cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
182 osd_id_claims, preview=preview).run()
183 logger.debug(f"Resulting ceph-volume cmd: {cmd}")
184 return cmd
185
186 def get_previews(self, host: str) -> List[Dict[str, Any]]:
187 # Find OSDSpecs that match host.
188 osdspecs = self.resolve_osdspecs_for_host(host)
189 return self.generate_previews(osdspecs, host)
190
191 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
192 """
193
194 The return should look like this:
195
196 [
197 {'data': {<metadata>},
198 'osdspec': <name of osdspec>,
199 'host': <name of host>
200 },
201
202 {'data': ...,
203 'osdspec': ..,
204 'host': ..
205 }
206 ]
207
208 Note: One host can have multiple previews based on its assigned OSDSpecs.
209 """
210 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
211 ret_all: List[Dict[str, Any]] = []
212 if not osdspecs:
213 return ret_all
214 for osdspec in osdspecs:
215
216 # populate osd_id_claims
217 osd_id_claims = self.find_destroyed_osds()
218
219 # prepare driveselection
220 for host, ds in self.prepare_drivegroup(osdspec):
221 if host != for_host:
222 continue
223
224 # driveselection for host
225 cmd = self.driveselection_to_ceph_volume(ds,
226 osd_id_claims.get(host, []),
227 preview=True)
228 if not cmd:
229 logger.debug("No data_devices, skipping DriveGroup: {}".format(
230 osdspec.service_name()))
231 continue
232
233 # get preview data from ceph-volume
234 out, err, code = self._run_ceph_volume_command(host, cmd)
235 if out:
236 try:
237 concat_out: Dict[str, Any] = json.loads(' '.join(out))
238 except ValueError:
239 logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
240 concat_out = {}
241
242 ret_all.append({'data': concat_out,
243 'osdspec': osdspec.service_id,
244 'host': host})
245 return ret_all
246
247 def resolve_hosts_for_osdspecs(self,
248 specs: Optional[List[DriveGroupSpec]] = None
249 ) -> List[str]:
250 osdspecs = []
251 if specs:
252 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
253 if not osdspecs:
254 self.mgr.log.debug("No OSDSpecs found")
255 return []
256 return sum([spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()) for spec in osdspecs], [])
257
258 def resolve_osdspecs_for_host(self, host: str,
259 specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
260 matching_specs = []
261 self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
262 if not specs:
263 specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
264 if spec.service_type == 'osd']
265 for spec in specs:
266 if host in spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()):
267 self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
268 matching_specs.append(spec)
269 return matching_specs
270
271 def _run_ceph_volume_command(self, host: str,
272 cmd: str, env_vars: Optional[List[str]] = None
273 ) -> Tuple[List[str], List[str], int]:
274 self.mgr.inventory.assert_host(host)
275
276 # get bootstrap key
277 ret, keyring, err = self.mgr.check_mon_command({
278 'prefix': 'auth get',
279 'entity': 'client.bootstrap-osd',
280 })
281
282 j = json.dumps({
283 'config': self.mgr.get_minimal_ceph_conf(),
284 'keyring': keyring,
285 })
286
287 split_cmd = cmd.split(' ')
288 _cmd = ['--config-json', '-', '--']
289 _cmd.extend(split_cmd)
290 out, err, code = CephadmServe(self.mgr)._run_cephadm(
291 host, 'osd', 'ceph-volume',
292 _cmd,
293 env_vars=env_vars,
294 stdin=j,
295 error_ok=True)
296 return out, err, code
297
298 def get_osdspec_affinity(self, osd_id: str) -> str:
299 return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
300
301 def find_destroyed_osds(self) -> Dict[str, List[str]]:
302 osd_host_map: Dict[str, List[str]] = dict()
303 try:
304 ret, out, err = self.mgr.check_mon_command({
305 'prefix': 'osd tree',
306 'states': ['destroyed'],
307 'format': 'json'
308 })
309 except MonCommandFailed as e:
310 logger.exception('osd tree failed')
311 raise OrchestratorError(str(e))
312 try:
313 tree = json.loads(out)
314 except ValueError:
315 logger.exception(f'Cannot decode JSON: \'{out}\'')
316 return osd_host_map
317
318 nodes = tree.get('nodes', {})
319 for node in nodes:
320 if node.get('type') == 'host':
321 osd_host_map.update(
322 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
323 )
324 if osd_host_map:
325 self.mgr.log.info(f"Found osd claims -> {osd_host_map}")
326 return osd_host_map
327
328
329 class RemoveUtil(object):
330 def __init__(self, mgr: "CephadmOrchestrator") -> None:
331 self.mgr: "CephadmOrchestrator" = mgr
332
333 def get_osds_in_cluster(self) -> List[str]:
334 osd_map = self.mgr.get_osdmap()
335 return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
336
337 def osd_df(self) -> dict:
338 base_cmd = 'osd df'
339 ret, out, err = self.mgr.mon_command({
340 'prefix': base_cmd,
341 'format': 'json'
342 })
343 try:
344 return json.loads(out)
345 except ValueError:
346 logger.exception(f'Cannot decode JSON: \'{out}\'')
347 return {}
348
349 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
350 if not osd_df:
351 osd_df = self.osd_df()
352 osd_nodes = osd_df.get('nodes', [])
353 for osd_node in osd_nodes:
354 if osd_node.get('id') == int(osd_id):
355 return osd_node.get('pgs', -1)
356 return -1
357
358 def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
359 """
360 Cut osd_id list in half until it's ok-to-stop
361
362 :param osds: list of osd_ids
363 :return: list of ods_ids that can be stopped at once
364 """
365 if not osds:
366 return []
367 while not self.ok_to_stop(osds):
368 if len(osds) <= 1:
369 # can't even stop one OSD, aborting
370 self.mgr.log.info(
371 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
372 return []
373
374 # This potentially prolongs the global wait time.
375 self.mgr.event.wait(1)
376 # splitting osd_ids in half until ok_to_stop yields success
377 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
378 # There's a lot of room for micro adjustments here
379 osds = osds[len(osds) // 2:]
380 return osds
381
382 # todo start draining
383 # return all([osd.start_draining() for osd in osds])
384
385 def ok_to_stop(self, osds: List["OSD"]) -> bool:
386 cmd_args = {
387 'prefix': "osd ok-to-stop",
388 'ids': [str(osd.osd_id) for osd in osds]
389 }
390 return self._run_mon_cmd(cmd_args)
391
392 def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
393 base_cmd = f"osd {flag}"
394 self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
395 ret, out, err = self.mgr.mon_command({
396 'prefix': base_cmd,
397 'ids': [str(osd.osd_id) for osd in osds]
398 })
399 if ret != 0:
400 self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
401 return False
402 self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
403 return True
404
405 def get_weight(self, osd: "OSD") -> Optional[float]:
406 ret, out, err = self.mgr.mon_command({
407 'prefix': 'osd crush tree',
408 'format': 'json',
409 })
410 if ret != 0:
411 self.mgr.log.error(f"Could not dump crush weights. <{err}>")
412 return None
413 j = json.loads(out)
414 for n in j.get("nodes", []):
415 if n.get("name") == f"osd.{osd.osd_id}":
416 self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
417 return n.get("crush_weight")
418 return None
419
420 def reweight_osd(self, osd: "OSD", weight: float) -> bool:
421 self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
422 ret, out, err = self.mgr.mon_command({
423 'prefix': "osd crush reweight",
424 'name': f"osd.{osd.osd_id}",
425 'weight': weight,
426 })
427 if ret != 0:
428 self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
429 return False
430 self.mgr.log.info(f"{osd} weight is now {weight}")
431 return True
432
433 def safe_to_destroy(self, osd_ids: List[int]) -> bool:
434 """ Queries the safe-to-destroy flag for OSDs """
435 cmd_args = {'prefix': 'osd safe-to-destroy',
436 'ids': [str(x) for x in osd_ids]}
437 return self._run_mon_cmd(cmd_args)
438
439 def destroy_osd(self, osd_id: int) -> bool:
440 """ Destroys an OSD (forcefully) """
441 cmd_args = {'prefix': 'osd destroy-actual',
442 'id': int(osd_id),
443 'yes_i_really_mean_it': True}
444 return self._run_mon_cmd(cmd_args)
445
446 def purge_osd(self, osd_id: int) -> bool:
447 """ Purges an OSD from the cluster (forcefully) """
448 cmd_args = {
449 'prefix': 'osd purge-actual',
450 'id': int(osd_id),
451 'yes_i_really_mean_it': True
452 }
453 return self._run_mon_cmd(cmd_args)
454
455 def _run_mon_cmd(self, cmd_args: dict) -> bool:
456 """
457 Generic command to run mon_command and evaluate/log the results
458 """
459 ret, out, err = self.mgr.mon_command(cmd_args)
460 if ret != 0:
461 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
462 self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
463 return False
464 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
465 return True
466
467
468 class NotFoundError(Exception):
469 pass
470
471
472 class OSD:
473
474 def __init__(self,
475 osd_id: int,
476 remove_util: RemoveUtil,
477 drain_started_at: Optional[datetime] = None,
478 process_started_at: Optional[datetime] = None,
479 drain_stopped_at: Optional[datetime] = None,
480 drain_done_at: Optional[datetime] = None,
481 draining: bool = False,
482 started: bool = False,
483 stopped: bool = False,
484 replace: bool = False,
485 force: bool = False,
486 hostname: Optional[str] = None,
487 ):
488 # the ID of the OSD
489 self.osd_id = osd_id
490
491 # when did process (not the actual draining) start
492 self.process_started_at = process_started_at
493
494 # when did the drain start
495 self.drain_started_at = drain_started_at
496
497 # when did the drain stop
498 self.drain_stopped_at = drain_stopped_at
499
500 # when did the drain finish
501 self.drain_done_at = drain_done_at
502
503 # did the draining start
504 self.draining = draining
505
506 # was the operation started
507 self.started = started
508
509 # was the operation stopped
510 self.stopped = stopped
511
512 # If this is a replace or remove operation
513 self.replace = replace
514 # If we wait for the osd to be drained
515 self.force = force
516 # The name of the node
517 self.hostname = hostname
518
519 # mgr obj to make mgr/mon calls
520 self.rm_util: RemoveUtil = remove_util
521
522 self.original_weight: Optional[float] = None
523
524 def start(self) -> None:
525 if self.started:
526 logger.debug(f"Already started draining {self}")
527 return None
528 self.started = True
529 self.stopped = False
530
531 def start_draining(self) -> bool:
532 if self.stopped:
533 logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
534 return False
535 if self.replace:
536 self.rm_util.set_osd_flag([self], 'out')
537 else:
538 self.original_weight = self.rm_util.get_weight(self)
539 self.rm_util.reweight_osd(self, 0.0)
540 self.drain_started_at = datetime.utcnow()
541 self.draining = True
542 logger.debug(f"Started draining {self}.")
543 return True
544
545 def stop_draining(self) -> bool:
546 if self.replace:
547 self.rm_util.set_osd_flag([self], 'in')
548 else:
549 if self.original_weight:
550 self.rm_util.reweight_osd(self, self.original_weight)
551 self.drain_stopped_at = datetime.utcnow()
552 self.draining = False
553 logger.debug(f"Stopped draining {self}.")
554 return True
555
556 def stop(self) -> None:
557 if self.stopped:
558 logger.debug(f"Already stopped draining {self}")
559 return None
560 self.started = False
561 self.stopped = True
562 self.stop_draining()
563
564 @property
565 def is_draining(self) -> bool:
566 """
567 Consider an OSD draining when it is
568 actively draining but not yet empty
569 """
570 return self.draining and not self.is_empty
571
572 @property
573 def is_ok_to_stop(self) -> bool:
574 return self.rm_util.ok_to_stop([self])
575
576 @property
577 def is_empty(self) -> bool:
578 if self.get_pg_count() == 0:
579 if not self.drain_done_at:
580 self.drain_done_at = datetime.utcnow()
581 self.draining = False
582 return True
583 return False
584
585 def safe_to_destroy(self) -> bool:
586 return self.rm_util.safe_to_destroy([self.osd_id])
587
588 def down(self) -> bool:
589 return self.rm_util.set_osd_flag([self], 'down')
590
591 def destroy(self) -> bool:
592 return self.rm_util.destroy_osd(self.osd_id)
593
594 def purge(self) -> bool:
595 return self.rm_util.purge_osd(self.osd_id)
596
597 def get_pg_count(self) -> int:
598 return self.rm_util.get_pg_count(self.osd_id)
599
600 @property
601 def exists(self) -> bool:
602 return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
603
604 def drain_status_human(self) -> str:
605 default_status = 'not started'
606 status = 'started' if self.started and not self.draining else default_status
607 status = 'draining' if self.draining else status
608 status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
609 return status
610
611 def pg_count_str(self) -> str:
612 return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
613
614 def to_json(self) -> dict:
615 out: Dict[str, Any] = dict()
616 out['osd_id'] = self.osd_id
617 out['started'] = self.started
618 out['draining'] = self.draining
619 out['stopped'] = self.stopped
620 out['replace'] = self.replace
621 out['force'] = self.force
622 out['hostname'] = self.hostname # type: ignore
623
624 for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
625 if getattr(self, k):
626 out[k] = datetime_to_str(getattr(self, k))
627 else:
628 out[k] = getattr(self, k)
629 return out
630
631 @classmethod
632 def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
633 if not inp:
634 return None
635 for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
636 if inp.get(date_field):
637 inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
638 inp.update({'remove_util': rm_util})
639 if 'nodename' in inp:
640 hostname = inp.pop('nodename')
641 inp['hostname'] = hostname
642 return cls(**inp)
643
644 def __hash__(self) -> int:
645 return hash(self.osd_id)
646
647 def __eq__(self, other: object) -> bool:
648 if not isinstance(other, OSD):
649 return NotImplemented
650 return self.osd_id == other.osd_id
651
652 def __repr__(self) -> str:
653 return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
654
655
656 class OSDRemovalQueue(object):
657
658 def __init__(self, mgr: "CephadmOrchestrator") -> None:
659 self.mgr: "CephadmOrchestrator" = mgr
660 self.osds: Set[OSD] = set()
661 self.rm_util = RemoveUtil(mgr)
662
663 # locks multithreaded access to self.osds. Please avoid locking
664 # network calls, like mon commands.
665 self.lock = Lock()
666
667 def process_removal_queue(self) -> None:
668 """
669 Performs actions in the _serve() loop to remove an OSD
670 when criteria is met.
671
672 we can't hold self.lock, as we're calling _remove_daemon in the loop
673 """
674
675 # make sure that we don't run on OSDs that are not in the cluster anymore.
676 self.cleanup()
677
678 # find osds that are ok-to-stop and not yet draining
679 ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
680 if ok_to_stop_osds:
681 # start draining those
682 _ = [osd.start_draining() for osd in ok_to_stop_osds]
683
684 all_osds = self.all_osds()
685
686 logger.debug(
687 f"{self.queue_size()} OSDs are scheduled "
688 f"for removal: {all_osds}")
689
690 # Check all osds for their state and take action (remove, purge etc)
691 new_queue: Set[OSD] = set()
692 for osd in all_osds: # type: OSD
693 if not osd.force:
694 # skip criteria
695 if not osd.is_empty:
696 logger.debug(f"{osd} is not empty yet. Waiting a bit more")
697 new_queue.add(osd)
698 continue
699
700 if not osd.safe_to_destroy():
701 logger.debug(
702 f"{osd} is not safe-to-destroy yet. Waiting a bit more")
703 new_queue.add(osd)
704 continue
705
706 # abort criteria
707 if not osd.down():
708 # also remove it from the remove_osd list and set a health_check warning?
709 raise orchestrator.OrchestratorError(
710 f"Could not mark {osd} down")
711
712 # stop and remove daemon
713 assert osd.hostname is not None
714 CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
715 logger.info(f"Successfully removed {osd} on {osd.hostname}")
716
717 if osd.replace:
718 # mark destroyed in osdmap
719 if not osd.destroy():
720 raise orchestrator.OrchestratorError(
721 f"Could not destroy {osd}")
722 logger.info(
723 f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
724 else:
725 # purge from osdmap
726 if not osd.purge():
727 raise orchestrator.OrchestratorError(f"Could not purge {osd}")
728 logger.info(f"Successfully purged {osd} on {osd.hostname}")
729
730 logger.debug(f"Removing {osd} from the queue.")
731
732 # self could change while this is processing (osds get added from the CLI)
733 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
734 # osds that were added while this method was executed'
735 with self.lock:
736 self.osds.intersection_update(new_queue)
737 self._save_to_store()
738
739 def cleanup(self) -> None:
740 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
741 with self.lock:
742 for osd in self._not_in_cluster():
743 self.osds.remove(osd)
744
745 def _save_to_store(self) -> None:
746 osd_queue = [osd.to_json() for osd in self.osds]
747 logger.debug(f"Saving {osd_queue} to store")
748 self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
749
750 def load_from_store(self) -> None:
751 with self.lock:
752 for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
753 for osd in json.loads(v):
754 logger.debug(f"Loading osd ->{osd} from store")
755 osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
756 if osd_obj is not None:
757 self.osds.add(osd_obj)
758
759 def as_osd_ids(self) -> List[int]:
760 with self.lock:
761 return [osd.osd_id for osd in self.osds]
762
763 def queue_size(self) -> int:
764 with self.lock:
765 return len(self.osds)
766
767 def draining_osds(self) -> List["OSD"]:
768 with self.lock:
769 return [osd for osd in self.osds if osd.is_draining]
770
771 def idling_osds(self) -> List["OSD"]:
772 with self.lock:
773 return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
774
775 def empty_osds(self) -> List["OSD"]:
776 with self.lock:
777 return [osd for osd in self.osds if osd.is_empty]
778
779 def all_osds(self) -> List["OSD"]:
780 with self.lock:
781 return [osd for osd in self.osds]
782
783 def _not_in_cluster(self) -> List["OSD"]:
784 return [osd for osd in self.osds if not osd.exists]
785
786 def enqueue(self, osd: "OSD") -> None:
787 if not osd.exists:
788 raise NotFoundError()
789 with self.lock:
790 self.osds.add(osd)
791 osd.start()
792
793 def rm(self, osd: "OSD") -> None:
794 if not osd.exists:
795 raise NotFoundError()
796 osd.stop()
797 with self.lock:
798 try:
799 logger.debug(f'Removing {osd} from the queue.')
800 self.osds.remove(osd)
801 except KeyError:
802 logger.debug(f"Could not find {osd} in queue.")
803 raise KeyError
804
805 def __eq__(self, other: Any) -> bool:
806 if not isinstance(other, OSDRemovalQueue):
807 return False
808 with self.lock:
809 return self.osds == other.osds