2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
7 This module is runnable outside of ceph-mgr, useful for testing.
12 from contextlib
import contextmanager
13 from time
import sleep
15 from orchestrator
import OrchResult
18 from urllib
.parse
import urljoin
20 # Optional kubernetes imports to enable MgrModule.can_run
22 from urllib3
.exceptions
import ProtocolError
24 from ceph
.deployment
.inventory
import Device
25 from ceph
.deployment
.drive_group
import DriveGroupSpec
26 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
, HostPlacementSpec
27 from ceph
.utils
import datetime_now
28 from ceph
.deployment
.drive_selection
.matchers
import SizeMatcher
29 from nfs
.cluster
import create_ganesha_pool
30 from nfs
.module
import Module
31 from nfs
.export
import NFSRados
32 from mgr_module
import NFS_POOL_NAME
33 from mgr_util
import merge_dicts
35 from typing
import Optional
, Tuple
, TypeVar
, List
, Callable
, Any
, cast
, Generic
, \
36 Iterable
, Dict
, Iterator
, Type
39 from kubernetes
import client
, watch
40 from kubernetes
.client
.rest
import ApiException
42 class ApiException(Exception): # type: ignore
45 from .rook_client
.ceph
import cephfilesystem
as cfs
46 from .rook_client
.ceph
import cephnfs
as cnfs
47 from .rook_client
.ceph
import cephobjectstore
as cos
48 from .rook_client
.ceph
import cephcluster
as ccl
49 from .rook_client
.ceph
import cephrbdmirror
as crbdm
50 from .rook_client
._helper
import CrdClass
55 from rook
.module
import RookEnv
, RookOrchestrator
57 pass # just used for type checking.
61 FuncT
= TypeVar('FuncT', bound
=Callable
)
63 CrdClassT
= TypeVar('CrdClassT', bound
=CrdClass
)
66 log
= logging
.getLogger(__name__
)
69 def __urllib3_supports_read_chunked() -> bool:
70 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
71 # than required by kubernetes-client
73 from urllib3
.response
import HTTPResponse
74 return hasattr(HTTPResponse
, 'read_chunked')
79 _urllib3_supports_read_chunked
= __urllib3_supports_read_chunked()
81 class ApplyException(orchestrator
.OrchestratorError
):
83 For failures to update the Rook CRDs, usually indicating
84 some kind of interference between our attempted update
85 and other conflicting activity.
89 def threaded(f
: Callable
[..., None]) -> Callable
[..., threading
.Thread
]:
90 def wrapper(*args
: Any
, **kwargs
: Any
) -> threading
.Thread
:
91 t
= threading
.Thread(target
=f
, args
=args
, kwargs
=kwargs
)
95 return cast(Callable
[..., threading
.Thread
], wrapper
)
98 class DefaultFetcher():
99 def __init__(self
, storage_class
: str, coreV1_api
: 'client.CoreV1Api'):
100 self
.storage_class
= storage_class
101 self
.coreV1_api
= coreV1_api
103 def fetch(self
) -> None:
104 self
.inventory
: KubernetesResource
[client
.V1PersistentVolumeList
] = KubernetesResource(self
.coreV1_api
.list_persistent_volume
)
105 self
.pvs_in_sc
= [i
for i
in self
.inventory
.items
if i
.spec
.storage_class_name
== self
.storage_class
]
107 def convert_size(self
, size_str
: str) -> int:
108 units
= ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
109 coeff_and_unit
= re
.search('(\d+)(\D+)', size_str
)
110 assert coeff_and_unit
is not None
111 coeff
= int(coeff_and_unit
[1])
112 unit
= coeff_and_unit
[2]
114 factor
= units
.index(unit
) % 7
116 log
.error("PV size format invalid")
118 size
= coeff
* (2 ** (10 * factor
))
121 def devices(self
) -> Dict
[str, List
[Device
]]:
122 nodename_to_devices
: Dict
[str, List
[Device
]] = {}
123 for i
in self
.pvs_in_sc
:
124 node
, device
= self
.device(i
)
125 if node
not in nodename_to_devices
:
126 nodename_to_devices
[node
] = []
127 nodename_to_devices
[node
].append(device
)
128 return nodename_to_devices
130 def device(self
, i
: 'client.V1PersistentVolume') -> Tuple
[str, Device
]:
132 if i
.spec
.node_affinity
:
133 terms
= i
.spec
.node_affinity
.required
.node_selector_terms
134 if len(terms
) == 1 and len(terms
[0].match_expressions
) == 1 and terms
[0].match_expressions
[0].key
== 'kubernetes.io/hostname' and len(terms
[0].match_expressions
[0].values
) == 1:
135 node
= terms
[0].match_expressions
[0].values
[0]
136 size
= self
.convert_size(i
.spec
.capacity
['storage'])
137 path
= i
.spec
.host_path
.path
if i
.spec
.host_path
else i
.spec
.local
.path
if i
.spec
.local
else ('/dev/' + i
.metadata
.annotations
['storage.openshift.com/device-name']) if i
.metadata
.annotations
and 'storage.openshift.com/device-name' in i
.metadata
.annotations
else ''
138 state
= i
.spec
.volume_mode
== 'Block' and i
.status
.phase
== 'Available'
139 pv_name
= i
.metadata
.name
149 return (node
, device
)
152 class LSOFetcher(DefaultFetcher
):
153 def __init__(self
, storage_class
: 'str', coreV1_api
: 'client.CoreV1Api', customObjects_api
: 'client.CustomObjectsApi', nodenames
: 'Optional[List[str]]' = None):
154 super().__init
__(storage_class
, coreV1_api
)
155 self
.customObjects_api
= customObjects_api
156 self
.nodenames
= nodenames
158 def fetch(self
) -> None:
160 self
.discovery
: KubernetesCustomResource
= KubernetesCustomResource(self
.customObjects_api
.list_cluster_custom_object
,
161 group
="local.storage.openshift.io",
163 plural
="localvolumediscoveryresults")
165 def predicate(self
, item
: 'client.V1ConfigMapList') -> bool:
166 if self
.nodenames
is not None:
167 return item
['spec']['nodeName'] in self
.nodenames
171 def devices(self
) -> Dict
[str, List
[Device
]]:
173 lso_discovery_results
= [i
for i
in self
.discovery
.items
if self
.predicate(i
)]
174 except ApiException
as dummy_e
:
175 log
.error("Failed to fetch device metadata")
177 self
.lso_devices
= {}
178 for i
in lso_discovery_results
:
179 drives
= i
['status']['discoveredDevices']
181 self
.lso_devices
[drive
['deviceID'].split('/')[-1]] = drive
182 nodename_to_devices
: Dict
[str, List
[Device
]] = {}
183 for i
in self
.pvs_in_sc
:
184 node
, device
= (None, None)
185 if (not i
.metadata
.annotations
) or ('storage.openshift.com/device-id' not in i
.metadata
.annotations
) or (i
.metadata
.annotations
['storage.openshift.com/device-id'] not in self
.lso_devices
):
186 node
, device
= super().device(i
)
188 node
, device
= self
.device(i
)
189 if node
not in nodename_to_devices
:
190 nodename_to_devices
[node
] = []
191 nodename_to_devices
[node
].append(device
)
192 return nodename_to_devices
194 def device(self
, i
: Any
) -> Tuple
[str, Device
]:
195 node
= i
.metadata
.labels
['kubernetes.io/hostname']
196 device_discovery
= self
.lso_devices
[i
.metadata
.annotations
['storage.openshift.com/device-id']]
197 pv_name
= i
.metadata
.name
198 vendor
: str = device_discovery
['model'].split()[0] if len(device_discovery
['model'].split()) >= 1 else ''
199 model
: str = ' '.join(device_discovery
['model'].split()[1:]) if len(device_discovery
['model'].split()) > 1 else ''
201 path
= device_discovery
['path'],
203 size
= device_discovery
['size'],
204 rotational
= '1' if device_discovery
['property']=='Rotational' else '0',
210 available
= device_discovery
['status']['state']=='Available',
211 device_id
= device_discovery
['deviceID'].split('/')[-1],
213 serialNum
= device_discovery
['serial']
216 return (node
, device
)
218 class KubernetesResource(Generic
[T
]):
219 def __init__(self
, api_func
: Callable
, **kwargs
: Any
) -> None:
221 Generic kubernetes Resource parent class
223 The api fetch and watch methods should be common across resource types,
225 Exceptions in the runner thread are propagated to the caller.
227 :param api_func: kubernetes client api function that is passed to the watcher
228 :param filter_func: signature: ``(Item) -> bool``.
231 self
.api_func
= api_func
233 # ``_items`` is accessed by different threads. I assume assignment is atomic.
234 self
._items
: Dict
[str, T
] = dict()
235 self
.thread
= None # type: Optional[threading.Thread]
236 self
.exception
: Optional
[Exception] = None
237 if not _urllib3_supports_read_chunked
:
238 logging
.info('urllib3 is too old. Fallback to full fetches')
240 def _fetch(self
) -> str:
241 """ Execute the requested api method as a one-off fetch"""
242 response
= self
.api_func(**self
.kwargs
)
243 metadata
= response
.metadata
244 self
._items
= {item
.metadata
.name
: item
for item
in response
.items
}
245 log
.info('Full fetch of {}. result: {}'.format(self
.api_func
, len(self
._items
)))
246 return metadata
.resource_version
249 def items(self
) -> Iterable
[T
]:
251 Returns the items of the request.
252 Creates the watcher as a side effect.
257 self
.exception
= None
258 raise e
# Propagate the exception to the user.
259 if not self
.thread
or not self
.thread
.is_alive():
260 resource_version
= self
._fetch
()
261 if _urllib3_supports_read_chunked
:
262 # Start a thread which will use the kubernetes watch client against a resource
263 log
.debug("Attaching resource watcher for k8s {}".format(self
.api_func
))
264 self
.thread
= self
._watch
(resource_version
)
266 return self
._items
.values()
268 def get_item_name(self
, item
: Any
) -> Any
:
270 return item
.metadata
.name
271 except AttributeError:
272 raise AttributeError(
273 "{} doesn't contain a metadata.name. Unable to track changes".format(
276 def _watch(self
, res_ver
: Optional
[str]) -> None:
277 """ worker thread that runs the kubernetes watch """
279 self
.exception
= None
284 # execute generator to continually watch resource for changes
285 for event
in w
.stream(self
.api_func
, resource_version
=res_ver
, watch
=True,
288 item
= event
['object']
289 name
= self
.get_item_name(item
)
291 log
.info('{} event: {}'.format(event
['type'], name
))
293 if event
['type'] in ('ADDED', 'MODIFIED'):
294 self
._items
= merge_dicts(self
._items
, {name
: item
})
295 elif event
['type'] == 'DELETED':
296 self
._items
= {k
:v
for k
,v
in self
._items
.items() if k
!= name
}
297 elif event
['type'] == 'BOOKMARK':
299 elif event
['type'] == 'ERROR':
300 raise ApiException(str(event
))
302 raise KeyError('Unknown watch event {}'.format(event
['type']))
303 except ProtocolError
as e
:
304 if 'Connection broken' in str(e
):
305 log
.info('Connection reset.')
308 except ApiException
as e
:
309 log
.exception('K8s API failed. {}'.format(self
.api_func
))
312 except Exception as e
:
313 log
.exception("Watcher failed. ({})".format(self
.api_func
))
317 class KubernetesCustomResource(KubernetesResource
):
318 def _fetch(self
) -> str:
319 response
= self
.api_func(**self
.kwargs
)
320 metadata
= response
['metadata']
321 self
._items
= {item
['metadata']['name']: item
for item
in response
['items']}
322 log
.info('Full fetch of {}. result: {}'.format(self
.api_func
, len(self
._items
)))
323 return metadata
['resourceVersion']
325 def get_item_name(self
, item
: Any
) -> Any
:
327 return item
['metadata']['name']
328 except AttributeError:
329 raise AttributeError(
330 "{} doesn't contain a metadata.name. Unable to track changes".format(
333 class DefaultCreator():
334 def __init__(self
, inventory
: 'Dict[str, List[Device]]', coreV1_api
: 'client.CoreV1Api', storage_class
: 'str'):
335 self
.coreV1_api
= coreV1_api
336 self
.storage_class
= storage_class
337 self
.inventory
= inventory
339 def device_to_device_set(self
, drive_group
: DriveGroupSpec
, d
: Device
) -> ccl
.StorageClassDeviceSetsItem
:
340 device_set
= ccl
.StorageClassDeviceSetsItem(
341 name
=d
.sys_api
['pv_name'],
342 volumeClaimTemplates
= ccl
.VolumeClaimTemplatesList(),
344 encrypted
=drive_group
.encrypted
,
347 device_set
.volumeClaimTemplates
.append(
348 ccl
.VolumeClaimTemplatesItem(
349 metadata
=ccl
.Metadata(
353 storageClassName
=self
.storage_class
,
355 accessModes
=ccl
.CrdObjectList(["ReadWriteOnce"]),
361 volumeName
=d
.sys_api
['pv_name']
367 def filter_devices(self
, rook_pods
: KubernetesResource
, drive_group
: DriveGroupSpec
, matching_hosts
: List
[str]) -> List
[Device
]:
369 assert drive_group
.data_devices
is not None
370 sizematcher
: Optional
[SizeMatcher
] = None
371 if drive_group
.data_devices
.size
:
372 sizematcher
= SizeMatcher('size', drive_group
.data_devices
.size
)
373 limit
= getattr(drive_group
.data_devices
, 'limit', None)
375 all
= getattr(drive_group
.data_devices
, 'all', None)
376 paths
= [device
.path
for device
in drive_group
.data_devices
.paths
]
378 for pod
in rook_pods
.items
:
380 hasattr(pod
, 'metadata')
381 and hasattr(pod
.metadata
, 'labels')
382 and 'osd' in pod
.metadata
.labels
383 and 'ceph.rook.io/DeviceSet' in pod
.metadata
.labels
385 osd_list
.append(pod
.metadata
.labels
['ceph.rook.io/DeviceSet'])
386 for _
, node
in self
.inventory
.items():
388 if device
.sys_api
['pv_name'] in osd_list
:
390 for _
, node
in self
.inventory
.items():
392 if not limit
or (count
< limit
):
397 device
.sys_api
['node'] in matching_hosts
398 and ((sizematcher
!= None) or sizematcher
.compare(device
))
400 not drive_group
.data_devices
.paths
401 or (device
.path
in paths
)
405 device_list
.append(device
)
410 def add_osds(self
, rook_pods
: KubernetesResource
, drive_group
: DriveGroupSpec
, matching_hosts
: List
[str]) -> Any
:
411 to_create
= self
.filter_devices(rook_pods
, drive_group
,matching_hosts
)
412 assert drive_group
.data_devices
is not None
413 def _add_osds(current_cluster
, new_cluster
):
414 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
415 if not hasattr(new_cluster
.spec
, 'storage') or not new_cluster
.spec
.storage
:
416 new_cluster
.spec
.storage
= ccl
.Storage()
418 if not hasattr(new_cluster
.spec
.storage
, 'storageClassDeviceSets') or not new_cluster
.spec
.storage
.storageClassDeviceSets
:
419 new_cluster
.spec
.storage
.storageClassDeviceSets
= ccl
.StorageClassDeviceSetsList()
422 scds
.name
for scds
in new_cluster
.spec
.storage
.storageClassDeviceSets
424 for device
in to_create
:
425 new_scds
= self
.device_to_device_set(drive_group
, device
)
426 if new_scds
.name
not in existing_scds
:
427 new_cluster
.spec
.storage
.storageClassDeviceSets
.append(new_scds
)
431 class LSOCreator(DefaultCreator
):
432 def filter_devices(self
, rook_pods
: KubernetesResource
, drive_group
: DriveGroupSpec
, matching_hosts
: List
[str]) -> List
[Device
]:
434 assert drive_group
.data_devices
is not None
436 if drive_group
.data_devices
.size
:
437 sizematcher
= SizeMatcher('size', drive_group
.data_devices
.size
)
438 limit
= getattr(drive_group
.data_devices
, 'limit', None)
439 all
= getattr(drive_group
.data_devices
, 'all', None)
440 paths
= [device
.path
for device
in drive_group
.data_devices
.paths
]
441 vendor
= getattr(drive_group
.data_devices
, 'vendor', None)
442 model
= getattr(drive_group
.data_devices
, 'model', None)
445 for pod
in rook_pods
.items
:
447 hasattr(pod
, 'metadata')
448 and hasattr(pod
.metadata
, 'labels')
449 and 'osd' in pod
.metadata
.labels
450 and 'ceph.rook.io/DeviceSet' in pod
.metadata
.labels
452 osd_list
.append(pod
.metadata
.labels
['ceph.rook.io/DeviceSet'])
453 for _
, node
in self
.inventory
.items():
455 if device
.sys_api
['pv_name'] in osd_list
:
457 for _
, node
in self
.inventory
.items():
459 if not limit
or (count
< limit
):
464 device
.sys_api
['node'] in matching_hosts
465 and ((sizematcher
!= None) or sizematcher
.compare(device
))
467 not drive_group
.data_devices
.paths
468 or device
.path
in paths
472 or device
.sys_api
['vendor'] == vendor
476 or device
.sys_api
['model'].startsWith(model
)
480 device_list
.append(device
)
484 class DefaultRemover():
487 coreV1_api
: 'client.CoreV1Api',
488 batchV1_api
: 'client.BatchV1Api',
489 appsV1_api
: 'client.AppsV1Api',
493 mon_command
: Callable
,
496 inventory
: Dict
[str, List
[Device
]]
498 self
.batchV1_api
= batchV1_api
499 self
.appsV1_api
= appsV1_api
500 self
.coreV1_api
= coreV1_api
502 self
.osd_ids
= osd_ids
503 self
.replace_flag
= replace_flag
504 self
.force_flag
= force_flag
506 self
.mon_command
= mon_command
509 self
.rook_env
= rook_env
511 self
.inventory
= inventory
512 self
.osd_pods
: KubernetesResource
= KubernetesResource(self
.coreV1_api
.list_namespaced_pod
, namespace
='rook-ceph', label_selector
='app=rook-ceph-osd')
513 self
.jobs
: KubernetesResource
= KubernetesResource(self
.batchV1_api
.list_namespaced_job
, namespace
='rook-ceph', label_selector
='app=rook-ceph-osd-prepare')
514 self
.pvcs
: KubernetesResource
= KubernetesResource(self
.coreV1_api
.list_namespaced_persistent_volume_claim
, namespace
='rook-ceph')
517 def remove_device_sets(self
) -> str:
518 self
.to_remove
: Dict
[str, int] = {}
519 self
.pvc_to_remove
: List
[str] = []
520 for pod
in self
.osd_pods
.items
:
522 hasattr(pod
, 'metadata')
523 and hasattr(pod
.metadata
, 'labels')
524 and 'osd' in pod
.metadata
.labels
525 and pod
.metadata
.labels
['osd'] in self
.osd_ids
527 if pod
.metadata
.labels
['ceph.rook.io/DeviceSet'] in self
.to_remove
:
528 self
.to_remove
[pod
.metadata
.labels
['ceph.rook.io/DeviceSet']] = self
.to_remove
[pod
.metadata
.labels
['ceph.rook.io/DeviceSet']] + 1
530 self
.to_remove
[pod
.metadata
.labels
['ceph.rook.io/DeviceSet']] = 1
531 self
.pvc_to_remove
.append(pod
.metadata
.labels
['ceph.rook.io/pvc'])
532 def _remove_osds(current_cluster
, new_cluster
):
533 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
534 assert new_cluster
.spec
.storage
is not None and new_cluster
.spec
.storage
.storageClassDeviceSets
is not None
535 for _set
in new_cluster
.spec
.storage
.storageClassDeviceSets
:
536 if _set
.name
in self
.to_remove
:
537 if _set
.count
== self
.to_remove
[_set
.name
]:
538 new_cluster
.spec
.storage
.storageClassDeviceSets
.remove(_set
)
540 _set
.count
= _set
.count
- self
.to_remove
[_set
.name
]
542 return self
.patch(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _remove_osds
)
544 def check_force(self
) -> None:
545 if not self
.force_flag
:
546 safe_args
= {'prefix': 'osd safe-to-destroy',
547 'ids': [str(x
) for x
in self
.osd_ids
]}
548 ret
, out
, err
= self
.mon_command(safe_args
)
550 raise RuntimeError(err
)
552 def set_osds_down(self
) -> None:
554 'prefix': 'osd down',
555 'ids': [str(x
) for x
in self
.osd_ids
]
557 ret
, out
, err
= self
.mon_command(down_flag_args
)
559 raise RuntimeError(err
)
561 def scale_deployments(self
) -> None:
562 for osd_id
in self
.osd_ids
:
563 self
.appsV1_api
.patch_namespaced_deployment_scale(namespace
='rook-ceph', name
='rook-ceph-osd-{}'.format(osd_id
), body
=client
.V1Scale(
564 spec
=client
.V1ScaleSpec(
569 def set_osds_out(self
) -> None:
572 'ids': [str(x
) for x
in self
.osd_ids
]
574 ret
, out
, err
= self
.mon_command(out_flag_args
)
576 raise RuntimeError(err
)
578 def delete_deployments(self
) -> None:
579 for osd_id
in self
.osd_ids
:
580 self
.appsV1_api
.delete_namespaced_deployment(namespace
='rook-ceph', name
='rook-ceph-osd-{}'.format(osd_id
), propagation_policy
='Foreground')
582 def clean_up_prepare_jobs_and_pvc(self
) -> None:
583 for job
in self
.jobs
.items
:
584 if job
.metadata
.labels
['ceph.rook.io/pvc'] in self
.pvc_to_remove
:
585 self
.batchV1_api
.delete_namespaced_job(name
=job
.metadata
.name
, namespace
='rook-ceph', propagation_policy
='Foreground')
586 self
.coreV1_api
.delete_namespaced_persistent_volume_claim(name
=job
.metadata
.labels
['ceph.rook.io/pvc'], namespace
='rook-ceph', propagation_policy
='Foreground')
588 def purge_osds(self
) -> None:
589 for id in self
.osd_ids
:
591 'prefix': 'osd purge-actual',
593 'yes_i_really_mean_it': True
595 ret
, out
, err
= self
.mon_command(purge_args
)
597 raise RuntimeError(err
)
599 def destroy_osds(self
) -> None:
600 for id in self
.osd_ids
:
602 'prefix': 'osd destroy-actual',
604 'yes_i_really_mean_it': True
606 ret
, out
, err
= self
.mon_command(destroy_args
)
608 raise RuntimeError(err
)
610 def remove(self
) -> str:
613 except Exception as e
:
614 log
.exception("Error checking if OSDs are safe to destroy")
615 return f
"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
617 remove_result
= self
.remove_device_sets()
618 except Exception as e
:
619 log
.exception("Error patching ceph cluster CRD")
620 return f
"Not possible to modify Ceph cluster CRD: {e}"
622 self
.scale_deployments()
623 self
.delete_deployments()
624 self
.clean_up_prepare_jobs_and_pvc()
625 except Exception as e
:
626 log
.exception("Ceph cluster CRD patched, but error cleaning environment")
627 return f
"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
631 if self
.replace_flag
:
635 except Exception as e
:
636 log
.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
637 return f
"Error removing OSDs from Ceph cluster: {e}"
643 class RookCluster(object):
644 # import of client.CoreV1Api must be optional at import time.
645 # Instead allow mgr/rook to be imported anyway.
648 coreV1_api
: 'client.CoreV1Api',
649 batchV1_api
: 'client.BatchV1Api',
650 customObjects_api
: 'client.CustomObjectsApi',
651 storageV1_api
: 'client.StorageV1Api',
652 appsV1_api
: 'client.AppsV1Api',
656 self
.rook_env
= rook_env
# type: RookEnv
657 self
.coreV1_api
= coreV1_api
# client.CoreV1Api
658 self
.batchV1_api
= batchV1_api
659 self
.customObjects_api
= customObjects_api
660 self
.storageV1_api
= storageV1_api
# client.StorageV1Api
661 self
.appsV1_api
= appsV1_api
# client.AppsV1Api
662 self
.storage_class
= storage_class
# type: str
664 # TODO: replace direct k8s calls with Rook API calls
665 self
.storage_classes
: KubernetesResource
= KubernetesResource(self
.storageV1_api
.list_storage_class
)
667 self
.rook_pods
: KubernetesResource
[client
.V1Pod
] = KubernetesResource(self
.coreV1_api
.list_namespaced_pod
,
668 namespace
=self
.rook_env
.namespace
,
669 label_selector
="rook_cluster={0}".format(
670 self
.rook_env
.namespace
))
671 self
.nodes
: KubernetesResource
[client
.V1Node
] = KubernetesResource(self
.coreV1_api
.list_node
)
673 def rook_url(self
, path
: str) -> str:
674 prefix
= "/apis/ceph.rook.io/%s/namespaces/%s/" % (
675 self
.rook_env
.crd_version
, self
.rook_env
.namespace
)
676 return urljoin(prefix
, path
)
678 def rook_api_call(self
, verb
: str, path
: str, **kwargs
: Any
) -> Any
:
679 full_path
= self
.rook_url(path
)
680 log
.debug("[%s] %s" % (verb
, full_path
))
682 return self
.coreV1_api
.api_client
.call_api(
685 auth_settings
=['BearerToken'],
686 response_type
="object",
687 _return_http_data_only
=True,
688 _preload_content
=True,
691 def rook_api_get(self
, path
: str, **kwargs
: Any
) -> Any
:
692 return self
.rook_api_call("GET", path
, **kwargs
)
694 def rook_api_delete(self
, path
: str) -> Any
:
695 return self
.rook_api_call("DELETE", path
)
697 def rook_api_patch(self
, path
: str, **kwargs
: Any
) -> Any
:
698 return self
.rook_api_call("PATCH", path
,
699 header_params
={"Content-Type": "application/json-patch+json"},
702 def rook_api_post(self
, path
: str, **kwargs
: Any
) -> Any
:
703 return self
.rook_api_call("POST", path
, **kwargs
)
705 def get_storage_class(self
) -> 'client.V1StorageClass':
706 matching_sc
= [i
for i
in self
.storage_classes
.items
if self
.storage_class
== i
.metadata
.name
]
707 if len(matching_sc
) == 0:
708 log
.error(f
"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
709 raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
710 return matching_sc
[0]
712 def get_discovered_devices(self
, nodenames
: Optional
[List
[str]] = None) -> Dict
[str, List
[Device
]]:
713 storage_class
= self
.get_storage_class()
714 self
.fetcher
: Optional
[DefaultFetcher
] = None
715 if storage_class
.metadata
.labels
and ('local.storage.openshift.io/owner-name' in storage_class
.metadata
.labels
):
716 self
.fetcher
= LSOFetcher(self
.storage_class
, self
.coreV1_api
, self
.customObjects_api
, nodenames
)
718 self
.fetcher
= DefaultFetcher(self
.storage_class
, self
.coreV1_api
)
720 return self
.fetcher
.devices()
722 def get_osds(self
) -> List
:
723 osd_pods
: KubernetesResource
= KubernetesResource(self
.coreV1_api
.list_namespaced_pod
, namespace
='rook-ceph', label_selector
='app=rook-ceph-osd')
724 return list(osd_pods
.items
)
726 def get_nfs_conf_url(self
, nfs_cluster
: str, instance
: str) -> Optional
[str]:
728 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
729 # URL for the instance within that cluster. If the fetch fails, just
733 ceph_nfs
= self
.rook_api_get("cephnfses/{0}".format(nfs_cluster
))
734 except ApiException
as e
:
735 log
.info("Unable to fetch cephnfs object: {}".format(e
.status
))
738 pool
= ceph_nfs
['spec']['rados']['pool']
739 namespace
= ceph_nfs
['spec']['rados'].get('namespace', None)
741 if namespace
== None:
742 url
= "rados://{0}/conf-{1}.{2}".format(pool
, nfs_cluster
, instance
)
744 url
= "rados://{0}/{1}/conf-{2}.{3}".format(pool
, namespace
, nfs_cluster
, instance
)
747 def describe_pods(self
,
748 service_type
: Optional
[str],
749 service_id
: Optional
[str],
750 nodename
: Optional
[str]) -> List
[Dict
[str, Any
]]:
752 Go query the k8s API about deployment, containers related to this
755 Example Rook Pod labels for a mgr daemon:
756 Labels: app=rook-ceph-mgr
757 pod-template-hash=2171958073
759 And MDS containers additionally have `rook_filesystem` label
761 Label filter is rook_cluster=<cluster namespace>
762 rook_file_system=<self.fs_name>
765 # type: (client.V1Pod) -> bool
766 metadata
= item
.metadata
767 if service_type
is not None:
768 if metadata
.labels
['app'] != "rook-ceph-{0}".format(service_type
):
771 if service_id
is not None:
774 "mds": ("rook_file_system", service_id
),
775 "osd": ("ceph-osd-id", service_id
),
776 "mon": ("mon", service_id
),
777 "mgr": ("mgr", service_id
),
778 "nfs": ("nfs", service_id
),
779 "rgw": ("ceph_rgw", service_id
),
782 raise orchestrator
.OrchestratorValidationError(
783 '{} not supported'.format(service_type
))
784 if metadata
.labels
[k
] != v
:
787 if nodename
is not None:
788 if item
.spec
.node_name
!= nodename
:
792 refreshed
= datetime_now()
793 pods
= [i
for i
in self
.rook_pods
.items
if predicate(i
)]
802 for c
in d
['spec']['containers']:
803 # look at the first listed container in the pod...
804 image_name
= c
['image']
807 ls
= d
['status'].get('container_statuses')
809 # ignore pods with no containers
811 image_id
= ls
[0]['image_id']
812 image_id
= image_id
.split(prefix
)[1] if prefix
in image_id
else image_id
815 "name": d
['metadata']['name'],
816 "hostname": d
['spec']['node_name'],
817 "labels": d
['metadata']['labels'],
818 'phase': d
['status']['phase'],
819 'container_image_name': image_name
,
820 'container_image_id': image_id
,
821 'refreshed': refreshed
,
822 # these may get set below...
828 if d
['metadata'].get('creation_timestamp', None):
829 s
['created'] = d
['metadata']['creation_timestamp'].astimezone(
830 tz
=datetime
.timezone
.utc
)
831 if d
['status'].get('start_time', None):
832 s
['started'] = d
['status']['start_time'].astimezone(
833 tz
=datetime
.timezone
.utc
)
835 pods_summary
.append(s
)
839 def remove_pods(self
, names
: List
[str]) -> List
[str]:
840 pods
= [i
for i
in self
.rook_pods
.items
]
843 daemon_type
= d
['metadata']['labels']['app'].replace('rook-ceph-','')
844 daemon_id
= d
['metadata']['labels']['ceph_daemon_id']
845 name
= daemon_type
+ '.' + daemon_id
847 self
.coreV1_api
.delete_namespaced_pod(
848 d
['metadata']['name'],
849 self
.rook_env
.namespace
,
850 body
=client
.V1DeleteOptions()
852 return [f
'Removed Pod {n}' for n
in names
]
854 def get_node_names(self
) -> List
[str]:
855 return [i
.metadata
.name
for i
in self
.nodes
.items
]
858 def ignore_409(self
, what
: str) -> Iterator
[None]:
861 except ApiException
as e
:
863 # Idempotent, succeed.
864 log
.info("{} already exists".format(what
))
868 def apply_filesystem(self
, spec
: ServiceSpec
, num_replicas
: int,
869 leaf_type
: str) -> str:
870 # TODO use spec.placement
871 # TODO warn if spec.extended has entries we don't kow how
873 all_hosts
= self
.get_hosts()
874 def _update_fs(new
: cfs
.CephFilesystem
) -> cfs
.CephFilesystem
:
875 new
.spec
.metadataServer
.activeCount
= spec
.placement
.count
or 1
876 new
.spec
.metadataServer
.placement
= cfs
.Placement(
877 nodeAffinity
=cfs
.NodeAffinity(
878 requiredDuringSchedulingIgnoredDuringExecution
=cfs
.RequiredDuringSchedulingIgnoredDuringExecution(
879 nodeSelectorTerms
=cfs
.NodeSelectorTermsList(
880 [placement_spec_to_node_selector(spec
.placement
, all_hosts
)]
886 def _create_fs() -> cfs
.CephFilesystem
:
887 fs
= cfs
.CephFilesystem(
888 apiVersion
=self
.rook_env
.api_name
,
890 name
=spec
.service_id
,
891 namespace
=self
.rook_env
.namespace
,
894 dataPools
=cfs
.DataPoolsList(
897 failureDomain
=leaf_type
,
898 replicated
=cfs
.Replicated(
904 metadataPool
=cfs
.MetadataPool(
905 failureDomain
=leaf_type
,
906 replicated
=cfs
.Replicated(
910 metadataServer
=cfs
.MetadataServer(
911 activeCount
=spec
.placement
.count
or 1,
915 nodeAffinity
=cfs
.NodeAffinity(
916 requiredDuringSchedulingIgnoredDuringExecution
=cfs
.RequiredDuringSchedulingIgnoredDuringExecution(
917 nodeSelectorTerms
=cfs
.NodeSelectorTermsList(
918 [placement_spec_to_node_selector(spec
.placement
, all_hosts
)]
927 assert spec
.service_id
is not None
928 return self
._create
_or
_patch
(
929 cfs
.CephFilesystem
, 'cephfilesystems', spec
.service_id
,
930 _update_fs
, _create_fs
)
932 def get_matching_node(self
, host
: str) -> Any
:
934 for node
in self
.nodes
.items
:
935 if node
.metadata
.labels
['kubernetes.io/hostname'] == host
:
939 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
940 matching_node
= self
.get_matching_node(host
)
941 if matching_node
== None:
942 return OrchResult(None, RuntimeError(f
"Cannot add {label} label to {host}: host not found in cluster"))
943 matching_node
.metadata
.labels
['ceph-label/'+ label
] = ""
944 self
.coreV1_api
.patch_node(host
, matching_node
)
945 return OrchResult(f
'Added {label} label to {host}')
947 def remove_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
948 matching_node
= self
.get_matching_node(host
)
949 if matching_node
== None:
950 return OrchResult(None, RuntimeError(f
"Cannot remove {label} label from {host}: host not found in cluster"))
951 matching_node
.metadata
.labels
.pop('ceph-label/' + label
, None)
952 self
.coreV1_api
.patch_node(host
, matching_node
)
953 return OrchResult(f
'Removed {label} label from {host}')
955 def apply_objectstore(self
, spec
: RGWSpec
, num_replicas
: int, leaf_type
: str) -> str:
956 assert spec
.service_id
is not None
958 name
= spec
.service_id
960 if '.' in spec
.service_id
:
961 # rook does not like . in the name. this is could
962 # there because it is a legacy rgw spec that was named
963 # like $realm.$zone, except that I doubt there were any
964 # users of this code. Instead, focus on future users and
965 # translate . to - (fingers crossed!) instead.
966 name
= spec
.service_id
.replace('.', '-')
968 all_hosts
= self
.get_hosts()
969 def _create_zone() -> cos
.CephObjectStore
:
973 secure_port
= spec
.get_port()
975 port
= spec
.get_port()
976 object_store
= cos
.CephObjectStore(
977 apiVersion
=self
.rook_env
.api_name
,
980 namespace
=self
.rook_env
.namespace
985 securePort
=secure_port
,
986 instances
=spec
.placement
.count
or 1,
987 placement
=cos
.Placement(
989 requiredDuringSchedulingIgnoredDuringExecution
=cos
.RequiredDuringSchedulingIgnoredDuringExecution(
990 nodeSelectorTerms
=cos
.NodeSelectorTermsList(
992 placement_spec_to_node_selector(spec
.placement
, all_hosts
)
999 dataPool
=cos
.DataPool(
1000 failureDomain
=leaf_type
,
1001 replicated
=cos
.Replicated(
1005 metadataPool
=cos
.MetadataPool(
1006 failureDomain
=leaf_type
,
1007 replicated
=cos
.Replicated(
1014 object_store
.spec
.zone
=cos
.Zone(
1020 def _update_zone(new
: cos
.CephObjectStore
) -> cos
.CephObjectStore
:
1021 if new
.spec
.gateway
:
1022 new
.spec
.gateway
.instances
= spec
.placement
.count
or 1
1024 new
.spec
.gateway
=cos
.Gateway(
1025 instances
=spec
.placement
.count
or 1
1028 return self
._create
_or
_patch
(
1029 cos
.CephObjectStore
, 'cephobjectstores', name
,
1030 _update_zone
, _create_zone
)
1032 def apply_nfsgw(self
, spec
: NFSServiceSpec
, mgr
: 'RookOrchestrator') -> str:
1033 # TODO use spec.placement
1034 # TODO warn if spec.extended has entries we don't kow how
1036 # TODO Number of pods should be based on the list of hosts in the
1038 assert spec
.service_id
, "service id in NFS service spec cannot be an empty string or None " # for mypy typing
1039 service_id
= spec
.service_id
1040 mgr_module
= cast(Module
, mgr
)
1041 count
= spec
.placement
.count
or 1
1042 def _update_nfs(new
: cnfs
.CephNFS
) -> cnfs
.CephNFS
:
1043 new
.spec
.server
.active
= count
1046 def _create_nfs() -> cnfs
.CephNFS
:
1047 rook_nfsgw
= cnfs
.CephNFS(
1048 apiVersion
=self
.rook_env
.api_name
,
1050 name
=spec
.service_id
,
1051 namespace
=self
.rook_env
.namespace
,
1055 namespace
=service_id
,
1067 create_ganesha_pool(mgr
)
1068 NFSRados(mgr_module
, service_id
).write_obj('', f
'conf-nfs.{spec.service_id}')
1069 return self
._create
_or
_patch
(cnfs
.CephNFS
, 'cephnfses', service_id
,
1070 _update_nfs
, _create_nfs
)
1072 def rm_service(self
, rooktype
: str, service_id
: str) -> str:
1073 self
.customObjects_api
.delete_namespaced_custom_object(group
="ceph.rook.io", version
="v1", namespace
="rook-ceph", plural
=rooktype
, name
=service_id
)
1074 objpath
= "{0}/{1}".format(rooktype
, service_id
)
1075 return f
'Removed {objpath}'
1077 def get_resource(self
, resource_type
: str) -> Iterable
:
1078 custom_objects
: KubernetesCustomResource
= KubernetesCustomResource(self
.customObjects_api
.list_namespaced_custom_object
, group
="ceph.rook.io", version
="v1", namespace
="rook-ceph", plural
=resource_type
)
1079 return custom_objects
.items
1081 def can_create_osd(self
) -> bool:
1082 current_cluster
= self
.rook_api_get(
1083 "cephclusters/{0}".format(self
.rook_env
.cluster_name
))
1084 use_all_nodes
= current_cluster
['spec'].get('useAllNodes', False)
1086 # If useAllNodes is set, then Rook will not be paying attention
1087 # to anything we put in 'nodes', so can't do OSD creation.
1088 return not use_all_nodes
1090 def node_exists(self
, node_name
: str) -> bool:
1091 return node_name
in self
.get_node_names()
1093 def update_mon_count(self
, newcount
: Optional
[int]) -> str:
1094 def _update_mon_count(current
, new
):
1095 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
1096 if newcount
is None:
1097 raise orchestrator
.OrchestratorError('unable to set mon count to None')
1098 if not new
.spec
.mon
:
1099 raise orchestrator
.OrchestratorError("mon attribute not specified in new spec")
1100 new
.spec
.mon
.count
= newcount
1102 return self
._patch
(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _update_mon_count
)
1104 def add_osds(self
, drive_group
, matching_hosts
):
1105 # type: (DriveGroupSpec, List[str]) -> str
1106 assert drive_group
.objectstore
in ("bluestore", "filestore")
1107 assert drive_group
.service_id
1108 storage_class
= self
.get_storage_class()
1109 inventory
= self
.get_discovered_devices()
1110 creator
: Optional
[DefaultCreator
] = None
1112 storage_class
.metadata
.labels
1113 and 'local.storage.openshift.io/owner-name' in storage_class
.metadata
.labels
1115 creator
= LSOCreator(inventory
, self
.coreV1_api
, self
.storage_class
)
1117 creator
= DefaultCreator(inventory
, self
.coreV1_api
, self
.storage_class
)
1121 self
.rook_env
.cluster_name
,
1122 creator
.add_osds(self
.rook_pods
, drive_group
, matching_hosts
)
1125 def remove_osds(self
, osd_ids
: List
[str], replace
: bool, force
: bool, mon_command
: Callable
) -> str:
1126 inventory
= self
.get_discovered_devices()
1127 self
.remover
= DefaultRemover(
1139 return self
.remover
.remove()
1141 def get_hosts(self
) -> List
[orchestrator
.HostSpec
]:
1143 for node
in self
.nodes
.items
:
1144 spec
= orchestrator
.HostSpec(
1146 addr
='/'.join([addr
.address
for addr
in node
.status
.addresses
]),
1147 labels
=[label
.split('/')[1] for label
in node
.metadata
.labels
if label
.startswith('ceph-label')],
1152 def create_zap_job(self
, host
: str, path
: str) -> None:
1153 body
= client
.V1Job(
1154 api_version
="batch/v1",
1155 metadata
=client
.V1ObjectMeta(
1156 name
="rook-ceph-device-zap",
1157 namespace
="rook-ceph"
1159 spec
=client
.V1JobSpec(
1160 template
=client
.V1PodTemplateSpec(
1161 spec
=client
.V1PodSpec(
1165 image
="rook/ceph:master",
1167 args
=["-c", f
"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
1170 name
="ROOK_CEPH_USERNAME",
1171 value_from
=client
.V1EnvVarSource(
1172 secret_key_ref
=client
.V1SecretKeySelector(
1173 key
="ceph-username",
1174 name
="rook-ceph-mon"
1179 name
="ROOK_CEPH_SECRET",
1180 value_from
=client
.V1EnvVarSource(
1181 secret_key_ref
=client
.V1SecretKeySelector(
1183 name
="rook-ceph-mon"
1188 security_context
=client
.V1SecurityContext(
1193 client
.V1VolumeMount(
1194 mount_path
="/etc/ceph",
1195 name
="ceph-conf-emptydir"
1197 client
.V1VolumeMount(
1198 mount_path
="/etc/rook",
1201 client
.V1VolumeMount(
1210 name
="ceph-conf-emptydir",
1211 empty_dir
=client
.V1EmptyDirVolumeSource()
1215 empty_dir
=client
.V1EmptyDirVolumeSource()
1219 host_path
=client
.V1HostPathVolumeSource(
1225 "kubernetes.io/hostname": host
1227 restart_policy
="Never"
1232 self
.batchV1_api
.create_namespaced_job('rook-ceph', body
)
1234 def rbd_mirror(self
, spec
: ServiceSpec
) -> None:
1235 service_id
= spec
.service_id
or "default-rbd-mirror"
1236 all_hosts
= self
.get_hosts()
1237 def _create_rbd_mirror() -> crbdm
.CephRBDMirror
:
1238 return crbdm
.CephRBDMirror(
1239 apiVersion
=self
.rook_env
.api_name
,
1242 namespace
=self
.rook_env
.namespace
,
1245 count
=spec
.placement
.count
or 1,
1246 placement
=crbdm
.Placement(
1247 nodeAffinity
=crbdm
.NodeAffinity(
1248 requiredDuringSchedulingIgnoredDuringExecution
=crbdm
.RequiredDuringSchedulingIgnoredDuringExecution(
1249 nodeSelectorTerms
=crbdm
.NodeSelectorTermsList(
1251 placement_spec_to_node_selector(spec
.placement
, all_hosts
)
1259 def _update_rbd_mirror(new
: crbdm
.CephRBDMirror
) -> crbdm
.CephRBDMirror
:
1260 new
.spec
.count
= spec
.placement
.count
or 1
1261 new
.spec
.placement
= crbdm
.Placement(
1262 nodeAffinity
=crbdm
.NodeAffinity(
1263 requiredDuringSchedulingIgnoredDuringExecution
=crbdm
.RequiredDuringSchedulingIgnoredDuringExecution(
1264 nodeSelectorTerms
=crbdm
.NodeSelectorTermsList(
1266 placement_spec_to_node_selector(spec
.placement
, all_hosts
)
1273 self
._create
_or
_patch
(crbdm
.CephRBDMirror
, 'cephrbdmirrors', service_id
, _update_rbd_mirror
, _create_rbd_mirror
)
1274 def _patch(self
, crd
: Type
, crd_name
: str, cr_name
: str, func
: Callable
[[CrdClassT
, CrdClassT
], CrdClassT
]) -> str:
1275 current_json
= self
.rook_api_get(
1276 "{}/{}".format(crd_name
, cr_name
)
1279 current
= crd
.from_json(current_json
)
1280 new
= crd
.from_json(current_json
) # no deepcopy.
1282 new
= func(current
, new
)
1284 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
1286 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
1292 self
.rook_api_patch(
1293 "{}/{}".format(crd_name
, cr_name
),
1295 except ApiException
as e
:
1296 log
.exception("API exception: {0}".format(e
))
1297 raise ApplyException(
1298 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
1302 def _create_or_patch(self
,
1306 update_func
: Callable
[[CrdClassT
], CrdClassT
],
1307 create_func
: Callable
[[], CrdClassT
]) -> str:
1309 current_json
= self
.rook_api_get(
1310 "{}/{}".format(crd_name
, cr_name
)
1312 except ApiException
as e
:
1319 new
= crd
.from_json(current_json
) # no deepcopy.
1321 new
= update_func(new
)
1323 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
1325 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
1331 self
.rook_api_patch(
1332 "{}/{}".format(crd_name
, cr_name
),
1334 except ApiException
as e
:
1335 log
.exception("API exception: {0}".format(e
))
1336 raise ApplyException(
1337 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
1341 with self
.ignore_409("{} {} already exists".format(crd_name
,
1343 self
.rook_api_post("{}/".format(crd_name
),
1346 def get_ceph_image(self
) -> str:
1348 api_response
= self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
1349 label_selector
="app=rook-ceph-mon",
1351 if api_response
.items
:
1352 return api_response
.items
[-1].spec
.containers
[0].image
1354 raise orchestrator
.OrchestratorError(
1355 "Error getting ceph image. Cluster without monitors")
1356 except ApiException
as e
:
1357 raise orchestrator
.OrchestratorError("Error getting ceph image: {}".format(e
))
1360 def _execute_blight_job(self
, ident_fault
: str, on
: bool, loc
: orchestrator
.DeviceLightLoc
) -> str:
1361 operation_id
= str(hash(loc
))
1365 job_metadata
= client
.V1ObjectMeta(name
=operation_id
,
1366 namespace
= self
.rook_env
.namespace
,
1367 labels
={"ident": operation_id
})
1368 pod_metadata
= client
.V1ObjectMeta(labels
={"ident": operation_id
})
1369 pod_container
= client
.V1Container(name
="ceph-lsmcli-command",
1370 security_context
=client
.V1SecurityContext(privileged
=True),
1371 image
=self
.get_ceph_image(),
1372 command
=["lsmcli",],
1373 args
=['local-disk-%s-led-%s' % (ident_fault
,'on' if on
else 'off'),
1374 '--path', loc
.path
or loc
.dev
,],
1375 volume_mounts
=[client
.V1VolumeMount(name
="devices", mount_path
="/dev"),
1376 client
.V1VolumeMount(name
="run-udev", mount_path
="/run/udev")])
1377 pod_spec
= client
.V1PodSpec(containers
=[pod_container
],
1378 active_deadline_seconds
=30, # Max time to terminate pod
1379 restart_policy
="Never",
1380 node_selector
= {"kubernetes.io/hostname": loc
.host
},
1381 volumes
=[client
.V1Volume(name
="devices",
1382 host_path
=client
.V1HostPathVolumeSource(path
="/dev")),
1383 client
.V1Volume(name
="run-udev",
1384 host_path
=client
.V1HostPathVolumeSource(path
="/run/udev"))])
1385 pod_template
= client
.V1PodTemplateSpec(metadata
=pod_metadata
,
1387 job_spec
= client
.V1JobSpec(active_deadline_seconds
=60, # Max time to terminate job
1388 ttl_seconds_after_finished
=10, # Alfa. Lifetime after finishing (either Complete or Failed)
1390 template
=pod_template
)
1391 job
= client
.V1Job(api_version
="batch/v1",
1393 metadata
=job_metadata
,
1396 # delete previous job if it exists
1399 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
1400 self
.rook_env
.namespace
,
1401 propagation_policy
="Background")
1402 except ApiException
as e
:
1403 if e
.status
!= 404: # No problem if the job does not exist
1406 # wait until the job is not present
1409 while not deleted
and retries
< 10:
1410 api_response
= self
.batchV1_api
.list_namespaced_job(self
.rook_env
.namespace
,
1411 label_selector
="ident=%s" % operation_id
,
1413 deleted
= not api_response
.items
1417 if retries
== 10 and not deleted
:
1418 raise orchestrator
.OrchestratorError(
1419 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
1420 on
, loc
.host
, loc
.path
or loc
.dev
, operation_id
))
1423 api_response
= self
.batchV1_api
.create_namespaced_job(self
.rook_env
.namespace
, job
)
1428 api_response
= self
.batchV1_api
.read_namespaced_job(operation_id
,
1429 self
.rook_env
.namespace
)
1430 finished
= api_response
.status
.succeeded
or api_response
.status
.failed
1432 message
= api_response
.status
.conditions
[-1].message
1434 # get the result of the lsmcli command
1435 api_response
=self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
1436 label_selector
="ident=%s" % operation_id
,
1438 if api_response
.items
:
1439 pod_name
= api_response
.items
[-1].metadata
.name
1440 message
= self
.coreV1_api
.read_namespaced_pod_log(pod_name
,
1441 self
.rook_env
.namespace
)
1443 except ApiException
as e
:
1444 log
.exception('K8s API failed. {}'.format(e
))
1447 # Finally, delete the job.
1448 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
1449 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
1451 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
1452 self
.rook_env
.namespace
,
1453 propagation_policy
="Background")
1454 except ApiException
as e
:
1455 if e
.status
!= 404: # No problem if the job does not exist
1460 def blink_light(self
, ident_fault
, on
, locs
):
1461 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
1462 return [self
._execute
_blight
_job
(ident_fault
, on
, loc
) for loc
in locs
]
1464 def placement_spec_to_node_selector(spec
: PlacementSpec
, all_hosts
: List
) -> ccl
.NodeSelectorTermsItem
:
1465 all_hostnames
= [hs
.hostname
for hs
in all_hosts
]
1466 res
= ccl
.NodeSelectorTermsItem(matchExpressions
=ccl
.MatchExpressionsList())
1467 if spec
.host_pattern
and spec
.host_pattern
!= "*":
1468 raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements")
1470 res
.matchExpressions
.append(
1471 ccl
.MatchExpressionsItem(
1472 key
="ceph-label/" + spec
.label
,
1477 host_list
= [h
.hostname
for h
in spec
.hosts
if h
.hostname
in all_hostnames
]
1478 res
.matchExpressions
.append(
1479 ccl
.MatchExpressionsItem(
1480 key
="kubernetes.io/hostname",
1482 values
=ccl
.CrdObjectList(host_list
)
1485 if spec
.host_pattern
== "*" or (not spec
.label
and not spec
.hosts
and not spec
.host_pattern
):
1486 res
.matchExpressions
.append(
1487 ccl
.MatchExpressionsItem(
1488 key
="kubernetes.io/hostname",
1494 def node_selector_to_placement_spec(node_selector
: ccl
.NodeSelectorTermsItem
) -> PlacementSpec
:
1495 res
= PlacementSpec()
1496 for expression
in node_selector
.matchExpressions
:
1497 if expression
.key
.startswith("ceph-label/"):
1498 res
.label
= expression
.key
.split('/')[1]
1499 elif expression
.key
== "kubernetes.io/hostname":
1500 if expression
.operator
== "Exists":
1501 res
.host_pattern
= "*"
1502 elif expression
.operator
== "In":
1503 res
.hosts
= [HostPlacementSpec(hostname
=value
, network
='', name
='')for value
in expression
.values
]