]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/services/osd.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
CommitLineData
e306af50
TL
1import json
2import logging
adb31ebb 3from threading import Lock
f67539c2 4from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
e306af50
TL
5
6from ceph.deployment import translate
7from ceph.deployment.drive_group import DriveGroupSpec
8from ceph.deployment.drive_selection import DriveSelection
f91f0fd5 9from ceph.deployment.inventory import Device
adb31ebb 10from ceph.utils import datetime_to_str, str_to_datetime
e306af50 11
f6b5b4d7 12from datetime import datetime
e306af50 13import orchestrator
f67539c2 14from cephadm.serve import CephadmServe
adb31ebb 15from cephadm.utils import forall_hosts
f67539c2 16from ceph.utils import datetime_now
e306af50
TL
17from orchestrator import OrchestratorError
18from mgr_module import MonCommandFailed
19
f67539c2 20from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
f91f0fd5
TL
21
22if TYPE_CHECKING:
23 from cephadm.module import CephadmOrchestrator
e306af50
TL
24
25logger = logging.getLogger(__name__)
26
27
f91f0fd5 28class OSDService(CephService):
f6b5b4d7
TL
29 TYPE = 'osd'
30
31 def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
e306af50 32 logger.debug(f"Processing DriveGroup {drive_group}")
f6b5b4d7 33 osd_id_claims = self.find_destroyed_osds()
adb31ebb
TL
34 if osd_id_claims:
35 logger.info(
36 f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
f6b5b4d7
TL
37
38 @forall_hosts
39 def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
f67539c2
TL
40 # skip this host if there has been no change in inventory
41 if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
42 self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
43 host, drive_group))
44 return None
45
f6b5b4d7
TL
46 cmd = self.driveselection_to_ceph_volume(drive_selection,
47 osd_id_claims.get(host, []))
e306af50 48 if not cmd:
f91f0fd5
TL
49 logger.debug("No data_devices, skipping DriveGroup: {}".format(
50 drive_group.service_id))
f6b5b4d7 51 return None
f67539c2
TL
52
53 logger.info('Applying service osd.%s on host %s...' % (
54 drive_group.service_id, host
55 ))
56 start_ts = datetime_now()
f6b5b4d7 57 env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
e306af50 58 ret_msg = self.create_single_host(
f67539c2
TL
59 drive_group, host, cmd,
60 replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
61 )
62 self.mgr.cache.update_osdspec_last_applied(
63 host, drive_group.service_name(), start_ts
e306af50 64 )
f67539c2 65 self.mgr.cache.save_host(host)
f6b5b4d7
TL
66 return ret_msg
67
68 ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
69 return ", ".join(filter(None, ret))
70
f67539c2
TL
71 def create_single_host(self,
72 drive_group: DriveGroupSpec,
73 host: str, cmd: str, replace_osd_ids: List[str],
f91f0fd5 74 env_vars: Optional[List[str]] = None) -> str:
e306af50
TL
75 out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
76
77 if code == 1 and ', it is already prepared' in '\n'.join(err):
78 # HACK: when we create against an existing LV, ceph-volume
79 # returns an error and the above message. To make this
80 # command idempotent, tolerate this "error" and continue.
81 logger.debug('the device was already prepared; continuing')
82 code = 0
83 if code:
84 raise RuntimeError(
85 'cephadm exited with an error code: %d, stderr:%s' % (
86 code, '\n'.join(err)))
f67539c2
TL
87 return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
88 replace_osd_ids)
e306af50 89
f67539c2
TL
90 def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
91 replace_osd_ids: Optional[List[str]] = None) -> str:
92
93 if replace_osd_ids is None:
94 replace_osd_ids = self.find_destroyed_osds().get(host, [])
95 assert replace_osd_ids is not None
e306af50 96 # check result
f67539c2 97 osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
e306af50
TL
98 host, 'osd', 'ceph-volume',
99 [
100 '--',
101 'lvm', 'list',
102 '--format', 'json',
103 ])
104 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
e306af50
TL
105 fsid = self.mgr._cluster_fsid
106 osd_uuid_map = self.mgr.get_osd_uuid_map()
107 created = []
108 for osd_id, osds in osds_elems.items():
109 for osd in osds:
110 if osd['tags']['ceph.cluster_fsid'] != fsid:
111 logger.debug('mismatched fsid, skipping %s' % osd)
112 continue
113 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
114 # if it exists but is part of the replacement operation, don't skip
115 continue
116 if osd_id not in osd_uuid_map:
117 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
118 continue
119 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
120 logger.debug('mismatched osd uuid (cluster has %s, osd '
f91f0fd5
TL
121 'has %s)' % (
122 osd_uuid_map.get(osd_id),
123 osd['tags']['ceph.osd_fsid']))
e306af50
TL
124 continue
125
126 created.append(osd_id)
f67539c2
TL
127 daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
128 service_name=service_name,
f6b5b4d7
TL
129 daemon_id=osd_id,
130 host=host,
131 daemon_type='osd',
132 )
f67539c2
TL
133 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
134 CephadmServe(self.mgr)._create_daemon(
f6b5b4d7 135 daemon_spec,
e306af50
TL
136 osd_uuid_map=osd_uuid_map)
137
138 if created:
139 self.mgr.cache.invalidate_host_devices(host)
140 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
141 else:
142 return "Created no osd(s) on host %s; already created?" % host
143
144 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
145 # 1) use fn_filter to determine matching_hosts
f91f0fd5
TL
146 matching_hosts = drive_group.placement.filter_matching_hostspecs(
147 self.mgr.inventory.all_specs())
e306af50
TL
148 # 2) Map the inventory to the InventoryHost object
149 host_ds_map = []
150
151 # set osd_id_claims
152
f91f0fd5 153 def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
e306af50
TL
154 # This is stupid and needs to be loaded with the host
155 for _host, _inventory in inventory_dict.items():
156 if _host == hostname:
157 return _inventory
158 raise OrchestratorError("No inventory found for host: {}".format(hostname))
159
160 # 3) iterate over matching_host and call DriveSelection
161 logger.debug(f"Checking matching hosts -> {matching_hosts}")
162 for host in matching_hosts:
163 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
164 logger.debug(f"Found inventory for host {inventory_for_host}")
f91f0fd5
TL
165
166 # List of Daemons on that host
167 dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
168 dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]
169
170 drive_selection = DriveSelection(drive_group, inventory_for_host,
171 existing_daemons=len(dd_for_spec_and_host))
e306af50
TL
172 logger.debug(f"Found drive selection {drive_selection}")
173 host_ds_map.append((host, drive_selection))
174 return host_ds_map
175
f91f0fd5
TL
176 @staticmethod
177 def driveselection_to_ceph_volume(drive_selection: DriveSelection,
e306af50
TL
178 osd_id_claims: Optional[List[str]] = None,
179 preview: bool = False) -> Optional[str]:
f6b5b4d7
TL
180 logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
181 cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
e306af50
TL
182 osd_id_claims, preview=preview).run()
183 logger.debug(f"Resulting ceph-volume cmd: {cmd}")
184 return cmd
185
f91f0fd5 186 def get_previews(self, host: str) -> List[Dict[str, Any]]:
e306af50 187 # Find OSDSpecs that match host.
f6b5b4d7 188 osdspecs = self.resolve_osdspecs_for_host(host)
e306af50
TL
189 return self.generate_previews(osdspecs, host)
190
191 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
192 """
193
194 The return should look like this:
195
196 [
197 {'data': {<metadata>},
198 'osdspec': <name of osdspec>,
199 'host': <name of host>
200 },
201
202 {'data': ...,
203 'osdspec': ..,
204 'host': ..
205 }
206 ]
207
208 Note: One host can have multiple previews based on its assigned OSDSpecs.
209 """
210 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
211 ret_all: List[Dict[str, Any]] = []
212 if not osdspecs:
213 return ret_all
214 for osdspec in osdspecs:
215
216 # populate osd_id_claims
f6b5b4d7 217 osd_id_claims = self.find_destroyed_osds()
e306af50
TL
218
219 # prepare driveselection
220 for host, ds in self.prepare_drivegroup(osdspec):
221 if host != for_host:
222 continue
223
224 # driveselection for host
f6b5b4d7
TL
225 cmd = self.driveselection_to_ceph_volume(ds,
226 osd_id_claims.get(host, []),
e306af50
TL
227 preview=True)
228 if not cmd:
229 logger.debug("No data_devices, skipping DriveGroup: {}".format(
230 osdspec.service_name()))
231 continue
232
233 # get preview data from ceph-volume
234 out, err, code = self._run_ceph_volume_command(host, cmd)
235 if out:
adb31ebb
TL
236 try:
237 concat_out: Dict[str, Any] = json.loads(' '.join(out))
238 except ValueError:
239 logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
240 concat_out = {}
241
e306af50
TL
242 ret_all.append({'data': concat_out,
243 'osdspec': osdspec.service_id,
244 'host': host})
245 return ret_all
246
f6b5b4d7
TL
247 def resolve_hosts_for_osdspecs(self,
248 specs: Optional[List[DriveGroupSpec]] = None
249 ) -> List[str]:
250 osdspecs = []
251 if specs:
252 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
253 if not osdspecs:
254 self.mgr.log.debug("No OSDSpecs found")
255 return []
f91f0fd5 256 return sum([spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()) for spec in osdspecs], [])
f6b5b4d7 257
f91f0fd5
TL
258 def resolve_osdspecs_for_host(self, host: str,
259 specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
f6b5b4d7
TL
260 matching_specs = []
261 self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
262 if not specs:
263 specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
264 if spec.service_type == 'osd']
265 for spec in specs:
f91f0fd5 266 if host in spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()):
f6b5b4d7
TL
267 self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
268 matching_specs.append(spec)
269 return matching_specs
270
e306af50
TL
271 def _run_ceph_volume_command(self, host: str,
272 cmd: str, env_vars: Optional[List[str]] = None
273 ) -> Tuple[List[str], List[str], int]:
274 self.mgr.inventory.assert_host(host)
275
276 # get bootstrap key
277 ret, keyring, err = self.mgr.check_mon_command({
278 'prefix': 'auth get',
279 'entity': 'client.bootstrap-osd',
280 })
281
e306af50 282 j = json.dumps({
f91f0fd5 283 'config': self.mgr.get_minimal_ceph_conf(),
e306af50
TL
284 'keyring': keyring,
285 })
286
287 split_cmd = cmd.split(' ')
288 _cmd = ['--config-json', '-', '--']
289 _cmd.extend(split_cmd)
f67539c2 290 out, err, code = CephadmServe(self.mgr)._run_cephadm(
e306af50
TL
291 host, 'osd', 'ceph-volume',
292 _cmd,
293 env_vars=env_vars,
294 stdin=j,
295 error_ok=True)
296 return out, err, code
297
298 def get_osdspec_affinity(self, osd_id: str) -> str:
299 return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
300
301 def find_destroyed_osds(self) -> Dict[str, List[str]]:
302 osd_host_map: Dict[str, List[str]] = dict()
303 try:
304 ret, out, err = self.mgr.check_mon_command({
305 'prefix': 'osd tree',
306 'states': ['destroyed'],
307 'format': 'json'
308 })
309 except MonCommandFailed as e:
310 logger.exception('osd tree failed')
311 raise OrchestratorError(str(e))
312 try:
313 tree = json.loads(out)
adb31ebb
TL
314 except ValueError:
315 logger.exception(f'Cannot decode JSON: \'{out}\'')
e306af50
TL
316 return osd_host_map
317
318 nodes = tree.get('nodes', {})
319 for node in nodes:
320 if node.get('type') == 'host':
321 osd_host_map.update(
322 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
323 )
adb31ebb
TL
324 if osd_host_map:
325 self.mgr.log.info(f"Found osd claims -> {osd_host_map}")
e306af50
TL
326 return osd_host_map
327
328
e306af50 329class RemoveUtil(object):
f91f0fd5
TL
330 def __init__(self, mgr: "CephadmOrchestrator") -> None:
331 self.mgr: "CephadmOrchestrator" = mgr
e306af50 332
f6b5b4d7
TL
333 def get_osds_in_cluster(self) -> List[str]:
334 osd_map = self.mgr.get_osdmap()
335 return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
336
337 def osd_df(self) -> dict:
338 base_cmd = 'osd df'
339 ret, out, err = self.mgr.mon_command({
340 'prefix': base_cmd,
341 'format': 'json'
342 })
adb31ebb 343 try:
f67539c2 344 return json.loads(out)
adb31ebb
TL
345 except ValueError:
346 logger.exception(f'Cannot decode JSON: \'{out}\'')
347 return {}
f6b5b4d7
TL
348
349 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
350 if not osd_df:
351 osd_df = self.osd_df()
352 osd_nodes = osd_df.get('nodes', [])
353 for osd_node in osd_nodes:
354 if osd_node.get('id') == int(osd_id):
355 return osd_node.get('pgs', -1)
356 return -1
e306af50 357
f6b5b4d7 358 def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
e306af50 359 """
f6b5b4d7
TL
360 Cut osd_id list in half until it's ok-to-stop
361
362 :param osds: list of osd_ids
363 :return: list of ods_ids that can be stopped at once
e306af50 364 """
f6b5b4d7
TL
365 if not osds:
366 return []
367 while not self.ok_to_stop(osds):
368 if len(osds) <= 1:
369 # can't even stop one OSD, aborting
f91f0fd5
TL
370 self.mgr.log.info(
371 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
f6b5b4d7
TL
372 return []
373
374 # This potentially prolongs the global wait time.
375 self.mgr.event.wait(1)
376 # splitting osd_ids in half until ok_to_stop yields success
377 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
378 # There's a lot of room for micro adjustments here
379 osds = osds[len(osds) // 2:]
380 return osds
381
f67539c2
TL
382 # todo start draining
383 # return all([osd.start_draining() for osd in osds])
f6b5b4d7
TL
384
385 def ok_to_stop(self, osds: List["OSD"]) -> bool:
e306af50 386 cmd_args = {
f6b5b4d7
TL
387 'prefix': "osd ok-to-stop",
388 'ids': [str(osd.osd_id) for osd in osds]
e306af50
TL
389 }
390 return self._run_mon_cmd(cmd_args)
391
f6b5b4d7
TL
392 def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
393 base_cmd = f"osd {flag}"
394 self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
e306af50 395 ret, out, err = self.mgr.mon_command({
f6b5b4d7
TL
396 'prefix': base_cmd,
397 'ids': [str(osd.osd_id) for osd in osds]
e306af50
TL
398 })
399 if ret != 0:
f67539c2
TL
400 self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
401 return False
402 self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
403 return True
404
405 def get_weight(self, osd: "OSD") -> Optional[float]:
406 ret, out, err = self.mgr.mon_command({
407 'prefix': 'osd crush tree',
408 'format': 'json',
409 })
410 if ret != 0:
411 self.mgr.log.error(f"Could not dump crush weights. <{err}>")
412 return None
413 j = json.loads(out)
414 for n in j.get("nodes", []):
415 if n.get("name") == f"osd.{osd.osd_id}":
416 self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
417 return n.get("crush_weight")
418 return None
419
420 def reweight_osd(self, osd: "OSD", weight: float) -> bool:
421 self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
422 ret, out, err = self.mgr.mon_command({
423 'prefix': "osd crush reweight",
424 'name': f"osd.{osd.osd_id}",
425 'weight': weight,
426 })
427 if ret != 0:
428 self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
f6b5b4d7 429 return False
f67539c2 430 self.mgr.log.info(f"{osd} weight is now {weight}")
f6b5b4d7 431 return True
e306af50 432
f6b5b4d7 433 def safe_to_destroy(self, osd_ids: List[int]) -> bool:
e306af50
TL
434 """ Queries the safe-to-destroy flag for OSDs """
435 cmd_args = {'prefix': 'osd safe-to-destroy',
f6b5b4d7 436 'ids': [str(x) for x in osd_ids]}
e306af50
TL
437 return self._run_mon_cmd(cmd_args)
438
439 def destroy_osd(self, osd_id: int) -> bool:
440 """ Destroys an OSD (forcefully) """
441 cmd_args = {'prefix': 'osd destroy-actual',
442 'id': int(osd_id),
443 'yes_i_really_mean_it': True}
444 return self._run_mon_cmd(cmd_args)
445
e306af50
TL
446 def purge_osd(self, osd_id: int) -> bool:
447 """ Purges an OSD from the cluster (forcefully) """
448 cmd_args = {
449 'prefix': 'osd purge-actual',
450 'id': int(osd_id),
451 'yes_i_really_mean_it': True
452 }
453 return self._run_mon_cmd(cmd_args)
454
e306af50
TL
455 def _run_mon_cmd(self, cmd_args: dict) -> bool:
456 """
457 Generic command to run mon_command and evaluate/log the results
458 """
459 ret, out, err = self.mgr.mon_command(cmd_args)
460 if ret != 0:
461 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
462 self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
463 return False
464 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
465 return True
f6b5b4d7 466
f6b5b4d7
TL
467
468class NotFoundError(Exception):
469 pass
470
471
472class OSD:
473
474 def __init__(self,
475 osd_id: int,
476 remove_util: RemoveUtil,
477 drain_started_at: Optional[datetime] = None,
478 process_started_at: Optional[datetime] = None,
479 drain_stopped_at: Optional[datetime] = None,
480 drain_done_at: Optional[datetime] = None,
481 draining: bool = False,
482 started: bool = False,
483 stopped: bool = False,
484 replace: bool = False,
485 force: bool = False,
486 hostname: Optional[str] = None,
f6b5b4d7
TL
487 ):
488 # the ID of the OSD
489 self.osd_id = osd_id
490
491 # when did process (not the actual draining) start
492 self.process_started_at = process_started_at
493
494 # when did the drain start
495 self.drain_started_at = drain_started_at
496
497 # when did the drain stop
498 self.drain_stopped_at = drain_stopped_at
499
500 # when did the drain finish
501 self.drain_done_at = drain_done_at
502
503 # did the draining start
504 self.draining = draining
505
506 # was the operation started
507 self.started = started
508
509 # was the operation stopped
510 self.stopped = stopped
511
512 # If this is a replace or remove operation
513 self.replace = replace
514 # If we wait for the osd to be drained
515 self.force = force
516 # The name of the node
f91f0fd5 517 self.hostname = hostname
f6b5b4d7
TL
518
519 # mgr obj to make mgr/mon calls
adb31ebb 520 self.rm_util: RemoveUtil = remove_util
f6b5b4d7 521
f67539c2
TL
522 self.original_weight: Optional[float] = None
523
f6b5b4d7
TL
524 def start(self) -> None:
525 if self.started:
526 logger.debug(f"Already started draining {self}")
527 return None
528 self.started = True
529 self.stopped = False
530
531 def start_draining(self) -> bool:
532 if self.stopped:
533 logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
534 return False
f67539c2
TL
535 if self.replace:
536 self.rm_util.set_osd_flag([self], 'out')
537 else:
538 self.original_weight = self.rm_util.get_weight(self)
539 self.rm_util.reweight_osd(self, 0.0)
f6b5b4d7
TL
540 self.drain_started_at = datetime.utcnow()
541 self.draining = True
542 logger.debug(f"Started draining {self}.")
543 return True
544
545 def stop_draining(self) -> bool:
f67539c2
TL
546 if self.replace:
547 self.rm_util.set_osd_flag([self], 'in')
548 else:
549 if self.original_weight:
550 self.rm_util.reweight_osd(self, self.original_weight)
f6b5b4d7
TL
551 self.drain_stopped_at = datetime.utcnow()
552 self.draining = False
553 logger.debug(f"Stopped draining {self}.")
554 return True
555
556 def stop(self) -> None:
557 if self.stopped:
558 logger.debug(f"Already stopped draining {self}")
559 return None
560 self.started = False
561 self.stopped = True
562 self.stop_draining()
563
564 @property
565 def is_draining(self) -> bool:
566 """
567 Consider an OSD draining when it is
568 actively draining but not yet empty
569 """
570 return self.draining and not self.is_empty
571
572 @property
573 def is_ok_to_stop(self) -> bool:
574 return self.rm_util.ok_to_stop([self])
575
576 @property
577 def is_empty(self) -> bool:
578 if self.get_pg_count() == 0:
579 if not self.drain_done_at:
580 self.drain_done_at = datetime.utcnow()
581 self.draining = False
582 return True
583 return False
584
585 def safe_to_destroy(self) -> bool:
586 return self.rm_util.safe_to_destroy([self.osd_id])
587
588 def down(self) -> bool:
589 return self.rm_util.set_osd_flag([self], 'down')
590
591 def destroy(self) -> bool:
592 return self.rm_util.destroy_osd(self.osd_id)
593
594 def purge(self) -> bool:
595 return self.rm_util.purge_osd(self.osd_id)
596
597 def get_pg_count(self) -> int:
598 return self.rm_util.get_pg_count(self.osd_id)
599
600 @property
601 def exists(self) -> bool:
602 return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
603
f91f0fd5 604 def drain_status_human(self) -> str:
f6b5b4d7
TL
605 default_status = 'not started'
606 status = 'started' if self.started and not self.draining else default_status
607 status = 'draining' if self.draining else status
608 status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
609 return status
610
f91f0fd5 611 def pg_count_str(self) -> str:
f6b5b4d7
TL
612 return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
613
614 def to_json(self) -> dict:
f91f0fd5 615 out: Dict[str, Any] = dict()
f6b5b4d7
TL
616 out['osd_id'] = self.osd_id
617 out['started'] = self.started
618 out['draining'] = self.draining
619 out['stopped'] = self.stopped
620 out['replace'] = self.replace
621 out['force'] = self.force
f91f0fd5 622 out['hostname'] = self.hostname # type: ignore
f6b5b4d7
TL
623
624 for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
625 if getattr(self, k):
f91f0fd5 626 out[k] = datetime_to_str(getattr(self, k))
f6b5b4d7
TL
627 else:
628 out[k] = getattr(self, k)
629 return out
630
631 @classmethod
adb31ebb 632 def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
f6b5b4d7
TL
633 if not inp:
634 return None
635 for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
636 if inp.get(date_field):
f91f0fd5 637 inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
adb31ebb 638 inp.update({'remove_util': rm_util})
f91f0fd5
TL
639 if 'nodename' in inp:
640 hostname = inp.pop('nodename')
641 inp['hostname'] = hostname
f6b5b4d7
TL
642 return cls(**inp)
643
f91f0fd5 644 def __hash__(self) -> int:
f6b5b4d7
TL
645 return hash(self.osd_id)
646
647 def __eq__(self, other: object) -> bool:
648 if not isinstance(other, OSD):
649 return NotImplemented
650 return self.osd_id == other.osd_id
651
652 def __repr__(self) -> str:
f67539c2 653 return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
f6b5b4d7
TL
654
655
adb31ebb
TL
656class OSDRemovalQueue(object):
657
658 def __init__(self, mgr: "CephadmOrchestrator") -> None:
659 self.mgr: "CephadmOrchestrator" = mgr
660 self.osds: Set[OSD] = set()
661 self.rm_util = RemoveUtil(mgr)
662
663 # locks multithreaded access to self.osds. Please avoid locking
664 # network calls, like mon commands.
665 self.lock = Lock()
666
667 def process_removal_queue(self) -> None:
668 """
669 Performs actions in the _serve() loop to remove an OSD
670 when criteria is met.
671
672 we can't hold self.lock, as we're calling _remove_daemon in the loop
673 """
674
675 # make sure that we don't run on OSDs that are not in the cluster anymore.
676 self.cleanup()
677
678 # find osds that are ok-to-stop and not yet draining
679 ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
680 if ok_to_stop_osds:
681 # start draining those
682 _ = [osd.start_draining() for osd in ok_to_stop_osds]
683
684 all_osds = self.all_osds()
685
686 logger.debug(
687 f"{self.queue_size()} OSDs are scheduled "
688 f"for removal: {all_osds}")
689
690 # Check all osds for their state and take action (remove, purge etc)
691 new_queue: Set[OSD] = set()
692 for osd in all_osds: # type: OSD
693 if not osd.force:
694 # skip criteria
695 if not osd.is_empty:
f67539c2 696 logger.debug(f"{osd} is not empty yet. Waiting a bit more")
adb31ebb
TL
697 new_queue.add(osd)
698 continue
699
700 if not osd.safe_to_destroy():
f67539c2
TL
701 logger.debug(
702 f"{osd} is not safe-to-destroy yet. Waiting a bit more")
adb31ebb
TL
703 new_queue.add(osd)
704 continue
705
706 # abort criteria
707 if not osd.down():
708 # also remove it from the remove_osd list and set a health_check warning?
709 raise orchestrator.OrchestratorError(
f67539c2
TL
710 f"Could not mark {osd} down")
711
712 # stop and remove daemon
713 assert osd.hostname is not None
714 CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
715 logger.info(f"Successfully removed {osd} on {osd.hostname}")
f6b5b4d7 716
adb31ebb 717 if osd.replace:
f67539c2 718 # mark destroyed in osdmap
adb31ebb
TL
719 if not osd.destroy():
720 raise orchestrator.OrchestratorError(
f67539c2
TL
721 f"Could not destroy {osd}")
722 logger.info(
723 f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
adb31ebb 724 else:
f67539c2 725 # purge from osdmap
adb31ebb 726 if not osd.purge():
f67539c2
TL
727 raise orchestrator.OrchestratorError(f"Could not purge {osd}")
728 logger.info(f"Successfully purged {osd} on {osd.hostname}")
adb31ebb 729
f67539c2 730 logger.debug(f"Removing {osd} from the queue.")
adb31ebb
TL
731
732 # self could change while this is processing (osds get added from the CLI)
733 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
734 # osds that were added while this method was executed'
735 with self.lock:
736 self.osds.intersection_update(new_queue)
737 self._save_to_store()
738
739 def cleanup(self) -> None:
740 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
741 with self.lock:
742 for osd in self._not_in_cluster():
743 self.osds.remove(osd)
744
745 def _save_to_store(self) -> None:
746 osd_queue = [osd.to_json() for osd in self.osds]
747 logger.debug(f"Saving {osd_queue} to store")
748 self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
749
750 def load_from_store(self) -> None:
751 with self.lock:
752 for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
753 for osd in json.loads(v):
754 logger.debug(f"Loading osd ->{osd} from store")
755 osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
756 if osd_obj is not None:
757 self.osds.add(osd_obj)
f6b5b4d7
TL
758
759 def as_osd_ids(self) -> List[int]:
adb31ebb
TL
760 with self.lock:
761 return [osd.osd_id for osd in self.osds]
f6b5b4d7
TL
762
763 def queue_size(self) -> int:
adb31ebb
TL
764 with self.lock:
765 return len(self.osds)
f6b5b4d7
TL
766
767 def draining_osds(self) -> List["OSD"]:
adb31ebb
TL
768 with self.lock:
769 return [osd for osd in self.osds if osd.is_draining]
f6b5b4d7
TL
770
771 def idling_osds(self) -> List["OSD"]:
adb31ebb
TL
772 with self.lock:
773 return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
f6b5b4d7
TL
774
775 def empty_osds(self) -> List["OSD"]:
adb31ebb
TL
776 with self.lock:
777 return [osd for osd in self.osds if osd.is_empty]
f6b5b4d7
TL
778
779 def all_osds(self) -> List["OSD"]:
adb31ebb
TL
780 with self.lock:
781 return [osd for osd in self.osds]
f6b5b4d7 782
adb31ebb
TL
783 def _not_in_cluster(self) -> List["OSD"]:
784 return [osd for osd in self.osds if not osd.exists]
f6b5b4d7
TL
785
786 def enqueue(self, osd: "OSD") -> None:
787 if not osd.exists:
788 raise NotFoundError()
adb31ebb
TL
789 with self.lock:
790 self.osds.add(osd)
f6b5b4d7
TL
791 osd.start()
792
793 def rm(self, osd: "OSD") -> None:
794 if not osd.exists:
795 raise NotFoundError()
796 osd.stop()
adb31ebb
TL
797 with self.lock:
798 try:
799 logger.debug(f'Removing {osd} from the queue.')
800 self.osds.remove(osd)
801 except KeyError:
802 logger.debug(f"Could not find {osd} in queue.")
803 raise KeyError
804
805 def __eq__(self, other: Any) -> bool:
806 if not isinstance(other, OSDRemovalQueue):
807 return False
808 with self.lock:
809 return self.osds == other.osds