]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | """ |
2 | This module wrap's Rook + Kubernetes APIs to expose the calls | |
3 | needed to implement an orchestrator module. While the orchestrator | |
4 | module exposes an async API, this module simply exposes blocking API | |
5 | call methods. | |
6 | ||
7 | This module is runnable outside of ceph-mgr, useful for testing. | |
8 | """ | |
9f95a23c TL |
9 | import datetime |
10 | import threading | |
11fdf7f2 | 11 | import logging |
11fdf7f2 | 12 | from contextlib import contextmanager |
801d1391 | 13 | from time import sleep |
20effc67 TL |
14 | import re |
15 | from orchestrator import OrchResult | |
11fdf7f2 | 16 | |
9f95a23c | 17 | import jsonpatch |
f67539c2 | 18 | from urllib.parse import urljoin |
39ae355f | 19 | import json |
11fdf7f2 TL |
20 | |
21 | # Optional kubernetes imports to enable MgrModule.can_run | |
22 | # to behave cleanly. | |
9f95a23c TL |
23 | from urllib3.exceptions import ProtocolError |
24 | ||
20effc67 | 25 | from ceph.deployment.inventory import Device |
9f95a23c | 26 | from ceph.deployment.drive_group import DriveGroupSpec |
20effc67 | 27 | from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec |
522d829b | 28 | from ceph.utils import datetime_now |
20effc67 TL |
29 | from ceph.deployment.drive_selection.matchers import SizeMatcher |
30 | from nfs.cluster import create_ganesha_pool | |
31 | from nfs.module import Module | |
32 | from nfs.export import NFSRados | |
a4b75251 | 33 | from mgr_module import NFS_POOL_NAME |
9f95a23c TL |
34 | from mgr_util import merge_dicts |
35 | ||
20effc67 | 36 | from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \ |
522d829b | 37 | Iterable, Dict, Iterator, Type |
9f95a23c | 38 | |
11fdf7f2 | 39 | try: |
801d1391 | 40 | from kubernetes import client, watch |
11fdf7f2 TL |
41 | from kubernetes.client.rest import ApiException |
42 | except ImportError: | |
9f95a23c TL |
43 | class ApiException(Exception): # type: ignore |
44 | status = 0 | |
45 | ||
46 | from .rook_client.ceph import cephfilesystem as cfs | |
47 | from .rook_client.ceph import cephnfs as cnfs | |
48 | from .rook_client.ceph import cephobjectstore as cos | |
49 | from .rook_client.ceph import cephcluster as ccl | |
20effc67 | 50 | from .rook_client.ceph import cephrbdmirror as crbdm |
522d829b | 51 | from .rook_client._helper import CrdClass |
9f95a23c | 52 | |
9f95a23c TL |
53 | import orchestrator |
54 | ||
11fdf7f2 | 55 | try: |
20effc67 | 56 | from rook.module import RookEnv, RookOrchestrator |
11fdf7f2 TL |
57 | except ImportError: |
58 | pass # just used for type checking. | |
59 | ||
522d829b TL |
60 | |
61 | T = TypeVar('T') | |
62 | FuncT = TypeVar('FuncT', bound=Callable) | |
63 | ||
64 | CrdClassT = TypeVar('CrdClassT', bound=CrdClass) | |
65 | ||
66 | ||
11fdf7f2 TL |
67 | log = logging.getLogger(__name__) |
68 | ||
69 | ||
522d829b | 70 | def __urllib3_supports_read_chunked() -> bool: |
9f95a23c TL |
71 | # There is a bug in CentOS 7 as it ships a urllib3 which is lower |
72 | # than required by kubernetes-client | |
73 | try: | |
74 | from urllib3.response import HTTPResponse | |
75 | return hasattr(HTTPResponse, 'read_chunked') | |
76 | except ImportError: | |
77 | return False | |
78 | ||
79 | ||
522d829b | 80 | _urllib3_supports_read_chunked = __urllib3_supports_read_chunked() |
9f95a23c TL |
81 | |
82 | class ApplyException(orchestrator.OrchestratorError): | |
11fdf7f2 TL |
83 | """ |
84 | For failures to update the Rook CRDs, usually indicating | |
85 | some kind of interference between our attempted update | |
86 | and other conflicting activity. | |
87 | """ | |
88 | ||
89 | ||
522d829b TL |
90 | def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]: |
91 | def wrapper(*args: Any, **kwargs: Any) -> threading.Thread: | |
9f95a23c TL |
92 | t = threading.Thread(target=f, args=args, kwargs=kwargs) |
93 | t.start() | |
94 | return t | |
95 | ||
522d829b | 96 | return cast(Callable[..., threading.Thread], wrapper) |
9f95a23c TL |
97 | |
98 | ||
20effc67 TL |
99 | class DefaultFetcher(): |
100 | def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'): | |
101 | self.storage_class = storage_class | |
102 | self.coreV1_api = coreV1_api | |
103 | ||
104 | def fetch(self) -> None: | |
105 | self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume) | |
106 | self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class] | |
107 | ||
108 | def convert_size(self, size_str: str) -> int: | |
109 | units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E") | |
110 | coeff_and_unit = re.search('(\d+)(\D+)', size_str) | |
111 | assert coeff_and_unit is not None | |
112 | coeff = int(coeff_and_unit[1]) | |
113 | unit = coeff_and_unit[2] | |
114 | try: | |
115 | factor = units.index(unit) % 7 | |
116 | except ValueError: | |
117 | log.error("PV size format invalid") | |
118 | raise | |
119 | size = coeff * (2 ** (10 * factor)) | |
120 | return size | |
121 | ||
122 | def devices(self) -> Dict[str, List[Device]]: | |
123 | nodename_to_devices: Dict[str, List[Device]] = {} | |
124 | for i in self.pvs_in_sc: | |
125 | node, device = self.device(i) | |
126 | if node not in nodename_to_devices: | |
127 | nodename_to_devices[node] = [] | |
128 | nodename_to_devices[node].append(device) | |
129 | return nodename_to_devices | |
130 | ||
131 | def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]: | |
132 | node = 'N/A' | |
133 | if i.spec.node_affinity: | |
134 | terms = i.spec.node_affinity.required.node_selector_terms | |
135 | if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1: | |
136 | node = terms[0].match_expressions[0].values[0] | |
137 | size = self.convert_size(i.spec.capacity['storage']) | |
138 | path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else '' | |
139 | state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available' | |
140 | pv_name = i.metadata.name | |
141 | device = Device( | |
142 | path = path, | |
143 | sys_api = dict( | |
144 | size = size, | |
145 | node = node, | |
146 | pv_name = pv_name | |
147 | ), | |
148 | available = state, | |
149 | ) | |
150 | return (node, device) | |
151 | ||
152 | ||
153 | class LSOFetcher(DefaultFetcher): | |
154 | def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None): | |
155 | super().__init__(storage_class, coreV1_api) | |
156 | self.customObjects_api = customObjects_api | |
157 | self.nodenames = nodenames | |
158 | ||
159 | def fetch(self) -> None: | |
160 | super().fetch() | |
161 | self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object, | |
162 | group="local.storage.openshift.io", | |
163 | version="v1alpha1", | |
164 | plural="localvolumediscoveryresults") | |
165 | ||
166 | def predicate(self, item: 'client.V1ConfigMapList') -> bool: | |
167 | if self.nodenames is not None: | |
168 | return item['spec']['nodeName'] in self.nodenames | |
169 | else: | |
170 | return True | |
171 | ||
172 | def devices(self) -> Dict[str, List[Device]]: | |
173 | try: | |
174 | lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)] | |
175 | except ApiException as dummy_e: | |
176 | log.error("Failed to fetch device metadata") | |
177 | raise | |
178 | self.lso_devices = {} | |
179 | for i in lso_discovery_results: | |
180 | drives = i['status']['discoveredDevices'] | |
181 | for drive in drives: | |
182 | self.lso_devices[drive['deviceID'].split('/')[-1]] = drive | |
183 | nodename_to_devices: Dict[str, List[Device]] = {} | |
184 | for i in self.pvs_in_sc: | |
185 | node, device = (None, None) | |
186 | if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices): | |
187 | node, device = super().device(i) | |
188 | else: | |
189 | node, device = self.device(i) | |
190 | if node not in nodename_to_devices: | |
191 | nodename_to_devices[node] = [] | |
192 | nodename_to_devices[node].append(device) | |
193 | return nodename_to_devices | |
194 | ||
195 | def device(self, i: Any) -> Tuple[str, Device]: | |
196 | node = i.metadata.labels['kubernetes.io/hostname'] | |
197 | device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']] | |
198 | pv_name = i.metadata.name | |
199 | vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else '' | |
200 | model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else '' | |
201 | device = Device( | |
202 | path = device_discovery['path'], | |
203 | sys_api = dict( | |
204 | size = device_discovery['size'], | |
205 | rotational = '1' if device_discovery['property']=='Rotational' else '0', | |
206 | node = node, | |
207 | pv_name = pv_name, | |
208 | model = model, | |
209 | vendor = vendor | |
210 | ), | |
211 | available = device_discovery['status']['state']=='Available', | |
212 | device_id = device_discovery['deviceID'].split('/')[-1], | |
213 | lsm_data = dict( | |
214 | serialNum = device_discovery['serial'] | |
215 | ) | |
216 | ) | |
217 | return (node, device) | |
218 | ||
39ae355f TL |
219 | |
220 | class PDFetcher(DefaultFetcher): | |
221 | """ Physical Devices Fetcher""" | |
222 | def __init__(self, coreV1_api: 'client.CoreV1Api'): | |
223 | self.coreV1_api = coreV1_api | |
224 | ||
225 | def fetch(self) -> None: | |
226 | """ Collect the devices information from k8s configmaps""" | |
227 | self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map, | |
228 | namespace='rook-ceph', | |
229 | label_selector='app=rook-discover') | |
230 | ||
231 | def devices(self) -> Dict[str, List[Device]]: | |
232 | """ Return the list of devices found""" | |
233 | node_devices: Dict[str, List[Device]] = {} | |
234 | for i in self.dev_cms.items: | |
235 | devices_list: List[Device] = [] | |
236 | for d in json.loads(i.data['devices']): | |
237 | devices_list.append(self.device(d)[1]) | |
238 | node_devices[i.metadata.labels['rook.io/node']] = devices_list | |
239 | ||
240 | return node_devices | |
241 | ||
242 | def device(self, devData: Dict[str,str]) -> Tuple[str, Device]: | |
243 | """ Build an orchestrator device """ | |
244 | if 'cephVolumeData' in devData and devData['cephVolumeData']: | |
245 | return "", Device.from_json(json.loads(devData['cephVolumeData'])) | |
246 | else: | |
247 | return "", Device( | |
248 | path='/dev/' + devData['name'], | |
249 | sys_api=dict( | |
250 | rotational='1' if devData['rotational'] else '0', | |
251 | size=devData['size'] | |
252 | ), | |
253 | available=False, | |
254 | rejected_reasons=['device data coming from ceph-volume not provided'], | |
255 | ) | |
256 | ||
257 | ||
522d829b TL |
258 | class KubernetesResource(Generic[T]): |
259 | def __init__(self, api_func: Callable, **kwargs: Any) -> None: | |
9f95a23c TL |
260 | """ |
261 | Generic kubernetes Resource parent class | |
262 | ||
263 | The api fetch and watch methods should be common across resource types, | |
264 | ||
265 | Exceptions in the runner thread are propagated to the caller. | |
266 | ||
267 | :param api_func: kubernetes client api function that is passed to the watcher | |
268 | :param filter_func: signature: ``(Item) -> bool``. | |
269 | """ | |
270 | self.kwargs = kwargs | |
271 | self.api_func = api_func | |
272 | ||
273 | # ``_items`` is accessed by different threads. I assume assignment is atomic. | |
522d829b | 274 | self._items: Dict[str, T] = dict() |
9f95a23c | 275 | self.thread = None # type: Optional[threading.Thread] |
522d829b | 276 | self.exception: Optional[Exception] = None |
9f95a23c TL |
277 | if not _urllib3_supports_read_chunked: |
278 | logging.info('urllib3 is too old. Fallback to full fetches') | |
279 | ||
522d829b | 280 | def _fetch(self) -> str: |
9f95a23c TL |
281 | """ Execute the requested api method as a one-off fetch""" |
282 | response = self.api_func(**self.kwargs) | |
20effc67 | 283 | metadata = response.metadata |
9f95a23c TL |
284 | self._items = {item.metadata.name: item for item in response.items} |
285 | log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) | |
286 | return metadata.resource_version | |
287 | ||
288 | @property | |
522d829b | 289 | def items(self) -> Iterable[T]: |
9f95a23c TL |
290 | """ |
291 | Returns the items of the request. | |
292 | Creates the watcher as a side effect. | |
293 | :return: | |
294 | """ | |
295 | if self.exception: | |
296 | e = self.exception | |
297 | self.exception = None | |
298 | raise e # Propagate the exception to the user. | |
299 | if not self.thread or not self.thread.is_alive(): | |
300 | resource_version = self._fetch() | |
301 | if _urllib3_supports_read_chunked: | |
302 | # Start a thread which will use the kubernetes watch client against a resource | |
303 | log.debug("Attaching resource watcher for k8s {}".format(self.api_func)) | |
304 | self.thread = self._watch(resource_version) | |
305 | ||
306 | return self._items.values() | |
307 | ||
20effc67 TL |
308 | def get_item_name(self, item: Any) -> Any: |
309 | try: | |
310 | return item.metadata.name | |
311 | except AttributeError: | |
312 | raise AttributeError( | |
313 | "{} doesn't contain a metadata.name. Unable to track changes".format( | |
314 | self.api_func)) | |
9f95a23c | 315 | @threaded |
522d829b | 316 | def _watch(self, res_ver: Optional[str]) -> None: |
9f95a23c TL |
317 | """ worker thread that runs the kubernetes watch """ |
318 | ||
319 | self.exception = None | |
320 | ||
321 | w = watch.Watch() | |
322 | ||
323 | try: | |
324 | # execute generator to continually watch resource for changes | |
325 | for event in w.stream(self.api_func, resource_version=res_ver, watch=True, | |
326 | **self.kwargs): | |
327 | self.health = '' | |
328 | item = event['object'] | |
20effc67 | 329 | name = self.get_item_name(item) |
9f95a23c TL |
330 | |
331 | log.info('{} event: {}'.format(event['type'], name)) | |
332 | ||
333 | if event['type'] in ('ADDED', 'MODIFIED'): | |
334 | self._items = merge_dicts(self._items, {name: item}) | |
335 | elif event['type'] == 'DELETED': | |
336 | self._items = {k:v for k,v in self._items.items() if k != name} | |
337 | elif event['type'] == 'BOOKMARK': | |
338 | pass | |
339 | elif event['type'] == 'ERROR': | |
340 | raise ApiException(str(event)) | |
341 | else: | |
342 | raise KeyError('Unknown watch event {}'.format(event['type'])) | |
343 | except ProtocolError as e: | |
344 | if 'Connection broken' in str(e): | |
345 | log.info('Connection reset.') | |
346 | return | |
347 | raise | |
348 | except ApiException as e: | |
349 | log.exception('K8s API failed. {}'.format(self.api_func)) | |
350 | self.exception = e | |
351 | raise | |
352 | except Exception as e: | |
353 | log.exception("Watcher failed. ({})".format(self.api_func)) | |
354 | self.exception = e | |
355 | raise | |
356 | ||
20effc67 TL |
357 | class KubernetesCustomResource(KubernetesResource): |
358 | def _fetch(self) -> str: | |
359 | response = self.api_func(**self.kwargs) | |
360 | metadata = response['metadata'] | |
361 | self._items = {item['metadata']['name']: item for item in response['items']} | |
362 | log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) | |
363 | return metadata['resourceVersion'] | |
364 | ||
365 | def get_item_name(self, item: Any) -> Any: | |
366 | try: | |
367 | return item['metadata']['name'] | |
368 | except AttributeError: | |
369 | raise AttributeError( | |
370 | "{} doesn't contain a metadata.name. Unable to track changes".format( | |
371 | self.api_func)) | |
372 | ||
373 | class DefaultCreator(): | |
374 | def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'): | |
375 | self.coreV1_api = coreV1_api | |
376 | self.storage_class = storage_class | |
377 | self.inventory = inventory | |
378 | ||
379 | def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem: | |
380 | device_set = ccl.StorageClassDeviceSetsItem( | |
381 | name=d.sys_api['pv_name'], | |
382 | volumeClaimTemplates= ccl.VolumeClaimTemplatesList(), | |
383 | count=1, | |
384 | encrypted=drive_group.encrypted, | |
385 | portable=False | |
386 | ) | |
387 | device_set.volumeClaimTemplates.append( | |
388 | ccl.VolumeClaimTemplatesItem( | |
389 | metadata=ccl.Metadata( | |
390 | name="data" | |
391 | ), | |
392 | spec=ccl.Spec( | |
393 | storageClassName=self.storage_class, | |
394 | volumeMode="Block", | |
395 | accessModes=ccl.CrdObjectList(["ReadWriteOnce"]), | |
396 | resources={ | |
397 | "requests":{ | |
398 | "storage": 1 | |
399 | } | |
400 | }, | |
401 | volumeName=d.sys_api['pv_name'] | |
402 | ) | |
403 | ) | |
404 | ) | |
405 | return device_set | |
406 | ||
407 | def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: | |
408 | device_list = [] | |
409 | assert drive_group.data_devices is not None | |
410 | sizematcher: Optional[SizeMatcher] = None | |
411 | if drive_group.data_devices.size: | |
412 | sizematcher = SizeMatcher('size', drive_group.data_devices.size) | |
413 | limit = getattr(drive_group.data_devices, 'limit', None) | |
414 | count = 0 | |
415 | all = getattr(drive_group.data_devices, 'all', None) | |
416 | paths = [device.path for device in drive_group.data_devices.paths] | |
417 | osd_list = [] | |
418 | for pod in rook_pods.items: | |
419 | if ( | |
420 | hasattr(pod, 'metadata') | |
421 | and hasattr(pod.metadata, 'labels') | |
422 | and 'osd' in pod.metadata.labels | |
423 | and 'ceph.rook.io/DeviceSet' in pod.metadata.labels | |
424 | ): | |
425 | osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) | |
426 | for _, node in self.inventory.items(): | |
427 | for device in node: | |
428 | if device.sys_api['pv_name'] in osd_list: | |
429 | count += 1 | |
430 | for _, node in self.inventory.items(): | |
431 | for device in node: | |
432 | if not limit or (count < limit): | |
433 | if device.available: | |
434 | if ( | |
435 | all | |
436 | or ( | |
437 | device.sys_api['node'] in matching_hosts | |
438 | and ((sizematcher != None) or sizematcher.compare(device)) | |
439 | and ( | |
440 | not drive_group.data_devices.paths | |
441 | or (device.path in paths) | |
442 | ) | |
443 | ) | |
444 | ): | |
445 | device_list.append(device) | |
446 | count += 1 | |
447 | ||
448 | return device_list | |
449 | ||
450 | def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any: | |
451 | to_create = self.filter_devices(rook_pods, drive_group,matching_hosts) | |
452 | assert drive_group.data_devices is not None | |
453 | def _add_osds(current_cluster, new_cluster): | |
454 | # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster | |
455 | if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage: | |
456 | new_cluster.spec.storage = ccl.Storage() | |
457 | ||
458 | if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: | |
459 | new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() | |
460 | ||
461 | existing_scds = [ | |
462 | scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets | |
463 | ] | |
464 | for device in to_create: | |
465 | new_scds = self.device_to_device_set(drive_group, device) | |
466 | if new_scds.name not in existing_scds: | |
467 | new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) | |
468 | return new_cluster | |
469 | return _add_osds | |
470 | ||
471 | class LSOCreator(DefaultCreator): | |
472 | def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: | |
473 | device_list = [] | |
474 | assert drive_group.data_devices is not None | |
475 | sizematcher = None | |
476 | if drive_group.data_devices.size: | |
477 | sizematcher = SizeMatcher('size', drive_group.data_devices.size) | |
478 | limit = getattr(drive_group.data_devices, 'limit', None) | |
479 | all = getattr(drive_group.data_devices, 'all', None) | |
480 | paths = [device.path for device in drive_group.data_devices.paths] | |
481 | vendor = getattr(drive_group.data_devices, 'vendor', None) | |
482 | model = getattr(drive_group.data_devices, 'model', None) | |
483 | count = 0 | |
484 | osd_list = [] | |
485 | for pod in rook_pods.items: | |
486 | if ( | |
487 | hasattr(pod, 'metadata') | |
488 | and hasattr(pod.metadata, 'labels') | |
489 | and 'osd' in pod.metadata.labels | |
490 | and 'ceph.rook.io/DeviceSet' in pod.metadata.labels | |
491 | ): | |
492 | osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) | |
493 | for _, node in self.inventory.items(): | |
494 | for device in node: | |
495 | if device.sys_api['pv_name'] in osd_list: | |
496 | count += 1 | |
497 | for _, node in self.inventory.items(): | |
498 | for device in node: | |
499 | if not limit or (count < limit): | |
500 | if device.available: | |
501 | if ( | |
502 | all | |
503 | or ( | |
504 | device.sys_api['node'] in matching_hosts | |
505 | and ((sizematcher != None) or sizematcher.compare(device)) | |
506 | and ( | |
507 | not drive_group.data_devices.paths | |
508 | or device.path in paths | |
509 | ) | |
510 | and ( | |
511 | not vendor | |
512 | or device.sys_api['vendor'] == vendor | |
513 | ) | |
514 | and ( | |
515 | not model | |
516 | or device.sys_api['model'].startsWith(model) | |
517 | ) | |
518 | ) | |
519 | ): | |
520 | device_list.append(device) | |
521 | count += 1 | |
522 | return device_list | |
523 | ||
524 | class DefaultRemover(): | |
525 | def __init__( | |
526 | self, | |
527 | coreV1_api: 'client.CoreV1Api', | |
528 | batchV1_api: 'client.BatchV1Api', | |
529 | appsV1_api: 'client.AppsV1Api', | |
530 | osd_ids: List[str], | |
531 | replace_flag: bool, | |
532 | force_flag: bool, | |
533 | mon_command: Callable, | |
534 | patch: Callable, | |
535 | rook_env: 'RookEnv', | |
536 | inventory: Dict[str, List[Device]] | |
537 | ): | |
538 | self.batchV1_api = batchV1_api | |
539 | self.appsV1_api = appsV1_api | |
540 | self.coreV1_api = coreV1_api | |
541 | ||
542 | self.osd_ids = osd_ids | |
543 | self.replace_flag = replace_flag | |
544 | self.force_flag = force_flag | |
545 | ||
546 | self.mon_command = mon_command | |
547 | ||
548 | self.patch = patch | |
549 | self.rook_env = rook_env | |
550 | ||
551 | self.inventory = inventory | |
552 | self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd') | |
553 | self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare') | |
554 | self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph') | |
555 | ||
556 | ||
557 | def remove_device_sets(self) -> str: | |
558 | self.to_remove: Dict[str, int] = {} | |
559 | self.pvc_to_remove: List[str] = [] | |
560 | for pod in self.osd_pods.items: | |
561 | if ( | |
562 | hasattr(pod, 'metadata') | |
563 | and hasattr(pod.metadata, 'labels') | |
564 | and 'osd' in pod.metadata.labels | |
565 | and pod.metadata.labels['osd'] in self.osd_ids | |
566 | ): | |
567 | if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: | |
568 | self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1 | |
569 | else: | |
570 | self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1 | |
571 | self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc']) | |
572 | def _remove_osds(current_cluster, new_cluster): | |
573 | # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster | |
574 | assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None | |
575 | for _set in new_cluster.spec.storage.storageClassDeviceSets: | |
576 | if _set.name in self.to_remove: | |
577 | if _set.count == self.to_remove[_set.name]: | |
578 | new_cluster.spec.storage.storageClassDeviceSets.remove(_set) | |
579 | else: | |
580 | _set.count = _set.count - self.to_remove[_set.name] | |
581 | return new_cluster | |
582 | return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds) | |
583 | ||
584 | def check_force(self) -> None: | |
585 | if not self.force_flag: | |
586 | safe_args = {'prefix': 'osd safe-to-destroy', | |
587 | 'ids': [str(x) for x in self.osd_ids]} | |
588 | ret, out, err = self.mon_command(safe_args) | |
589 | if ret != 0: | |
590 | raise RuntimeError(err) | |
591 | ||
592 | def set_osds_down(self) -> None: | |
593 | down_flag_args = { | |
594 | 'prefix': 'osd down', | |
595 | 'ids': [str(x) for x in self.osd_ids] | |
596 | } | |
597 | ret, out, err = self.mon_command(down_flag_args) | |
598 | if ret != 0: | |
599 | raise RuntimeError(err) | |
600 | ||
601 | def scale_deployments(self) -> None: | |
602 | for osd_id in self.osd_ids: | |
603 | self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale( | |
604 | spec=client.V1ScaleSpec( | |
605 | replicas=0 | |
606 | ) | |
607 | )) | |
608 | ||
609 | def set_osds_out(self) -> None: | |
610 | out_flag_args = { | |
611 | 'prefix': 'osd out', | |
612 | 'ids': [str(x) for x in self.osd_ids] | |
613 | } | |
614 | ret, out, err = self.mon_command(out_flag_args) | |
615 | if ret != 0: | |
616 | raise RuntimeError(err) | |
617 | ||
618 | def delete_deployments(self) -> None: | |
619 | for osd_id in self.osd_ids: | |
620 | self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground') | |
621 | ||
622 | def clean_up_prepare_jobs_and_pvc(self) -> None: | |
623 | for job in self.jobs.items: | |
624 | if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove: | |
625 | self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground') | |
626 | self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground') | |
627 | ||
628 | def purge_osds(self) -> None: | |
629 | for id in self.osd_ids: | |
630 | purge_args = { | |
631 | 'prefix': 'osd purge-actual', | |
632 | 'id': int(id), | |
633 | 'yes_i_really_mean_it': True | |
634 | } | |
635 | ret, out, err = self.mon_command(purge_args) | |
636 | if ret != 0: | |
637 | raise RuntimeError(err) | |
638 | ||
639 | def destroy_osds(self) -> None: | |
640 | for id in self.osd_ids: | |
641 | destroy_args = { | |
642 | 'prefix': 'osd destroy-actual', | |
643 | 'id': int(id), | |
644 | 'yes_i_really_mean_it': True | |
645 | } | |
646 | ret, out, err = self.mon_command(destroy_args) | |
647 | if ret != 0: | |
648 | raise RuntimeError(err) | |
649 | ||
650 | def remove(self) -> str: | |
651 | try: | |
652 | self.check_force() | |
653 | except Exception as e: | |
654 | log.exception("Error checking if OSDs are safe to destroy") | |
655 | return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}" | |
656 | try: | |
657 | remove_result = self.remove_device_sets() | |
658 | except Exception as e: | |
659 | log.exception("Error patching ceph cluster CRD") | |
660 | return f"Not possible to modify Ceph cluster CRD: {e}" | |
661 | try: | |
662 | self.scale_deployments() | |
663 | self.delete_deployments() | |
664 | self.clean_up_prepare_jobs_and_pvc() | |
665 | except Exception as e: | |
666 | log.exception("Ceph cluster CRD patched, but error cleaning environment") | |
667 | return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}" | |
668 | try: | |
669 | self.set_osds_down() | |
670 | self.set_osds_out() | |
671 | if self.replace_flag: | |
672 | self.destroy_osds() | |
673 | else: | |
674 | self.purge_osds() | |
675 | except Exception as e: | |
676 | log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster") | |
677 | return f"Error removing OSDs from Ceph cluster: {e}" | |
678 | ||
679 | return remove_result | |
680 | ||
681 | ||
9f95a23c | 682 | |
11fdf7f2 | 683 | class RookCluster(object): |
522d829b TL |
684 | # import of client.CoreV1Api must be optional at import time. |
685 | # Instead allow mgr/rook to be imported anyway. | |
20effc67 TL |
686 | def __init__( |
687 | self, | |
688 | coreV1_api: 'client.CoreV1Api', | |
689 | batchV1_api: 'client.BatchV1Api', | |
690 | customObjects_api: 'client.CustomObjectsApi', | |
691 | storageV1_api: 'client.StorageV1Api', | |
692 | appsV1_api: 'client.AppsV1Api', | |
693 | rook_env: 'RookEnv', | |
694 | storage_class: 'str' | |
695 | ): | |
11fdf7f2 | 696 | self.rook_env = rook_env # type: RookEnv |
801d1391 TL |
697 | self.coreV1_api = coreV1_api # client.CoreV1Api |
698 | self.batchV1_api = batchV1_api | |
20effc67 TL |
699 | self.customObjects_api = customObjects_api |
700 | self.storageV1_api = storageV1_api # client.StorageV1Api | |
701 | self.appsV1_api = appsV1_api # client.AppsV1Api | |
702 | self.storage_class = storage_class # type: str | |
9f95a23c TL |
703 | |
704 | # TODO: replace direct k8s calls with Rook API calls | |
20effc67 | 705 | self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class) |
9f95a23c | 706 | |
522d829b | 707 | self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod, |
9f95a23c TL |
708 | namespace=self.rook_env.namespace, |
709 | label_selector="rook_cluster={0}".format( | |
f67539c2 | 710 | self.rook_env.namespace)) |
522d829b | 711 | self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node) |
39ae355f | 712 | |
522d829b | 713 | def rook_url(self, path: str) -> str: |
11fdf7f2 | 714 | prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( |
81eedcae | 715 | self.rook_env.crd_version, self.rook_env.namespace) |
11fdf7f2 TL |
716 | return urljoin(prefix, path) |
717 | ||
522d829b | 718 | def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
719 | full_path = self.rook_url(path) |
720 | log.debug("[%s] %s" % (verb, full_path)) | |
721 | ||
801d1391 | 722 | return self.coreV1_api.api_client.call_api( |
11fdf7f2 TL |
723 | full_path, |
724 | verb, | |
725 | auth_settings=['BearerToken'], | |
726 | response_type="object", | |
727 | _return_http_data_only=True, | |
728 | _preload_content=True, | |
729 | **kwargs) | |
730 | ||
522d829b | 731 | def rook_api_get(self, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
732 | return self.rook_api_call("GET", path, **kwargs) |
733 | ||
522d829b | 734 | def rook_api_delete(self, path: str) -> Any: |
11fdf7f2 TL |
735 | return self.rook_api_call("DELETE", path) |
736 | ||
522d829b | 737 | def rook_api_patch(self, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
738 | return self.rook_api_call("PATCH", path, |
739 | header_params={"Content-Type": "application/json-patch+json"}, | |
740 | **kwargs) | |
741 | ||
522d829b | 742 | def rook_api_post(self, path: str, **kwargs: Any) -> Any: |
11fdf7f2 TL |
743 | return self.rook_api_call("POST", path, **kwargs) |
744 | ||
20effc67 TL |
745 | def get_storage_class(self) -> 'client.V1StorageClass': |
746 | matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name] | |
747 | if len(matching_sc) == 0: | |
748 | log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)") | |
749 | raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class') | |
750 | return matching_sc[0] | |
751 | ||
752 | def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]: | |
20effc67 | 753 | self.fetcher: Optional[DefaultFetcher] = None |
39ae355f TL |
754 | op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data |
755 | if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true': | |
756 | self.fetcher = PDFetcher(self.coreV1_api) | |
20effc67 | 757 | else: |
39ae355f TL |
758 | storage_class = self.get_storage_class() |
759 | if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels): | |
760 | self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames) | |
761 | else: | |
762 | self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api) | |
763 | ||
20effc67 TL |
764 | self.fetcher.fetch() |
765 | return self.fetcher.devices() | |
39ae355f | 766 | |
20effc67 TL |
767 | def get_osds(self) -> List: |
768 | osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd') | |
769 | return list(osd_pods.items) | |
770 | ||
522d829b | 771 | def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]: |
11fdf7f2 TL |
772 | # |
773 | # Fetch cephnfs object for "nfs_cluster" and then return a rados:// | |
774 | # URL for the instance within that cluster. If the fetch fails, just | |
775 | # return None. | |
776 | # | |
777 | try: | |
778 | ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster)) | |
779 | except ApiException as e: | |
780 | log.info("Unable to fetch cephnfs object: {}".format(e.status)) | |
781 | return None | |
782 | ||
783 | pool = ceph_nfs['spec']['rados']['pool'] | |
784 | namespace = ceph_nfs['spec']['rados'].get('namespace', None) | |
785 | ||
786 | if namespace == None: | |
787 | url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance) | |
788 | else: | |
789 | url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance) | |
790 | return url | |
791 | ||
522d829b TL |
792 | def describe_pods(self, |
793 | service_type: Optional[str], | |
794 | service_id: Optional[str], | |
795 | nodename: Optional[str]) -> List[Dict[str, Any]]: | |
9f95a23c TL |
796 | """ |
797 | Go query the k8s API about deployment, containers related to this | |
798 | filesystem | |
11fdf7f2 | 799 | |
9f95a23c TL |
800 | Example Rook Pod labels for a mgr daemon: |
801 | Labels: app=rook-ceph-mgr | |
802 | pod-template-hash=2171958073 | |
803 | rook_cluster=rook | |
804 | And MDS containers additionally have `rook_filesystem` label | |
11fdf7f2 | 805 | |
f67539c2 | 806 | Label filter is rook_cluster=<cluster namespace> |
9f95a23c TL |
807 | rook_file_system=<self.fs_name> |
808 | """ | |
809 | def predicate(item): | |
801d1391 | 810 | # type: (client.V1Pod) -> bool |
9f95a23c TL |
811 | metadata = item.metadata |
812 | if service_type is not None: | |
813 | if metadata.labels['app'] != "rook-ceph-{0}".format(service_type): | |
814 | return False | |
815 | ||
816 | if service_id is not None: | |
817 | try: | |
818 | k, v = { | |
819 | "mds": ("rook_file_system", service_id), | |
820 | "osd": ("ceph-osd-id", service_id), | |
821 | "mon": ("mon", service_id), | |
822 | "mgr": ("mgr", service_id), | |
20effc67 | 823 | "nfs": ("nfs", service_id), |
9f95a23c TL |
824 | "rgw": ("ceph_rgw", service_id), |
825 | }[service_type] | |
826 | except KeyError: | |
827 | raise orchestrator.OrchestratorValidationError( | |
828 | '{} not supported'.format(service_type)) | |
829 | if metadata.labels[k] != v: | |
830 | return False | |
831 | ||
832 | if nodename is not None: | |
833 | if item.spec.node_name != nodename: | |
834 | return False | |
835 | return True | |
11fdf7f2 | 836 | |
522d829b | 837 | refreshed = datetime_now() |
9f95a23c | 838 | pods = [i for i in self.rook_pods.items if predicate(i)] |
11fdf7f2 TL |
839 | |
840 | pods_summary = [] | |
f67539c2 | 841 | prefix = 'sha256:' |
11fdf7f2 | 842 | |
9f95a23c | 843 | for p in pods: |
11fdf7f2 | 844 | d = p.to_dict() |
9f95a23c TL |
845 | |
846 | image_name = None | |
847 | for c in d['spec']['containers']: | |
848 | # look at the first listed container in the pod... | |
849 | image_name = c['image'] | |
850 | break | |
851 | ||
20effc67 TL |
852 | ls = d['status'].get('container_statuses') |
853 | if not ls: | |
854 | # ignore pods with no containers | |
855 | continue | |
856 | image_id = ls[0]['image_id'] | |
f67539c2 TL |
857 | image_id = image_id.split(prefix)[1] if prefix in image_id else image_id |
858 | ||
9f95a23c | 859 | s = { |
11fdf7f2 | 860 | "name": d['metadata']['name'], |
9f95a23c TL |
861 | "hostname": d['spec']['node_name'], |
862 | "labels": d['metadata']['labels'], | |
863 | 'phase': d['status']['phase'], | |
864 | 'container_image_name': image_name, | |
f67539c2 | 865 | 'container_image_id': image_id, |
9f95a23c TL |
866 | 'refreshed': refreshed, |
867 | # these may get set below... | |
868 | 'started': None, | |
869 | 'created': None, | |
870 | } | |
871 | ||
522d829b | 872 | # note: we want UTC |
9f95a23c TL |
873 | if d['metadata'].get('creation_timestamp', None): |
874 | s['created'] = d['metadata']['creation_timestamp'].astimezone( | |
522d829b | 875 | tz=datetime.timezone.utc) |
9f95a23c TL |
876 | if d['status'].get('start_time', None): |
877 | s['started'] = d['status']['start_time'].astimezone( | |
522d829b | 878 | tz=datetime.timezone.utc) |
9f95a23c TL |
879 | |
880 | pods_summary.append(s) | |
11fdf7f2 TL |
881 | |
882 | return pods_summary | |
883 | ||
522d829b | 884 | def remove_pods(self, names: List[str]) -> List[str]: |
9f95a23c | 885 | pods = [i for i in self.rook_pods.items] |
9f95a23c TL |
886 | for p in pods: |
887 | d = p.to_dict() | |
888 | daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','') | |
889 | daemon_id = d['metadata']['labels']['ceph_daemon_id'] | |
890 | name = daemon_type + '.' + daemon_id | |
891 | if name in names: | |
801d1391 | 892 | self.coreV1_api.delete_namespaced_pod( |
9f95a23c TL |
893 | d['metadata']['name'], |
894 | self.rook_env.namespace, | |
801d1391 | 895 | body=client.V1DeleteOptions() |
9f95a23c | 896 | ) |
522d829b | 897 | return [f'Removed Pod {n}' for n in names] |
9f95a23c | 898 | |
522d829b | 899 | def get_node_names(self) -> List[str]: |
9f95a23c TL |
900 | return [i.metadata.name for i in self.nodes.items] |
901 | ||
11fdf7f2 | 902 | @contextmanager |
522d829b | 903 | def ignore_409(self, what: str) -> Iterator[None]: |
11fdf7f2 TL |
904 | try: |
905 | yield | |
906 | except ApiException as e: | |
907 | if e.status == 409: | |
908 | # Idempotent, succeed. | |
909 | log.info("{} already exists".format(what)) | |
910 | else: | |
911 | raise | |
912 | ||
20effc67 TL |
913 | def apply_filesystem(self, spec: ServiceSpec, num_replicas: int, |
914 | leaf_type: str) -> str: | |
11fdf7f2 TL |
915 | # TODO use spec.placement |
916 | # TODO warn if spec.extended has entries we don't kow how | |
917 | # to action. | |
20effc67 | 918 | all_hosts = self.get_hosts() |
f67539c2 | 919 | def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem: |
9f95a23c | 920 | new.spec.metadataServer.activeCount = spec.placement.count or 1 |
20effc67 TL |
921 | new.spec.metadataServer.placement = cfs.Placement( |
922 | nodeAffinity=cfs.NodeAffinity( | |
923 | requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution( | |
924 | nodeSelectorTerms=cfs.NodeSelectorTermsList( | |
925 | [placement_spec_to_node_selector(spec.placement, all_hosts)] | |
926 | ) | |
927 | ) | |
928 | ) | |
929 | ) | |
9f95a23c | 930 | return new |
f67539c2 | 931 | def _create_fs() -> cfs.CephFilesystem: |
20effc67 | 932 | fs = cfs.CephFilesystem( |
9f95a23c TL |
933 | apiVersion=self.rook_env.api_name, |
934 | metadata=dict( | |
935 | name=spec.service_id, | |
936 | namespace=self.rook_env.namespace, | |
937 | ), | |
938 | spec=cfs.Spec( | |
20effc67 TL |
939 | dataPools=cfs.DataPoolsList( |
940 | { | |
941 | cfs.DataPoolsItem( | |
942 | failureDomain=leaf_type, | |
943 | replicated=cfs.Replicated( | |
944 | size=num_replicas | |
945 | ) | |
946 | ) | |
947 | } | |
948 | ), | |
949 | metadataPool=cfs.MetadataPool( | |
950 | failureDomain=leaf_type, | |
951 | replicated=cfs.Replicated( | |
952 | size=num_replicas | |
953 | ) | |
954 | ), | |
9f95a23c TL |
955 | metadataServer=cfs.MetadataServer( |
956 | activeCount=spec.placement.count or 1, | |
20effc67 TL |
957 | activeStandby=True, |
958 | placement= | |
959 | cfs.Placement( | |
960 | nodeAffinity=cfs.NodeAffinity( | |
961 | requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution( | |
962 | nodeSelectorTerms=cfs.NodeSelectorTermsList( | |
963 | [placement_spec_to_node_selector(spec.placement, all_hosts)] | |
964 | ) | |
965 | ) | |
966 | ) | |
967 | ) | |
9f95a23c TL |
968 | ) |
969 | ) | |
970 | ) | |
20effc67 | 971 | return fs |
522d829b | 972 | assert spec.service_id is not None |
9f95a23c TL |
973 | return self._create_or_patch( |
974 | cfs.CephFilesystem, 'cephfilesystems', spec.service_id, | |
975 | _update_fs, _create_fs) | |
976 | ||
20effc67 TL |
977 | def get_matching_node(self, host: str) -> Any: |
978 | matching_node = None | |
979 | for node in self.nodes.items: | |
980 | if node.metadata.labels['kubernetes.io/hostname'] == host: | |
981 | matching_node = node | |
982 | return matching_node | |
983 | ||
984 | def add_host_label(self, host: str, label: str) -> OrchResult[str]: | |
985 | matching_node = self.get_matching_node(host) | |
986 | if matching_node == None: | |
987 | return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster")) | |
988 | matching_node.metadata.labels['ceph-label/'+ label] = "" | |
989 | self.coreV1_api.patch_node(host, matching_node) | |
990 | return OrchResult(f'Added {label} label to {host}') | |
991 | ||
992 | def remove_host_label(self, host: str, label: str) -> OrchResult[str]: | |
993 | matching_node = self.get_matching_node(host) | |
994 | if matching_node == None: | |
995 | return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster")) | |
996 | matching_node.metadata.labels.pop('ceph-label/' + label, None) | |
997 | self.coreV1_api.patch_node(host, matching_node) | |
998 | return OrchResult(f'Removed {label} label from {host}') | |
999 | ||
1000 | def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str: | |
f67539c2 TL |
1001 | assert spec.service_id is not None |
1002 | ||
1003 | name = spec.service_id | |
9f95a23c | 1004 | |
f67539c2 TL |
1005 | if '.' in spec.service_id: |
1006 | # rook does not like . in the name. this is could | |
1007 | # there because it is a legacy rgw spec that was named | |
1008 | # like $realm.$zone, except that I doubt there were any | |
1009 | # users of this code. Instead, focus on future users and | |
1010 | # translate . to - (fingers crossed!) instead. | |
1011 | name = spec.service_id.replace('.', '-') | |
9f95a23c | 1012 | |
20effc67 | 1013 | all_hosts = self.get_hosts() |
f67539c2 | 1014 | def _create_zone() -> cos.CephObjectStore: |
9f95a23c TL |
1015 | port = None |
1016 | secure_port = None | |
1017 | if spec.ssl: | |
1018 | secure_port = spec.get_port() | |
1019 | else: | |
1020 | port = spec.get_port() | |
20effc67 TL |
1021 | object_store = cos.CephObjectStore( |
1022 | apiVersion=self.rook_env.api_name, | |
1023 | metadata=dict( | |
1024 | name=name, | |
1025 | namespace=self.rook_env.namespace | |
1026 | ), | |
1027 | spec=cos.Spec( | |
1028 | gateway=cos.Gateway( | |
1029 | port=port, | |
1030 | securePort=secure_port, | |
1031 | instances=spec.placement.count or 1, | |
1032 | placement=cos.Placement( | |
1033 | cos.NodeAffinity( | |
1034 | requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution( | |
1035 | nodeSelectorTerms=cos.NodeSelectorTermsList( | |
1036 | [ | |
1037 | placement_spec_to_node_selector(spec.placement, all_hosts) | |
1038 | ] | |
1039 | ) | |
1040 | ) | |
1041 | ) | |
1042 | ) | |
1043 | ), | |
1044 | dataPool=cos.DataPool( | |
1045 | failureDomain=leaf_type, | |
1046 | replicated=cos.Replicated( | |
1047 | size=num_replicas | |
1048 | ) | |
1049 | ), | |
1050 | metadataPool=cos.MetadataPool( | |
1051 | failureDomain=leaf_type, | |
1052 | replicated=cos.Replicated( | |
1053 | size=num_replicas | |
1054 | ) | |
1055 | ) | |
9f95a23c TL |
1056 | ) |
1057 | ) | |
20effc67 TL |
1058 | if spec.rgw_zone: |
1059 | object_store.spec.zone=cos.Zone( | |
1060 | name=spec.rgw_zone | |
1061 | ) | |
1062 | return object_store | |
1063 | ||
9f95a23c | 1064 | |
f67539c2 | 1065 | def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore: |
20effc67 TL |
1066 | if new.spec.gateway: |
1067 | new.spec.gateway.instances = spec.placement.count or 1 | |
1068 | else: | |
1069 | new.spec.gateway=cos.Gateway( | |
1070 | instances=spec.placement.count or 1 | |
1071 | ) | |
9f95a23c | 1072 | return new |
9f95a23c TL |
1073 | return self._create_or_patch( |
1074 | cos.CephObjectStore, 'cephobjectstores', name, | |
1075 | _update_zone, _create_zone) | |
11fdf7f2 | 1076 | |
20effc67 | 1077 | def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str: |
11fdf7f2 TL |
1078 | # TODO use spec.placement |
1079 | # TODO warn if spec.extended has entries we don't kow how | |
1080 | # to action. | |
f67539c2 TL |
1081 | # TODO Number of pods should be based on the list of hosts in the |
1082 | # PlacementSpec. | |
20effc67 TL |
1083 | assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing |
1084 | service_id = spec.service_id | |
1085 | mgr_module = cast(Module, mgr) | |
f67539c2 TL |
1086 | count = spec.placement.count or 1 |
1087 | def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS: | |
1088 | new.spec.server.active = count | |
1089 | return new | |
11fdf7f2 | 1090 | |
f67539c2 TL |
1091 | def _create_nfs() -> cnfs.CephNFS: |
1092 | rook_nfsgw = cnfs.CephNFS( | |
1093 | apiVersion=self.rook_env.api_name, | |
1094 | metadata=dict( | |
1095 | name=spec.service_id, | |
1096 | namespace=self.rook_env.namespace, | |
1097 | ), | |
1098 | spec=cnfs.Spec( | |
1099 | rados=cnfs.Rados( | |
20effc67 | 1100 | namespace=service_id, |
a4b75251 | 1101 | pool=NFS_POOL_NAME, |
f67539c2 TL |
1102 | ), |
1103 | server=cnfs.Server( | |
1104 | active=count | |
1105 | ) | |
1106 | ) | |
1107 | ) | |
9f95a23c | 1108 | |
9f95a23c | 1109 | |
f67539c2 TL |
1110 | return rook_nfsgw |
1111 | ||
20effc67 | 1112 | create_ganesha_pool(mgr) |
39ae355f | 1113 | NFSRados(mgr_module.rados, service_id).write_obj('', f'conf-nfs.{spec.service_id}') |
20effc67 | 1114 | return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id, |
f67539c2 | 1115 | _update_nfs, _create_nfs) |
9f95a23c | 1116 | |
522d829b | 1117 | def rm_service(self, rooktype: str, service_id: str) -> str: |
20effc67 | 1118 | self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id) |
11fdf7f2 | 1119 | objpath = "{0}/{1}".format(rooktype, service_id) |
522d829b TL |
1120 | return f'Removed {objpath}' |
1121 | ||
20effc67 TL |
1122 | def get_resource(self, resource_type: str) -> Iterable: |
1123 | custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type) | |
1124 | return custom_objects.items | |
1125 | ||
522d829b | 1126 | def can_create_osd(self) -> bool: |
11fdf7f2 TL |
1127 | current_cluster = self.rook_api_get( |
1128 | "cephclusters/{0}".format(self.rook_env.cluster_name)) | |
1129 | use_all_nodes = current_cluster['spec'].get('useAllNodes', False) | |
1130 | ||
1131 | # If useAllNodes is set, then Rook will not be paying attention | |
1132 | # to anything we put in 'nodes', so can't do OSD creation. | |
1133 | return not use_all_nodes | |
1134 | ||
522d829b | 1135 | def node_exists(self, node_name: str) -> bool: |
9f95a23c | 1136 | return node_name in self.get_node_names() |
11fdf7f2 | 1137 | |
522d829b | 1138 | def update_mon_count(self, newcount: Optional[int]) -> str: |
9f95a23c TL |
1139 | def _update_mon_count(current, new): |
1140 | # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster | |
522d829b TL |
1141 | if newcount is None: |
1142 | raise orchestrator.OrchestratorError('unable to set mon count to None') | |
20effc67 TL |
1143 | if not new.spec.mon: |
1144 | raise orchestrator.OrchestratorError("mon attribute not specified in new spec") | |
9f95a23c TL |
1145 | new.spec.mon.count = newcount |
1146 | return new | |
1147 | return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) | |
11fdf7f2 | 1148 | |
e306af50 | 1149 | def add_osds(self, drive_group, matching_hosts): |
9f95a23c | 1150 | # type: (DriveGroupSpec, List[str]) -> str |
11fdf7f2 | 1151 | assert drive_group.objectstore in ("bluestore", "filestore") |
20effc67 TL |
1152 | assert drive_group.service_id |
1153 | storage_class = self.get_storage_class() | |
1154 | inventory = self.get_discovered_devices() | |
1155 | creator: Optional[DefaultCreator] = None | |
1156 | if ( | |
1157 | storage_class.metadata.labels | |
1158 | and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels | |
1159 | ): | |
1160 | creator = LSOCreator(inventory, self.coreV1_api, self.storage_class) | |
1161 | else: | |
1162 | creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class) | |
1163 | return self._patch( | |
1164 | ccl.CephCluster, | |
1165 | 'cephclusters', | |
1166 | self.rook_env.cluster_name, | |
1167 | creator.add_osds(self.rook_pods, drive_group, matching_hosts) | |
1168 | ) | |
11fdf7f2 | 1169 | |
20effc67 TL |
1170 | def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: |
1171 | inventory = self.get_discovered_devices() | |
1172 | self.remover = DefaultRemover( | |
1173 | self.coreV1_api, | |
1174 | self.batchV1_api, | |
1175 | self.appsV1_api, | |
1176 | osd_ids, | |
1177 | replace, | |
1178 | force, | |
1179 | mon_command, | |
1180 | self._patch, | |
1181 | self.rook_env, | |
1182 | inventory | |
1183 | ) | |
1184 | return self.remover.remove() | |
1185 | ||
1186 | def get_hosts(self) -> List[orchestrator.HostSpec]: | |
1187 | ret = [] | |
1188 | for node in self.nodes.items: | |
1189 | spec = orchestrator.HostSpec( | |
1190 | node.metadata.name, | |
1191 | addr='/'.join([addr.address for addr in node.status.addresses]), | |
1192 | labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')], | |
1193 | ) | |
1194 | ret.append(spec) | |
1195 | return ret | |
1196 | ||
1197 | def create_zap_job(self, host: str, path: str) -> None: | |
1198 | body = client.V1Job( | |
1199 | api_version="batch/v1", | |
1200 | metadata=client.V1ObjectMeta( | |
1201 | name="rook-ceph-device-zap", | |
1202 | namespace="rook-ceph" | |
1203 | ), | |
1204 | spec=client.V1JobSpec( | |
1205 | template=client.V1PodTemplateSpec( | |
1206 | spec=client.V1PodSpec( | |
1207 | containers=[ | |
1208 | client.V1Container( | |
1209 | name="device-zap", | |
1210 | image="rook/ceph:master", | |
1211 | command=["bash"], | |
1212 | args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"], | |
1213 | env=[ | |
1214 | client.V1EnvVar( | |
1215 | name="ROOK_CEPH_USERNAME", | |
1216 | value_from=client.V1EnvVarSource( | |
1217 | secret_key_ref=client.V1SecretKeySelector( | |
1218 | key="ceph-username", | |
1219 | name="rook-ceph-mon" | |
1220 | ) | |
1221 | ) | |
1222 | ), | |
1223 | client.V1EnvVar( | |
1224 | name="ROOK_CEPH_SECRET", | |
1225 | value_from=client.V1EnvVarSource( | |
1226 | secret_key_ref=client.V1SecretKeySelector( | |
1227 | key="ceph-secret", | |
1228 | name="rook-ceph-mon" | |
1229 | ) | |
1230 | ) | |
1231 | ) | |
1232 | ], | |
1233 | security_context=client.V1SecurityContext( | |
1234 | run_as_user=0, | |
1235 | privileged=True | |
1236 | ), | |
1237 | volume_mounts=[ | |
1238 | client.V1VolumeMount( | |
1239 | mount_path="/etc/ceph", | |
1240 | name="ceph-conf-emptydir" | |
1241 | ), | |
1242 | client.V1VolumeMount( | |
1243 | mount_path="/etc/rook", | |
1244 | name="rook-config" | |
1245 | ), | |
1246 | client.V1VolumeMount( | |
1247 | mount_path="/dev", | |
1248 | name="devices" | |
1249 | ) | |
1250 | ] | |
1251 | ) | |
1252 | ], | |
1253 | volumes=[ | |
1254 | client.V1Volume( | |
1255 | name="ceph-conf-emptydir", | |
1256 | empty_dir=client.V1EmptyDirVolumeSource() | |
1257 | ), | |
1258 | client.V1Volume( | |
1259 | name="rook-config", | |
1260 | empty_dir=client.V1EmptyDirVolumeSource() | |
1261 | ), | |
1262 | client.V1Volume( | |
1263 | name="devices", | |
1264 | host_path=client.V1HostPathVolumeSource( | |
1265 | path="/dev" | |
1266 | ) | |
1267 | ), | |
1268 | ], | |
1269 | node_selector={ | |
1270 | "kubernetes.io/hostname": host | |
1271 | }, | |
1272 | restart_policy="Never" | |
9f95a23c TL |
1273 | ) |
1274 | ) | |
20effc67 TL |
1275 | ) |
1276 | ) | |
1277 | self.batchV1_api.create_namespaced_job('rook-ceph', body) | |
9f95a23c | 1278 | |
20effc67 TL |
1279 | def rbd_mirror(self, spec: ServiceSpec) -> None: |
1280 | service_id = spec.service_id or "default-rbd-mirror" | |
1281 | all_hosts = self.get_hosts() | |
1282 | def _create_rbd_mirror() -> crbdm.CephRBDMirror: | |
1283 | return crbdm.CephRBDMirror( | |
1284 | apiVersion=self.rook_env.api_name, | |
1285 | metadata=dict( | |
1286 | name=service_id, | |
1287 | namespace=self.rook_env.namespace, | |
1288 | ), | |
1289 | spec=crbdm.Spec( | |
1290 | count=spec.placement.count or 1, | |
1291 | placement=crbdm.Placement( | |
1292 | nodeAffinity=crbdm.NodeAffinity( | |
1293 | requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution( | |
1294 | nodeSelectorTerms=crbdm.NodeSelectorTermsList( | |
1295 | [ | |
1296 | placement_spec_to_node_selector(spec.placement, all_hosts) | |
1297 | ] | |
1298 | ) | |
1299 | ) | |
1300 | ) | |
9f95a23c | 1301 | ) |
20effc67 TL |
1302 | ) |
1303 | ) | |
1304 | def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror: | |
1305 | new.spec.count = spec.placement.count or 1 | |
1306 | new.spec.placement = crbdm.Placement( | |
1307 | nodeAffinity=crbdm.NodeAffinity( | |
1308 | requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution( | |
1309 | nodeSelectorTerms=crbdm.NodeSelectorTermsList( | |
1310 | [ | |
1311 | placement_spec_to_node_selector(spec.placement, all_hosts) | |
1312 | ] | |
1313 | ) | |
9f95a23c | 1314 | ) |
20effc67 TL |
1315 | ) |
1316 | ) | |
1317 | return new | |
1318 | self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror) | |
522d829b | 1319 | def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str: |
9f95a23c TL |
1320 | current_json = self.rook_api_get( |
1321 | "{}/{}".format(crd_name, cr_name) | |
1322 | ) | |
1323 | ||
1324 | current = crd.from_json(current_json) | |
1325 | new = crd.from_json(current_json) # no deepcopy. | |
1326 | ||
1327 | new = func(current, new) | |
1328 | ||
1329 | patch = list(jsonpatch.make_patch(current_json, new.to_json())) | |
1330 | ||
1331 | log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch)) | |
11fdf7f2 TL |
1332 | |
1333 | if len(patch) == 0: | |
1334 | return "No change" | |
1335 | ||
1336 | try: | |
1337 | self.rook_api_patch( | |
9f95a23c | 1338 | "{}/{}".format(crd_name, cr_name), |
11fdf7f2 TL |
1339 | body=patch) |
1340 | except ApiException as e: | |
1341 | log.exception("API exception: {0}".format(e)) | |
1342 | raise ApplyException( | |
9f95a23c | 1343 | "Failed to update {}/{}: {}".format(crd_name, cr_name, e)) |
11fdf7f2 TL |
1344 | |
1345 | return "Success" | |
9f95a23c | 1346 | |
522d829b TL |
1347 | def _create_or_patch(self, |
1348 | crd: Type, | |
1349 | crd_name: str, | |
1350 | cr_name: str, | |
1351 | update_func: Callable[[CrdClassT], CrdClassT], | |
1352 | create_func: Callable[[], CrdClassT]) -> str: | |
9f95a23c TL |
1353 | try: |
1354 | current_json = self.rook_api_get( | |
1355 | "{}/{}".format(crd_name, cr_name) | |
1356 | ) | |
1357 | except ApiException as e: | |
1358 | if e.status == 404: | |
1359 | current_json = None | |
1360 | else: | |
1361 | raise | |
1362 | ||
1363 | if current_json: | |
9f95a23c TL |
1364 | new = crd.from_json(current_json) # no deepcopy. |
1365 | ||
f67539c2 | 1366 | new = update_func(new) |
9f95a23c TL |
1367 | |
1368 | patch = list(jsonpatch.make_patch(current_json, new.to_json())) | |
1369 | ||
1370 | log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch)) | |
1371 | ||
1372 | if len(patch) == 0: | |
1373 | return "No change" | |
1374 | ||
1375 | try: | |
1376 | self.rook_api_patch( | |
1377 | "{}/{}".format(crd_name, cr_name), | |
1378 | body=patch) | |
1379 | except ApiException as e: | |
1380 | log.exception("API exception: {0}".format(e)) | |
1381 | raise ApplyException( | |
1382 | "Failed to update {}/{}: {}".format(crd_name, cr_name, e)) | |
1383 | return "Updated" | |
1384 | else: | |
1385 | new = create_func() | |
1386 | with self.ignore_409("{} {} already exists".format(crd_name, | |
1387 | cr_name)): | |
1388 | self.rook_api_post("{}/".format(crd_name), | |
1389 | body=new.to_json()) | |
1390 | return "Created" | |
801d1391 TL |
1391 | def get_ceph_image(self) -> str: |
1392 | try: | |
1393 | api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, | |
1394 | label_selector="app=rook-ceph-mon", | |
1395 | timeout_seconds=10) | |
1396 | if api_response.items: | |
1397 | return api_response.items[-1].spec.containers[0].image | |
1398 | else: | |
1399 | raise orchestrator.OrchestratorError( | |
1400 | "Error getting ceph image. Cluster without monitors") | |
1401 | except ApiException as e: | |
1402 | raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e)) | |
1403 | ||
1404 | ||
1405 | def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str: | |
1406 | operation_id = str(hash(loc)) | |
1407 | message = "" | |
1408 | ||
1409 | # job definition | |
1410 | job_metadata = client.V1ObjectMeta(name=operation_id, | |
1411 | namespace= self.rook_env.namespace, | |
1412 | labels={"ident": operation_id}) | |
1413 | pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id}) | |
1414 | pod_container = client.V1Container(name="ceph-lsmcli-command", | |
1415 | security_context=client.V1SecurityContext(privileged=True), | |
1416 | image=self.get_ceph_image(), | |
1417 | command=["lsmcli",], | |
1418 | args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'), | |
1419 | '--path', loc.path or loc.dev,], | |
1420 | volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"), | |
1421 | client.V1VolumeMount(name="run-udev", mount_path="/run/udev")]) | |
1422 | pod_spec = client.V1PodSpec(containers=[pod_container], | |
1423 | active_deadline_seconds=30, # Max time to terminate pod | |
1424 | restart_policy="Never", | |
1425 | node_selector= {"kubernetes.io/hostname": loc.host}, | |
1426 | volumes=[client.V1Volume(name="devices", | |
1427 | host_path=client.V1HostPathVolumeSource(path="/dev")), | |
1428 | client.V1Volume(name="run-udev", | |
1429 | host_path=client.V1HostPathVolumeSource(path="/run/udev"))]) | |
1430 | pod_template = client.V1PodTemplateSpec(metadata=pod_metadata, | |
1431 | spec=pod_spec) | |
1432 | job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job | |
1433 | ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed) | |
1434 | backoff_limit=0, | |
1435 | template=pod_template) | |
1436 | job = client.V1Job(api_version="batch/v1", | |
1437 | kind="Job", | |
1438 | metadata=job_metadata, | |
1439 | spec=job_spec) | |
1440 | ||
1441 | # delete previous job if it exists | |
1442 | try: | |
1443 | try: | |
1444 | api_response = self.batchV1_api.delete_namespaced_job(operation_id, | |
1445 | self.rook_env.namespace, | |
1446 | propagation_policy="Background") | |
1447 | except ApiException as e: | |
1448 | if e.status != 404: # No problem if the job does not exist | |
1449 | raise | |
1450 | ||
1451 | # wait until the job is not present | |
1452 | deleted = False | |
1453 | retries = 0 | |
1454 | while not deleted and retries < 10: | |
1455 | api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace, | |
1456 | label_selector="ident=%s" % operation_id, | |
1457 | timeout_seconds=10) | |
1458 | deleted = not api_response.items | |
1459 | if retries > 5: | |
1460 | sleep(0.1) | |
522d829b | 1461 | retries += 1 |
801d1391 TL |
1462 | if retries == 10 and not deleted: |
1463 | raise orchestrator.OrchestratorError( | |
1464 | "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format( | |
1465 | on, loc.host, loc.path or loc.dev, operation_id)) | |
1466 | ||
1467 | # create the job | |
1468 | api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job) | |
1469 | ||
1470 | # get the result | |
1471 | finished = False | |
1472 | while not finished: | |
1473 | api_response = self.batchV1_api.read_namespaced_job(operation_id, | |
1474 | self.rook_env.namespace) | |
1475 | finished = api_response.status.succeeded or api_response.status.failed | |
1476 | if finished: | |
1477 | message = api_response.status.conditions[-1].message | |
1478 | ||
1479 | # get the result of the lsmcli command | |
1480 | api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace, | |
1481 | label_selector="ident=%s" % operation_id, | |
1482 | timeout_seconds=10) | |
1483 | if api_response.items: | |
1484 | pod_name = api_response.items[-1].metadata.name | |
1485 | message = self.coreV1_api.read_namespaced_pod_log(pod_name, | |
1486 | self.rook_env.namespace) | |
1487 | ||
1488 | except ApiException as e: | |
1489 | log.exception('K8s API failed. {}'.format(e)) | |
1490 | raise | |
1491 | ||
1492 | # Finally, delete the job. | |
1493 | # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job. | |
1494 | # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically | |
1495 | try: | |
1496 | api_response = self.batchV1_api.delete_namespaced_job(operation_id, | |
1497 | self.rook_env.namespace, | |
1498 | propagation_policy="Background") | |
1499 | except ApiException as e: | |
1500 | if e.status != 404: # No problem if the job does not exist | |
1501 | raise | |
1502 | ||
1503 | return message | |
1504 | ||
1505 | def blink_light(self, ident_fault, on, locs): | |
1506 | # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str] | |
1507 | return [self._execute_blight_job(ident_fault, on, loc) for loc in locs] | |
20effc67 TL |
1508 | |
1509 | def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem: | |
1510 | all_hostnames = [hs.hostname for hs in all_hosts] | |
1511 | res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList()) | |
1512 | if spec.host_pattern and spec.host_pattern != "*": | |
1513 | raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements") | |
1514 | if spec.label: | |
1515 | res.matchExpressions.append( | |
1516 | ccl.MatchExpressionsItem( | |
1517 | key="ceph-label/" + spec.label, | |
1518 | operator="Exists" | |
1519 | ) | |
1520 | ) | |
1521 | if spec.hosts: | |
1522 | host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames] | |
1523 | res.matchExpressions.append( | |
1524 | ccl.MatchExpressionsItem( | |
1525 | key="kubernetes.io/hostname", | |
1526 | operator="In", | |
1527 | values=ccl.CrdObjectList(host_list) | |
1528 | ) | |
1529 | ) | |
1530 | if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern): | |
1531 | res.matchExpressions.append( | |
1532 | ccl.MatchExpressionsItem( | |
1533 | key="kubernetes.io/hostname", | |
1534 | operator="Exists", | |
1535 | ) | |
1536 | ) | |
1537 | return res | |
1538 | ||
1539 | def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec: | |
1540 | res = PlacementSpec() | |
1541 | for expression in node_selector.matchExpressions: | |
1542 | if expression.key.startswith("ceph-label/"): | |
1543 | res.label = expression.key.split('/')[1] | |
1544 | elif expression.key == "kubernetes.io/hostname": | |
1545 | if expression.operator == "Exists": | |
1546 | res.host_pattern = "*" | |
1547 | elif expression.operator == "In": | |
1548 | res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values] | |
1549 | return res |