2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
7 This module is runnable outside of ceph-mgr, useful for testing.
12 from contextlib
import contextmanager
13 from time
import sleep
15 from orchestrator
import OrchResult
18 from urllib
.parse
import urljoin
21 # Optional kubernetes imports to enable MgrModule.can_run
23 from urllib3
.exceptions
import ProtocolError
25 from ceph
.deployment
.inventory
import Device
26 from ceph
.deployment
.drive_group
import DriveGroupSpec
27 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
, HostPlacementSpec
28 from ceph
.utils
import datetime_now
29 from ceph
.deployment
.drive_selection
.matchers
import SizeMatcher
30 from nfs
.cluster
import create_ganesha_pool
31 from nfs
.module
import Module
32 from nfs
.export
import NFSRados
33 from mgr_module
import NFS_POOL_NAME
34 from mgr_util
import merge_dicts
36 from typing
import Optional
, Tuple
, TypeVar
, List
, Callable
, Any
, cast
, Generic
, \
37 Iterable
, Dict
, Iterator
, Type
40 from kubernetes
import client
, watch
41 from kubernetes
.client
.rest
import ApiException
43 class ApiException(Exception): # type: ignore
46 from .rook_client
.ceph
import cephfilesystem
as cfs
47 from .rook_client
.ceph
import cephnfs
as cnfs
48 from .rook_client
.ceph
import cephobjectstore
as cos
49 from .rook_client
.ceph
import cephcluster
as ccl
50 from .rook_client
.ceph
import cephrbdmirror
as crbdm
51 from .rook_client
._helper
import CrdClass
56 from rook
.module
import RookEnv
, RookOrchestrator
58 pass # just used for type checking.
62 FuncT
= TypeVar('FuncT', bound
=Callable
)
64 CrdClassT
= TypeVar('CrdClassT', bound
=CrdClass
)
67 log
= logging
.getLogger(__name__
)
70 def __urllib3_supports_read_chunked() -> bool:
71 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
72 # than required by kubernetes-client
74 from urllib3
.response
import HTTPResponse
75 return hasattr(HTTPResponse
, 'read_chunked')
80 _urllib3_supports_read_chunked
= __urllib3_supports_read_chunked()
82 class ApplyException(orchestrator
.OrchestratorError
):
84 For failures to update the Rook CRDs, usually indicating
85 some kind of interference between our attempted update
86 and other conflicting activity.
90 def threaded(f
: Callable
[..., None]) -> Callable
[..., threading
.Thread
]:
91 def wrapper(*args
: Any
, **kwargs
: Any
) -> threading
.Thread
:
92 t
= threading
.Thread(target
=f
, args
=args
, kwargs
=kwargs
)
96 return cast(Callable
[..., threading
.Thread
], wrapper
)
99 class DefaultFetcher():
100 def __init__(self
, storage_class
: str, coreV1_api
: 'client.CoreV1Api'):
101 self
.storage_class
= storage_class
102 self
.coreV1_api
= coreV1_api
104 def fetch(self
) -> None:
105 self
.inventory
: KubernetesResource
[client
.V1PersistentVolumeList
] = KubernetesResource(self
.coreV1_api
.list_persistent_volume
)
106 self
.pvs_in_sc
= [i
for i
in self
.inventory
.items
if i
.spec
.storage_class_name
== self
.storage_class
]
108 def convert_size(self
, size_str
: str) -> int:
109 units
= ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
110 coeff_and_unit
= re
.search('(\d+)(\D+)', size_str
)
111 assert coeff_and_unit
is not None
112 coeff
= int(coeff_and_unit
[1])
113 unit
= coeff_and_unit
[2]
115 factor
= units
.index(unit
) % 7
117 log
.error("PV size format invalid")
119 size
= coeff
* (2 ** (10 * factor
))
122 def devices(self
) -> Dict
[str, List
[Device
]]:
123 nodename_to_devices
: Dict
[str, List
[Device
]] = {}
124 for i
in self
.pvs_in_sc
:
125 node
, device
= self
.device(i
)
126 if node
not in nodename_to_devices
:
127 nodename_to_devices
[node
] = []
128 nodename_to_devices
[node
].append(device
)
129 return nodename_to_devices
131 def device(self
, i
: 'client.V1PersistentVolume') -> Tuple
[str, Device
]:
133 if i
.spec
.node_affinity
:
134 terms
= i
.spec
.node_affinity
.required
.node_selector_terms
135 if len(terms
) == 1 and len(terms
[0].match_expressions
) == 1 and terms
[0].match_expressions
[0].key
== 'kubernetes.io/hostname' and len(terms
[0].match_expressions
[0].values
) == 1:
136 node
= terms
[0].match_expressions
[0].values
[0]
137 size
= self
.convert_size(i
.spec
.capacity
['storage'])
138 path
= i
.spec
.host_path
.path
if i
.spec
.host_path
else i
.spec
.local
.path
if i
.spec
.local
else ('/dev/' + i
.metadata
.annotations
['storage.openshift.com/device-name']) if i
.metadata
.annotations
and 'storage.openshift.com/device-name' in i
.metadata
.annotations
else ''
139 state
= i
.spec
.volume_mode
== 'Block' and i
.status
.phase
== 'Available'
140 pv_name
= i
.metadata
.name
150 return (node
, device
)
153 class LSOFetcher(DefaultFetcher
):
154 def __init__(self
, storage_class
: 'str', coreV1_api
: 'client.CoreV1Api', customObjects_api
: 'client.CustomObjectsApi', nodenames
: 'Optional[List[str]]' = None):
155 super().__init
__(storage_class
, coreV1_api
)
156 self
.customObjects_api
= customObjects_api
157 self
.nodenames
= nodenames
159 def fetch(self
) -> None:
161 self
.discovery
: KubernetesCustomResource
= KubernetesCustomResource(self
.customObjects_api
.list_cluster_custom_object
,
162 group
="local.storage.openshift.io",
164 plural
="localvolumediscoveryresults")
166 def predicate(self
, item
: 'client.V1ConfigMapList') -> bool:
167 if self
.nodenames
is not None:
168 return item
['spec']['nodeName'] in self
.nodenames
172 def devices(self
) -> Dict
[str, List
[Device
]]:
174 lso_discovery_results
= [i
for i
in self
.discovery
.items
if self
.predicate(i
)]
175 except ApiException
as dummy_e
:
176 log
.error("Failed to fetch device metadata")
178 self
.lso_devices
= {}
179 for i
in lso_discovery_results
:
180 drives
= i
['status']['discoveredDevices']
182 self
.lso_devices
[drive
['deviceID'].split('/')[-1]] = drive
183 nodename_to_devices
: Dict
[str, List
[Device
]] = {}
184 for i
in self
.pvs_in_sc
:
185 node
, device
= (None, None)
186 if (not i
.metadata
.annotations
) or ('storage.openshift.com/device-id' not in i
.metadata
.annotations
) or (i
.metadata
.annotations
['storage.openshift.com/device-id'] not in self
.lso_devices
):
187 node
, device
= super().device(i
)
189 node
, device
= self
.device(i
)
190 if node
not in nodename_to_devices
:
191 nodename_to_devices
[node
] = []
192 nodename_to_devices
[node
].append(device
)
193 return nodename_to_devices
195 def device(self
, i
: Any
) -> Tuple
[str, Device
]:
196 node
= i
.metadata
.labels
['kubernetes.io/hostname']
197 device_discovery
= self
.lso_devices
[i
.metadata
.annotations
['storage.openshift.com/device-id']]
198 pv_name
= i
.metadata
.name
199 vendor
: str = device_discovery
['model'].split()[0] if len(device_discovery
['model'].split()) >= 1 else ''
200 model
: str = ' '.join(device_discovery
['model'].split()[1:]) if len(device_discovery
['model'].split()) > 1 else ''
202 path
= device_discovery
['path'],
204 size
= device_discovery
['size'],
205 rotational
= '1' if device_discovery
['property']=='Rotational' else '0',
211 available
= device_discovery
['status']['state']=='Available',
212 device_id
= device_discovery
['deviceID'].split('/')[-1],
214 serialNum
= device_discovery
['serial']
217 return (node
, device
)
220 class PDFetcher(DefaultFetcher
):
221 """ Physical Devices Fetcher"""
222 def __init__(self
, coreV1_api
: 'client.CoreV1Api'):
223 self
.coreV1_api
= coreV1_api
225 def fetch(self
) -> None:
226 """ Collect the devices information from k8s configmaps"""
227 self
.dev_cms
: KubernetesResource
= KubernetesResource(self
.coreV1_api
.list_namespaced_config_map
,
228 namespace
='rook-ceph',
229 label_selector
='app=rook-discover')
231 def devices(self
) -> Dict
[str, List
[Device
]]:
232 """ Return the list of devices found"""
233 node_devices
: Dict
[str, List
[Device
]] = {}
234 for i
in self
.dev_cms
.items
:
235 devices_list
: List
[Device
] = []
236 for d
in json
.loads(i
.data
['devices']):
237 devices_list
.append(self
.device(d
)[1])
238 node_devices
[i
.metadata
.labels
['rook.io/node']] = devices_list
242 def device(self
, devData
: Dict
[str,str]) -> Tuple
[str, Device
]:
243 """ Build an orchestrator device """
244 if 'cephVolumeData' in devData
and devData
['cephVolumeData']:
245 return "", Device
.from_json(json
.loads(devData
['cephVolumeData']))
248 path
='/dev/' + devData
['name'],
250 rotational
='1' if devData
['rotational'] else '0',
254 rejected_reasons
=['device data coming from ceph-volume not provided'],
258 class KubernetesResource(Generic
[T
]):
259 def __init__(self
, api_func
: Callable
, **kwargs
: Any
) -> None:
261 Generic kubernetes Resource parent class
263 The api fetch and watch methods should be common across resource types,
265 Exceptions in the runner thread are propagated to the caller.
267 :param api_func: kubernetes client api function that is passed to the watcher
268 :param filter_func: signature: ``(Item) -> bool``.
271 self
.api_func
= api_func
273 # ``_items`` is accessed by different threads. I assume assignment is atomic.
274 self
._items
: Dict
[str, T
] = dict()
275 self
.thread
= None # type: Optional[threading.Thread]
276 self
.exception
: Optional
[Exception] = None
277 if not _urllib3_supports_read_chunked
:
278 logging
.info('urllib3 is too old. Fallback to full fetches')
280 def _fetch(self
) -> str:
281 """ Execute the requested api method as a one-off fetch"""
282 response
= self
.api_func(**self
.kwargs
)
283 metadata
= response
.metadata
284 self
._items
= {item
.metadata
.name
: item
for item
in response
.items
}
285 log
.info('Full fetch of {}. result: {}'.format(self
.api_func
, len(self
._items
)))
286 return metadata
.resource_version
289 def items(self
) -> Iterable
[T
]:
291 Returns the items of the request.
292 Creates the watcher as a side effect.
297 self
.exception
= None
298 raise e
# Propagate the exception to the user.
299 if not self
.thread
or not self
.thread
.is_alive():
300 resource_version
= self
._fetch
()
301 if _urllib3_supports_read_chunked
:
302 # Start a thread which will use the kubernetes watch client against a resource
303 log
.debug("Attaching resource watcher for k8s {}".format(self
.api_func
))
304 self
.thread
= self
._watch
(resource_version
)
306 return self
._items
.values()
308 def get_item_name(self
, item
: Any
) -> Any
:
310 return item
.metadata
.name
311 except AttributeError:
312 raise AttributeError(
313 "{} doesn't contain a metadata.name. Unable to track changes".format(
316 def _watch(self
, res_ver
: Optional
[str]) -> None:
317 """ worker thread that runs the kubernetes watch """
319 self
.exception
= None
324 # execute generator to continually watch resource for changes
325 for event
in w
.stream(self
.api_func
, resource_version
=res_ver
, watch
=True,
328 item
= event
['object']
329 name
= self
.get_item_name(item
)
331 log
.info('{} event: {}'.format(event
['type'], name
))
333 if event
['type'] in ('ADDED', 'MODIFIED'):
334 self
._items
= merge_dicts(self
._items
, {name
: item
})
335 elif event
['type'] == 'DELETED':
336 self
._items
= {k
:v
for k
,v
in self
._items
.items() if k
!= name
}
337 elif event
['type'] == 'BOOKMARK':
339 elif event
['type'] == 'ERROR':
340 raise ApiException(str(event
))
342 raise KeyError('Unknown watch event {}'.format(event
['type']))
343 except ProtocolError
as e
:
344 if 'Connection broken' in str(e
):
345 log
.info('Connection reset.')
348 except ApiException
as e
:
349 log
.exception('K8s API failed. {}'.format(self
.api_func
))
352 except Exception as e
:
353 log
.exception("Watcher failed. ({})".format(self
.api_func
))
357 class KubernetesCustomResource(KubernetesResource
):
358 def _fetch(self
) -> str:
359 response
= self
.api_func(**self
.kwargs
)
360 metadata
= response
['metadata']
361 self
._items
= {item
['metadata']['name']: item
for item
in response
['items']}
362 log
.info('Full fetch of {}. result: {}'.format(self
.api_func
, len(self
._items
)))
363 return metadata
['resourceVersion']
365 def get_item_name(self
, item
: Any
) -> Any
:
367 return item
['metadata']['name']
368 except AttributeError:
369 raise AttributeError(
370 "{} doesn't contain a metadata.name. Unable to track changes".format(
373 class DefaultCreator():
374 def __init__(self
, inventory
: 'Dict[str, List[Device]]', coreV1_api
: 'client.CoreV1Api', storage_class
: 'str'):
375 self
.coreV1_api
= coreV1_api
376 self
.storage_class
= storage_class
377 self
.inventory
= inventory
379 def device_to_device_set(self
, drive_group
: DriveGroupSpec
, d
: Device
) -> ccl
.StorageClassDeviceSetsItem
:
380 device_set
= ccl
.StorageClassDeviceSetsItem(
381 name
=d
.sys_api
['pv_name'],
382 volumeClaimTemplates
= ccl
.VolumeClaimTemplatesList(),
384 encrypted
=drive_group
.encrypted
,
387 device_set
.volumeClaimTemplates
.append(
388 ccl
.VolumeClaimTemplatesItem(
389 metadata
=ccl
.Metadata(
393 storageClassName
=self
.storage_class
,
395 accessModes
=ccl
.CrdObjectList(["ReadWriteOnce"]),
401 volumeName
=d
.sys_api
['pv_name']
407 def filter_devices(self
, rook_pods
: KubernetesResource
, drive_group
: DriveGroupSpec
, matching_hosts
: List
[str]) -> List
[Device
]:
409 assert drive_group
.data_devices
is not None
410 sizematcher
: Optional
[SizeMatcher
] = None
411 if drive_group
.data_devices
.size
:
412 sizematcher
= SizeMatcher('size', drive_group
.data_devices
.size
)
413 limit
= getattr(drive_group
.data_devices
, 'limit', None)
415 all
= getattr(drive_group
.data_devices
, 'all', None)
416 paths
= [device
.path
for device
in drive_group
.data_devices
.paths
]
418 for pod
in rook_pods
.items
:
420 hasattr(pod
, 'metadata')
421 and hasattr(pod
.metadata
, 'labels')
422 and 'osd' in pod
.metadata
.labels
423 and 'ceph.rook.io/DeviceSet' in pod
.metadata
.labels
425 osd_list
.append(pod
.metadata
.labels
['ceph.rook.io/DeviceSet'])
426 for _
, node
in self
.inventory
.items():
428 if device
.sys_api
['pv_name'] in osd_list
:
430 for _
, node
in self
.inventory
.items():
432 if not limit
or (count
< limit
):
437 device
.sys_api
['node'] in matching_hosts
438 and ((sizematcher
!= None) or sizematcher
.compare(device
))
440 not drive_group
.data_devices
.paths
441 or (device
.path
in paths
)
445 device_list
.append(device
)
450 def add_osds(self
, rook_pods
: KubernetesResource
, drive_group
: DriveGroupSpec
, matching_hosts
: List
[str]) -> Any
:
451 to_create
= self
.filter_devices(rook_pods
, drive_group
,matching_hosts
)
452 assert drive_group
.data_devices
is not None
453 def _add_osds(current_cluster
, new_cluster
):
454 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
455 if not hasattr(new_cluster
.spec
, 'storage') or not new_cluster
.spec
.storage
:
456 new_cluster
.spec
.storage
= ccl
.Storage()
458 if not hasattr(new_cluster
.spec
.storage
, 'storageClassDeviceSets') or not new_cluster
.spec
.storage
.storageClassDeviceSets
:
459 new_cluster
.spec
.storage
.storageClassDeviceSets
= ccl
.StorageClassDeviceSetsList()
462 scds
.name
for scds
in new_cluster
.spec
.storage
.storageClassDeviceSets
464 for device
in to_create
:
465 new_scds
= self
.device_to_device_set(drive_group
, device
)
466 if new_scds
.name
not in existing_scds
:
467 new_cluster
.spec
.storage
.storageClassDeviceSets
.append(new_scds
)
471 class LSOCreator(DefaultCreator
):
472 def filter_devices(self
, rook_pods
: KubernetesResource
, drive_group
: DriveGroupSpec
, matching_hosts
: List
[str]) -> List
[Device
]:
474 assert drive_group
.data_devices
is not None
476 if drive_group
.data_devices
.size
:
477 sizematcher
= SizeMatcher('size', drive_group
.data_devices
.size
)
478 limit
= getattr(drive_group
.data_devices
, 'limit', None)
479 all
= getattr(drive_group
.data_devices
, 'all', None)
480 paths
= [device
.path
for device
in drive_group
.data_devices
.paths
]
481 vendor
= getattr(drive_group
.data_devices
, 'vendor', None)
482 model
= getattr(drive_group
.data_devices
, 'model', None)
485 for pod
in rook_pods
.items
:
487 hasattr(pod
, 'metadata')
488 and hasattr(pod
.metadata
, 'labels')
489 and 'osd' in pod
.metadata
.labels
490 and 'ceph.rook.io/DeviceSet' in pod
.metadata
.labels
492 osd_list
.append(pod
.metadata
.labels
['ceph.rook.io/DeviceSet'])
493 for _
, node
in self
.inventory
.items():
495 if device
.sys_api
['pv_name'] in osd_list
:
497 for _
, node
in self
.inventory
.items():
499 if not limit
or (count
< limit
):
504 device
.sys_api
['node'] in matching_hosts
505 and ((sizematcher
!= None) or sizematcher
.compare(device
))
507 not drive_group
.data_devices
.paths
508 or device
.path
in paths
512 or device
.sys_api
['vendor'] == vendor
516 or device
.sys_api
['model'].startsWith(model
)
520 device_list
.append(device
)
524 class DefaultRemover():
527 coreV1_api
: 'client.CoreV1Api',
528 batchV1_api
: 'client.BatchV1Api',
529 appsV1_api
: 'client.AppsV1Api',
533 mon_command
: Callable
,
536 inventory
: Dict
[str, List
[Device
]]
538 self
.batchV1_api
= batchV1_api
539 self
.appsV1_api
= appsV1_api
540 self
.coreV1_api
= coreV1_api
542 self
.osd_ids
= osd_ids
543 self
.replace_flag
= replace_flag
544 self
.force_flag
= force_flag
546 self
.mon_command
= mon_command
549 self
.rook_env
= rook_env
551 self
.inventory
= inventory
552 self
.osd_pods
: KubernetesResource
= KubernetesResource(self
.coreV1_api
.list_namespaced_pod
, namespace
='rook-ceph', label_selector
='app=rook-ceph-osd')
553 self
.jobs
: KubernetesResource
= KubernetesResource(self
.batchV1_api
.list_namespaced_job
, namespace
='rook-ceph', label_selector
='app=rook-ceph-osd-prepare')
554 self
.pvcs
: KubernetesResource
= KubernetesResource(self
.coreV1_api
.list_namespaced_persistent_volume_claim
, namespace
='rook-ceph')
557 def remove_device_sets(self
) -> str:
558 self
.to_remove
: Dict
[str, int] = {}
559 self
.pvc_to_remove
: List
[str] = []
560 for pod
in self
.osd_pods
.items
:
562 hasattr(pod
, 'metadata')
563 and hasattr(pod
.metadata
, 'labels')
564 and 'osd' in pod
.metadata
.labels
565 and pod
.metadata
.labels
['osd'] in self
.osd_ids
567 if pod
.metadata
.labels
['ceph.rook.io/DeviceSet'] in self
.to_remove
:
568 self
.to_remove
[pod
.metadata
.labels
['ceph.rook.io/DeviceSet']] = self
.to_remove
[pod
.metadata
.labels
['ceph.rook.io/DeviceSet']] + 1
570 self
.to_remove
[pod
.metadata
.labels
['ceph.rook.io/DeviceSet']] = 1
571 self
.pvc_to_remove
.append(pod
.metadata
.labels
['ceph.rook.io/pvc'])
572 def _remove_osds(current_cluster
, new_cluster
):
573 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
574 assert new_cluster
.spec
.storage
is not None and new_cluster
.spec
.storage
.storageClassDeviceSets
is not None
575 for _set
in new_cluster
.spec
.storage
.storageClassDeviceSets
:
576 if _set
.name
in self
.to_remove
:
577 if _set
.count
== self
.to_remove
[_set
.name
]:
578 new_cluster
.spec
.storage
.storageClassDeviceSets
.remove(_set
)
580 _set
.count
= _set
.count
- self
.to_remove
[_set
.name
]
582 return self
.patch(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _remove_osds
)
584 def check_force(self
) -> None:
585 if not self
.force_flag
:
586 safe_args
= {'prefix': 'osd safe-to-destroy',
587 'ids': [str(x
) for x
in self
.osd_ids
]}
588 ret
, out
, err
= self
.mon_command(safe_args
)
590 raise RuntimeError(err
)
592 def set_osds_down(self
) -> None:
594 'prefix': 'osd down',
595 'ids': [str(x
) for x
in self
.osd_ids
]
597 ret
, out
, err
= self
.mon_command(down_flag_args
)
599 raise RuntimeError(err
)
601 def scale_deployments(self
) -> None:
602 for osd_id
in self
.osd_ids
:
603 self
.appsV1_api
.patch_namespaced_deployment_scale(namespace
='rook-ceph', name
='rook-ceph-osd-{}'.format(osd_id
), body
=client
.V1Scale(
604 spec
=client
.V1ScaleSpec(
609 def set_osds_out(self
) -> None:
612 'ids': [str(x
) for x
in self
.osd_ids
]
614 ret
, out
, err
= self
.mon_command(out_flag_args
)
616 raise RuntimeError(err
)
618 def delete_deployments(self
) -> None:
619 for osd_id
in self
.osd_ids
:
620 self
.appsV1_api
.delete_namespaced_deployment(namespace
='rook-ceph', name
='rook-ceph-osd-{}'.format(osd_id
), propagation_policy
='Foreground')
622 def clean_up_prepare_jobs_and_pvc(self
) -> None:
623 for job
in self
.jobs
.items
:
624 if job
.metadata
.labels
['ceph.rook.io/pvc'] in self
.pvc_to_remove
:
625 self
.batchV1_api
.delete_namespaced_job(name
=job
.metadata
.name
, namespace
='rook-ceph', propagation_policy
='Foreground')
626 self
.coreV1_api
.delete_namespaced_persistent_volume_claim(name
=job
.metadata
.labels
['ceph.rook.io/pvc'], namespace
='rook-ceph', propagation_policy
='Foreground')
628 def purge_osds(self
) -> None:
629 for id in self
.osd_ids
:
631 'prefix': 'osd purge-actual',
633 'yes_i_really_mean_it': True
635 ret
, out
, err
= self
.mon_command(purge_args
)
637 raise RuntimeError(err
)
639 def destroy_osds(self
) -> None:
640 for id in self
.osd_ids
:
642 'prefix': 'osd destroy-actual',
644 'yes_i_really_mean_it': True
646 ret
, out
, err
= self
.mon_command(destroy_args
)
648 raise RuntimeError(err
)
650 def remove(self
) -> str:
653 except Exception as e
:
654 log
.exception("Error checking if OSDs are safe to destroy")
655 return f
"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
657 remove_result
= self
.remove_device_sets()
658 except Exception as e
:
659 log
.exception("Error patching ceph cluster CRD")
660 return f
"Not possible to modify Ceph cluster CRD: {e}"
662 self
.scale_deployments()
663 self
.delete_deployments()
664 self
.clean_up_prepare_jobs_and_pvc()
665 except Exception as e
:
666 log
.exception("Ceph cluster CRD patched, but error cleaning environment")
667 return f
"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
671 if self
.replace_flag
:
675 except Exception as e
:
676 log
.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
677 return f
"Error removing OSDs from Ceph cluster: {e}"
683 class RookCluster(object):
684 # import of client.CoreV1Api must be optional at import time.
685 # Instead allow mgr/rook to be imported anyway.
688 coreV1_api
: 'client.CoreV1Api',
689 batchV1_api
: 'client.BatchV1Api',
690 customObjects_api
: 'client.CustomObjectsApi',
691 storageV1_api
: 'client.StorageV1Api',
692 appsV1_api
: 'client.AppsV1Api',
696 self
.rook_env
= rook_env
# type: RookEnv
697 self
.coreV1_api
= coreV1_api
# client.CoreV1Api
698 self
.batchV1_api
= batchV1_api
699 self
.customObjects_api
= customObjects_api
700 self
.storageV1_api
= storageV1_api
# client.StorageV1Api
701 self
.appsV1_api
= appsV1_api
# client.AppsV1Api
702 self
.storage_class
= storage_class
# type: str
704 # TODO: replace direct k8s calls with Rook API calls
705 self
.storage_classes
: KubernetesResource
= KubernetesResource(self
.storageV1_api
.list_storage_class
)
707 self
.rook_pods
: KubernetesResource
[client
.V1Pod
] = KubernetesResource(self
.coreV1_api
.list_namespaced_pod
,
708 namespace
=self
.rook_env
.namespace
,
709 label_selector
="rook_cluster={0}".format(
710 self
.rook_env
.namespace
))
711 self
.nodes
: KubernetesResource
[client
.V1Node
] = KubernetesResource(self
.coreV1_api
.list_node
)
713 def rook_url(self
, path
: str) -> str:
714 prefix
= "/apis/ceph.rook.io/%s/namespaces/%s/" % (
715 self
.rook_env
.crd_version
, self
.rook_env
.namespace
)
716 return urljoin(prefix
, path
)
718 def rook_api_call(self
, verb
: str, path
: str, **kwargs
: Any
) -> Any
:
719 full_path
= self
.rook_url(path
)
720 log
.debug("[%s] %s" % (verb
, full_path
))
722 return self
.coreV1_api
.api_client
.call_api(
725 auth_settings
=['BearerToken'],
726 response_type
="object",
727 _return_http_data_only
=True,
728 _preload_content
=True,
731 def rook_api_get(self
, path
: str, **kwargs
: Any
) -> Any
:
732 return self
.rook_api_call("GET", path
, **kwargs
)
734 def rook_api_delete(self
, path
: str) -> Any
:
735 return self
.rook_api_call("DELETE", path
)
737 def rook_api_patch(self
, path
: str, **kwargs
: Any
) -> Any
:
738 return self
.rook_api_call("PATCH", path
,
739 header_params
={"Content-Type": "application/json-patch+json"},
742 def rook_api_post(self
, path
: str, **kwargs
: Any
) -> Any
:
743 return self
.rook_api_call("POST", path
, **kwargs
)
745 def get_storage_class(self
) -> 'client.V1StorageClass':
746 matching_sc
= [i
for i
in self
.storage_classes
.items
if self
.storage_class
== i
.metadata
.name
]
747 if len(matching_sc
) == 0:
748 log
.error(f
"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
749 raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
750 return matching_sc
[0]
752 def get_discovered_devices(self
, nodenames
: Optional
[List
[str]] = None) -> Dict
[str, List
[Device
]]:
753 self
.fetcher
: Optional
[DefaultFetcher
] = None
754 op_settings
= self
.coreV1_api
.read_namespaced_config_map(name
="rook-ceph-operator-config", namespace
='rook-ceph').data
755 if op_settings
.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
756 self
.fetcher
= PDFetcher(self
.coreV1_api
)
758 storage_class
= self
.get_storage_class()
759 if storage_class
.metadata
.labels
and ('local.storage.openshift.io/owner-name' in storage_class
.metadata
.labels
):
760 self
.fetcher
= LSOFetcher(self
.storage_class
, self
.coreV1_api
, self
.customObjects_api
, nodenames
)
762 self
.fetcher
= DefaultFetcher(self
.storage_class
, self
.coreV1_api
)
765 return self
.fetcher
.devices()
767 def get_osds(self
) -> List
:
768 osd_pods
: KubernetesResource
= KubernetesResource(self
.coreV1_api
.list_namespaced_pod
, namespace
='rook-ceph', label_selector
='app=rook-ceph-osd')
769 return list(osd_pods
.items
)
771 def get_nfs_conf_url(self
, nfs_cluster
: str, instance
: str) -> Optional
[str]:
773 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
774 # URL for the instance within that cluster. If the fetch fails, just
778 ceph_nfs
= self
.rook_api_get("cephnfses/{0}".format(nfs_cluster
))
779 except ApiException
as e
:
780 log
.info("Unable to fetch cephnfs object: {}".format(e
.status
))
783 pool
= ceph_nfs
['spec']['rados']['pool']
784 namespace
= ceph_nfs
['spec']['rados'].get('namespace', None)
786 if namespace
== None:
787 url
= "rados://{0}/conf-{1}.{2}".format(pool
, nfs_cluster
, instance
)
789 url
= "rados://{0}/{1}/conf-{2}.{3}".format(pool
, namespace
, nfs_cluster
, instance
)
792 def describe_pods(self
,
793 service_type
: Optional
[str],
794 service_id
: Optional
[str],
795 nodename
: Optional
[str]) -> List
[Dict
[str, Any
]]:
797 Go query the k8s API about deployment, containers related to this
800 Example Rook Pod labels for a mgr daemon:
801 Labels: app=rook-ceph-mgr
802 pod-template-hash=2171958073
804 And MDS containers additionally have `rook_filesystem` label
806 Label filter is rook_cluster=<cluster namespace>
807 rook_file_system=<self.fs_name>
810 # type: (client.V1Pod) -> bool
811 metadata
= item
.metadata
812 if service_type
is not None:
813 if metadata
.labels
['app'] != "rook-ceph-{0}".format(service_type
):
816 if service_id
is not None:
819 "mds": ("rook_file_system", service_id
),
820 "osd": ("ceph-osd-id", service_id
),
821 "mon": ("mon", service_id
),
822 "mgr": ("mgr", service_id
),
823 "nfs": ("nfs", service_id
),
824 "rgw": ("ceph_rgw", service_id
),
827 raise orchestrator
.OrchestratorValidationError(
828 '{} not supported'.format(service_type
))
829 if metadata
.labels
[k
] != v
:
832 if nodename
is not None:
833 if item
.spec
.node_name
!= nodename
:
837 refreshed
= datetime_now()
838 pods
= [i
for i
in self
.rook_pods
.items
if predicate(i
)]
847 for c
in d
['spec']['containers']:
848 # look at the first listed container in the pod...
849 image_name
= c
['image']
852 ls
= d
['status'].get('container_statuses')
854 # ignore pods with no containers
856 image_id
= ls
[0]['image_id']
857 image_id
= image_id
.split(prefix
)[1] if prefix
in image_id
else image_id
860 "name": d
['metadata']['name'],
861 "hostname": d
['spec']['node_name'],
862 "labels": d
['metadata']['labels'],
863 'phase': d
['status']['phase'],
864 'container_image_name': image_name
,
865 'container_image_id': image_id
,
866 'refreshed': refreshed
,
867 # these may get set below...
873 if d
['metadata'].get('creation_timestamp', None):
874 s
['created'] = d
['metadata']['creation_timestamp'].astimezone(
875 tz
=datetime
.timezone
.utc
)
876 if d
['status'].get('start_time', None):
877 s
['started'] = d
['status']['start_time'].astimezone(
878 tz
=datetime
.timezone
.utc
)
880 pods_summary
.append(s
)
884 def remove_pods(self
, names
: List
[str]) -> List
[str]:
885 pods
= [i
for i
in self
.rook_pods
.items
]
888 daemon_type
= d
['metadata']['labels']['app'].replace('rook-ceph-','')
889 daemon_id
= d
['metadata']['labels']['ceph_daemon_id']
890 name
= daemon_type
+ '.' + daemon_id
892 self
.coreV1_api
.delete_namespaced_pod(
893 d
['metadata']['name'],
894 self
.rook_env
.namespace
,
895 body
=client
.V1DeleteOptions()
897 return [f
'Removed Pod {n}' for n
in names
]
899 def get_node_names(self
) -> List
[str]:
900 return [i
.metadata
.name
for i
in self
.nodes
.items
]
903 def ignore_409(self
, what
: str) -> Iterator
[None]:
906 except ApiException
as e
:
908 # Idempotent, succeed.
909 log
.info("{} already exists".format(what
))
913 def apply_filesystem(self
, spec
: ServiceSpec
, num_replicas
: int,
914 leaf_type
: str) -> str:
915 # TODO use spec.placement
916 # TODO warn if spec.extended has entries we don't kow how
918 all_hosts
= self
.get_hosts()
919 def _update_fs(new
: cfs
.CephFilesystem
) -> cfs
.CephFilesystem
:
920 new
.spec
.metadataServer
.activeCount
= spec
.placement
.count
or 1
921 new
.spec
.metadataServer
.placement
= cfs
.Placement(
922 nodeAffinity
=cfs
.NodeAffinity(
923 requiredDuringSchedulingIgnoredDuringExecution
=cfs
.RequiredDuringSchedulingIgnoredDuringExecution(
924 nodeSelectorTerms
=cfs
.NodeSelectorTermsList(
925 [placement_spec_to_node_selector(spec
.placement
, all_hosts
)]
931 def _create_fs() -> cfs
.CephFilesystem
:
932 fs
= cfs
.CephFilesystem(
933 apiVersion
=self
.rook_env
.api_name
,
935 name
=spec
.service_id
,
936 namespace
=self
.rook_env
.namespace
,
939 dataPools
=cfs
.DataPoolsList(
942 failureDomain
=leaf_type
,
943 replicated
=cfs
.Replicated(
949 metadataPool
=cfs
.MetadataPool(
950 failureDomain
=leaf_type
,
951 replicated
=cfs
.Replicated(
955 metadataServer
=cfs
.MetadataServer(
956 activeCount
=spec
.placement
.count
or 1,
960 nodeAffinity
=cfs
.NodeAffinity(
961 requiredDuringSchedulingIgnoredDuringExecution
=cfs
.RequiredDuringSchedulingIgnoredDuringExecution(
962 nodeSelectorTerms
=cfs
.NodeSelectorTermsList(
963 [placement_spec_to_node_selector(spec
.placement
, all_hosts
)]
972 assert spec
.service_id
is not None
973 return self
._create
_or
_patch
(
974 cfs
.CephFilesystem
, 'cephfilesystems', spec
.service_id
,
975 _update_fs
, _create_fs
)
977 def get_matching_node(self
, host
: str) -> Any
:
979 for node
in self
.nodes
.items
:
980 if node
.metadata
.labels
['kubernetes.io/hostname'] == host
:
984 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
985 matching_node
= self
.get_matching_node(host
)
986 if matching_node
== None:
987 return OrchResult(None, RuntimeError(f
"Cannot add {label} label to {host}: host not found in cluster"))
988 matching_node
.metadata
.labels
['ceph-label/'+ label
] = ""
989 self
.coreV1_api
.patch_node(host
, matching_node
)
990 return OrchResult(f
'Added {label} label to {host}')
992 def remove_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
993 matching_node
= self
.get_matching_node(host
)
994 if matching_node
== None:
995 return OrchResult(None, RuntimeError(f
"Cannot remove {label} label from {host}: host not found in cluster"))
996 matching_node
.metadata
.labels
.pop('ceph-label/' + label
, None)
997 self
.coreV1_api
.patch_node(host
, matching_node
)
998 return OrchResult(f
'Removed {label} label from {host}')
1000 def apply_objectstore(self
, spec
: RGWSpec
, num_replicas
: int, leaf_type
: str) -> str:
1001 assert spec
.service_id
is not None
1003 name
= spec
.service_id
1005 if '.' in spec
.service_id
:
1006 # rook does not like . in the name. this is could
1007 # there because it is a legacy rgw spec that was named
1008 # like $realm.$zone, except that I doubt there were any
1009 # users of this code. Instead, focus on future users and
1010 # translate . to - (fingers crossed!) instead.
1011 name
= spec
.service_id
.replace('.', '-')
1013 all_hosts
= self
.get_hosts()
1014 def _create_zone() -> cos
.CephObjectStore
:
1018 secure_port
= spec
.get_port()
1020 port
= spec
.get_port()
1021 object_store
= cos
.CephObjectStore(
1022 apiVersion
=self
.rook_env
.api_name
,
1025 namespace
=self
.rook_env
.namespace
1028 gateway
=cos
.Gateway(
1030 securePort
=secure_port
,
1031 instances
=spec
.placement
.count
or 1,
1032 placement
=cos
.Placement(
1034 requiredDuringSchedulingIgnoredDuringExecution
=cos
.RequiredDuringSchedulingIgnoredDuringExecution(
1035 nodeSelectorTerms
=cos
.NodeSelectorTermsList(
1037 placement_spec_to_node_selector(spec
.placement
, all_hosts
)
1044 dataPool
=cos
.DataPool(
1045 failureDomain
=leaf_type
,
1046 replicated
=cos
.Replicated(
1050 metadataPool
=cos
.MetadataPool(
1051 failureDomain
=leaf_type
,
1052 replicated
=cos
.Replicated(
1059 object_store
.spec
.zone
=cos
.Zone(
1065 def _update_zone(new
: cos
.CephObjectStore
) -> cos
.CephObjectStore
:
1066 if new
.spec
.gateway
:
1067 new
.spec
.gateway
.instances
= spec
.placement
.count
or 1
1069 new
.spec
.gateway
=cos
.Gateway(
1070 instances
=spec
.placement
.count
or 1
1073 return self
._create
_or
_patch
(
1074 cos
.CephObjectStore
, 'cephobjectstores', name
,
1075 _update_zone
, _create_zone
)
1077 def apply_nfsgw(self
, spec
: NFSServiceSpec
, mgr
: 'RookOrchestrator') -> str:
1078 # TODO use spec.placement
1079 # TODO warn if spec.extended has entries we don't kow how
1081 # TODO Number of pods should be based on the list of hosts in the
1083 assert spec
.service_id
, "service id in NFS service spec cannot be an empty string or None " # for mypy typing
1084 service_id
= spec
.service_id
1085 mgr_module
= cast(Module
, mgr
)
1086 count
= spec
.placement
.count
or 1
1087 def _update_nfs(new
: cnfs
.CephNFS
) -> cnfs
.CephNFS
:
1088 new
.spec
.server
.active
= count
1091 def _create_nfs() -> cnfs
.CephNFS
:
1092 rook_nfsgw
= cnfs
.CephNFS(
1093 apiVersion
=self
.rook_env
.api_name
,
1095 name
=spec
.service_id
,
1096 namespace
=self
.rook_env
.namespace
,
1100 namespace
=service_id
,
1112 create_ganesha_pool(mgr
)
1113 NFSRados(mgr_module
.rados
, service_id
).write_obj('', f
'conf-nfs.{spec.service_id}')
1114 return self
._create
_or
_patch
(cnfs
.CephNFS
, 'cephnfses', service_id
,
1115 _update_nfs
, _create_nfs
)
1117 def rm_service(self
, rooktype
: str, service_id
: str) -> str:
1118 self
.customObjects_api
.delete_namespaced_custom_object(group
="ceph.rook.io", version
="v1", namespace
="rook-ceph", plural
=rooktype
, name
=service_id
)
1119 objpath
= "{0}/{1}".format(rooktype
, service_id
)
1120 return f
'Removed {objpath}'
1122 def get_resource(self
, resource_type
: str) -> Iterable
:
1123 custom_objects
: KubernetesCustomResource
= KubernetesCustomResource(self
.customObjects_api
.list_namespaced_custom_object
, group
="ceph.rook.io", version
="v1", namespace
="rook-ceph", plural
=resource_type
)
1124 return custom_objects
.items
1126 def can_create_osd(self
) -> bool:
1127 current_cluster
= self
.rook_api_get(
1128 "cephclusters/{0}".format(self
.rook_env
.cluster_name
))
1129 use_all_nodes
= current_cluster
['spec'].get('useAllNodes', False)
1131 # If useAllNodes is set, then Rook will not be paying attention
1132 # to anything we put in 'nodes', so can't do OSD creation.
1133 return not use_all_nodes
1135 def node_exists(self
, node_name
: str) -> bool:
1136 return node_name
in self
.get_node_names()
1138 def update_mon_count(self
, newcount
: Optional
[int]) -> str:
1139 def _update_mon_count(current
, new
):
1140 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
1141 if newcount
is None:
1142 raise orchestrator
.OrchestratorError('unable to set mon count to None')
1143 if not new
.spec
.mon
:
1144 raise orchestrator
.OrchestratorError("mon attribute not specified in new spec")
1145 new
.spec
.mon
.count
= newcount
1147 return self
._patch
(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _update_mon_count
)
1149 def add_osds(self
, drive_group
, matching_hosts
):
1150 # type: (DriveGroupSpec, List[str]) -> str
1151 assert drive_group
.objectstore
in ("bluestore", "filestore")
1152 assert drive_group
.service_id
1153 storage_class
= self
.get_storage_class()
1154 inventory
= self
.get_discovered_devices()
1155 creator
: Optional
[DefaultCreator
] = None
1157 storage_class
.metadata
.labels
1158 and 'local.storage.openshift.io/owner-name' in storage_class
.metadata
.labels
1160 creator
= LSOCreator(inventory
, self
.coreV1_api
, self
.storage_class
)
1162 creator
= DefaultCreator(inventory
, self
.coreV1_api
, self
.storage_class
)
1166 self
.rook_env
.cluster_name
,
1167 creator
.add_osds(self
.rook_pods
, drive_group
, matching_hosts
)
1170 def remove_osds(self
, osd_ids
: List
[str], replace
: bool, force
: bool, mon_command
: Callable
) -> str:
1171 inventory
= self
.get_discovered_devices()
1172 self
.remover
= DefaultRemover(
1184 return self
.remover
.remove()
1186 def get_hosts(self
) -> List
[orchestrator
.HostSpec
]:
1188 for node
in self
.nodes
.items
:
1189 spec
= orchestrator
.HostSpec(
1191 addr
='/'.join([addr
.address
for addr
in node
.status
.addresses
]),
1192 labels
=[label
.split('/')[1] for label
in node
.metadata
.labels
if label
.startswith('ceph-label')],
1197 def create_zap_job(self
, host
: str, path
: str) -> None:
1198 body
= client
.V1Job(
1199 api_version
="batch/v1",
1200 metadata
=client
.V1ObjectMeta(
1201 name
="rook-ceph-device-zap",
1202 namespace
="rook-ceph"
1204 spec
=client
.V1JobSpec(
1205 template
=client
.V1PodTemplateSpec(
1206 spec
=client
.V1PodSpec(
1210 image
="rook/ceph:master",
1212 args
=["-c", f
"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
1215 name
="ROOK_CEPH_USERNAME",
1216 value_from
=client
.V1EnvVarSource(
1217 secret_key_ref
=client
.V1SecretKeySelector(
1218 key
="ceph-username",
1219 name
="rook-ceph-mon"
1224 name
="ROOK_CEPH_SECRET",
1225 value_from
=client
.V1EnvVarSource(
1226 secret_key_ref
=client
.V1SecretKeySelector(
1228 name
="rook-ceph-mon"
1233 security_context
=client
.V1SecurityContext(
1238 client
.V1VolumeMount(
1239 mount_path
="/etc/ceph",
1240 name
="ceph-conf-emptydir"
1242 client
.V1VolumeMount(
1243 mount_path
="/etc/rook",
1246 client
.V1VolumeMount(
1255 name
="ceph-conf-emptydir",
1256 empty_dir
=client
.V1EmptyDirVolumeSource()
1260 empty_dir
=client
.V1EmptyDirVolumeSource()
1264 host_path
=client
.V1HostPathVolumeSource(
1270 "kubernetes.io/hostname": host
1272 restart_policy
="Never"
1277 self
.batchV1_api
.create_namespaced_job('rook-ceph', body
)
1279 def rbd_mirror(self
, spec
: ServiceSpec
) -> None:
1280 service_id
= spec
.service_id
or "default-rbd-mirror"
1281 all_hosts
= self
.get_hosts()
1282 def _create_rbd_mirror() -> crbdm
.CephRBDMirror
:
1283 return crbdm
.CephRBDMirror(
1284 apiVersion
=self
.rook_env
.api_name
,
1287 namespace
=self
.rook_env
.namespace
,
1290 count
=spec
.placement
.count
or 1,
1291 placement
=crbdm
.Placement(
1292 nodeAffinity
=crbdm
.NodeAffinity(
1293 requiredDuringSchedulingIgnoredDuringExecution
=crbdm
.RequiredDuringSchedulingIgnoredDuringExecution(
1294 nodeSelectorTerms
=crbdm
.NodeSelectorTermsList(
1296 placement_spec_to_node_selector(spec
.placement
, all_hosts
)
1304 def _update_rbd_mirror(new
: crbdm
.CephRBDMirror
) -> crbdm
.CephRBDMirror
:
1305 new
.spec
.count
= spec
.placement
.count
or 1
1306 new
.spec
.placement
= crbdm
.Placement(
1307 nodeAffinity
=crbdm
.NodeAffinity(
1308 requiredDuringSchedulingIgnoredDuringExecution
=crbdm
.RequiredDuringSchedulingIgnoredDuringExecution(
1309 nodeSelectorTerms
=crbdm
.NodeSelectorTermsList(
1311 placement_spec_to_node_selector(spec
.placement
, all_hosts
)
1318 self
._create
_or
_patch
(crbdm
.CephRBDMirror
, 'cephrbdmirrors', service_id
, _update_rbd_mirror
, _create_rbd_mirror
)
1319 def _patch(self
, crd
: Type
, crd_name
: str, cr_name
: str, func
: Callable
[[CrdClassT
, CrdClassT
], CrdClassT
]) -> str:
1320 current_json
= self
.rook_api_get(
1321 "{}/{}".format(crd_name
, cr_name
)
1324 current
= crd
.from_json(current_json
)
1325 new
= crd
.from_json(current_json
) # no deepcopy.
1327 new
= func(current
, new
)
1329 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
1331 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
1337 self
.rook_api_patch(
1338 "{}/{}".format(crd_name
, cr_name
),
1340 except ApiException
as e
:
1341 log
.exception("API exception: {0}".format(e
))
1342 raise ApplyException(
1343 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
1347 def _create_or_patch(self
,
1351 update_func
: Callable
[[CrdClassT
], CrdClassT
],
1352 create_func
: Callable
[[], CrdClassT
]) -> str:
1354 current_json
= self
.rook_api_get(
1355 "{}/{}".format(crd_name
, cr_name
)
1357 except ApiException
as e
:
1364 new
= crd
.from_json(current_json
) # no deepcopy.
1366 new
= update_func(new
)
1368 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
1370 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
1376 self
.rook_api_patch(
1377 "{}/{}".format(crd_name
, cr_name
),
1379 except ApiException
as e
:
1380 log
.exception("API exception: {0}".format(e
))
1381 raise ApplyException(
1382 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
1386 with self
.ignore_409("{} {} already exists".format(crd_name
,
1388 self
.rook_api_post("{}/".format(crd_name
),
1391 def get_ceph_image(self
) -> str:
1393 api_response
= self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
1394 label_selector
="app=rook-ceph-mon",
1396 if api_response
.items
:
1397 return api_response
.items
[-1].spec
.containers
[0].image
1399 raise orchestrator
.OrchestratorError(
1400 "Error getting ceph image. Cluster without monitors")
1401 except ApiException
as e
:
1402 raise orchestrator
.OrchestratorError("Error getting ceph image: {}".format(e
))
1405 def _execute_blight_job(self
, ident_fault
: str, on
: bool, loc
: orchestrator
.DeviceLightLoc
) -> str:
1406 operation_id
= str(hash(loc
))
1410 job_metadata
= client
.V1ObjectMeta(name
=operation_id
,
1411 namespace
= self
.rook_env
.namespace
,
1412 labels
={"ident": operation_id
})
1413 pod_metadata
= client
.V1ObjectMeta(labels
={"ident": operation_id
})
1414 pod_container
= client
.V1Container(name
="ceph-lsmcli-command",
1415 security_context
=client
.V1SecurityContext(privileged
=True),
1416 image
=self
.get_ceph_image(),
1417 command
=["lsmcli",],
1418 args
=['local-disk-%s-led-%s' % (ident_fault
,'on' if on
else 'off'),
1419 '--path', loc
.path
or loc
.dev
,],
1420 volume_mounts
=[client
.V1VolumeMount(name
="devices", mount_path
="/dev"),
1421 client
.V1VolumeMount(name
="run-udev", mount_path
="/run/udev")])
1422 pod_spec
= client
.V1PodSpec(containers
=[pod_container
],
1423 active_deadline_seconds
=30, # Max time to terminate pod
1424 restart_policy
="Never",
1425 node_selector
= {"kubernetes.io/hostname": loc
.host
},
1426 volumes
=[client
.V1Volume(name
="devices",
1427 host_path
=client
.V1HostPathVolumeSource(path
="/dev")),
1428 client
.V1Volume(name
="run-udev",
1429 host_path
=client
.V1HostPathVolumeSource(path
="/run/udev"))])
1430 pod_template
= client
.V1PodTemplateSpec(metadata
=pod_metadata
,
1432 job_spec
= client
.V1JobSpec(active_deadline_seconds
=60, # Max time to terminate job
1433 ttl_seconds_after_finished
=10, # Alfa. Lifetime after finishing (either Complete or Failed)
1435 template
=pod_template
)
1436 job
= client
.V1Job(api_version
="batch/v1",
1438 metadata
=job_metadata
,
1441 # delete previous job if it exists
1444 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
1445 self
.rook_env
.namespace
,
1446 propagation_policy
="Background")
1447 except ApiException
as e
:
1448 if e
.status
!= 404: # No problem if the job does not exist
1451 # wait until the job is not present
1454 while not deleted
and retries
< 10:
1455 api_response
= self
.batchV1_api
.list_namespaced_job(self
.rook_env
.namespace
,
1456 label_selector
="ident=%s" % operation_id
,
1458 deleted
= not api_response
.items
1462 if retries
== 10 and not deleted
:
1463 raise orchestrator
.OrchestratorError(
1464 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
1465 on
, loc
.host
, loc
.path
or loc
.dev
, operation_id
))
1468 api_response
= self
.batchV1_api
.create_namespaced_job(self
.rook_env
.namespace
, job
)
1473 api_response
= self
.batchV1_api
.read_namespaced_job(operation_id
,
1474 self
.rook_env
.namespace
)
1475 finished
= api_response
.status
.succeeded
or api_response
.status
.failed
1477 message
= api_response
.status
.conditions
[-1].message
1479 # get the result of the lsmcli command
1480 api_response
=self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
1481 label_selector
="ident=%s" % operation_id
,
1483 if api_response
.items
:
1484 pod_name
= api_response
.items
[-1].metadata
.name
1485 message
= self
.coreV1_api
.read_namespaced_pod_log(pod_name
,
1486 self
.rook_env
.namespace
)
1488 except ApiException
as e
:
1489 log
.exception('K8s API failed. {}'.format(e
))
1492 # Finally, delete the job.
1493 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
1494 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
1496 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
1497 self
.rook_env
.namespace
,
1498 propagation_policy
="Background")
1499 except ApiException
as e
:
1500 if e
.status
!= 404: # No problem if the job does not exist
1505 def blink_light(self
, ident_fault
, on
, locs
):
1506 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
1507 return [self
._execute
_blight
_job
(ident_fault
, on
, loc
) for loc
in locs
]
1509 def placement_spec_to_node_selector(spec
: PlacementSpec
, all_hosts
: List
) -> ccl
.NodeSelectorTermsItem
:
1510 all_hostnames
= [hs
.hostname
for hs
in all_hosts
]
1511 res
= ccl
.NodeSelectorTermsItem(matchExpressions
=ccl
.MatchExpressionsList())
1512 if spec
.host_pattern
and spec
.host_pattern
!= "*":
1513 raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements")
1515 res
.matchExpressions
.append(
1516 ccl
.MatchExpressionsItem(
1517 key
="ceph-label/" + spec
.label
,
1522 host_list
= [h
.hostname
for h
in spec
.hosts
if h
.hostname
in all_hostnames
]
1523 res
.matchExpressions
.append(
1524 ccl
.MatchExpressionsItem(
1525 key
="kubernetes.io/hostname",
1527 values
=ccl
.CrdObjectList(host_list
)
1530 if spec
.host_pattern
== "*" or (not spec
.label
and not spec
.hosts
and not spec
.host_pattern
):
1531 res
.matchExpressions
.append(
1532 ccl
.MatchExpressionsItem(
1533 key
="kubernetes.io/hostname",
1539 def node_selector_to_placement_spec(node_selector
: ccl
.NodeSelectorTermsItem
) -> PlacementSpec
:
1540 res
= PlacementSpec()
1541 for expression
in node_selector
.matchExpressions
:
1542 if expression
.key
.startswith("ceph-label/"):
1543 res
.label
= expression
.key
.split('/')[1]
1544 elif expression
.key
== "kubernetes.io/hostname":
1545 if expression
.operator
== "Exists":
1546 res
.host_pattern
= "*"
1547 elif expression
.operator
== "In":
1548 res
.hosts
= [HostPlacementSpec(hostname
=value
, network
='', name
='')for value
in expression
.values
]