]>
Commit | Line | Data |
---|---|---|
9f95a23c | 1 | import datetime |
11fdf7f2 TL |
2 | import threading |
3 | import functools | |
4 | import os | |
1911f103 | 5 | import json |
9f95a23c TL |
6 | |
7 | from ceph.deployment import inventory | |
8 | from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec | |
9 | ||
11fdf7f2 | 10 | try: |
9f95a23c TL |
11 | from typing import List, Dict, Optional, Callable, Any |
12 | from ceph.deployment.drive_group import DriveGroupSpec | |
11fdf7f2 TL |
13 | except ImportError: |
14 | pass # just for type checking | |
15 | ||
16 | try: | |
17 | from kubernetes import client, config | |
18 | from kubernetes.client.rest import ApiException | |
19 | ||
20 | kubernetes_imported = True | |
9f95a23c TL |
21 | |
22 | # https://github.com/kubernetes-client/python/issues/895 | |
23 | from kubernetes.client.models.v1_container_image import V1ContainerImage | |
24 | def names(self, names): | |
25 | self._names = names | |
26 | V1ContainerImage.names = V1ContainerImage.names.setter(names) | |
27 | ||
11fdf7f2 TL |
28 | except ImportError: |
29 | kubernetes_imported = False | |
30 | client = None | |
31 | config = None | |
32 | ||
33 | from mgr_module import MgrModule | |
34 | import orchestrator | |
35 | ||
36 | from .rook_cluster import RookCluster | |
37 | ||
38 | ||
9f95a23c TL |
39 | class RookCompletion(orchestrator.Completion): |
40 | def evaluate(self): | |
41 | self.finalize(None) | |
11fdf7f2 TL |
42 | |
43 | ||
44 | def deferred_read(f): | |
9f95a23c | 45 | # type: (Callable) -> Callable[..., RookCompletion] |
11fdf7f2 TL |
46 | """ |
47 | Decorator to make RookOrchestrator methods return | |
48 | a completion object that executes themselves. | |
49 | """ | |
50 | ||
51 | @functools.wraps(f) | |
52 | def wrapper(*args, **kwargs): | |
9f95a23c | 53 | return RookCompletion(on_complete=lambda _: f(*args, **kwargs)) |
11fdf7f2 TL |
54 | |
55 | return wrapper | |
56 | ||
57 | ||
9f95a23c TL |
58 | def write_completion(on_complete, # type: Callable |
59 | message, # type: str | |
60 | mgr, | |
61 | calc_percent=None # type: Optional[Callable[[], RookCompletion]] | |
62 | ): | |
63 | # type: (...) -> RookCompletion | |
64 | return RookCompletion.with_progress( | |
65 | message=message, | |
66 | mgr=mgr, | |
67 | on_complete=lambda _: on_complete(), | |
68 | calc_percent=calc_percent, | |
69 | ) | |
70 | ||
71 | ||
11fdf7f2 TL |
72 | class RookEnv(object): |
73 | def __init__(self): | |
74 | # POD_NAMESPACE already exist for Rook 0.9 | |
81eedcae | 75 | self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph') |
11fdf7f2 | 76 | |
11fdf7f2 | 77 | # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0 |
81eedcae | 78 | self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace) |
11fdf7f2 | 79 | |
9f95a23c | 80 | self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace) |
11fdf7f2 TL |
81 | self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1') |
82 | self.api_name = "ceph.rook.io/" + self.crd_version | |
83 | ||
84 | def api_version_match(self): | |
85 | return self.crd_version == 'v1' | |
86 | ||
81eedcae TL |
87 | def has_namespace(self): |
88 | return 'POD_NAMESPACE' in os.environ | |
89 | ||
11fdf7f2 TL |
90 | |
91 | class RookOrchestrator(MgrModule, orchestrator.Orchestrator): | |
9f95a23c TL |
92 | """ |
93 | Writes are a two-phase thing, firstly sending | |
94 | the write to the k8s API (fast) and then waiting | |
95 | for the corresponding change to appear in the | |
96 | Ceph cluster (slow) | |
11fdf7f2 | 97 | |
801d1391 | 98 | Right now, we are calling the k8s API synchronously. |
9f95a23c | 99 | """ |
11fdf7f2 | 100 | |
9f95a23c TL |
101 | MODULE_OPTIONS = [ |
102 | # TODO: configure k8s API addr instead of assuming local | |
103 | ] # type: List[Dict[str, Any]] | |
11fdf7f2 | 104 | |
9f95a23c TL |
105 | def process(self, completions): |
106 | # type: (List[RookCompletion]) -> None | |
11fdf7f2 | 107 | |
9f95a23c TL |
108 | if completions: |
109 | self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions))) | |
11fdf7f2 | 110 | |
9f95a23c TL |
111 | for p in completions: |
112 | p.evaluate() | |
11fdf7f2 TL |
113 | |
114 | @staticmethod | |
115 | def can_run(): | |
116 | if not kubernetes_imported: | |
117 | return False, "`kubernetes` python module not found" | |
118 | if not RookEnv().api_version_match(): | |
119 | return False, "Rook version unsupported." | |
120 | return True, '' | |
121 | ||
122 | def available(self): | |
123 | if not kubernetes_imported: | |
124 | return False, "`kubernetes` python module not found" | |
81eedcae | 125 | elif not self._rook_env.has_namespace(): |
11fdf7f2 TL |
126 | return False, "ceph-mgr not running in Rook cluster" |
127 | ||
128 | try: | |
129 | self.k8s.list_namespaced_pod(self._rook_env.cluster_name) | |
130 | except ApiException as e: | |
131 | return False, "Cannot reach Kubernetes API: {}".format(e) | |
132 | else: | |
133 | return True, "" | |
134 | ||
135 | def __init__(self, *args, **kwargs): | |
136 | super(RookOrchestrator, self).__init__(*args, **kwargs) | |
137 | ||
138 | self._initialized = threading.Event() | |
801d1391 TL |
139 | self._k8s_CoreV1_api = None |
140 | self._k8s_BatchV1_api = None | |
11fdf7f2 TL |
141 | self._rook_cluster = None |
142 | self._rook_env = RookEnv() | |
143 | ||
144 | self._shutdown = threading.Event() | |
145 | ||
9f95a23c TL |
146 | self.all_progress_references = list() # type: List[orchestrator.ProgressReference] |
147 | ||
11fdf7f2 TL |
148 | def shutdown(self): |
149 | self._shutdown.set() | |
150 | ||
151 | @property | |
152 | def k8s(self): | |
9f95a23c | 153 | # type: () -> client.CoreV1Api |
11fdf7f2 | 154 | self._initialized.wait() |
801d1391 TL |
155 | assert self._k8s_CoreV1_api is not None |
156 | return self._k8s_CoreV1_api | |
11fdf7f2 TL |
157 | |
158 | @property | |
159 | def rook_cluster(self): | |
160 | # type: () -> RookCluster | |
161 | self._initialized.wait() | |
9f95a23c | 162 | assert self._rook_cluster is not None |
11fdf7f2 TL |
163 | return self._rook_cluster |
164 | ||
165 | def serve(self): | |
166 | # For deployed clusters, we should always be running inside | |
167 | # a Rook cluster. For development convenience, also support | |
168 | # running outside (reading ~/.kube config) | |
169 | ||
9f95a23c | 170 | if self._rook_env.has_namespace(): |
11fdf7f2 TL |
171 | config.load_incluster_config() |
172 | cluster_name = self._rook_env.cluster_name | |
173 | else: | |
174 | self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~") | |
175 | config.load_kube_config() | |
176 | ||
177 | cluster_name = "rook-ceph" | |
178 | ||
179 | # So that I can do port forwarding from my workstation - jcsp | |
180 | from kubernetes.client import configuration | |
181 | configuration.verify_ssl = False | |
182 | ||
801d1391 TL |
183 | self._k8s_CoreV1_api = client.CoreV1Api() |
184 | self._k8s_BatchV1_api = client.BatchV1Api() | |
11fdf7f2 TL |
185 | |
186 | try: | |
187 | # XXX mystery hack -- I need to do an API call from | |
188 | # this context, or subsequent API usage from handle_command | |
189 | # fails with SSLError('bad handshake'). Suspect some kind of | |
190 | # thread context setup in SSL lib? | |
801d1391 | 191 | self._k8s_CoreV1_api.list_namespaced_pod(cluster_name) |
11fdf7f2 TL |
192 | except ApiException: |
193 | # Ignore here to make self.available() fail with a proper error message | |
194 | pass | |
195 | ||
196 | self._rook_cluster = RookCluster( | |
801d1391 TL |
197 | self._k8s_CoreV1_api, |
198 | self._k8s_BatchV1_api, | |
11fdf7f2 TL |
199 | self._rook_env) |
200 | ||
201 | self._initialized.set() | |
202 | ||
203 | while not self._shutdown.is_set(): | |
204 | # XXX hack (or is it?) to kick all completions periodically, | |
205 | # in case we had a caller that wait()'ed on them long enough | |
206 | # to get persistence but not long enough to get completion | |
207 | ||
9f95a23c TL |
208 | self.all_progress_references = [p for p in self.all_progress_references if not p.effective] |
209 | for p in self.all_progress_references: | |
210 | p.update() | |
11fdf7f2 TL |
211 | |
212 | self._shutdown.wait(5) | |
213 | ||
9f95a23c TL |
214 | def cancel_completions(self): |
215 | for p in self.all_progress_references: | |
216 | p.fail() | |
217 | self.all_progress_references.clear() | |
11fdf7f2 TL |
218 | |
219 | @deferred_read | |
9f95a23c TL |
220 | def get_inventory(self, host_filter=None, refresh=False): |
221 | host_list = None | |
222 | if host_filter and host_filter.hosts: | |
223 | # Explicit host list | |
224 | host_list = host_filter.hosts | |
225 | elif host_filter and host_filter.labels: | |
226 | # TODO: query k8s API to resolve to host list, and pass | |
11fdf7f2 TL |
227 | # it into RookCluster.get_discovered_devices |
228 | raise NotImplementedError() | |
229 | ||
9f95a23c | 230 | devs = self.rook_cluster.get_discovered_devices(host_list) |
11fdf7f2 TL |
231 | |
232 | result = [] | |
9f95a23c | 233 | for host_name, host_devs in devs.items(): |
11fdf7f2 | 234 | devs = [] |
9f95a23c | 235 | for d in host_devs: |
1911f103 TL |
236 | if 'cephVolumeData' in d and d['cephVolumeData']: |
237 | devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData']))) | |
238 | else: | |
239 | devs.append(inventory.Device( | |
240 | path = '/dev/' + d['name'], | |
241 | sys_api = dict( | |
242 | rotational = '1' if d['rotational'] else '0', | |
243 | size = d['size'] | |
244 | ), | |
245 | available = False, | |
246 | rejected_reasons=['device data coming from ceph-volume not provided'], | |
247 | )) | |
11fdf7f2 | 248 | |
9f95a23c | 249 | result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs))) |
11fdf7f2 TL |
250 | |
251 | return result | |
252 | ||
253 | @deferred_read | |
9f95a23c TL |
254 | def get_hosts(self): |
255 | # type: () -> List[orchestrator.HostSpec] | |
256 | return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()] | |
11fdf7f2 | 257 | |
9f95a23c TL |
258 | @deferred_read |
259 | def describe_service(self, service_type=None, service_name=None, | |
260 | refresh=False): | |
261 | now = datetime.datetime.utcnow() | |
262 | ||
263 | # CephCluster | |
264 | cl = self.rook_cluster.rook_api_get( | |
265 | "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name)) | |
266 | self.log.debug('CephCluster %s' % cl) | |
267 | image_name = cl['spec'].get('cephVersion', {}).get('image', None) | |
268 | num_nodes = len(self.rook_cluster.get_node_names()) | |
269 | ||
270 | spec = {} | |
271 | spec['mon'] = orchestrator.ServiceDescription( | |
9f95a23c TL |
272 | spec=ServiceSpec( |
273 | 'mon', | |
274 | placement=PlacementSpec( | |
275 | count=cl['spec'].get('mon', {}).get('count', 1), | |
276 | ), | |
277 | ), | |
278 | size=cl['spec'].get('mon', {}).get('count', 1), | |
279 | container_image_name=image_name, | |
280 | last_refresh=now, | |
281 | ) | |
282 | spec['mgr'] = orchestrator.ServiceDescription( | |
9f95a23c TL |
283 | spec=ServiceSpec( |
284 | 'mgr', | |
285 | placement=PlacementSpec.from_string('count:1'), | |
286 | ), | |
287 | size=1, | |
288 | container_image_name=image_name, | |
289 | last_refresh=now, | |
290 | ) | |
291 | if not cl['spec'].get('crashCollector', {}).get('disable', False): | |
292 | spec['crash'] = orchestrator.ServiceDescription( | |
9f95a23c TL |
293 | spec=ServiceSpec( |
294 | 'crash', | |
295 | placement=PlacementSpec.from_string('*'), | |
296 | ), | |
297 | size=num_nodes, | |
298 | container_image_name=image_name, | |
299 | last_refresh=now, | |
300 | ) | |
301 | ||
302 | # CephFilesystems | |
303 | all_fs = self.rook_cluster.rook_api_get( | |
304 | "cephfilesystems/") | |
305 | self.log.debug('CephFilesystems %s' % all_fs) | |
306 | for fs in all_fs.get('items', []): | |
307 | svc = 'mds.' + fs['metadata']['name'] | |
308 | if svc in spec: | |
309 | continue | |
310 | # FIXME: we are conflating active (+ standby) with count | |
311 | active = fs['spec'].get('metadataServer', {}).get('activeCount', 1) | |
312 | total_mds = active | |
313 | if fs['spec'].get('metadataServer', {}).get('activeStandby', False): | |
314 | total_mds = active * 2 | |
315 | spec[svc] = orchestrator.ServiceDescription( | |
9f95a23c | 316 | spec=ServiceSpec( |
1911f103 TL |
317 | service_type='mds', |
318 | service_id=fs['metadata']['name'], | |
9f95a23c TL |
319 | placement=PlacementSpec(count=active), |
320 | ), | |
321 | size=total_mds, | |
322 | container_image_name=image_name, | |
323 | last_refresh=now, | |
324 | ) | |
325 | ||
326 | # CephObjectstores | |
327 | all_zones = self.rook_cluster.rook_api_get( | |
328 | "cephobjectstores/") | |
329 | self.log.debug('CephObjectstores %s' % all_zones) | |
330 | for zone in all_zones.get('items', []): | |
331 | rgw_realm = zone['metadata']['name'] | |
332 | rgw_zone = rgw_realm | |
333 | svc = 'rgw.' + rgw_realm + '.' + rgw_zone | |
334 | if svc in spec: | |
335 | continue | |
336 | active = zone['spec']['gateway']['instances']; | |
337 | if 'securePort' in zone['spec']['gateway']: | |
338 | ssl = True | |
339 | port = zone['spec']['gateway']['securePort'] | |
340 | else: | |
341 | ssl = False | |
342 | port = zone['spec']['gateway']['port'] or 80 | |
343 | spec[svc] = orchestrator.ServiceDescription( | |
9f95a23c | 344 | spec=RGWSpec( |
1911f103 | 345 | service_id=rgw_realm + '.' + rgw_zone, |
9f95a23c TL |
346 | rgw_realm=rgw_realm, |
347 | rgw_zone=rgw_zone, | |
348 | ssl=ssl, | |
349 | rgw_frontend_port=port, | |
350 | placement=PlacementSpec(count=active), | |
351 | ), | |
352 | size=active, | |
353 | container_image_name=image_name, | |
354 | last_refresh=now, | |
355 | ) | |
356 | ||
357 | for dd in self._list_daemons(): | |
358 | if dd.service_name() not in spec: | |
359 | continue | |
360 | spec[dd.service_name()].running += 1 | |
361 | return [v for k, v in spec.items()] | |
11fdf7f2 | 362 | |
9f95a23c | 363 | @deferred_read |
801d1391 | 364 | def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, |
9f95a23c TL |
365 | refresh=False): |
366 | return self._list_daemons(daemon_type, daemon_id, host, refresh) | |
367 | ||
801d1391 | 368 | def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, |
9f95a23c TL |
369 | refresh=False): |
370 | pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host) | |
371 | self.log.debug('pods %s' % pods) | |
11fdf7f2 TL |
372 | result = [] |
373 | for p in pods: | |
9f95a23c TL |
374 | sd = orchestrator.DaemonDescription() |
375 | sd.hostname = p['hostname'] | |
11fdf7f2 | 376 | sd.container_id = p['name'] |
9f95a23c TL |
377 | sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '') |
378 | status = { | |
379 | 'Pending': -1, | |
380 | 'Running': 1, | |
381 | 'Succeeded': 0, | |
382 | 'Failed': -1, | |
383 | 'Unknown': -1, | |
384 | }[p['phase']] | |
385 | sd.status = status | |
386 | sd.status_desc = p['phase'] | |
387 | ||
388 | if 'ceph_daemon_id' in p['labels']: | |
389 | sd.daemon_id = p['labels']['ceph_daemon_id'] | |
390 | elif 'ceph-osd-id' in p['labels']: | |
391 | sd.daemon_id = p['labels']['ceph-osd-id'] | |
11fdf7f2 TL |
392 | else: |
393 | # Unknown type -- skip it | |
394 | continue | |
395 | ||
801d1391 TL |
396 | if service_name is not None and service_name != sd.service_name(): |
397 | continue | |
9f95a23c | 398 | sd.container_image_name = p['container_image_name'] |
9f95a23c TL |
399 | sd.created = p['created'] |
400 | sd.last_configured = p['created'] | |
401 | sd.last_deployed = p['created'] | |
402 | sd.started = p['started'] | |
403 | sd.last_refresh = p['refreshed'] | |
11fdf7f2 TL |
404 | result.append(sd) |
405 | ||
406 | return result | |
407 | ||
408 | def _service_add_decorate(self, typename, spec, func): | |
9f95a23c TL |
409 | return write_completion( |
410 | on_complete=lambda : func(spec), | |
411 | message="Creating {} services for {}".format(typename, spec.service_id), | |
412 | mgr=self | |
413 | ) | |
414 | ||
415 | def add_nfs(self, spec): | |
416 | # type: (NFSServiceSpec) -> RookCompletion | |
417 | return self._service_add_decorate("NFS", spec, | |
418 | self.rook_cluster.add_nfsgw) | |
419 | ||
420 | def _service_rm_decorate(self, typename, name, func): | |
421 | return write_completion( | |
422 | on_complete=lambda : func(name), | |
423 | message="Removing {} services for {}".format(typename, name), | |
424 | mgr=self | |
425 | ) | |
426 | ||
427 | def remove_service(self, service_type, service_name): | |
428 | if service_type == 'mds': | |
429 | return self._service_rm_decorate( | |
430 | 'MDS', service_name, lambda: self.rook_cluster.rm_service( | |
431 | 'cephfilesystems', service_name) | |
432 | ) | |
433 | elif service_type == 'rgw': | |
434 | return self._service_rm_decorate( | |
435 | 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name) | |
436 | ) | |
437 | elif service_type == 'nfs': | |
438 | return self._service_rm_decorate( | |
439 | 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name) | |
440 | ) | |
441 | ||
442 | def apply_mon(self, spec): | |
443 | # type: (ServiceSpec) -> RookCompletion | |
444 | if spec.placement.hosts or spec.placement.label: | |
445 | raise RuntimeError("Host list or label is not supported by rook.") | |
446 | ||
447 | return write_completion( | |
448 | lambda: self.rook_cluster.update_mon_count(spec.placement.count), | |
449 | "Updating mon count to {0}".format(spec.placement.count), | |
450 | mgr=self | |
451 | ) | |
452 | ||
453 | def apply_mds(self, spec): | |
454 | # type: (ServiceSpec) -> RookCompletion | |
455 | return self._service_add_decorate('MDS', spec, | |
456 | self.rook_cluster.apply_filesystem) | |
457 | ||
458 | def apply_rgw(self, spec): | |
459 | # type: (RGWSpec) -> RookCompletion | |
460 | return self._service_add_decorate('RGW', spec, | |
461 | self.rook_cluster.apply_objectstore) | |
462 | ||
9f95a23c TL |
463 | def apply_nfs(self, spec): |
464 | # type: (NFSServiceSpec) -> RookCompletion | |
465 | num = spec.placement.count | |
466 | return write_completion( | |
467 | lambda: self.rook_cluster.update_nfs_count(spec.service_id, num), | |
468 | "Updating NFS server count in {0} to {1}".format(spec.service_id, num), | |
469 | mgr=self | |
470 | ) | |
471 | ||
472 | def remove_daemons(self, names): | |
473 | return write_completion( | |
474 | lambda: self.rook_cluster.remove_pods(names), | |
475 | "Removing daemons {}".format(','.join(names)), | |
476 | mgr=self | |
477 | ) | |
478 | ||
479 | def create_osds(self, drive_group): | |
480 | # type: (DriveGroupSpec) -> RookCompletion | |
481 | """ Creates OSDs from a drive group specification. | |
482 | ||
483 | $: ceph orch osd create -i <dg.file> | |
484 | ||
485 | The drivegroup file must only contain one spec at a time. | |
486 | """ | |
487 | ||
488 | targets = [] # type: List[str] | |
489 | if drive_group.data_devices and drive_group.data_devices.paths: | |
490 | targets += [d.path for d in drive_group.data_devices.paths] | |
11fdf7f2 TL |
491 | if drive_group.data_directories: |
492 | targets += drive_group.data_directories | |
493 | ||
9f95a23c TL |
494 | def execute(all_hosts_): |
495 | # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion | |
f6b5b4d7 | 496 | matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_) |
9f95a23c TL |
497 | |
498 | assert len(matching_hosts) == 1 | |
499 | ||
500 | if not self.rook_cluster.node_exists(matching_hosts[0]): | |
501 | raise RuntimeError("Node '{0}' is not in the Kubernetes " | |
502 | "cluster".format(matching_hosts)) | |
503 | ||
504 | # Validate whether cluster CRD can accept individual OSD | |
505 | # creations (i.e. not useAllDevices) | |
506 | if not self.rook_cluster.can_create_osd(): | |
507 | raise RuntimeError("Rook cluster configuration does not " | |
508 | "support OSD creation.") | |
11fdf7f2 | 509 | |
9f95a23c TL |
510 | return orchestrator.Completion.with_progress( |
511 | message="Creating OSD on {0}:{1}".format( | |
512 | matching_hosts, | |
513 | targets), | |
514 | mgr=self, | |
e306af50 TL |
515 | on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts), |
516 | calc_percent=lambda: has_osds(matching_hosts) | |
9f95a23c | 517 | ) |
11fdf7f2 | 518 | |
9f95a23c | 519 | @deferred_read |
e306af50 | 520 | def has_osds(matching_hosts): |
11fdf7f2 | 521 | |
11fdf7f2 TL |
522 | # Find OSD pods on this host |
523 | pod_osd_ids = set() | |
9f95a23c | 524 | pods = self.k8s.list_namespaced_pod(self._rook_env.namespace, |
11fdf7f2 TL |
525 | label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), |
526 | field_selector="spec.nodeName={0}".format( | |
9f95a23c | 527 | matching_hosts[0] |
11fdf7f2 TL |
528 | )).items |
529 | for p in pods: | |
530 | pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id'])) | |
531 | ||
532 | self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids)) | |
533 | ||
534 | found = [] | |
535 | osdmap = self.get("osd_map") | |
536 | for osd in osdmap['osds']: | |
537 | osd_id = osd['osd'] | |
538 | if osd_id not in pod_osd_ids: | |
539 | continue | |
540 | ||
541 | metadata = self.get_metadata('osd', "%s" % osd_id) | |
542 | if metadata and metadata['devices'] in targets: | |
543 | found.append(osd_id) | |
544 | else: | |
545 | self.log.info("ignoring osd {0} {1}".format( | |
546 | osd_id, metadata['devices'] | |
547 | )) | |
548 | ||
549 | return found is not None | |
550 | ||
9f95a23c TL |
551 | c = self.get_hosts().then(execute) |
552 | return c | |
801d1391 TL |
553 | |
554 | def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion: | |
555 | return write_completion( | |
556 | on_complete=lambda: self.rook_cluster.blink_light( | |
557 | ident_fault, on, locs), | |
558 | message="Switching <{}> identification light in {}".format( | |
559 | on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])), | |
560 | mgr=self | |
561 | ) |