]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
1 import datetime
2 import logging
3 import re
4 import threading
5 import functools
6 import os
7 import json
8
9 from ceph.deployment import inventory
10 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
11 from ceph.utils import datetime_now
12
13 from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple, TYPE_CHECKING
14
15 try:
16 from ceph.deployment.drive_group import DriveGroupSpec
17 except ImportError:
18 pass # just for type checking
19
20 try:
21 from kubernetes import client, config
22 from kubernetes.client.rest import ApiException
23
24 kubernetes_imported = True
25
26 # https://github.com/kubernetes-client/python/issues/895
27 from kubernetes.client.models.v1_container_image import V1ContainerImage
28 def names(self: Any, names: Any) -> None:
29 self._names = names
30 V1ContainerImage.names = V1ContainerImage.names.setter(names)
31
32 except ImportError:
33 kubernetes_imported = False
34 client = None
35 config = None
36
37 from mgr_module import MgrModule, Option, NFS_POOL_NAME
38 import orchestrator
39 from orchestrator import handle_orch_error, OrchResult, raise_if_exception
40
41 from .rook_cluster import RookCluster
42
43 T = TypeVar('T')
44 FuncT = TypeVar('FuncT', bound=Callable)
45 ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
46
47
48 class RookEnv(object):
49 def __init__(self) -> None:
50 # POD_NAMESPACE already exist for Rook 0.9
51 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
52
53 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
54 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
55
56 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
57 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
58 self.api_name = "ceph.rook.io/" + self.crd_version
59
60 def api_version_match(self) -> bool:
61 return self.crd_version == 'v1'
62
63 def has_namespace(self) -> bool:
64 return 'POD_NAMESPACE' in os.environ
65
66
67 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
68 """
69 Writes are a two-phase thing, firstly sending
70 the write to the k8s API (fast) and then waiting
71 for the corresponding change to appear in the
72 Ceph cluster (slow)
73
74 Right now, we are calling the k8s API synchronously.
75 """
76
77 MODULE_OPTIONS: List[Option] = [
78 # TODO: configure k8s API addr instead of assuming local
79 Option(
80 'storage_class',
81 type='str',
82 default='local',
83 desc='storage class name for LSO-discovered PVs',
84 ),
85 Option(
86 'drive_group_interval',
87 type='float',
88 default=300.0,
89 desc='interval in seconds between re-application of applied drive_groups',
90 ),
91 ]
92
93 @staticmethod
94 def can_run() -> Tuple[bool, str]:
95 if not kubernetes_imported:
96 return False, "`kubernetes` python module not found"
97 if not RookEnv().api_version_match():
98 return False, "Rook version unsupported."
99 return True, ''
100
101 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
102 if not kubernetes_imported:
103 return False, "`kubernetes` python module not found", {}
104 elif not self._rook_env.has_namespace():
105 return False, "ceph-mgr not running in Rook cluster", {}
106
107 try:
108 self.k8s.list_namespaced_pod(self._rook_env.namespace)
109 except ApiException as e:
110 return False, "Cannot reach Kubernetes API: {}".format(e), {}
111 else:
112 return True, "", {}
113
114 def __init__(self, *args: Any, **kwargs: Any) -> None:
115 super(RookOrchestrator, self).__init__(*args, **kwargs)
116
117 self._initialized = threading.Event()
118 self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None
119 self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None
120 self._k8s_CustomObjects_api: Optional[client.CustomObjectsApi] = None
121 self._k8s_StorageV1_api: Optional[client.StorageV1Api] = None
122 self._rook_cluster: Optional[RookCluster] = None
123 self._rook_env = RookEnv()
124 self._k8s_AppsV1_api: Optional[client.AppsV1Api] = None
125
126 self.config_notify()
127 if TYPE_CHECKING:
128 self.storage_class = 'foo'
129 self.drive_group_interval = 10.0
130
131 self._load_drive_groups()
132 self._shutdown = threading.Event()
133
134 def config_notify(self) -> None:
135 """
136 This method is called whenever one of our config options is changed.
137
138 TODO: this method should be moved into mgr_module.py
139 """
140 for opt in self.MODULE_OPTIONS:
141 setattr(self,
142 opt['name'], # type: ignore
143 self.get_module_option(opt['name'])) # type: ignore
144 self.log.debug(' mgr option %s = %s',
145 opt['name'], getattr(self, opt['name'])) # type: ignore
146 assert isinstance(self.storage_class, str)
147 assert isinstance(self.drive_group_interval, float)
148
149 if self._rook_cluster:
150 self._rook_cluster.storage_class = self.storage_class
151
152 def shutdown(self) -> None:
153 self._shutdown.set()
154
155 @property
156 def k8s(self):
157 # type: () -> client.CoreV1Api
158 self._initialized.wait()
159 assert self._k8s_CoreV1_api is not None
160 return self._k8s_CoreV1_api
161
162 @property
163 def rook_cluster(self):
164 # type: () -> RookCluster
165 self._initialized.wait()
166 assert self._rook_cluster is not None
167 return self._rook_cluster
168
169 def serve(self) -> None:
170 # For deployed clusters, we should always be running inside
171 # a Rook cluster. For development convenience, also support
172 # running outside (reading ~/.kube config)
173
174 if self._rook_env.has_namespace():
175 config.load_incluster_config()
176 else:
177 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
178 config.load_kube_config()
179
180 # So that I can do port forwarding from my workstation - jcsp
181 from kubernetes.client import configuration
182 configuration.verify_ssl = False
183
184 self._k8s_CoreV1_api = client.CoreV1Api()
185 self._k8s_BatchV1_api = client.BatchV1Api()
186 self._k8s_CustomObjects_api = client.CustomObjectsApi()
187 self._k8s_StorageV1_api = client.StorageV1Api()
188 self._k8s_AppsV1_api = client.AppsV1Api()
189
190 try:
191 # XXX mystery hack -- I need to do an API call from
192 # this context, or subsequent API usage from handle_command
193 # fails with SSLError('bad handshake'). Suspect some kind of
194 # thread context setup in SSL lib?
195 self._k8s_CoreV1_api.list_namespaced_pod(self._rook_env.namespace)
196 except ApiException:
197 # Ignore here to make self.available() fail with a proper error message
198 pass
199
200 assert isinstance(self.storage_class, str)
201
202 self._rook_cluster = RookCluster(
203 self._k8s_CoreV1_api,
204 self._k8s_BatchV1_api,
205 self._k8s_CustomObjects_api,
206 self._k8s_StorageV1_api,
207 self._k8s_AppsV1_api,
208 self._rook_env,
209 self.storage_class)
210
211 self._initialized.set()
212 self.config_notify()
213
214 while not self._shutdown.is_set():
215 self._apply_drivegroups(list(self._drive_group_map.values()))
216 self._shutdown.wait(self.drive_group_interval)
217
218 @handle_orch_error
219 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
220 host_list = None
221 if host_filter and host_filter.hosts:
222 # Explicit host list
223 host_list = host_filter.hosts
224 elif host_filter and host_filter.labels:
225 # TODO: query k8s API to resolve to host list, and pass
226 # it into RookCluster.get_discovered_devices
227 raise NotImplementedError()
228
229 discovered_devs = self.rook_cluster.get_discovered_devices(host_list)
230
231 result = []
232 for host_name, host_devs in discovered_devs.items():
233 devs = []
234 for d in host_devs:
235 devs.append(d)
236
237 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
238
239 return result
240
241 @handle_orch_error
242 def get_hosts(self):
243 # type: () -> List[orchestrator.HostSpec]
244 return self.rook_cluster.get_hosts()
245
246 @handle_orch_error
247 def describe_service(self,
248 service_type: Optional[str] = None,
249 service_name: Optional[str] = None,
250 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
251 now = datetime_now()
252
253 # CephCluster
254 cl = self.rook_cluster.rook_api_get(
255 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
256 self.log.debug('CephCluster %s' % cl)
257 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
258 num_nodes = len(self.rook_cluster.get_node_names())
259
260 spec = {}
261 if service_type == 'mon' or service_type is None:
262 spec['mon'] = orchestrator.ServiceDescription(
263 spec=ServiceSpec(
264 'mon',
265 placement=PlacementSpec(
266 count=cl['spec'].get('mon', {}).get('count', 1),
267 ),
268 ),
269 size=cl['spec'].get('mon', {}).get('count', 1),
270 container_image_name=image_name,
271 last_refresh=now,
272 )
273 if service_type == 'mgr' or service_type is None:
274 spec['mgr'] = orchestrator.ServiceDescription(
275 spec=ServiceSpec(
276 'mgr',
277 placement=PlacementSpec.from_string('count:1'),
278 ),
279 size=1,
280 container_image_name=image_name,
281 last_refresh=now,
282 )
283
284 if (
285 service_type == 'crash' or service_type is None
286 and not cl['spec'].get('crashCollector', {}).get('disable', False)
287 ):
288 spec['crash'] = orchestrator.ServiceDescription(
289 spec=ServiceSpec(
290 'crash',
291 placement=PlacementSpec.from_string('*'),
292 ),
293 size=num_nodes,
294 container_image_name=image_name,
295 last_refresh=now,
296 )
297
298 if service_type == 'mds' or service_type is None:
299 # CephFilesystems
300 all_fs = self.rook_cluster.get_resource("cephfilesystems")
301 for fs in all_fs:
302 svc = 'mds.' + fs['metadata']['name']
303 if svc in spec:
304 continue
305 # FIXME: we are conflating active (+ standby) with count
306 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
307 total_mds = active
308 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
309 total_mds = active * 2
310 spec[svc] = orchestrator.ServiceDescription(
311 spec=ServiceSpec(
312 service_type='mds',
313 service_id=fs['metadata']['name'],
314 placement=PlacementSpec(count=active),
315 ),
316 size=total_mds,
317 container_image_name=image_name,
318 last_refresh=now,
319 )
320
321 if service_type == 'rgw' or service_type is None:
322 # CephObjectstores
323 all_zones = self.rook_cluster.get_resource("cephobjectstores")
324 for zone in all_zones:
325 svc = 'rgw.' + zone['metadata']['name']
326 if svc in spec:
327 continue
328 active = zone['spec']['gateway']['instances'];
329 if 'securePort' in zone['spec']['gateway']:
330 ssl = True
331 port = zone['spec']['gateway']['securePort']
332 else:
333 ssl = False
334 port = zone['spec']['gateway']['port'] or 80
335 rgw_zone = zone['spec'].get('zone', {}).get('name') or None
336 spec[svc] = orchestrator.ServiceDescription(
337 spec=RGWSpec(
338 service_id=zone['metadata']['name'],
339 rgw_zone=rgw_zone,
340 ssl=ssl,
341 rgw_frontend_port=port,
342 placement=PlacementSpec(count=active),
343 ),
344 size=active,
345 container_image_name=image_name,
346 last_refresh=now,
347 )
348
349 if service_type == 'nfs' or service_type is None:
350 # CephNFSes
351 all_nfs = self.rook_cluster.get_resource("cephnfses")
352 nfs_pods = self.rook_cluster.describe_pods('nfs', None, None)
353 for nfs in all_nfs:
354 if nfs['spec']['rados']['pool'] != NFS_POOL_NAME:
355 continue
356 nfs_name = nfs['metadata']['name']
357 svc = 'nfs.' + nfs_name
358 if svc in spec:
359 continue
360 active = nfs['spec'].get('server', {}).get('active')
361 creation_timestamp = datetime.datetime.strptime(nfs['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ')
362 spec[svc] = orchestrator.ServiceDescription(
363 spec=NFSServiceSpec(
364 service_id=nfs_name,
365 placement=PlacementSpec(count=active),
366 ),
367 size=active,
368 last_refresh=now,
369 running=len([1 for pod in nfs_pods if pod['labels']['ceph_nfs'] == nfs_name]),
370 created=creation_timestamp.astimezone(tz=datetime.timezone.utc)
371 )
372 if service_type == 'osd' or service_type is None:
373 # OSDs
374 # FIXME: map running OSDs back to their respective services...
375
376 # the catch-all unmanaged
377 all_osds = self.rook_cluster.get_osds()
378 svc = 'osd'
379 spec[svc] = orchestrator.ServiceDescription(
380 spec=DriveGroupSpec(
381 unmanaged=True,
382 service_type='osd',
383 ),
384 size=len(all_osds),
385 last_refresh=now,
386 running=sum(osd.status.phase == 'Running' for osd in all_osds)
387 )
388
389 # drivegroups
390 for name, dg in self._drive_group_map.items():
391 spec[f'osd.{name}'] = orchestrator.ServiceDescription(
392 spec=dg,
393 last_refresh=now,
394 size=0,
395 running=0,
396 )
397
398 if service_type == 'rbd-mirror' or service_type is None:
399 # rbd-mirrors
400 all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
401 for mirror in all_mirrors:
402 logging.warn(mirror)
403 mirror_name = mirror['metadata']['name']
404 svc = 'rbd-mirror.' + mirror_name
405 if svc in spec:
406 continue
407 spec[svc] = orchestrator.ServiceDescription(
408 spec=ServiceSpec(
409 service_id=mirror_name,
410 service_type="rbd-mirror",
411 placement=PlacementSpec(count=1),
412 ),
413 size=1,
414 last_refresh=now,
415 )
416
417 for dd in self._list_daemons():
418 if dd.service_name() not in spec:
419 continue
420 service = spec[dd.service_name()]
421 service.running += 1
422 if not service.container_image_id:
423 service.container_image_id = dd.container_image_id
424 if not service.container_image_name:
425 service.container_image_name = dd.container_image_name
426 if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh:
427 service.last_refresh = dd.last_refresh
428 if service.created is None or dd.created is None or dd.created < service.created:
429 service.created = dd.created
430
431 return [v for k, v in spec.items()]
432
433 @handle_orch_error
434 def list_daemons(self,
435 service_name: Optional[str] = None,
436 daemon_type: Optional[str] = None,
437 daemon_id: Optional[str] = None,
438 host: Optional[str] = None,
439 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
440 return self._list_daemons(service_name=service_name,
441 daemon_type=daemon_type,
442 daemon_id=daemon_id,
443 host=host,
444 refresh=refresh)
445
446 def _list_daemons(self,
447 service_name: Optional[str] = None,
448 daemon_type: Optional[str] = None,
449 daemon_id: Optional[str] = None,
450 host: Optional[str] = None,
451 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
452 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
453 self.log.debug('pods %s' % pods)
454 result = []
455 for p in pods:
456 sd = orchestrator.DaemonDescription()
457 sd.hostname = p['hostname']
458 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
459 status = {
460 'Pending': orchestrator.DaemonDescriptionStatus.starting,
461 'Running': orchestrator.DaemonDescriptionStatus.running,
462 'Succeeded': orchestrator.DaemonDescriptionStatus.stopped,
463 'Failed': orchestrator.DaemonDescriptionStatus.error,
464 'Unknown': orchestrator.DaemonDescriptionStatus.unknown,
465 }[p['phase']]
466 sd.status = status
467
468 if 'ceph_daemon_id' in p['labels']:
469 sd.daemon_id = p['labels']['ceph_daemon_id']
470 elif 'ceph-osd-id' in p['labels']:
471 sd.daemon_id = p['labels']['ceph-osd-id']
472 else:
473 # Unknown type -- skip it
474 continue
475
476 if service_name is not None and service_name != sd.service_name():
477 continue
478 sd.container_image_name = p['container_image_name']
479 sd.container_image_id = p['container_image_id']
480 sd.created = p['created']
481 sd.last_configured = p['created']
482 sd.last_deployed = p['created']
483 sd.started = p['started']
484 sd.last_refresh = p['refreshed']
485 result.append(sd)
486
487 return result
488
489 def _get_pool_params(self) -> Tuple[int, str]:
490 num_replicas = self.get_ceph_option('osd_pool_default_size')
491 assert type(num_replicas) is int
492
493 leaf_type_id = self.get_ceph_option('osd_crush_chooseleaf_type')
494 assert type(leaf_type_id) is int
495 crush = self.get('osd_map_crush')
496 leaf_type = 'host'
497 for t in crush['types']:
498 if t['type_id'] == leaf_type_id:
499 leaf_type = t['name']
500 break
501 return num_replicas, leaf_type
502
503 @handle_orch_error
504 def remove_service(self, service_name: str, force: bool = False) -> str:
505 if service_name == 'rbd-mirror':
506 return self.rook_cluster.rm_service('cephrbdmirrors', 'default-rbd-mirror')
507 service_type, service_id = service_name.split('.', 1)
508 if service_type == 'mds':
509 return self.rook_cluster.rm_service('cephfilesystems', service_id)
510 elif service_type == 'rgw':
511 return self.rook_cluster.rm_service('cephobjectstores', service_id)
512 elif service_type == 'nfs':
513 ret, out, err = self.mon_command({
514 'prefix': 'auth ls'
515 })
516 matches = re.findall(rf'client\.nfs-ganesha\.{service_id}\..*', out)
517 for match in matches:
518 self.check_mon_command({
519 'prefix': 'auth rm',
520 'entity': match
521 })
522 return self.rook_cluster.rm_service('cephnfses', service_id)
523 elif service_type == 'rbd-mirror':
524 return self.rook_cluster.rm_service('cephrbdmirrors', service_id)
525 elif service_type == 'osd':
526 if service_id in self._drive_group_map:
527 del self._drive_group_map[service_id]
528 self._save_drive_groups()
529 return f'Removed {service_name}'
530 elif service_type == 'ingress':
531 self.log.info("{0} service '{1}' does not exist".format('ingress', service_id))
532 return 'The Rook orchestrator does not currently support ingress'
533 else:
534 raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
535
536 def zap_device(self, host: str, path: str) -> OrchResult[str]:
537 try:
538 self.rook_cluster.create_zap_job(host, path)
539 except Exception as e:
540 logging.error(e)
541 return OrchResult(None, Exception("Unable to zap device: " + str(e.with_traceback(None))))
542 return OrchResult(f'{path} on {host} zapped')
543
544 @handle_orch_error
545 def apply_mon(self, spec):
546 # type: (ServiceSpec) -> str
547 if spec.placement.hosts or spec.placement.label:
548 raise RuntimeError("Host list or label is not supported by rook.")
549
550 return self.rook_cluster.update_mon_count(spec.placement.count)
551
552 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
553 try:
554 self.rook_cluster.rbd_mirror(spec)
555 return OrchResult("Success")
556 except Exception as e:
557 return OrchResult(None, e)
558
559 @handle_orch_error
560 def apply_mds(self, spec):
561 # type: (ServiceSpec) -> str
562 num_replicas, leaf_type = self._get_pool_params()
563 return self.rook_cluster.apply_filesystem(spec, num_replicas, leaf_type)
564
565 @handle_orch_error
566 def apply_rgw(self, spec):
567 # type: (RGWSpec) -> str
568 num_replicas, leaf_type = self._get_pool_params()
569 return self.rook_cluster.apply_objectstore(spec, num_replicas, leaf_type)
570
571 @handle_orch_error
572 def apply_nfs(self, spec):
573 # type: (NFSServiceSpec) -> str
574 try:
575 return self.rook_cluster.apply_nfsgw(spec, self)
576 except Exception as e:
577 logging.error(e)
578 return "Unable to create NFS daemon, check logs for more traceback\n" + str(e.with_traceback(None))
579
580 @handle_orch_error
581 def remove_daemons(self, names: List[str]) -> List[str]:
582 return self.rook_cluster.remove_pods(names)
583
584 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
585 for drive_group in specs:
586 self._drive_group_map[str(drive_group.service_id)] = drive_group
587 self._save_drive_groups()
588 return OrchResult(self._apply_drivegroups(specs))
589
590 def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
591 all_hosts = raise_if_exception(self.get_hosts())
592 result_list: List[str] = []
593 for drive_group in ls:
594 matching_hosts = drive_group.placement.filter_matching_hosts(
595 lambda label=None, as_hostspec=None: all_hosts
596 )
597
598 if not self.rook_cluster.node_exists(matching_hosts[0]):
599 raise RuntimeError("Node '{0}' is not in the Kubernetes "
600 "cluster".format(matching_hosts))
601
602 # Validate whether cluster CRD can accept individual OSD
603 # creations (i.e. not useAllDevices)
604 if not self.rook_cluster.can_create_osd():
605 raise RuntimeError("Rook cluster configuration does not "
606 "support OSD creation.")
607 result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
608 return result_list
609
610 def _load_drive_groups(self) -> None:
611 stored_drive_group = self.get_store("drive_group_map")
612 self._drive_group_map: Dict[str, DriveGroupSpec] = {}
613 if stored_drive_group:
614 for name, dg in json.loads(stored_drive_group).items():
615 try:
616 self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
617 except ValueError as e:
618 self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
619
620 def _save_drive_groups(self) -> None:
621 json_drive_group_map = {
622 name: dg.to_json() for name, dg in self._drive_group_map.items()
623 }
624 self.set_store("drive_group_map", json.dumps(json_drive_group_map))
625
626 def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]:
627 assert self._rook_cluster is not None
628 if zap:
629 raise RuntimeError("Rook does not support zapping devices during OSD removal.")
630 res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
631 return OrchResult(res)
632
633 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
634 return self.rook_cluster.add_host_label(host, label)
635
636 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
637 return self.rook_cluster.remove_host_label(host, label)
638 """
639 @handle_orch_error
640 def create_osds(self, drive_group):
641 # type: (DriveGroupSpec) -> str
642 # Creates OSDs from a drive group specification.
643
644 # $: ceph orch osd create -i <dg.file>
645
646 # The drivegroup file must only contain one spec at a time.
647 #
648
649 targets = [] # type: List[str]
650 if drive_group.data_devices and drive_group.data_devices.paths:
651 targets += [d.path for d in drive_group.data_devices.paths]
652 if drive_group.data_directories:
653 targets += drive_group.data_directories
654
655 all_hosts = raise_if_exception(self.get_hosts())
656
657 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
658
659 assert len(matching_hosts) == 1
660
661 if not self.rook_cluster.node_exists(matching_hosts[0]):
662 raise RuntimeError("Node '{0}' is not in the Kubernetes "
663 "cluster".format(matching_hosts))
664
665 # Validate whether cluster CRD can accept individual OSD
666 # creations (i.e. not useAllDevices)
667 if not self.rook_cluster.can_create_osd():
668 raise RuntimeError("Rook cluster configuration does not "
669 "support OSD creation.")
670
671 return self.rook_cluster.add_osds(drive_group, matching_hosts)
672
673 # TODO: this was the code to update the progress reference:
674
675 @handle_orch_error
676 def has_osds(matching_hosts: List[str]) -> bool:
677
678 # Find OSD pods on this host
679 pod_osd_ids = set()
680 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
681 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
682 field_selector="spec.nodeName={0}".format(
683 matching_hosts[0]
684 )).items
685 for p in pods:
686 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
687
688 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
689
690 found = []
691 osdmap = self.get("osd_map")
692 for osd in osdmap['osds']:
693 osd_id = osd['osd']
694 if osd_id not in pod_osd_ids:
695 continue
696
697 metadata = self.get_metadata('osd', "%s" % osd_id)
698 if metadata and metadata['devices'] in targets:
699 found.append(osd_id)
700 else:
701 self.log.info("ignoring osd {0} {1}".format(
702 osd_id, metadata['devices'] if metadata else 'DNE'
703 ))
704
705 return found is not None
706 """
707
708 @handle_orch_error
709 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
710 return self.rook_cluster.blink_light(ident_fault, on, locs)