]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | """ |
2 | This module wrap's Rook + Kubernetes APIs to expose the calls | |
3 | needed to implement an orchestrator module. While the orchestrator | |
4 | module exposes an async API, this module simply exposes blocking API | |
5 | call methods. | |
6 | ||
7 | This module is runnable outside of ceph-mgr, useful for testing. | |
8 | """ | |
9f95a23c TL |
9 | import datetime |
10 | import threading | |
11fdf7f2 | 11 | import logging |
11fdf7f2 | 12 | from contextlib import contextmanager |
801d1391 | 13 | from time import sleep |
20effc67 TL |
14 | import re |
15 | from orchestrator import OrchResult | |
11fdf7f2 | 16 | |
9f95a23c | 17 | import jsonpatch |
f67539c2 | 18 | from urllib.parse import urljoin |
11fdf7f2 TL |
19 | |
20 | # Optional kubernetes imports to enable MgrModule.can_run | |
21 | # to behave cleanly. | |
9f95a23c TL |
22 | from urllib3.exceptions import ProtocolError |
23 | ||
20effc67 | 24 | from ceph.deployment.inventory import Device |
9f95a23c | 25 | from ceph.deployment.drive_group import DriveGroupSpec |
20effc67 | 26 | from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec |
522d829b | 27 | from ceph.utils import datetime_now |
20effc67 TL |
28 | from ceph.deployment.drive_selection.matchers import SizeMatcher |
29 | from nfs.cluster import create_ganesha_pool | |
30 | from nfs.module import Module | |
31 | from nfs.export import NFSRados | |
a4b75251 | 32 | from mgr_module import NFS_POOL_NAME |
9f95a23c TL |
33 | from mgr_util import merge_dicts |
34 | ||
20effc67 | 35 | from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \ |
522d829b | 36 | Iterable, Dict, Iterator, Type |
9f95a23c | 37 | |
11fdf7f2 | 38 | try: |
801d1391 | 39 | from kubernetes import client, watch |
11fdf7f2 TL |
40 | from kubernetes.client.rest import ApiException |
41 | except ImportError: | |
9f95a23c TL |
42 | class ApiException(Exception): # type: ignore |
43 | status = 0 | |
44 | ||
45 | from .rook_client.ceph import cephfilesystem as cfs | |
46 | from .rook_client.ceph import cephnfs as cnfs | |
47 | from .rook_client.ceph import cephobjectstore as cos | |
48 | from .rook_client.ceph import cephcluster as ccl | |
20effc67 | 49 | from .rook_client.ceph import cephrbdmirror as crbdm |
522d829b | 50 | from .rook_client._helper import CrdClass |
9f95a23c | 51 | |
9f95a23c TL |
52 | import orchestrator |
53 | ||
11fdf7f2 | 54 | try: |
20effc67 | 55 | from rook.module import RookEnv, RookOrchestrator |
11fdf7f2 TL |
56 | except ImportError: |
57 | pass # just used for type checking. | |
58 | ||
522d829b TL |
59 | |
60 | T = TypeVar('T') | |
61 | FuncT = TypeVar('FuncT', bound=Callable) | |
62 | ||
63 | CrdClassT = TypeVar('CrdClassT', bound=CrdClass) | |
64 | ||
65 | ||
11fdf7f2 TL |
66 | log = logging.getLogger(__name__) |
67 | ||
68 | ||
522d829b | 69 | def __urllib3_supports_read_chunked() -> bool: |
9f95a23c TL |
70 | # There is a bug in CentOS 7 as it ships a urllib3 which is lower |
71 | # than required by kubernetes-client | |
72 | try: | |
73 | from urllib3.response import HTTPResponse | |
74 | return hasattr(HTTPResponse, 'read_chunked') | |
75 | except ImportError: | |
76 | return False | |
77 | ||
78 | ||
522d829b | 79 | _urllib3_supports_read_chunked = __urllib3_supports_read_chunked() |
9f95a23c TL |
80 | |
81 | class ApplyException(orchestrator.OrchestratorError): | |
11fdf7f2 TL |
82 | """ |
83 | For failures to update the Rook CRDs, usually indicating | |
84 | some kind of interference between our attempted update | |
85 | and other conflicting activity. | |
86 | """ | |
87 | ||
88 | ||
522d829b TL |
89 | def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]: |
90 | def wrapper(*args: Any, **kwargs: Any) -> threading.Thread: | |
9f95a23c TL |
91 | t = threading.Thread(target=f, args=args, kwargs=kwargs) |
92 | t.start() | |
93 | return t | |
94 | ||
522d829b | 95 | return cast(Callable[..., threading.Thread], wrapper) |
9f95a23c TL |
96 | |
97 | ||
20effc67 TL |
98 | class DefaultFetcher(): |
99 | def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'): | |
100 | self.storage_class = storage_class | |
101 | self.coreV1_api = coreV1_api | |
102 | ||
103 | def fetch(self) -> None: | |
104 | self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume) | |
105 | self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class] | |
106 | ||
107 | def convert_size(self, size_str: str) -> int: | |
108 | units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E") | |
109 | coeff_and_unit = re.search('(\d+)(\D+)', size_str) | |
110 | assert coeff_and_unit is not None | |
111 | coeff = int(coeff_and_unit[1]) | |
112 | unit = coeff_and_unit[2] | |
113 | try: | |
114 | factor = units.index(unit) % 7 | |
115 | except ValueError: | |
116 | log.error("PV size format invalid") | |
117 | raise | |
118 | size = coeff * (2 ** (10 * factor)) | |
119 | return size | |
120 | ||
121 | def devices(self) -> Dict[str, List[Device]]: | |
122 | nodename_to_devices: Dict[str, List[Device]] = {} | |
123 | for i in self.pvs_in_sc: | |
124 | node, device = self.device(i) | |
125 | if node not in nodename_to_devices: | |
126 | nodename_to_devices[node] = [] | |
127 | nodename_to_devices[node].append(device) | |
128 | return nodename_to_devices | |
129 | ||
130 | def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]: | |
131 | node = 'N/A' | |
132 | if i.spec.node_affinity: | |
133 | terms = i.spec.node_affinity.required.node_selector_terms | |
134 | if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1: | |
135 | node = terms[0].match_expressions[0].values[0] | |
136 | size = self.convert_size(i.spec.capacity['storage']) | |
137 | path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else '' | |
138 | state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available' | |
139 | pv_name = i.metadata.name | |
140 | device = Device( | |
141 | path = path, | |
142 | sys_api = dict( | |
143 | size = size, | |
144 | node = node, | |
145 | pv_name = pv_name | |
146 | ), | |
147 | available = state, | |
148 | ) | |
149 | return (node, device) | |
150 | ||
151 | ||
152 | class LSOFetcher(DefaultFetcher): | |
153 | def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None): | |
154 | super().__init__(storage_class, coreV1_api) | |
155 | self.customObjects_api = customObjects_api | |
156 | self.nodenames = nodenames | |
157 | ||
158 | def fetch(self) -> None: | |
159 | super().fetch() | |
160 | self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object, | |
161 | group="local.storage.openshift.io", | |
162 | version="v1alpha1", | |
163 | plural="localvolumediscoveryresults") | |
164 | ||
165 | def predicate(self, item: 'client.V1ConfigMapList') -> bool: | |
166 | if self.nodenames is not None: | |
167 | return item['spec']['nodeName'] in self.nodenames | |
168 | else: | |
169 | return True | |
170 | ||
171 | def devices(self) -> Dict[str, List[Device]]: | |
172 | try: | |
173 | lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)] | |
174 | except ApiException as dummy_e: | |
175 | log.error("Failed to fetch device metadata") | |
176 | raise | |
177 | self.lso_devices = {} | |
178 | for i in lso_discovery_results: | |
179 | drives = i['status']['discoveredDevices'] | |
180 | for drive in drives: | |
181 | self.lso_devices[drive['deviceID'].split('/')[-1]] = drive | |
182 | nodename_to_devices: Dict[str, List[Device]] = {} | |
183 | for i in self.pvs_in_sc: | |
184 | node, device = (None, None) | |
185 | if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices): | |
186 | node, device = super().device(i) | |
187 | else: | |
188 | node, device = self.device(i) | |
189 | if node not in nodename_to_devices: | |
190 | nodename_to_devices[node] = [] | |
191 | nodename_to_devices[node].append(device) | |
192 | return nodename_to_devices | |
193 | ||
194 | def device(self, i: Any) -> Tuple[str, Device]: | |
195 | node = i.metadata.labels['kubernetes.io/hostname'] | |
196 | device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']] | |
197 | pv_name = i.metadata.name | |
198 | vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else '' | |
199 | model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else '' | |
200 | device = Device( | |
201 | path = device_discovery['path'], | |
202 | sys_api = dict( | |
203 | size = device_discovery['size'], | |
204 | rotational = '1' if device_discovery['property']=='Rotational' else '0', | |
205 | node = node, | |
206 | pv_name = pv_name, | |
207 | model = model, | |
208 | vendor = vendor | |
209 | ), | |
210 | available = device_discovery['status']['state']=='Available', | |
211 | device_id = device_discovery['deviceID'].split('/')[-1], | |
212 | lsm_data = dict( | |
213 | serialNum = device_discovery['serial'] | |
214 | ) | |
215 | ) | |
216 | return (node, device) | |
217 | ||
522d829b TL |
218 | class KubernetesResource(Generic[T]): |
219 | def __init__(self, api_func: Callable, **kwargs: Any) -> None: | |
9f95a23c TL |
220 | """ |
221 | Generic kubernetes Resource parent class | |
222 | ||
223 | The api fetch and watch methods should be common across resource types, | |
224 | ||
225 | Exceptions in the runner thread are propagated to the caller. | |
226 | ||
227 | :param api_func: kubernetes client api function that is passed to the watcher | |
228 | :param filter_func: signature: ``(Item) -> bool``. | |
229 | """ | |
230 | self.kwargs = kwargs | |
231 | self.api_func = api_func | |
232 | ||
233 | # ``_items`` is accessed by different threads. I assume assignment is atomic. | |
522d829b | 234 | self._items: Dict[str, T] = dict() |
9f95a23c | 235 | self.thread = None # type: Optional[threading.Thread] |
522d829b | 236 | self.exception: Optional[Exception] = None |
9f95a23c TL |
237 | if not _urllib3_supports_read_chunked: |
238 | logging.info('urllib3 is too old. Fallback to full fetches') | |
239 | ||
522d829b | 240 | def _fetch(self) -> str: |
9f95a23c TL |
241 | """ Execute the requested api method as a one-off fetch""" |
242 | response = self.api_func(**self.kwargs) | |
20effc67 | 243 | metadata = response.metadata |
9f95a23c TL |
244 | self._items = {item.metadata.name: item for item in response.items} |
245 | log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) | |
246 | return metadata.resource_version | |
247 | ||
248 | @property | |
522d829b | 249 | def items(self) -> Iterable[T]: |
9f95a23c TL |
250 | """ |
251 | Returns the items of the request. | |
252 | Creates the watcher as a side effect. | |
253 | :return: | |
254 | """ | |
255 | if self.exception: | |
256 | e = self.exception | |
257 | self.exception = None | |
258 | raise e # Propagate the exception to the user. | |
259 | if not self.thread or not self.thread.is_alive(): | |
260 | resource_version = self._fetch() | |
261 | if _urllib3_supports_read_chunked: | |
262 | # Start a thread which will use the kubernetes watch client against a resource | |
263 | log.debug("Attaching resource watcher for k8s {}".format(self.api_func)) | |
264 | self.thread = self._watch(resource_version) | |
265 | ||
266 | return self._items.values() | |
267 | ||
20effc67 TL |
268 | def get_item_name(self, item: Any) -> Any: |
269 | try: | |
270 | return item.metadata.name | |
271 | except AttributeError: | |
272 | raise AttributeError( | |
273 | "{} doesn't contain a metadata.name. Unable to track changes".format( | |
274 | self.api_func)) | |
9f95a23c | 275 | @threaded |
522d829b | 276 | def _watch(self, res_ver: Optional[str]) -> None: |
9f95a23c TL |
277 | """ worker thread that runs the kubernetes watch """ |
278 | ||
279 | self.exception = None | |
280 | ||
281 | w = watch.Watch() | |
282 | ||
283 | try: | |
284 | # execute generator to continually watch resource for changes | |
285 | for event in w.stream(self.api_func, resource_version=res_ver, watch=True, | |
286 | **self.kwargs): | |
287 | self.health = '' | |
288 | item = event['object'] | |
20effc67 | 289 | name = self.get_item_name(item) |
9f95a23c TL |
290 | |
291 | log.info('{} event: {}'.format(event['type'], name)) | |
292 | ||
293 | if event['type'] in ('ADDED', 'MODIFIED'): | |
294 | self._items = merge_dicts(self._items, {name: item}) | |
295 | elif event['type'] == 'DELETED': | |
296 | self._items = {k:v for k,v in self._items.items() if k != name} | |
297 | elif event['type'] == 'BOOKMARK': | |
298 | pass | |
299 | elif event['type'] == 'ERROR': | |
300 | raise ApiException(str(event)) | |
301 | else: | |
302 | raise KeyError('Unknown watch event {}'.format(event['type'])) | |
303 | except ProtocolError as e: | |
304 | if 'Connection broken' in str(e): | |
305 | log.info('Connection reset.') | |
306 | return | |
307 | raise | |
308 | except ApiException as e: | |
309 | log.exception('K8s API failed. {}'.format(self.api_func)) | |
310 | self.exception = e | |
311 | raise | |
312 | except Exception as e: | |
313 | log.exception("Watcher failed. ({})".format(self.api_func)) | |
314 | self.exception = e | |
315 | raise | |
316 | ||
20effc67 TL |
317 | class KubernetesCustomResource(KubernetesResource): |
318 | def _fetch(self) -> str: | |
319 | response = self.api_func(**self.kwargs) | |
320 | metadata = response['metadata'] | |
321 | self._items = {item['metadata']['name']: item for item in response['items']} | |
322 | log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) | |
323 | return metadata['resourceVersion'] | |
324 | ||
325 | def get_item_name(self, item: Any) -> Any: | |
326 | try: | |
327 | return item['metadata']['name'] | |
328 | except AttributeError: | |
329 | raise AttributeError( | |
330 | "{} doesn't contain a metadata.name. Unable to track changes".format( | |
331 | self.api_func)) | |
332 | ||
333 | class DefaultCreator(): | |
334 | def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'): | |
335 | self.coreV1_api = coreV1_api | |
336 | self.storage_class = storage_class | |
337 | self.inventory = inventory | |
338 | ||
339 | def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem: | |
340 | device_set = ccl.StorageClassDeviceSetsItem( | |
341 | name=d.sys_api['pv_name'], | |
342 | volumeClaimTemplates= ccl.VolumeClaimTemplatesList(), | |
343 | count=1, | |
344 | encrypted=drive_group.encrypted, | |
345 | portable=False | |
346 | ) | |
347 | device_set.volumeClaimTemplates.append( | |
348 | ccl.VolumeClaimTemplatesItem( | |
349 | metadata=ccl.Metadata( | |
350 | name="data" | |
351 | ), | |
352 | spec=ccl.Spec( | |
353 | storageClassName=self.storage_class, | |
354 | volumeMode="Block", | |
355 | accessModes=ccl.CrdObjectList(["ReadWriteOnce"]), | |
356 | resources={ | |
357 | "requests":{ | |
358 | "storage": 1 | |
359 | } | |
360 | }, | |
361 | volumeName=d.sys_api['pv_name'] | |
362 | ) | |
363 | ) | |
364 | ) | |
365 | return device_set | |
366 | ||
367 | def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: | |
368 | device_list = [] | |
369 | assert drive_group.data_devices is not None | |
370 | sizematcher: Optional[SizeMatcher] = None | |
371 | if drive_group.data_devices.size: | |
372 | sizematcher = SizeMatcher('size', drive_group.data_devices.size) | |
373 | limit = getattr(drive_group.data_devices, 'limit', None) | |
374 | count = 0 | |
375 | all = getattr(drive_group.data_devices, 'all', None) | |
376 | paths = [device.path for device in drive_group.data_devices.paths] | |
377 | osd_list = [] | |
378 | for pod in rook_pods.items: | |
379 | if ( | |
380 | hasattr(pod, 'metadata') | |
381 | and hasattr(pod.metadata, 'labels') | |
382 | and 'osd' in pod.metadata.labels | |
383 | and 'ceph.rook.io/DeviceSet' in pod.metadata.labels | |
384 | ): | |
385 | osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) | |
386 | for _, node in self.inventory.items(): | |
387 | for device in node: | |
388 | if device.sys_api['pv_name'] in osd_list: | |
389 | count += 1 | |
390 | for _, node in self.inventory.items(): | |
391 | for device in node: | |
392 | if not limit or (count < limit): | |
393 | if device.available: | |
394 | if ( | |
395 | all | |
396 | or ( | |
397 | device.sys_api['node'] in matching_hosts | |
398 | and ((sizematcher != None) or sizematcher.compare(device)) | |
399 | and ( | |
400 | not drive_group.data_devices.paths | |
401 | or (device.path in paths) | |
402 | ) | |
403 | ) | |
404 | ): | |
405 | device_list.append(device) | |
406 | count += 1 | |
407 | ||
408 | return device_list | |
409 | ||
410 | def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any: | |
411 | to_create = self.filter_devices(rook_pods, drive_group,matching_hosts) | |
412 | assert drive_group.data_devices is not None | |
413 | def _add_osds(current_cluster, new_cluster): | |
414 | # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster | |
415 | if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage: | |
416 | new_cluster.spec.storage = ccl.Storage() | |
417 | ||
418 | if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: | |
419 | new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() | |
420 | ||
421 | existing_scds = [ | |
422 | scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets | |
423 | ] | |
424 | for device in to_create: | |
425 | new_scds = self.device_to_device_set(drive_group, device) | |
426 | if new_scds.name not in existing_scds: | |
427 | new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) | |
428 | return new_cluster | |
429 | return _add_osds | |
430 | ||
431 | class LSOCreator(DefaultCreator): | |
432 | def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: | |
433 | device_list = [] | |
434 | assert drive_group.data_devices is not None | |
435 | sizematcher = None | |
436 | if drive_group.data_devices.size: | |
437 | sizematcher = SizeMatcher('size', drive_group.data_devices.size) | |
438 | limit = getattr(drive_group.data_devices, 'limit', None) | |
439 | all = getattr(drive_group.data_devices, 'all', None) | |
440 | paths = [device.path for device in drive_group.data_devices.paths] | |
441 | vendor = getattr(drive_group.data_devices, 'vendor', None) | |
442 | model = getattr(drive_group.data_devices, 'model', None) | |
443 | count = 0 | |
444 | osd_list = [] | |
445 | for pod in rook_pods.items: | |
446 | if ( | |
447 | hasattr(pod, 'metadata') | |
448 | and hasattr(pod.metadata, 'labels') | |
449 | and 'osd' in pod.metadata.labels | |
450 | and 'ceph.rook.io/DeviceSet' in pod.metadata.labels | |
451 | ): | |
452 | osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) | |
453 | for _, node in self.inventory.items(): | |
454 | for device in node: | |
455 | if device.sys_api['pv_name'] in osd_list: | |
456 | count += 1 | |
457 | for _, node in self.inventory.items(): | |
458 | for device in node: | |
459 | if not limit or (count < limit): | |
460 | if device.available: | |
461 | if ( | |
462 | all | |
463 | or ( | |
464 | device.sys_api['node'] in matching_hosts | |
465 | and ((sizematcher != None) or sizematcher.compare(device)) | |
466 | and ( | |
467 | not drive_group.data_devices.paths | |
468 | or device.path in paths | |
469 | ) | |
470 | and ( | |
471 | not vendor | |
472 | or device.sys_api['vendor'] == vendor | |
473 | ) | |
474 | and ( | |
475 | not model | |
476 | or device.sys_api['model'].startsWith(model) | |
477 | ) | |
478 | ) | |
479 | ): | |
480 | device_list.append(device) | |
481 | count += 1 | |
482 | return device_list | |
483 | ||
484 | class DefaultRemover(): | |
485 | def __init__( | |
486 | self, | |
487 | coreV1_api: 'client.CoreV1Api', | |
488 | batchV1_api: 'client.BatchV1Api', | |
489 | appsV1_api: 'client.AppsV1Api', | |
490 | osd_ids: List[str], | |
491 | replace_flag: bool, | |
492 | force_flag: bool, | |
493 | mon_command: Callable, | |
494 | patch: Callable, | |
495 | rook_env: 'RookEnv', | |
496 | inventory: Dict[str, List[Device]] | |
497 | ): | |
498 | self.batchV1_api = batchV1_api | |
499 | self.appsV1_api = appsV1_api | |
500 | self.coreV1_api = coreV1_api | |
501 | ||
502 | self.osd_ids = osd_ids | |
503 | self.replace_flag = replace_flag | |
504 | self.force_flag = force_flag | |
505 | ||
506 | self.mon_command = mon_command | |
507 | ||
508 | self.patch = patch | |
509 | self.rook_env = rook_env | |
510 | ||
511 | self.inventory = inventory | |
512 | self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd') | |
513 | self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare') | |
514 | self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph') | |
515 | ||
516 | ||
517 | def remove_device_sets(self) -> str: | |
518 | self.to_remove: Dict[str, int] = {} | |
519 | self.pvc_to_remove: List[str] = [] | |
520 | for pod in self.osd_pods.items: | |
521 | if ( | |
522 | hasattr(pod, 'metadata') | |
523 | and hasattr(pod.metadata, 'labels') | |
524 | and 'osd' in pod.metadata.labels | |
525 | and pod.metadata.labels['osd'] in self.osd_ids | |
526 | ): | |
527 | if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: | |
528 | self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1 | |
529 | else: | |
530 | self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1 | |
531 | self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc']) | |
532 | def _remove_osds(current_cluster, new_cluster): | |
533 | # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster | |
534 | assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None | |
535 | for _set in new_cluster.spec.storage.storageClassDeviceSets: | |
536 | if _set.name in self.to_remove: | |
537 | if _set.count == self.to_remove[_set.name]: | |
538 | new_cluster.spec.storage.storageClassDeviceSets.remove(_set) | |
539 | else: | |
540 | _set.count = _set.count - self.to_remove[_set.name] | |
541 | return new_cluster | |
542 | return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds) | |
543 | ||
544 | def check_force(self) -> None: | |
545 | if not self.force_flag: | |
546 | safe_args = {'prefix': 'osd safe-to-destroy', | |
547 | 'ids': [str(x) for x in self.osd_ids]} | |
548 | ret, out, err = self.mon_command(safe_args) | |
549 | if ret != 0: | |
550 | raise RuntimeError(err) | |
551 | ||
552 | def set_osds_down(self) -> None: | |
553 | down_flag_args = { | |
554 | 'prefix': 'osd down', | |
555 | 'ids': [str(x) for x in self.osd_ids] | |
556 | } | |
557 | ret, out, err = self.mon_command(down_flag_args) | |
558 | if ret != 0: | |
559 | raise RuntimeError(err) | |
560 | ||
561 | def scale_deployments(self) -> None: | |
562 | for osd_id in self.osd_ids: | |
563 | self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale( | |
564 | spec=client.V1ScaleSpec( | |
565 | replicas=0 | |
566 | ) | |
567 | )) | |
568 | ||
569 | def set_osds_out(self) -> None: | |
570 | out_flag_args = { | |
571 | 'prefix': 'osd out', | |
572 | 'ids': [str(x) for x in self.osd_ids] | |
573 | } | |
574 | ret, out, err = self.mon_command(out_flag_args) | |
575 | if ret != 0: | |
576 | raise RuntimeError(err) | |
577 | ||
578 | def delete_deployments(self) -> None: | |
579 | for osd_id in self.osd_ids: | |
580 | self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground') | |
581 | ||
582 | def clean_up_prepare_jobs_and_pvc(self) -> None: | |
583 | for job in self.jobs.items: | |
584 | if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove: | |
585 | self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground') | |
586 | self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground') | |
587 | ||
588 | def purge_osds(self) -> None: | |
589 | for id in self.osd_ids: | |
590 | purge_args = { | |
591 | 'prefix': 'osd purge-actual', | |
592 | 'id': int(id), | |
593 | 'yes_i_really_mean_it': True | |
594 | } | |
595 | ret, out, err = self.mon_command(purge_args) | |
596 | if ret != 0: | |
597 | raise RuntimeError(err) | |
598 | ||
599 | def destroy_osds(self) -> None: | |
600 | for id in self.osd_ids: | |
601 | destroy_args = { | |
602 | 'prefix': 'osd destroy-actual', | |
603 | 'id': int(id), | |
604 | 'yes_i_really_mean_it': True | |
605 | } | |
606 | ret, out, err = self.mon_command(destroy_args) | |
607 | if ret != 0: | |
608 | raise RuntimeError(err) | |
609 | ||
610 | def remove(self) -> str: | |
611 | try: | |
612 | self.check_force() | |
613 | except Exception as e: | |
614 | log.exception("Error checking if OSDs are safe to destroy") | |
615 | return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}" | |
616 | try: | |
617 | remove_result = self.remove_device_sets() | |
618 | except Exception as e: | |
619 | log.exception("Error patching ceph cluster CRD") | |
620 | return f"Not possible to modify Ceph cluster CRD: {e}" | |
621 | try: | |
622 | self.scale_deployments() | |
623 | self.delete_deployments() | |
624 | self.clean_up_prepare_jobs_and_pvc() | |
625 | except Exception as e: | |
626 | log.exception("Ceph cluster CRD patched, but error cleaning environment") | |
627 | return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}" | |
628 | try: | |
629 | self.set_osds_down() | |
630 | self.set_osds_out() | |
631 | if self.replace_flag: | |
632 | self.destroy_osds() | |
633 | else: | |
634 | self.purge_osds() | |
635 | except Exception as e: | |
636 | log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster") | |
637 | return f"Error removing OSDs from Ceph cluster: {e}" | |
638 | ||
639 | return remove_result | |
640 | ||
641 | ||
9f95a23c | 642 | |
11fdf7f2 | 643 | class RookCluster(object): |
522d829b TL |
644 | # import of client.CoreV1Api must be optional at import time. |
645 | # Instead allow mgr/rook to be imported anyway. | |
20effc67 TL |
646 | def __init__( |
647 | self, | |
648 | coreV1_api: 'client.CoreV1Api', | |
649 | batchV1_api: 'client.BatchV1Api', | |
650 | customObjects_api: 'client.CustomObjectsApi', | |
651 | storageV1_api: 'client.StorageV1Api', | |
652 | appsV1_api: 'client.AppsV1Api', | |
653 | rook_env: 'RookEnv', | |
654 | storage_class: 'str' | |
655 | ): | |
11fdf7f2 | 656 | self.rook_env = rook_env # type: RookEnv |
801d1391 TL |
657 | self.coreV1_api = coreV1_api # client.CoreV1Api |
658 | self.batchV1_api = batchV1_api | |
20effc67 TL |
659 | self.customObjects_api = customObjects_api |
660 | self.storageV1_api = storageV1_api # client.StorageV1Api | |
661 | self.appsV1_api = appsV1_api # client.AppsV1Api | |
662 | self.storage_class = storage_class # type: str | |
9f95a23c TL |
663 | |
664 | # TODO: replace direct k8s calls with Rook API calls | |
20effc67 | 665 | self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class) |
9f95a23c | 666 | |
522d829b | 667 | self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod, |
9f95a23c TL |
668 | namespace=self.rook_env.namespace, |
669 | label_selector="rook_cluster={0}".format( | |
f67539c2 | 670 | self.rook_env.namespace)) |
522d829b | 671 | self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node) |
20effc67 | 672 | |
522d829b | 673 | def rook_url(self, path: str) -> str: |
11fdf7f2 | 674 | prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( |
81eedcae | 675 | self.rook_env.crd_version, self.rook_env.namespace) |
11fdf7f2 TL |
676 | return urljoin(prefix, path) |
677 | ||
522d829b | 678 | def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
679 | full_path = self.rook_url(path) |
680 | log.debug("[%s] %s" % (verb, full_path)) | |
681 | ||
801d1391 | 682 | return self.coreV1_api.api_client.call_api( |
11fdf7f2 TL |
683 | full_path, |
684 | verb, | |
685 | auth_settings=['BearerToken'], | |
686 | response_type="object", | |
687 | _return_http_data_only=True, | |
688 | _preload_content=True, | |
689 | **kwargs) | |
690 | ||
522d829b | 691 | def rook_api_get(self, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
692 | return self.rook_api_call("GET", path, **kwargs) |
693 | ||
522d829b | 694 | def rook_api_delete(self, path: str) -> Any: |
11fdf7f2 TL |
695 | return self.rook_api_call("DELETE", path) |
696 | ||
522d829b | 697 | def rook_api_patch(self, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
698 | return self.rook_api_call("PATCH", path, |
699 | header_params={"Content-Type": "application/json-patch+json"}, | |
700 | **kwargs) | |
701 | ||
522d829b | 702 | def rook_api_post(self, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
703 | return self.rook_api_call("POST", path, **kwargs) |
704 | ||
20effc67 TL |
705 | def get_storage_class(self) -> 'client.V1StorageClass': |
706 | matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name] | |
707 | if len(matching_sc) == 0: | |
708 | log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)") | |
709 | raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class') | |
710 | return matching_sc[0] | |
711 | ||
712 | def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]: | |
713 | storage_class = self.get_storage_class() | |
714 | self.fetcher: Optional[DefaultFetcher] = None | |
715 | if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels): | |
716 | self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames) | |
717 | else: | |
718 | self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api) | |
719 | self.fetcher.fetch() | |
720 | return self.fetcher.devices() | |
721 | ||
722 | def get_osds(self) -> List: | |
723 | osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd') | |
724 | return list(osd_pods.items) | |
725 | ||
522d829b | 726 | def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]: |
11fdf7f2 TL |
727 | # |
728 | # Fetch cephnfs object for "nfs_cluster" and then return a rados:// | |
729 | # URL for the instance within that cluster. If the fetch fails, just | |
730 | # return None. | |
731 | # | |
732 | try: | |
733 | ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster)) | |
734 | except ApiException as e: | |
735 | log.info("Unable to fetch cephnfs object: {}".format(e.status)) | |
736 | return None | |
737 | ||
738 | pool = ceph_nfs['spec']['rados']['pool'] | |
739 | namespace = ceph_nfs['spec']['rados'].get('namespace', None) | |
740 | ||
741 | if namespace == None: | |
742 | url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance) | |
743 | else: | |
744 | url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance) | |
745 | return url | |
746 | ||
522d829b TL |
747 | def describe_pods(self, |
748 | service_type: Optional[str], | |
749 | service_id: Optional[str], | |
750 | nodename: Optional[str]) -> List[Dict[str, Any]]: | |
9f95a23c TL |
751 | """ |
752 | Go query the k8s API about deployment, containers related to this | |
753 | filesystem | |
11fdf7f2 | 754 | |
9f95a23c TL |
755 | Example Rook Pod labels for a mgr daemon: |
756 | Labels: app=rook-ceph-mgr | |
757 | pod-template-hash=2171958073 | |
758 | rook_cluster=rook | |
759 | And MDS containers additionally have `rook_filesystem` label | |
11fdf7f2 | 760 | |
f67539c2 | 761 | Label filter is rook_cluster=<cluster namespace> |
9f95a23c TL |
762 | rook_file_system=<self.fs_name> |
763 | """ | |
764 | def predicate(item): | |
801d1391 | 765 | # type: (client.V1Pod) -> bool |
9f95a23c TL |
766 | metadata = item.metadata |
767 | if service_type is not None: | |
768 | if metadata.labels['app'] != "rook-ceph-{0}".format(service_type): | |
769 | return False | |
770 | ||
771 | if service_id is not None: | |
772 | try: | |
773 | k, v = { | |
774 | "mds": ("rook_file_system", service_id), | |
775 | "osd": ("ceph-osd-id", service_id), | |
776 | "mon": ("mon", service_id), | |
777 | "mgr": ("mgr", service_id), | |
20effc67 | 778 | "nfs": ("nfs", service_id), |
9f95a23c TL |
779 | "rgw": ("ceph_rgw", service_id), |
780 | }[service_type] | |
781 | except KeyError: | |
782 | raise orchestrator.OrchestratorValidationError( | |
783 | '{} not supported'.format(service_type)) | |
784 | if metadata.labels[k] != v: | |
785 | return False | |
786 | ||
787 | if nodename is not None: | |
788 | if item.spec.node_name != nodename: | |
789 | return False | |
790 | return True | |
11fdf7f2 | 791 | |
522d829b | 792 | refreshed = datetime_now() |
9f95a23c | 793 | pods = [i for i in self.rook_pods.items if predicate(i)] |
11fdf7f2 TL |
794 | |
795 | pods_summary = [] | |
f67539c2 | 796 | prefix = 'sha256:' |
11fdf7f2 | 797 | |
9f95a23c | 798 | for p in pods: |
11fdf7f2 | 799 | d = p.to_dict() |
9f95a23c TL |
800 | |
801 | image_name = None | |
802 | for c in d['spec']['containers']: | |
803 | # look at the first listed container in the pod... | |
804 | image_name = c['image'] | |
805 | break | |
806 | ||
20effc67 TL |
807 | ls = d['status'].get('container_statuses') |
808 | if not ls: | |
809 | # ignore pods with no containers | |
810 | continue | |
811 | image_id = ls[0]['image_id'] | |
f67539c2 TL |
812 | image_id = image_id.split(prefix)[1] if prefix in image_id else image_id |
813 | ||
9f95a23c | 814 | s = { |
11fdf7f2 | 815 | "name": d['metadata']['name'], |
9f95a23c TL |
816 | "hostname": d['spec']['node_name'], |
817 | "labels": d['metadata']['labels'], | |
818 | 'phase': d['status']['phase'], | |
819 | 'container_image_name': image_name, | |
f67539c2 | 820 | 'container_image_id': image_id, |
9f95a23c TL |
821 | 'refreshed': refreshed, |
822 | # these may get set below... | |
823 | 'started': None, | |
824 | 'created': None, | |
825 | } | |
826 | ||
522d829b | 827 | # note: we want UTC |
9f95a23c TL |
828 | if d['metadata'].get('creation_timestamp', None): |
829 | s['created'] = d['metadata']['creation_timestamp'].astimezone( | |
522d829b | 830 | tz=datetime.timezone.utc) |
9f95a23c TL |
831 | if d['status'].get('start_time', None): |
832 | s['started'] = d['status']['start_time'].astimezone( | |
522d829b | 833 | tz=datetime.timezone.utc) |
9f95a23c TL |
834 | |
835 | pods_summary.append(s) | |
11fdf7f2 TL |
836 | |
837 | return pods_summary | |
838 | ||
522d829b | 839 | def remove_pods(self, names: List[str]) -> List[str]: |
9f95a23c | 840 | pods = [i for i in self.rook_pods.items] |
9f95a23c TL |
841 | for p in pods: |
842 | d = p.to_dict() | |
843 | daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','') | |
844 | daemon_id = d['metadata']['labels']['ceph_daemon_id'] | |
845 | name = daemon_type + '.' + daemon_id | |
846 | if name in names: | |
801d1391 | 847 | self.coreV1_api.delete_namespaced_pod( |
9f95a23c TL |
848 | d['metadata']['name'], |
849 | self.rook_env.namespace, | |
801d1391 | 850 | body=client.V1DeleteOptions() |
9f95a23c | 851 | ) |
522d829b | 852 | return [f'Removed Pod {n}' for n in names] |
9f95a23c | 853 | |
522d829b | 854 | def get_node_names(self) -> List[str]: |
9f95a23c TL |
855 | return [i.metadata.name for i in self.nodes.items] |
856 | ||
11fdf7f2 | 857 | @contextmanager |
522d829b | 858 | def ignore_409(self, what: str) -> Iterator[None]: |
11fdf7f2 TL |
859 | try: |
860 | yield | |
861 | except ApiException as e: | |
862 | if e.status == 409: | |
863 | # Idempotent, succeed. | |
864 | log.info("{} already exists".format(what)) | |
865 | else: | |
866 | raise | |
867 | ||
20effc67 TL |
868 | def apply_filesystem(self, spec: ServiceSpec, num_replicas: int, |
869 | leaf_type: str) -> str: | |
11fdf7f2 TL |
870 | # TODO use spec.placement |
871 | # TODO warn if spec.extended has entries we don't kow how | |
872 | # to action. | |
20effc67 | 873 | all_hosts = self.get_hosts() |
f67539c2 | 874 | def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem: |
9f95a23c | 875 | new.spec.metadataServer.activeCount = spec.placement.count or 1 |
20effc67 TL |
876 | new.spec.metadataServer.placement = cfs.Placement( |
877 | nodeAffinity=cfs.NodeAffinity( | |
878 | requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution( | |
879 | nodeSelectorTerms=cfs.NodeSelectorTermsList( | |
880 | [placement_spec_to_node_selector(spec.placement, all_hosts)] | |
881 | ) | |
882 | ) | |
883 | ) | |
884 | ) | |
9f95a23c | 885 | return new |
f67539c2 | 886 | def _create_fs() -> cfs.CephFilesystem: |
20effc67 | 887 | fs = cfs.CephFilesystem( |
9f95a23c TL |
888 | apiVersion=self.rook_env.api_name, |
889 | metadata=dict( | |
890 | name=spec.service_id, | |
891 | namespace=self.rook_env.namespace, | |
892 | ), | |
893 | spec=cfs.Spec( | |
20effc67 TL |
894 | dataPools=cfs.DataPoolsList( |
895 | { | |
896 | cfs.DataPoolsItem( | |
897 | failureDomain=leaf_type, | |
898 | replicated=cfs.Replicated( | |
899 | size=num_replicas | |
900 | ) | |
901 | ) | |
902 | } | |
903 | ), | |
904 | metadataPool=cfs.MetadataPool( | |
905 | failureDomain=leaf_type, | |
906 | replicated=cfs.Replicated( | |
907 | size=num_replicas | |
908 | ) | |
909 | ), | |
9f95a23c TL |
910 | metadataServer=cfs.MetadataServer( |
911 | activeCount=spec.placement.count or 1, | |
20effc67 TL |
912 | activeStandby=True, |
913 | placement= | |
914 | cfs.Placement( | |
915 | nodeAffinity=cfs.NodeAffinity( | |
916 | requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution( | |
917 | nodeSelectorTerms=cfs.NodeSelectorTermsList( | |
918 | [placement_spec_to_node_selector(spec.placement, all_hosts)] | |
919 | ) | |
920 | ) | |
921 | ) | |
922 | ) | |
9f95a23c TL |
923 | ) |
924 | ) | |
925 | ) | |
20effc67 | 926 | return fs |
522d829b | 927 | assert spec.service_id is not None |
9f95a23c TL |
928 | return self._create_or_patch( |
929 | cfs.CephFilesystem, 'cephfilesystems', spec.service_id, | |
930 | _update_fs, _create_fs) | |
931 | ||
20effc67 TL |
932 | def get_matching_node(self, host: str) -> Any: |
933 | matching_node = None | |
934 | for node in self.nodes.items: | |
935 | if node.metadata.labels['kubernetes.io/hostname'] == host: | |
936 | matching_node = node | |
937 | return matching_node | |
938 | ||
939 | def add_host_label(self, host: str, label: str) -> OrchResult[str]: | |
940 | matching_node = self.get_matching_node(host) | |
941 | if matching_node == None: | |
942 | return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster")) | |
943 | matching_node.metadata.labels['ceph-label/'+ label] = "" | |
944 | self.coreV1_api.patch_node(host, matching_node) | |
945 | return OrchResult(f'Added {label} label to {host}') | |
946 | ||
947 | def remove_host_label(self, host: str, label: str) -> OrchResult[str]: | |
948 | matching_node = self.get_matching_node(host) | |
949 | if matching_node == None: | |
950 | return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster")) | |
951 | matching_node.metadata.labels.pop('ceph-label/' + label, None) | |
952 | self.coreV1_api.patch_node(host, matching_node) | |
953 | return OrchResult(f'Removed {label} label from {host}') | |
954 | ||
955 | def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str: | |
f67539c2 TL |
956 | assert spec.service_id is not None |
957 | ||
958 | name = spec.service_id | |
9f95a23c | 959 | |
f67539c2 TL |
960 | if '.' in spec.service_id: |
961 | # rook does not like . in the name. this is could | |
962 | # there because it is a legacy rgw spec that was named | |
963 | # like $realm.$zone, except that I doubt there were any | |
964 | # users of this code. Instead, focus on future users and | |
965 | # translate . to - (fingers crossed!) instead. | |
966 | name = spec.service_id.replace('.', '-') | |
9f95a23c | 967 | |
20effc67 | 968 | all_hosts = self.get_hosts() |
f67539c2 | 969 | def _create_zone() -> cos.CephObjectStore: |
9f95a23c TL |
970 | port = None |
971 | secure_port = None | |
972 | if spec.ssl: | |
973 | secure_port = spec.get_port() | |
974 | else: | |
975 | port = spec.get_port() | |
20effc67 TL |
976 | object_store = cos.CephObjectStore( |
977 | apiVersion=self.rook_env.api_name, | |
978 | metadata=dict( | |
979 | name=name, | |
980 | namespace=self.rook_env.namespace | |
981 | ), | |
982 | spec=cos.Spec( | |
983 | gateway=cos.Gateway( | |
984 | port=port, | |
985 | securePort=secure_port, | |
986 | instances=spec.placement.count or 1, | |
987 | placement=cos.Placement( | |
988 | cos.NodeAffinity( | |
989 | requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution( | |
990 | nodeSelectorTerms=cos.NodeSelectorTermsList( | |
991 | [ | |
992 | placement_spec_to_node_selector(spec.placement, all_hosts) | |
993 | ] | |
994 | ) | |
995 | ) | |
996 | ) | |
997 | ) | |
998 | ), | |
999 | dataPool=cos.DataPool( | |
1000 | failureDomain=leaf_type, | |
1001 | replicated=cos.Replicated( | |
1002 | size=num_replicas | |
1003 | ) | |
1004 | ), | |
1005 | metadataPool=cos.MetadataPool( | |
1006 | failureDomain=leaf_type, | |
1007 | replicated=cos.Replicated( | |
1008 | size=num_replicas | |
1009 | ) | |
1010 | ) | |
9f95a23c TL |
1011 | ) |
1012 | ) | |
20effc67 TL |
1013 | if spec.rgw_zone: |
1014 | object_store.spec.zone=cos.Zone( | |
1015 | name=spec.rgw_zone | |
1016 | ) | |
1017 | return object_store | |
1018 | ||
9f95a23c | 1019 | |
f67539c2 | 1020 | def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore: |
20effc67 TL |
1021 | if new.spec.gateway: |
1022 | new.spec.gateway.instances = spec.placement.count or 1 | |
1023 | else: | |
1024 | new.spec.gateway=cos.Gateway( | |
1025 | instances=spec.placement.count or 1 | |
1026 | ) | |
9f95a23c | 1027 | return new |
9f95a23c TL |
1028 | return self._create_or_patch( |
1029 | cos.CephObjectStore, 'cephobjectstores', name, | |
1030 | _update_zone, _create_zone) | |
11fdf7f2 | 1031 | |
20effc67 | 1032 | def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str: |
11fdf7f2 TL |
1033 | # TODO use spec.placement |
1034 | # TODO warn if spec.extended has entries we don't kow how | |
1035 | # to action. | |
f67539c2 TL |
1036 | # TODO Number of pods should be based on the list of hosts in the |
1037 | # PlacementSpec. | |
20effc67 TL |
1038 | assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing |
1039 | service_id = spec.service_id | |
1040 | mgr_module = cast(Module, mgr) | |
f67539c2 TL |
1041 | count = spec.placement.count or 1 |
1042 | def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS: | |
1043 | new.spec.server.active = count | |
1044 | return new | |
11fdf7f2 | 1045 | |
f67539c2 TL |
1046 | def _create_nfs() -> cnfs.CephNFS: |
1047 | rook_nfsgw = cnfs.CephNFS( | |
1048 | apiVersion=self.rook_env.api_name, | |
1049 | metadata=dict( | |
1050 | name=spec.service_id, | |
1051 | namespace=self.rook_env.namespace, | |
1052 | ), | |
1053 | spec=cnfs.Spec( | |
1054 | rados=cnfs.Rados( | |
20effc67 | 1055 | namespace=service_id, |
a4b75251 | 1056 | pool=NFS_POOL_NAME, |
f67539c2 TL |
1057 | ), |
1058 | server=cnfs.Server( | |
1059 | active=count | |
1060 | ) | |
1061 | ) | |
1062 | ) | |
9f95a23c | 1063 | |
9f95a23c | 1064 | |
f67539c2 TL |
1065 | return rook_nfsgw |
1066 | ||
20effc67 TL |
1067 | create_ganesha_pool(mgr) |
1068 | NFSRados(mgr_module, service_id).write_obj('', f'conf-nfs.{spec.service_id}') | |
1069 | return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id, | |
f67539c2 | 1070 | _update_nfs, _create_nfs) |
9f95a23c | 1071 | |
522d829b | 1072 | def rm_service(self, rooktype: str, service_id: str) -> str: |
20effc67 | 1073 | self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id) |
11fdf7f2 | 1074 | objpath = "{0}/{1}".format(rooktype, service_id) |
522d829b TL |
1075 | return f'Removed {objpath}' |
1076 | ||
20effc67 TL |
1077 | def get_resource(self, resource_type: str) -> Iterable: |
1078 | custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type) | |
1079 | return custom_objects.items | |
1080 | ||
522d829b | 1081 | def can_create_osd(self) -> bool: |
11fdf7f2 TL |
1082 | current_cluster = self.rook_api_get( |
1083 | "cephclusters/{0}".format(self.rook_env.cluster_name)) | |
1084 | use_all_nodes = current_cluster['spec'].get('useAllNodes', False) | |
1085 | ||
1086 | # If useAllNodes is set, then Rook will not be paying attention | |
1087 | # to anything we put in 'nodes', so can't do OSD creation. | |
1088 | return not use_all_nodes | |
1089 | ||
522d829b | 1090 | def node_exists(self, node_name: str) -> bool: |
9f95a23c | 1091 | return node_name in self.get_node_names() |
11fdf7f2 | 1092 | |
522d829b | 1093 | def update_mon_count(self, newcount: Optional[int]) -> str: |
9f95a23c TL |
1094 | def _update_mon_count(current, new): |
1095 | # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster | |
522d829b TL |
1096 | if newcount is None: |
1097 | raise orchestrator.OrchestratorError('unable to set mon count to None') | |
20effc67 TL |
1098 | if not new.spec.mon: |
1099 | raise orchestrator.OrchestratorError("mon attribute not specified in new spec") | |
9f95a23c TL |
1100 | new.spec.mon.count = newcount |
1101 | return new | |
1102 | return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) | |
11fdf7f2 | 1103 | |
e306af50 | 1104 | def add_osds(self, drive_group, matching_hosts): |
9f95a23c | 1105 | # type: (DriveGroupSpec, List[str]) -> str |
11fdf7f2 | 1106 | assert drive_group.objectstore in ("bluestore", "filestore") |
20effc67 TL |
1107 | assert drive_group.service_id |
1108 | storage_class = self.get_storage_class() | |
1109 | inventory = self.get_discovered_devices() | |
1110 | creator: Optional[DefaultCreator] = None | |
1111 | if ( | |
1112 | storage_class.metadata.labels | |
1113 | and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels | |
1114 | ): | |
1115 | creator = LSOCreator(inventory, self.coreV1_api, self.storage_class) | |
1116 | else: | |
1117 | creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class) | |
1118 | return self._patch( | |
1119 | ccl.CephCluster, | |
1120 | 'cephclusters', | |
1121 | self.rook_env.cluster_name, | |
1122 | creator.add_osds(self.rook_pods, drive_group, matching_hosts) | |
1123 | ) | |
11fdf7f2 | 1124 | |
20effc67 TL |
1125 | def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: |
1126 | inventory = self.get_discovered_devices() | |
1127 | self.remover = DefaultRemover( | |
1128 | self.coreV1_api, | |
1129 | self.batchV1_api, | |
1130 | self.appsV1_api, | |
1131 | osd_ids, | |
1132 | replace, | |
1133 | force, | |
1134 | mon_command, | |
1135 | self._patch, | |
1136 | self.rook_env, | |
1137 | inventory | |
1138 | ) | |
1139 | return self.remover.remove() | |
1140 | ||
1141 | def get_hosts(self) -> List[orchestrator.HostSpec]: | |
1142 | ret = [] | |
1143 | for node in self.nodes.items: | |
1144 | spec = orchestrator.HostSpec( | |
1145 | node.metadata.name, | |
1146 | addr='/'.join([addr.address for addr in node.status.addresses]), | |
1147 | labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')], | |
1148 | ) | |
1149 | ret.append(spec) | |
1150 | return ret | |
1151 | ||
1152 | def create_zap_job(self, host: str, path: str) -> None: | |
1153 | body = client.V1Job( | |
1154 | api_version="batch/v1", | |
1155 | metadata=client.V1ObjectMeta( | |
1156 | name="rook-ceph-device-zap", | |
1157 | namespace="rook-ceph" | |
1158 | ), | |
1159 | spec=client.V1JobSpec( | |
1160 | template=client.V1PodTemplateSpec( | |
1161 | spec=client.V1PodSpec( | |
1162 | containers=[ | |
1163 | client.V1Container( | |
1164 | name="device-zap", | |
1165 | image="rook/ceph:master", | |
1166 | command=["bash"], | |
1167 | args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"], | |
1168 | env=[ | |
1169 | client.V1EnvVar( | |
1170 | name="ROOK_CEPH_USERNAME", | |
1171 | value_from=client.V1EnvVarSource( | |
1172 | secret_key_ref=client.V1SecretKeySelector( | |
1173 | key="ceph-username", | |
1174 | name="rook-ceph-mon" | |
1175 | ) | |
1176 | ) | |
1177 | ), | |
1178 | client.V1EnvVar( | |
1179 | name="ROOK_CEPH_SECRET", | |
1180 | value_from=client.V1EnvVarSource( | |
1181 | secret_key_ref=client.V1SecretKeySelector( | |
1182 | key="ceph-secret", | |
1183 | name="rook-ceph-mon" | |
1184 | ) | |
1185 | ) | |
1186 | ) | |
1187 | ], | |
1188 | security_context=client.V1SecurityContext( | |
1189 | run_as_user=0, | |
1190 | privileged=True | |
1191 | ), | |
1192 | volume_mounts=[ | |
1193 | client.V1VolumeMount( | |
1194 | mount_path="/etc/ceph", | |
1195 | name="ceph-conf-emptydir" | |
1196 | ), | |
1197 | client.V1VolumeMount( | |
1198 | mount_path="/etc/rook", | |
1199 | name="rook-config" | |
1200 | ), | |
1201 | client.V1VolumeMount( | |
1202 | mount_path="/dev", | |
1203 | name="devices" | |
1204 | ) | |
1205 | ] | |
1206 | ) | |
1207 | ], | |
1208 | volumes=[ | |
1209 | client.V1Volume( | |
1210 | name="ceph-conf-emptydir", | |
1211 | empty_dir=client.V1EmptyDirVolumeSource() | |
1212 | ), | |
1213 | client.V1Volume( | |
1214 | name="rook-config", | |
1215 | empty_dir=client.V1EmptyDirVolumeSource() | |
1216 | ), | |
1217 | client.V1Volume( | |
1218 | name="devices", | |
1219 | host_path=client.V1HostPathVolumeSource( | |
1220 | path="/dev" | |
1221 | ) | |
1222 | ), | |
1223 | ], | |
1224 | node_selector={ | |
1225 | "kubernetes.io/hostname": host | |
1226 | }, | |
1227 | restart_policy="Never" | |
9f95a23c TL |
1228 | ) |
1229 | ) | |
20effc67 TL |
1230 | ) |
1231 | ) | |
1232 | self.batchV1_api.create_namespaced_job('rook-ceph', body) | |
9f95a23c | 1233 | |
20effc67 TL |
1234 | def rbd_mirror(self, spec: ServiceSpec) -> None: |
1235 | service_id = spec.service_id or "default-rbd-mirror" | |
1236 | all_hosts = self.get_hosts() | |
1237 | def _create_rbd_mirror() -> crbdm.CephRBDMirror: | |
1238 | return crbdm.CephRBDMirror( | |
1239 | apiVersion=self.rook_env.api_name, | |
1240 | metadata=dict( | |
1241 | name=service_id, | |
1242 | namespace=self.rook_env.namespace, | |
1243 | ), | |
1244 | spec=crbdm.Spec( | |
1245 | count=spec.placement.count or 1, | |
1246 | placement=crbdm.Placement( | |
1247 | nodeAffinity=crbdm.NodeAffinity( | |
1248 | requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution( | |
1249 | nodeSelectorTerms=crbdm.NodeSelectorTermsList( | |
1250 | [ | |
1251 | placement_spec_to_node_selector(spec.placement, all_hosts) | |
1252 | ] | |
1253 | ) | |
1254 | ) | |
1255 | ) | |
9f95a23c | 1256 | ) |
20effc67 TL |
1257 | ) |
1258 | ) | |
1259 | def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror: | |
1260 | new.spec.count = spec.placement.count or 1 | |
1261 | new.spec.placement = crbdm.Placement( | |
1262 | nodeAffinity=crbdm.NodeAffinity( | |
1263 | requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution( | |
1264 | nodeSelectorTerms=crbdm.NodeSelectorTermsList( | |
1265 | [ | |
1266 | placement_spec_to_node_selector(spec.placement, all_hosts) | |
1267 | ] | |
1268 | ) | |
9f95a23c | 1269 | ) |
20effc67 TL |
1270 | ) |
1271 | ) | |
1272 | return new | |
1273 | self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror) | |
522d829b | 1274 | def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str: |
9f95a23c TL |
1275 | current_json = self.rook_api_get( |
1276 | "{}/{}".format(crd_name, cr_name) | |
1277 | ) | |
1278 | ||
1279 | current = crd.from_json(current_json) | |
1280 | new = crd.from_json(current_json) # no deepcopy. | |
1281 | ||
1282 | new = func(current, new) | |
1283 | ||
1284 | patch = list(jsonpatch.make_patch(current_json, new.to_json())) | |
1285 | ||
1286 | log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch)) | |
11fdf7f2 TL |
1287 | |
1288 | if len(patch) == 0: | |
1289 | return "No change" | |
1290 | ||
1291 | try: | |
1292 | self.rook_api_patch( | |
9f95a23c | 1293 | "{}/{}".format(crd_name, cr_name), |
11fdf7f2 TL |
1294 | body=patch) |
1295 | except ApiException as e: | |
1296 | log.exception("API exception: {0}".format(e)) | |
1297 | raise ApplyException( | |
9f95a23c | 1298 | "Failed to update {}/{}: {}".format(crd_name, cr_name, e)) |
11fdf7f2 TL |
1299 | |
1300 | return "Success" | |
9f95a23c | 1301 | |
522d829b TL |
1302 | def _create_or_patch(self, |
1303 | crd: Type, | |
1304 | crd_name: str, | |
1305 | cr_name: str, | |
1306 | update_func: Callable[[CrdClassT], CrdClassT], | |
1307 | create_func: Callable[[], CrdClassT]) -> str: | |
9f95a23c TL |
1308 | try: |
1309 | current_json = self.rook_api_get( | |
1310 | "{}/{}".format(crd_name, cr_name) | |
1311 | ) | |
1312 | except ApiException as e: | |
1313 | if e.status == 404: | |
1314 | current_json = None | |
1315 | else: | |
1316 | raise | |
1317 | ||
1318 | if current_json: | |
9f95a23c TL |
1319 | new = crd.from_json(current_json) # no deepcopy. |
1320 | ||
f67539c2 | 1321 | new = update_func(new) |
9f95a23c TL |
1322 | |
1323 | patch = list(jsonpatch.make_patch(current_json, new.to_json())) | |
1324 | ||
1325 | log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch)) | |
1326 | ||
1327 | if len(patch) == 0: | |
1328 | return "No change" | |
1329 | ||
1330 | try: | |
1331 | self.rook_api_patch( | |
1332 | "{}/{}".format(crd_name, cr_name), | |
1333 | body=patch) | |
1334 | except ApiException as e: | |
1335 | log.exception("API exception: {0}".format(e)) | |
1336 | raise ApplyException( | |
1337 | "Failed to update {}/{}: {}".format(crd_name, cr_name, e)) | |
1338 | return "Updated" | |
1339 | else: | |
1340 | new = create_func() | |
1341 | with self.ignore_409("{} {} already exists".format(crd_name, | |
1342 | cr_name)): | |
1343 | self.rook_api_post("{}/".format(crd_name), | |
1344 | body=new.to_json()) | |
1345 | return "Created" | |
801d1391 TL |
1346 | def get_ceph_image(self) -> str: |
1347 | try: | |
1348 | api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, | |
1349 | label_selector="app=rook-ceph-mon", | |
1350 | timeout_seconds=10) | |
1351 | if api_response.items: | |
1352 | return api_response.items[-1].spec.containers[0].image | |
1353 | else: | |
1354 | raise orchestrator.OrchestratorError( | |
1355 | "Error getting ceph image. Cluster without monitors") | |
1356 | except ApiException as e: | |
1357 | raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e)) | |
1358 | ||
1359 | ||
1360 | def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str: | |
1361 | operation_id = str(hash(loc)) | |
1362 | message = "" | |
1363 | ||
1364 | # job definition | |
1365 | job_metadata = client.V1ObjectMeta(name=operation_id, | |
1366 | namespace= self.rook_env.namespace, | |
1367 | labels={"ident": operation_id}) | |
1368 | pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id}) | |
1369 | pod_container = client.V1Container(name="ceph-lsmcli-command", | |
1370 | security_context=client.V1SecurityContext(privileged=True), | |
1371 | image=self.get_ceph_image(), | |
1372 | command=["lsmcli",], | |
1373 | args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'), | |
1374 | '--path', loc.path or loc.dev,], | |
1375 | volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"), | |
1376 | client.V1VolumeMount(name="run-udev", mount_path="/run/udev")]) | |
1377 | pod_spec = client.V1PodSpec(containers=[pod_container], | |
1378 | active_deadline_seconds=30, # Max time to terminate pod | |
1379 | restart_policy="Never", | |
1380 | node_selector= {"kubernetes.io/hostname": loc.host}, | |
1381 | volumes=[client.V1Volume(name="devices", | |
1382 | host_path=client.V1HostPathVolumeSource(path="/dev")), | |
1383 | client.V1Volume(name="run-udev", | |
1384 | host_path=client.V1HostPathVolumeSource(path="/run/udev"))]) | |
1385 | pod_template = client.V1PodTemplateSpec(metadata=pod_metadata, | |
1386 | spec=pod_spec) | |
1387 | job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job | |
1388 | ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed) | |
1389 | backoff_limit=0, | |
1390 | template=pod_template) | |
1391 | job = client.V1Job(api_version="batch/v1", | |
1392 | kind="Job", | |
1393 | metadata=job_metadata, | |
1394 | spec=job_spec) | |
1395 | ||
1396 | # delete previous job if it exists | |
1397 | try: | |
1398 | try: | |
1399 | api_response = self.batchV1_api.delete_namespaced_job(operation_id, | |
1400 | self.rook_env.namespace, | |
1401 | propagation_policy="Background") | |
1402 | except ApiException as e: | |
1403 | if e.status != 404: # No problem if the job does not exist | |
1404 | raise | |
1405 | ||
1406 | # wait until the job is not present | |
1407 | deleted = False | |
1408 | retries = 0 | |
1409 | while not deleted and retries < 10: | |
1410 | api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace, | |
1411 | label_selector="ident=%s" % operation_id, | |
1412 | timeout_seconds=10) | |
1413 | deleted = not api_response.items | |
1414 | if retries > 5: | |
1415 | sleep(0.1) | |
522d829b | 1416 | retries += 1 |
801d1391 TL |
1417 | if retries == 10 and not deleted: |
1418 | raise orchestrator.OrchestratorError( | |
1419 | "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format( | |
1420 | on, loc.host, loc.path or loc.dev, operation_id)) | |
1421 | ||
1422 | # create the job | |
1423 | api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job) | |
1424 | ||
1425 | # get the result | |
1426 | finished = False | |
1427 | while not finished: | |
1428 | api_response = self.batchV1_api.read_namespaced_job(operation_id, | |
1429 | self.rook_env.namespace) | |
1430 | finished = api_response.status.succeeded or api_response.status.failed | |
1431 | if finished: | |
1432 | message = api_response.status.conditions[-1].message | |
1433 | ||
1434 | # get the result of the lsmcli command | |
1435 | api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, | |
1436 | label_selector="ident=%s" % operation_id, | |
1437 | timeout_seconds=10) | |
1438 | if api_response.items: | |
1439 | pod_name = api_response.items[-1].metadata.name | |
1440 | message = self.coreV1_api.read_namespaced_pod_log(pod_name, | |
1441 | self.rook_env.namespace) | |
1442 | ||
1443 | except ApiException as e: | |
1444 | log.exception('K8s API failed. {}'.format(e)) | |
1445 | raise | |
1446 | ||
1447 | # Finally, delete the job. | |
1448 | # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job. | |
1449 | # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically | |
1450 | try: | |
1451 | api_response = self.batchV1_api.delete_namespaced_job(operation_id, | |
1452 | self.rook_env.namespace, | |
1453 | propagation_policy="Background") | |
1454 | except ApiException as e: | |
1455 | if e.status != 404: # No problem if the job does not exist | |
1456 | raise | |
1457 | ||
1458 | return message | |
1459 | ||
1460 | def blink_light(self, ident_fault, on, locs): | |
1461 | # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str] | |
1462 | return [self._execute_blight_job(ident_fault, on, loc) for loc in locs] | |
20effc67 TL |
1463 | |
1464 | def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem: | |
1465 | all_hostnames = [hs.hostname for hs in all_hosts] | |
1466 | res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList()) | |
1467 | if spec.host_pattern and spec.host_pattern != "*": | |
1468 | raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements") | |
1469 | if spec.label: | |
1470 | res.matchExpressions.append( | |
1471 | ccl.MatchExpressionsItem( | |
1472 | key="ceph-label/" + spec.label, | |
1473 | operator="Exists" | |
1474 | ) | |
1475 | ) | |
1476 | if spec.hosts: | |
1477 | host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames] | |
1478 | res.matchExpressions.append( | |
1479 | ccl.MatchExpressionsItem( | |
1480 | key="kubernetes.io/hostname", | |
1481 | operator="In", | |
1482 | values=ccl.CrdObjectList(host_list) | |
1483 | ) | |
1484 | ) | |
1485 | if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern): | |
1486 | res.matchExpressions.append( | |
1487 | ccl.MatchExpressionsItem( | |
1488 | key="kubernetes.io/hostname", | |
1489 | operator="Exists", | |
1490 | ) | |
1491 | ) | |
1492 | return res | |
1493 | ||
1494 | def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec: | |
1495 | res = PlacementSpec() | |
1496 | for expression in node_selector.matchExpressions: | |
1497 | if expression.key.startswith("ceph-label/"): | |
1498 | res.label = expression.key.split('/')[1] | |
1499 | elif expression.key == "kubernetes.io/hostname": | |
1500 | if expression.operator == "Exists": | |
1501 | res.host_pattern = "*" | |
1502 | elif expression.operator == "In": | |
1503 | res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values] | |
1504 | return res |