]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/osd.py
31771fb5fcebe4cdf404dac344d5a630ee51da58
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
1 import json
2 import logging
3 from asyncio import gather
4 from threading import Lock
5 from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
6
7 from ceph.deployment import translate
8 from ceph.deployment.drive_group import DriveGroupSpec
9 from ceph.deployment.drive_selection import DriveSelection
10 from ceph.deployment.inventory import Device
11 from ceph.utils import datetime_to_str, str_to_datetime
12
13 from datetime import datetime
14 import orchestrator
15 from cephadm.serve import CephadmServe
16 from ceph.utils import datetime_now
17 from orchestrator import OrchestratorError, DaemonDescription
18 from mgr_module import MonCommandFailed
19
20 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
21
22 if TYPE_CHECKING:
23 from cephadm.module import CephadmOrchestrator
24
25 logger = logging.getLogger(__name__)
26
27
28 class OSDService(CephService):
29 TYPE = 'osd'
30
31 def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
32 logger.debug(f"Processing DriveGroup {drive_group}")
33 osd_id_claims = OsdIdClaims(self.mgr)
34 if osd_id_claims.get():
35 logger.info(
36 f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
37
38 async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
39 # skip this host if there has been no change in inventory
40 if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
41 self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
42 host, drive_group))
43 return None
44 # skip this host if we cannot schedule here
45 if self.mgr.inventory.has_label(host, '_no_schedule'):
46 return None
47
48 osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
49
50 cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
51 osd_id_claims_for_host)
52 if not cmds:
53 logger.debug("No data_devices, skipping DriveGroup: {}".format(
54 drive_group.service_id))
55 return None
56
57 logger.debug('Applying service osd.%s on host %s...' % (
58 drive_group.service_id, host
59 ))
60 start_ts = datetime_now()
61 env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
62 ret_msg = await self.create_single_host(
63 drive_group, host, cmds,
64 replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
65 )
66 self.mgr.cache.update_osdspec_last_applied(
67 host, drive_group.service_name(), start_ts
68 )
69 self.mgr.cache.save_host(host)
70 return ret_msg
71
72 async def all_hosts() -> List[Optional[str]]:
73 futures = [create_from_spec_one(h, ds)
74 for h, ds in self.prepare_drivegroup(drive_group)]
75 return await gather(*futures)
76
77 ret = self.mgr.wait_async(all_hosts())
78 return ", ".join(filter(None, ret))
79
80 async def create_single_host(self,
81 drive_group: DriveGroupSpec,
82 host: str, cmds: List[str], replace_osd_ids: List[str],
83 env_vars: Optional[List[str]] = None) -> str:
84 for cmd in cmds:
85 out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
86 if code == 1 and ', it is already prepared' in '\n'.join(err):
87 # HACK: when we create against an existing LV, ceph-volume
88 # returns an error and the above message. To make this
89 # command idempotent, tolerate this "error" and continue.
90 logger.debug('the device was already prepared; continuing')
91 code = 0
92 if code:
93 raise RuntimeError(
94 'cephadm exited with an error code: %d, stderr:%s' % (
95 code, '\n'.join(err)))
96 return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
97 replace_osd_ids)
98
99 async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
100 replace_osd_ids: Optional[List[str]] = None) -> str:
101
102 if replace_osd_ids is None:
103 replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
104 assert replace_osd_ids is not None
105
106 # check result: lvm
107 osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
108 host, 'osd', 'ceph-volume',
109 [
110 '--',
111 'lvm', 'list',
112 '--format', 'json',
113 ])
114 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
115 fsid = self.mgr._cluster_fsid
116 osd_uuid_map = self.mgr.get_osd_uuid_map()
117 created = []
118 for osd_id, osds in osds_elems.items():
119 for osd in osds:
120 if osd['type'] == 'db':
121 continue
122 if osd['tags']['ceph.cluster_fsid'] != fsid:
123 logger.debug('mismatched fsid, skipping %s' % osd)
124 continue
125 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
126 # if it exists but is part of the replacement operation, don't skip
127 continue
128 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
129 # cephadm daemon instance already exists
130 logger.debug(f'osd id {osd_id} daemon already exists')
131 continue
132 if osd_id not in osd_uuid_map:
133 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
134 continue
135 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
136 logger.debug('mismatched osd uuid (cluster has %s, osd '
137 'has %s)' % (
138 osd_uuid_map.get(osd_id),
139 osd['tags']['ceph.osd_fsid']))
140 continue
141
142 created.append(osd_id)
143 daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
144 service_name=service_name,
145 daemon_id=str(osd_id),
146 host=host,
147 daemon_type='osd',
148 )
149 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
150 await CephadmServe(self.mgr)._create_daemon(
151 daemon_spec,
152 osd_uuid_map=osd_uuid_map)
153
154 # check result: raw
155 raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
156 host, 'osd', 'ceph-volume',
157 [
158 '--',
159 'raw', 'list',
160 '--format', 'json',
161 ])
162 for osd_uuid, osd in raw_elems.items():
163 if osd.get('ceph_fsid') != fsid:
164 continue
165 osd_id = str(osd.get('osd_id', '-1'))
166 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
167 # if it exists but is part of the replacement operation, don't skip
168 continue
169 if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
170 # cephadm daemon instance already exists
171 logger.debug(f'osd id {osd_id} daemon already exists')
172 continue
173 if osd_id not in osd_uuid_map:
174 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
175 continue
176 if osd_uuid_map.get(osd_id) != osd_uuid:
177 logger.debug('mismatched osd uuid (cluster has %s, osd '
178 'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid))
179 continue
180 if osd_id in created:
181 continue
182
183 created.append(osd_id)
184 daemon_spec = CephadmDaemonDeploySpec(
185 service_name=service_name,
186 daemon_id=osd_id,
187 host=host,
188 daemon_type='osd',
189 )
190 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
191 await CephadmServe(self.mgr)._create_daemon(
192 daemon_spec,
193 osd_uuid_map=osd_uuid_map)
194
195 if created:
196 self.mgr.cache.invalidate_host_devices(host)
197 self.mgr.cache.invalidate_autotune(host)
198 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
199 else:
200 return "Created no osd(s) on host %s; already created?" % host
201
202 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
203 # 1) use fn_filter to determine matching_hosts
204 matching_hosts = drive_group.placement.filter_matching_hostspecs(
205 self.mgr.cache.get_schedulable_hosts())
206 # 2) Map the inventory to the InventoryHost object
207 host_ds_map = []
208
209 # set osd_id_claims
210
211 def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
212 # This is stupid and needs to be loaded with the host
213 for _host, _inventory in inventory_dict.items():
214 if _host == hostname:
215 return _inventory
216 raise OrchestratorError("No inventory found for host: {}".format(hostname))
217
218 # 3) iterate over matching_host and call DriveSelection
219 logger.debug(f"Checking matching hosts -> {matching_hosts}")
220 for host in matching_hosts:
221 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
222 logger.debug(f"Found inventory for host {inventory_for_host}")
223
224 # List of Daemons on that host
225 dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
226 dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]
227
228 drive_selection = DriveSelection(drive_group, inventory_for_host,
229 existing_daemons=len(dd_for_spec_and_host))
230 logger.debug(f"Found drive selection {drive_selection}")
231 if drive_group.method and drive_group.method == 'raw':
232 # ceph-volume can currently only handle a 1:1 mapping
233 # of data/db/wal devices for raw mode osds. If db/wal devices
234 # are defined and the number does not match the number of data
235 # devices, we need to bail out
236 if drive_selection.data_devices() and drive_selection.db_devices():
237 if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
238 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
239 f'{len(drive_selection.data_devices())} potential data device(s) and '
240 f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
241 if drive_selection.data_devices() and drive_selection.wal_devices():
242 if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
243 raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
244 f'{len(drive_selection.data_devices())} potential data device(s) and '
245 f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
246 host_ds_map.append((host, drive_selection))
247 return host_ds_map
248
249 @staticmethod
250 def driveselection_to_ceph_volume(drive_selection: DriveSelection,
251 osd_id_claims: Optional[List[str]] = None,
252 preview: bool = False) -> List[str]:
253 logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
254 cmds: List[str] = translate.to_ceph_volume(drive_selection,
255 osd_id_claims, preview=preview).run()
256 logger.debug(f"Resulting ceph-volume cmds: {cmds}")
257 return cmds
258
259 def get_previews(self, host: str) -> List[Dict[str, Any]]:
260 # Find OSDSpecs that match host.
261 osdspecs = self.resolve_osdspecs_for_host(host)
262 return self.generate_previews(osdspecs, host)
263
264 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
265 """
266
267 The return should look like this:
268
269 [
270 {'data': {<metadata>},
271 'osdspec': <name of osdspec>,
272 'host': <name of host>,
273 'notes': <notes>
274 },
275
276 {'data': ...,
277 'osdspec': ..,
278 'host': ...,
279 'notes': ...
280 }
281 ]
282
283 Note: One host can have multiple previews based on its assigned OSDSpecs.
284 """
285 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
286 ret_all: List[Dict[str, Any]] = []
287 if not osdspecs:
288 return ret_all
289 for osdspec in osdspecs:
290
291 # populate osd_id_claims
292 osd_id_claims = OsdIdClaims(self.mgr)
293
294 # prepare driveselection
295 for host, ds in self.prepare_drivegroup(osdspec):
296 if host != for_host:
297 continue
298
299 # driveselection for host
300 cmds: List[str] = self.driveselection_to_ceph_volume(ds,
301 osd_id_claims.filtered_by_host(
302 host),
303 preview=True)
304 if not cmds:
305 logger.debug("No data_devices, skipping DriveGroup: {}".format(
306 osdspec.service_name()))
307 continue
308
309 # get preview data from ceph-volume
310 for cmd in cmds:
311 out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
312 if out:
313 try:
314 concat_out: Dict[str, Any] = json.loads(' '.join(out))
315 except ValueError:
316 logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
317 concat_out = {}
318 notes = []
319 if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
320 found = len(concat_out)
321 limit = osdspec.data_devices.limit
322 notes.append(
323 f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
324 ret_all.append({'data': concat_out,
325 'osdspec': osdspec.service_id,
326 'host': host,
327 'notes': notes})
328 return ret_all
329
330 def resolve_hosts_for_osdspecs(self,
331 specs: Optional[List[DriveGroupSpec]] = None
332 ) -> List[str]:
333 osdspecs = []
334 if specs:
335 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
336 if not osdspecs:
337 self.mgr.log.debug("No OSDSpecs found")
338 return []
339 return sum([spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()) for spec in osdspecs], [])
340
341 def resolve_osdspecs_for_host(self, host: str,
342 specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
343 matching_specs = []
344 self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
345 if not specs:
346 specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
347 if spec.service_type == 'osd']
348 for spec in specs:
349 if host in spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()):
350 self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
351 matching_specs.append(spec)
352 return matching_specs
353
354 async def _run_ceph_volume_command(self, host: str,
355 cmd: str, env_vars: Optional[List[str]] = None
356 ) -> Tuple[List[str], List[str], int]:
357 self.mgr.inventory.assert_host(host)
358
359 # get bootstrap key
360 ret, keyring, err = self.mgr.check_mon_command({
361 'prefix': 'auth get',
362 'entity': 'client.bootstrap-osd',
363 })
364
365 j = json.dumps({
366 'config': self.mgr.get_minimal_ceph_conf(),
367 'keyring': keyring,
368 })
369
370 split_cmd = cmd.split(' ')
371 _cmd = ['--config-json', '-', '--']
372 _cmd.extend(split_cmd)
373 out, err, code = await CephadmServe(self.mgr)._run_cephadm(
374 host, 'osd', 'ceph-volume',
375 _cmd,
376 env_vars=env_vars,
377 stdin=j,
378 error_ok=True)
379 return out, err, code
380
381 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
382 # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
383 # we cannot recover from it. The OSD keys are created by ceph-volume and not by
384 # us.
385 if not is_failed_deploy:
386 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
387
388
389 class OsdIdClaims(object):
390 """
391 Retrieve and provide osd ids that can be reused in the cluster
392 """
393
394 def __init__(self, mgr: "CephadmOrchestrator") -> None:
395 self.mgr: "CephadmOrchestrator" = mgr
396 self.osd_host_map: Dict[str, List[str]] = dict()
397 self.refresh()
398
399 def refresh(self) -> None:
400 try:
401 ret, out, err = self.mgr.check_mon_command({
402 'prefix': 'osd tree',
403 'states': ['destroyed'],
404 'format': 'json'
405 })
406 except MonCommandFailed as e:
407 logger.exception('osd tree failed')
408 raise OrchestratorError(str(e))
409 try:
410 tree = json.loads(out)
411 except ValueError:
412 logger.exception(f'Cannot decode JSON: \'{out}\'')
413 return
414
415 nodes = tree.get('nodes', {})
416 for node in nodes:
417 if node.get('type') == 'host':
418 self.osd_host_map.update(
419 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
420 )
421 if self.osd_host_map:
422 self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")
423
424 def get(self) -> Dict[str, List[str]]:
425 return self.osd_host_map
426
427 def filtered_by_host(self, host: str) -> List[str]:
428 """
429 Return the list of osd ids that can be reused in a host
430
431 OSD id claims in CRUSH map are linked to the bare name of
432 the hostname. In case of FQDN hostnames the host is searched by the
433 bare name
434 """
435 return self.osd_host_map.get(host.split(".")[0], [])
436
437
438 class RemoveUtil(object):
439 def __init__(self, mgr: "CephadmOrchestrator") -> None:
440 self.mgr: "CephadmOrchestrator" = mgr
441
442 def get_osds_in_cluster(self) -> List[str]:
443 osd_map = self.mgr.get_osdmap()
444 return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
445
446 def osd_df(self) -> dict:
447 base_cmd = 'osd df'
448 ret, out, err = self.mgr.mon_command({
449 'prefix': base_cmd,
450 'format': 'json'
451 })
452 try:
453 return json.loads(out)
454 except ValueError:
455 logger.exception(f'Cannot decode JSON: \'{out}\'')
456 return {}
457
458 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
459 if not osd_df:
460 osd_df = self.osd_df()
461 osd_nodes = osd_df.get('nodes', [])
462 for osd_node in osd_nodes:
463 if osd_node.get('id') == int(osd_id):
464 return osd_node.get('pgs', -1)
465 return -1
466
467 def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
468 """
469 Cut osd_id list in half until it's ok-to-stop
470
471 :param osds: list of osd_ids
472 :return: list of ods_ids that can be stopped at once
473 """
474 if not osds:
475 return []
476 while not self.ok_to_stop(osds):
477 if len(osds) <= 1:
478 # can't even stop one OSD, aborting
479 self.mgr.log.debug(
480 "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
481 return []
482
483 # This potentially prolongs the global wait time.
484 self.mgr.event.wait(1)
485 # splitting osd_ids in half until ok_to_stop yields success
486 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
487 # There's a lot of room for micro adjustments here
488 osds = osds[len(osds) // 2:]
489 return osds
490
491 # todo start draining
492 # return all([osd.start_draining() for osd in osds])
493
494 def ok_to_stop(self, osds: List["OSD"]) -> bool:
495 cmd_args = {
496 'prefix': "osd ok-to-stop",
497 'ids': [str(osd.osd_id) for osd in osds]
498 }
499 return self._run_mon_cmd(cmd_args, error_ok=True)
500
501 def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
502 base_cmd = f"osd {flag}"
503 self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
504 ret, out, err = self.mgr.mon_command({
505 'prefix': base_cmd,
506 'ids': [str(osd.osd_id) for osd in osds]
507 })
508 if ret != 0:
509 self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
510 return False
511 self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
512 return True
513
514 def get_weight(self, osd: "OSD") -> Optional[float]:
515 ret, out, err = self.mgr.mon_command({
516 'prefix': 'osd crush tree',
517 'format': 'json',
518 })
519 if ret != 0:
520 self.mgr.log.error(f"Could not dump crush weights. <{err}>")
521 return None
522 j = json.loads(out)
523 for n in j.get("nodes", []):
524 if n.get("name") == f"osd.{osd.osd_id}":
525 self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
526 return n.get("crush_weight")
527 return None
528
529 def reweight_osd(self, osd: "OSD", weight: float) -> bool:
530 self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
531 ret, out, err = self.mgr.mon_command({
532 'prefix': "osd crush reweight",
533 'name': f"osd.{osd.osd_id}",
534 'weight': weight,
535 })
536 if ret != 0:
537 self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
538 return False
539 self.mgr.log.info(f"{osd} weight is now {weight}")
540 return True
541
542 def zap_osd(self, osd: "OSD") -> str:
543 "Zaps all devices that are associated with an OSD"
544 if osd.hostname is not None:
545 out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
546 osd.hostname, 'osd', 'ceph-volume',
547 ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)],
548 error_ok=True))
549 self.mgr.cache.invalidate_host_devices(osd.hostname)
550 if code:
551 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
552 return '\n'.join(out + err)
553 raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")
554
555 def safe_to_destroy(self, osd_ids: List[int]) -> bool:
556 """ Queries the safe-to-destroy flag for OSDs """
557 cmd_args = {'prefix': 'osd safe-to-destroy',
558 'ids': [str(x) for x in osd_ids]}
559 return self._run_mon_cmd(cmd_args, error_ok=True)
560
561 def destroy_osd(self, osd_id: int) -> bool:
562 """ Destroys an OSD (forcefully) """
563 cmd_args = {'prefix': 'osd destroy-actual',
564 'id': int(osd_id),
565 'yes_i_really_mean_it': True}
566 return self._run_mon_cmd(cmd_args)
567
568 def purge_osd(self, osd_id: int) -> bool:
569 """ Purges an OSD from the cluster (forcefully) """
570 cmd_args = {
571 'prefix': 'osd purge-actual',
572 'id': int(osd_id),
573 'yes_i_really_mean_it': True
574 }
575 return self._run_mon_cmd(cmd_args)
576
577 def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool:
578 """
579 Generic command to run mon_command and evaluate/log the results
580 """
581 ret, out, err = self.mgr.mon_command(cmd_args)
582 if ret != 0:
583 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
584 if not error_ok:
585 self.mgr.log.error(
586 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
587 return False
588 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
589 return True
590
591
592 class NotFoundError(Exception):
593 pass
594
595
596 class OSD:
597
598 def __init__(self,
599 osd_id: int,
600 remove_util: RemoveUtil,
601 drain_started_at: Optional[datetime] = None,
602 process_started_at: Optional[datetime] = None,
603 drain_stopped_at: Optional[datetime] = None,
604 drain_done_at: Optional[datetime] = None,
605 draining: bool = False,
606 started: bool = False,
607 stopped: bool = False,
608 replace: bool = False,
609 force: bool = False,
610 hostname: Optional[str] = None,
611 zap: bool = False):
612 # the ID of the OSD
613 self.osd_id = osd_id
614
615 # when did process (not the actual draining) start
616 self.process_started_at = process_started_at
617
618 # when did the drain start
619 self.drain_started_at = drain_started_at
620
621 # when did the drain stop
622 self.drain_stopped_at = drain_stopped_at
623
624 # when did the drain finish
625 self.drain_done_at = drain_done_at
626
627 # did the draining start
628 self.draining = draining
629
630 # was the operation started
631 self.started = started
632
633 # was the operation stopped
634 self.stopped = stopped
635
636 # If this is a replace or remove operation
637 self.replace = replace
638 # If we wait for the osd to be drained
639 self.force = force
640 # The name of the node
641 self.hostname = hostname
642
643 # mgr obj to make mgr/mon calls
644 self.rm_util: RemoveUtil = remove_util
645
646 self.original_weight: Optional[float] = None
647
648 # Whether devices associated with the OSD should be zapped (DATA ERASED)
649 self.zap = zap
650
651 def start(self) -> None:
652 if self.started:
653 logger.debug(f"Already started draining {self}")
654 return None
655 self.started = True
656 self.stopped = False
657
658 def start_draining(self) -> bool:
659 if self.stopped:
660 logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
661 return False
662 if self.replace:
663 self.rm_util.set_osd_flag([self], 'out')
664 else:
665 self.original_weight = self.rm_util.get_weight(self)
666 self.rm_util.reweight_osd(self, 0.0)
667 self.drain_started_at = datetime.utcnow()
668 self.draining = True
669 logger.debug(f"Started draining {self}.")
670 return True
671
672 def stop_draining(self) -> bool:
673 if self.replace:
674 self.rm_util.set_osd_flag([self], 'in')
675 else:
676 if self.original_weight:
677 self.rm_util.reweight_osd(self, self.original_weight)
678 self.drain_stopped_at = datetime.utcnow()
679 self.draining = False
680 logger.debug(f"Stopped draining {self}.")
681 return True
682
683 def stop(self) -> None:
684 if self.stopped:
685 logger.debug(f"Already stopped draining {self}")
686 return None
687 self.started = False
688 self.stopped = True
689 self.stop_draining()
690
691 @property
692 def is_draining(self) -> bool:
693 """
694 Consider an OSD draining when it is
695 actively draining but not yet empty
696 """
697 return self.draining and not self.is_empty
698
699 @property
700 def is_ok_to_stop(self) -> bool:
701 return self.rm_util.ok_to_stop([self])
702
703 @property
704 def is_empty(self) -> bool:
705 if self.get_pg_count() == 0:
706 if not self.drain_done_at:
707 self.drain_done_at = datetime.utcnow()
708 self.draining = False
709 return True
710 return False
711
712 def safe_to_destroy(self) -> bool:
713 return self.rm_util.safe_to_destroy([self.osd_id])
714
715 def down(self) -> bool:
716 return self.rm_util.set_osd_flag([self], 'down')
717
718 def destroy(self) -> bool:
719 return self.rm_util.destroy_osd(self.osd_id)
720
721 def do_zap(self) -> str:
722 return self.rm_util.zap_osd(self)
723
724 def purge(self) -> bool:
725 return self.rm_util.purge_osd(self.osd_id)
726
727 def get_pg_count(self) -> int:
728 return self.rm_util.get_pg_count(self.osd_id)
729
730 @property
731 def exists(self) -> bool:
732 return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
733
734 def drain_status_human(self) -> str:
735 default_status = 'not started'
736 status = 'started' if self.started and not self.draining else default_status
737 status = 'draining' if self.draining else status
738 status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
739 return status
740
741 def pg_count_str(self) -> str:
742 return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
743
744 def to_json(self) -> dict:
745 out: Dict[str, Any] = dict()
746 out['osd_id'] = self.osd_id
747 out['started'] = self.started
748 out['draining'] = self.draining
749 out['stopped'] = self.stopped
750 out['replace'] = self.replace
751 out['force'] = self.force
752 out['zap'] = self.zap
753 out['hostname'] = self.hostname # type: ignore
754
755 for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
756 if getattr(self, k):
757 out[k] = datetime_to_str(getattr(self, k))
758 else:
759 out[k] = getattr(self, k)
760 return out
761
762 @classmethod
763 def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
764 if not inp:
765 return None
766 for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
767 if inp.get(date_field):
768 inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
769 inp.update({'remove_util': rm_util})
770 if 'nodename' in inp:
771 hostname = inp.pop('nodename')
772 inp['hostname'] = hostname
773 return cls(**inp)
774
775 def __hash__(self) -> int:
776 return hash(self.osd_id)
777
778 def __eq__(self, other: object) -> bool:
779 if not isinstance(other, OSD):
780 return NotImplemented
781 return self.osd_id == other.osd_id
782
783 def __repr__(self) -> str:
784 return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
785
786
787 class OSDRemovalQueue(object):
788
789 def __init__(self, mgr: "CephadmOrchestrator") -> None:
790 self.mgr: "CephadmOrchestrator" = mgr
791 self.osds: Set[OSD] = set()
792 self.rm_util = RemoveUtil(mgr)
793
794 # locks multithreaded access to self.osds. Please avoid locking
795 # network calls, like mon commands.
796 self.lock = Lock()
797
798 def process_removal_queue(self) -> None:
799 """
800 Performs actions in the _serve() loop to remove an OSD
801 when criteria is met.
802
803 we can't hold self.lock, as we're calling _remove_daemon in the loop
804 """
805
806 # make sure that we don't run on OSDs that are not in the cluster anymore.
807 self.cleanup()
808
809 # find osds that are ok-to-stop and not yet draining
810 ready_to_drain_osds = self._ready_to_drain_osds()
811 if ready_to_drain_osds:
812 # start draining those
813 _ = [osd.start_draining() for osd in ready_to_drain_osds]
814
815 all_osds = self.all_osds()
816
817 logger.debug(
818 f"{self.queue_size()} OSDs are scheduled "
819 f"for removal: {all_osds}")
820
821 # Check all osds for their state and take action (remove, purge etc)
822 new_queue: Set[OSD] = set()
823 for osd in all_osds: # type: OSD
824 if not osd.force:
825 # skip criteria
826 if not osd.is_empty:
827 logger.debug(f"{osd} is not empty yet. Waiting a bit more")
828 new_queue.add(osd)
829 continue
830
831 if not osd.safe_to_destroy():
832 logger.debug(
833 f"{osd} is not safe-to-destroy yet. Waiting a bit more")
834 new_queue.add(osd)
835 continue
836
837 # abort criteria
838 if not osd.down():
839 # also remove it from the remove_osd list and set a health_check warning?
840 raise orchestrator.OrchestratorError(
841 f"Could not mark {osd} down")
842
843 # stop and remove daemon
844 assert osd.hostname is not None
845
846 if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
847 CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
848 logger.info(f"Successfully removed {osd} on {osd.hostname}")
849 else:
850 logger.info(f"Daemon {osd} on {osd.hostname} was already removed")
851
852 if osd.replace:
853 # mark destroyed in osdmap
854 if not osd.destroy():
855 raise orchestrator.OrchestratorError(
856 f"Could not destroy {osd}")
857 logger.info(
858 f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
859 else:
860 # purge from osdmap
861 if not osd.purge():
862 raise orchestrator.OrchestratorError(f"Could not purge {osd}")
863 logger.info(f"Successfully purged {osd} on {osd.hostname}")
864
865 if osd.zap:
866 # throws an exception if the zap fails
867 logger.info(f"Zapping devices for {osd} on {osd.hostname}")
868 osd.do_zap()
869 logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
870
871 logger.debug(f"Removing {osd} from the queue.")
872
873 # self could change while this is processing (osds get added from the CLI)
874 # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
875 # osds that were added while this method was executed'
876 with self.lock:
877 self.osds.intersection_update(new_queue)
878 self._save_to_store()
879
880 def cleanup(self) -> None:
881 # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
882 with self.lock:
883 for osd in self._not_in_cluster():
884 self.osds.remove(osd)
885
886 def _ready_to_drain_osds(self) -> List["OSD"]:
887 """
888 Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
889 be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs
890 that are already draining.
891 """
892 draining_limit = max(1, self.mgr.max_osd_draining_count)
893 num_already_draining = len(self.draining_osds())
894 num_to_start_draining = max(0, draining_limit - num_already_draining)
895 stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
896 return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]
897
898 def _save_to_store(self) -> None:
899 osd_queue = [osd.to_json() for osd in self.osds]
900 logger.debug(f"Saving {osd_queue} to store")
901 self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
902
903 def load_from_store(self) -> None:
904 with self.lock:
905 for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
906 for osd in json.loads(v):
907 logger.debug(f"Loading osd ->{osd} from store")
908 osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
909 if osd_obj is not None:
910 self.osds.add(osd_obj)
911
912 def as_osd_ids(self) -> List[int]:
913 with self.lock:
914 return [osd.osd_id for osd in self.osds]
915
916 def queue_size(self) -> int:
917 with self.lock:
918 return len(self.osds)
919
920 def draining_osds(self) -> List["OSD"]:
921 with self.lock:
922 return [osd for osd in self.osds if osd.is_draining]
923
924 def idling_osds(self) -> List["OSD"]:
925 with self.lock:
926 return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
927
928 def empty_osds(self) -> List["OSD"]:
929 with self.lock:
930 return [osd for osd in self.osds if osd.is_empty]
931
932 def all_osds(self) -> List["OSD"]:
933 with self.lock:
934 return [osd for osd in self.osds]
935
936 def _not_in_cluster(self) -> List["OSD"]:
937 return [osd for osd in self.osds if not osd.exists]
938
939 def enqueue(self, osd: "OSD") -> None:
940 if not osd.exists:
941 raise NotFoundError()
942 with self.lock:
943 self.osds.add(osd)
944 osd.start()
945
946 def rm(self, osd: "OSD") -> None:
947 if not osd.exists:
948 raise NotFoundError()
949 osd.stop()
950 with self.lock:
951 try:
952 logger.debug(f'Removing {osd} from the queue.')
953 self.osds.remove(osd)
954 except KeyError:
955 logger.debug(f"Could not find {osd} in queue.")
956 raise KeyError
957
958 def __eq__(self, other: Any) -> bool:
959 if not isinstance(other, OSDRemovalQueue):
960 return False
961 with self.lock:
962 return self.osds == other.osds