]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/rook_cluster.py
4662b06143673acfaf994a89aee5c9b6a32d6e06
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
1 """
2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
5 call methods.
6
7 This module is runnable outside of ceph-mgr, useful for testing.
8 """
9 import datetime
10 import threading
11 import logging
12 from contextlib import contextmanager
13 from time import sleep
14 import re
15 from orchestrator import OrchResult
16
17 import jsonpatch
18 from urllib.parse import urljoin
19
20 # Optional kubernetes imports to enable MgrModule.can_run
21 # to behave cleanly.
22 from urllib3.exceptions import ProtocolError
23
24 from ceph.deployment.inventory import Device
25 from ceph.deployment.drive_group import DriveGroupSpec
26 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
27 from ceph.utils import datetime_now
28 from ceph.deployment.drive_selection.matchers import SizeMatcher
29 from nfs.cluster import create_ganesha_pool
30 from nfs.module import Module
31 from nfs.export import NFSRados
32 from mgr_module import NFS_POOL_NAME
33 from mgr_util import merge_dicts
34
35 from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \
36 Iterable, Dict, Iterator, Type
37
38 try:
39 from kubernetes import client, watch
40 from kubernetes.client.rest import ApiException
41 except ImportError:
42 class ApiException(Exception): # type: ignore
43 status = 0
44
45 from .rook_client.ceph import cephfilesystem as cfs
46 from .rook_client.ceph import cephnfs as cnfs
47 from .rook_client.ceph import cephobjectstore as cos
48 from .rook_client.ceph import cephcluster as ccl
49 from .rook_client.ceph import cephrbdmirror as crbdm
50 from .rook_client._helper import CrdClass
51
52 import orchestrator
53
54 try:
55 from rook.module import RookEnv, RookOrchestrator
56 except ImportError:
57 pass # just used for type checking.
58
59
60 T = TypeVar('T')
61 FuncT = TypeVar('FuncT', bound=Callable)
62
63 CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
64
65
66 log = logging.getLogger(__name__)
67
68
69 def __urllib3_supports_read_chunked() -> bool:
70 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
71 # than required by kubernetes-client
72 try:
73 from urllib3.response import HTTPResponse
74 return hasattr(HTTPResponse, 'read_chunked')
75 except ImportError:
76 return False
77
78
79 _urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
80
81 class ApplyException(orchestrator.OrchestratorError):
82 """
83 For failures to update the Rook CRDs, usually indicating
84 some kind of interference between our attempted update
85 and other conflicting activity.
86 """
87
88
89 def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
90 def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
91 t = threading.Thread(target=f, args=args, kwargs=kwargs)
92 t.start()
93 return t
94
95 return cast(Callable[..., threading.Thread], wrapper)
96
97
98 class DefaultFetcher():
99 def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
100 self.storage_class = storage_class
101 self.coreV1_api = coreV1_api
102
103 def fetch(self) -> None:
104 self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
105 self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
106
107 def convert_size(self, size_str: str) -> int:
108 units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
109 coeff_and_unit = re.search('(\d+)(\D+)', size_str)
110 assert coeff_and_unit is not None
111 coeff = int(coeff_and_unit[1])
112 unit = coeff_and_unit[2]
113 try:
114 factor = units.index(unit) % 7
115 except ValueError:
116 log.error("PV size format invalid")
117 raise
118 size = coeff * (2 ** (10 * factor))
119 return size
120
121 def devices(self) -> Dict[str, List[Device]]:
122 nodename_to_devices: Dict[str, List[Device]] = {}
123 for i in self.pvs_in_sc:
124 node, device = self.device(i)
125 if node not in nodename_to_devices:
126 nodename_to_devices[node] = []
127 nodename_to_devices[node].append(device)
128 return nodename_to_devices
129
130 def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
131 node = 'N/A'
132 if i.spec.node_affinity:
133 terms = i.spec.node_affinity.required.node_selector_terms
134 if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
135 node = terms[0].match_expressions[0].values[0]
136 size = self.convert_size(i.spec.capacity['storage'])
137 path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
138 state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
139 pv_name = i.metadata.name
140 device = Device(
141 path = path,
142 sys_api = dict(
143 size = size,
144 node = node,
145 pv_name = pv_name
146 ),
147 available = state,
148 )
149 return (node, device)
150
151
152 class LSOFetcher(DefaultFetcher):
153 def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
154 super().__init__(storage_class, coreV1_api)
155 self.customObjects_api = customObjects_api
156 self.nodenames = nodenames
157
158 def fetch(self) -> None:
159 super().fetch()
160 self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
161 group="local.storage.openshift.io",
162 version="v1alpha1",
163 plural="localvolumediscoveryresults")
164
165 def predicate(self, item: 'client.V1ConfigMapList') -> bool:
166 if self.nodenames is not None:
167 return item['spec']['nodeName'] in self.nodenames
168 else:
169 return True
170
171 def devices(self) -> Dict[str, List[Device]]:
172 try:
173 lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
174 except ApiException as dummy_e:
175 log.error("Failed to fetch device metadata")
176 raise
177 self.lso_devices = {}
178 for i in lso_discovery_results:
179 drives = i['status']['discoveredDevices']
180 for drive in drives:
181 self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
182 nodename_to_devices: Dict[str, List[Device]] = {}
183 for i in self.pvs_in_sc:
184 node, device = (None, None)
185 if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
186 node, device = super().device(i)
187 else:
188 node, device = self.device(i)
189 if node not in nodename_to_devices:
190 nodename_to_devices[node] = []
191 nodename_to_devices[node].append(device)
192 return nodename_to_devices
193
194 def device(self, i: Any) -> Tuple[str, Device]:
195 node = i.metadata.labels['kubernetes.io/hostname']
196 device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
197 pv_name = i.metadata.name
198 vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
199 model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
200 device = Device(
201 path = device_discovery['path'],
202 sys_api = dict(
203 size = device_discovery['size'],
204 rotational = '1' if device_discovery['property']=='Rotational' else '0',
205 node = node,
206 pv_name = pv_name,
207 model = model,
208 vendor = vendor
209 ),
210 available = device_discovery['status']['state']=='Available',
211 device_id = device_discovery['deviceID'].split('/')[-1],
212 lsm_data = dict(
213 serialNum = device_discovery['serial']
214 )
215 )
216 return (node, device)
217
218 class KubernetesResource(Generic[T]):
219 def __init__(self, api_func: Callable, **kwargs: Any) -> None:
220 """
221 Generic kubernetes Resource parent class
222
223 The api fetch and watch methods should be common across resource types,
224
225 Exceptions in the runner thread are propagated to the caller.
226
227 :param api_func: kubernetes client api function that is passed to the watcher
228 :param filter_func: signature: ``(Item) -> bool``.
229 """
230 self.kwargs = kwargs
231 self.api_func = api_func
232
233 # ``_items`` is accessed by different threads. I assume assignment is atomic.
234 self._items: Dict[str, T] = dict()
235 self.thread = None # type: Optional[threading.Thread]
236 self.exception: Optional[Exception] = None
237 if not _urllib3_supports_read_chunked:
238 logging.info('urllib3 is too old. Fallback to full fetches')
239
240 def _fetch(self) -> str:
241 """ Execute the requested api method as a one-off fetch"""
242 response = self.api_func(**self.kwargs)
243 metadata = response.metadata
244 self._items = {item.metadata.name: item for item in response.items}
245 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
246 return metadata.resource_version
247
248 @property
249 def items(self) -> Iterable[T]:
250 """
251 Returns the items of the request.
252 Creates the watcher as a side effect.
253 :return:
254 """
255 if self.exception:
256 e = self.exception
257 self.exception = None
258 raise e # Propagate the exception to the user.
259 if not self.thread or not self.thread.is_alive():
260 resource_version = self._fetch()
261 if _urllib3_supports_read_chunked:
262 # Start a thread which will use the kubernetes watch client against a resource
263 log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
264 self.thread = self._watch(resource_version)
265
266 return self._items.values()
267
268 def get_item_name(self, item: Any) -> Any:
269 try:
270 return item.metadata.name
271 except AttributeError:
272 raise AttributeError(
273 "{} doesn't contain a metadata.name. Unable to track changes".format(
274 self.api_func))
275 @threaded
276 def _watch(self, res_ver: Optional[str]) -> None:
277 """ worker thread that runs the kubernetes watch """
278
279 self.exception = None
280
281 w = watch.Watch()
282
283 try:
284 # execute generator to continually watch resource for changes
285 for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
286 **self.kwargs):
287 self.health = ''
288 item = event['object']
289 name = self.get_item_name(item)
290
291 log.info('{} event: {}'.format(event['type'], name))
292
293 if event['type'] in ('ADDED', 'MODIFIED'):
294 self._items = merge_dicts(self._items, {name: item})
295 elif event['type'] == 'DELETED':
296 self._items = {k:v for k,v in self._items.items() if k != name}
297 elif event['type'] == 'BOOKMARK':
298 pass
299 elif event['type'] == 'ERROR':
300 raise ApiException(str(event))
301 else:
302 raise KeyError('Unknown watch event {}'.format(event['type']))
303 except ProtocolError as e:
304 if 'Connection broken' in str(e):
305 log.info('Connection reset.')
306 return
307 raise
308 except ApiException as e:
309 log.exception('K8s API failed. {}'.format(self.api_func))
310 self.exception = e
311 raise
312 except Exception as e:
313 log.exception("Watcher failed. ({})".format(self.api_func))
314 self.exception = e
315 raise
316
317 class KubernetesCustomResource(KubernetesResource):
318 def _fetch(self) -> str:
319 response = self.api_func(**self.kwargs)
320 metadata = response['metadata']
321 self._items = {item['metadata']['name']: item for item in response['items']}
322 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
323 return metadata['resourceVersion']
324
325 def get_item_name(self, item: Any) -> Any:
326 try:
327 return item['metadata']['name']
328 except AttributeError:
329 raise AttributeError(
330 "{} doesn't contain a metadata.name. Unable to track changes".format(
331 self.api_func))
332
333 class DefaultCreator():
334 def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
335 self.coreV1_api = coreV1_api
336 self.storage_class = storage_class
337 self.inventory = inventory
338
339 def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
340 device_set = ccl.StorageClassDeviceSetsItem(
341 name=d.sys_api['pv_name'],
342 volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
343 count=1,
344 encrypted=drive_group.encrypted,
345 portable=False
346 )
347 device_set.volumeClaimTemplates.append(
348 ccl.VolumeClaimTemplatesItem(
349 metadata=ccl.Metadata(
350 name="data"
351 ),
352 spec=ccl.Spec(
353 storageClassName=self.storage_class,
354 volumeMode="Block",
355 accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
356 resources={
357 "requests":{
358 "storage": 1
359 }
360 },
361 volumeName=d.sys_api['pv_name']
362 )
363 )
364 )
365 return device_set
366
367 def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
368 device_list = []
369 assert drive_group.data_devices is not None
370 sizematcher: Optional[SizeMatcher] = None
371 if drive_group.data_devices.size:
372 sizematcher = SizeMatcher('size', drive_group.data_devices.size)
373 limit = getattr(drive_group.data_devices, 'limit', None)
374 count = 0
375 all = getattr(drive_group.data_devices, 'all', None)
376 paths = [device.path for device in drive_group.data_devices.paths]
377 osd_list = []
378 for pod in rook_pods.items:
379 if (
380 hasattr(pod, 'metadata')
381 and hasattr(pod.metadata, 'labels')
382 and 'osd' in pod.metadata.labels
383 and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
384 ):
385 osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
386 for _, node in self.inventory.items():
387 for device in node:
388 if device.sys_api['pv_name'] in osd_list:
389 count += 1
390 for _, node in self.inventory.items():
391 for device in node:
392 if not limit or (count < limit):
393 if device.available:
394 if (
395 all
396 or (
397 device.sys_api['node'] in matching_hosts
398 and ((sizematcher != None) or sizematcher.compare(device))
399 and (
400 not drive_group.data_devices.paths
401 or (device.path in paths)
402 )
403 )
404 ):
405 device_list.append(device)
406 count += 1
407
408 return device_list
409
410 def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
411 to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
412 assert drive_group.data_devices is not None
413 def _add_osds(current_cluster, new_cluster):
414 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
415 if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
416 new_cluster.spec.storage = ccl.Storage()
417
418 if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
419 new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
420
421 existing_scds = [
422 scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
423 ]
424 for device in to_create:
425 new_scds = self.device_to_device_set(drive_group, device)
426 if new_scds.name not in existing_scds:
427 new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
428 return new_cluster
429 return _add_osds
430
431 class LSOCreator(DefaultCreator):
432 def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
433 device_list = []
434 assert drive_group.data_devices is not None
435 sizematcher = None
436 if drive_group.data_devices.size:
437 sizematcher = SizeMatcher('size', drive_group.data_devices.size)
438 limit = getattr(drive_group.data_devices, 'limit', None)
439 all = getattr(drive_group.data_devices, 'all', None)
440 paths = [device.path for device in drive_group.data_devices.paths]
441 vendor = getattr(drive_group.data_devices, 'vendor', None)
442 model = getattr(drive_group.data_devices, 'model', None)
443 count = 0
444 osd_list = []
445 for pod in rook_pods.items:
446 if (
447 hasattr(pod, 'metadata')
448 and hasattr(pod.metadata, 'labels')
449 and 'osd' in pod.metadata.labels
450 and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
451 ):
452 osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
453 for _, node in self.inventory.items():
454 for device in node:
455 if device.sys_api['pv_name'] in osd_list:
456 count += 1
457 for _, node in self.inventory.items():
458 for device in node:
459 if not limit or (count < limit):
460 if device.available:
461 if (
462 all
463 or (
464 device.sys_api['node'] in matching_hosts
465 and ((sizematcher != None) or sizematcher.compare(device))
466 and (
467 not drive_group.data_devices.paths
468 or device.path in paths
469 )
470 and (
471 not vendor
472 or device.sys_api['vendor'] == vendor
473 )
474 and (
475 not model
476 or device.sys_api['model'].startsWith(model)
477 )
478 )
479 ):
480 device_list.append(device)
481 count += 1
482 return device_list
483
484 class DefaultRemover():
485 def __init__(
486 self,
487 coreV1_api: 'client.CoreV1Api',
488 batchV1_api: 'client.BatchV1Api',
489 appsV1_api: 'client.AppsV1Api',
490 osd_ids: List[str],
491 replace_flag: bool,
492 force_flag: bool,
493 mon_command: Callable,
494 patch: Callable,
495 rook_env: 'RookEnv',
496 inventory: Dict[str, List[Device]]
497 ):
498 self.batchV1_api = batchV1_api
499 self.appsV1_api = appsV1_api
500 self.coreV1_api = coreV1_api
501
502 self.osd_ids = osd_ids
503 self.replace_flag = replace_flag
504 self.force_flag = force_flag
505
506 self.mon_command = mon_command
507
508 self.patch = patch
509 self.rook_env = rook_env
510
511 self.inventory = inventory
512 self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
513 self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare')
514 self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
515
516
517 def remove_device_sets(self) -> str:
518 self.to_remove: Dict[str, int] = {}
519 self.pvc_to_remove: List[str] = []
520 for pod in self.osd_pods.items:
521 if (
522 hasattr(pod, 'metadata')
523 and hasattr(pod.metadata, 'labels')
524 and 'osd' in pod.metadata.labels
525 and pod.metadata.labels['osd'] in self.osd_ids
526 ):
527 if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
528 self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
529 else:
530 self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
531 self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
532 def _remove_osds(current_cluster, new_cluster):
533 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
534 assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
535 for _set in new_cluster.spec.storage.storageClassDeviceSets:
536 if _set.name in self.to_remove:
537 if _set.count == self.to_remove[_set.name]:
538 new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
539 else:
540 _set.count = _set.count - self.to_remove[_set.name]
541 return new_cluster
542 return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
543
544 def check_force(self) -> None:
545 if not self.force_flag:
546 safe_args = {'prefix': 'osd safe-to-destroy',
547 'ids': [str(x) for x in self.osd_ids]}
548 ret, out, err = self.mon_command(safe_args)
549 if ret != 0:
550 raise RuntimeError(err)
551
552 def set_osds_down(self) -> None:
553 down_flag_args = {
554 'prefix': 'osd down',
555 'ids': [str(x) for x in self.osd_ids]
556 }
557 ret, out, err = self.mon_command(down_flag_args)
558 if ret != 0:
559 raise RuntimeError(err)
560
561 def scale_deployments(self) -> None:
562 for osd_id in self.osd_ids:
563 self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
564 spec=client.V1ScaleSpec(
565 replicas=0
566 )
567 ))
568
569 def set_osds_out(self) -> None:
570 out_flag_args = {
571 'prefix': 'osd out',
572 'ids': [str(x) for x in self.osd_ids]
573 }
574 ret, out, err = self.mon_command(out_flag_args)
575 if ret != 0:
576 raise RuntimeError(err)
577
578 def delete_deployments(self) -> None:
579 for osd_id in self.osd_ids:
580 self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
581
582 def clean_up_prepare_jobs_and_pvc(self) -> None:
583 for job in self.jobs.items:
584 if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
585 self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
586 self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
587
588 def purge_osds(self) -> None:
589 for id in self.osd_ids:
590 purge_args = {
591 'prefix': 'osd purge-actual',
592 'id': int(id),
593 'yes_i_really_mean_it': True
594 }
595 ret, out, err = self.mon_command(purge_args)
596 if ret != 0:
597 raise RuntimeError(err)
598
599 def destroy_osds(self) -> None:
600 for id in self.osd_ids:
601 destroy_args = {
602 'prefix': 'osd destroy-actual',
603 'id': int(id),
604 'yes_i_really_mean_it': True
605 }
606 ret, out, err = self.mon_command(destroy_args)
607 if ret != 0:
608 raise RuntimeError(err)
609
610 def remove(self) -> str:
611 try:
612 self.check_force()
613 except Exception as e:
614 log.exception("Error checking if OSDs are safe to destroy")
615 return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
616 try:
617 remove_result = self.remove_device_sets()
618 except Exception as e:
619 log.exception("Error patching ceph cluster CRD")
620 return f"Not possible to modify Ceph cluster CRD: {e}"
621 try:
622 self.scale_deployments()
623 self.delete_deployments()
624 self.clean_up_prepare_jobs_and_pvc()
625 except Exception as e:
626 log.exception("Ceph cluster CRD patched, but error cleaning environment")
627 return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
628 try:
629 self.set_osds_down()
630 self.set_osds_out()
631 if self.replace_flag:
632 self.destroy_osds()
633 else:
634 self.purge_osds()
635 except Exception as e:
636 log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
637 return f"Error removing OSDs from Ceph cluster: {e}"
638
639 return remove_result
640
641
642
643 class RookCluster(object):
644 # import of client.CoreV1Api must be optional at import time.
645 # Instead allow mgr/rook to be imported anyway.
646 def __init__(
647 self,
648 coreV1_api: 'client.CoreV1Api',
649 batchV1_api: 'client.BatchV1Api',
650 customObjects_api: 'client.CustomObjectsApi',
651 storageV1_api: 'client.StorageV1Api',
652 appsV1_api: 'client.AppsV1Api',
653 rook_env: 'RookEnv',
654 storage_class: 'str'
655 ):
656 self.rook_env = rook_env # type: RookEnv
657 self.coreV1_api = coreV1_api # client.CoreV1Api
658 self.batchV1_api = batchV1_api
659 self.customObjects_api = customObjects_api
660 self.storageV1_api = storageV1_api # client.StorageV1Api
661 self.appsV1_api = appsV1_api # client.AppsV1Api
662 self.storage_class = storage_class # type: str
663
664 # TODO: replace direct k8s calls with Rook API calls
665 self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
666
667 self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
668 namespace=self.rook_env.namespace,
669 label_selector="rook_cluster={0}".format(
670 self.rook_env.namespace))
671 self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
672
673 def rook_url(self, path: str) -> str:
674 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
675 self.rook_env.crd_version, self.rook_env.namespace)
676 return urljoin(prefix, path)
677
678 def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
679 full_path = self.rook_url(path)
680 log.debug("[%s] %s" % (verb, full_path))
681
682 return self.coreV1_api.api_client.call_api(
683 full_path,
684 verb,
685 auth_settings=['BearerToken'],
686 response_type="object",
687 _return_http_data_only=True,
688 _preload_content=True,
689 **kwargs)
690
691 def rook_api_get(self, path: str, **kwargs: Any) -> Any:
692 return self.rook_api_call("GET", path, **kwargs)
693
694 def rook_api_delete(self, path: str) -> Any:
695 return self.rook_api_call("DELETE", path)
696
697 def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
698 return self.rook_api_call("PATCH", path,
699 header_params={"Content-Type": "application/json-patch+json"},
700 **kwargs)
701
702 def rook_api_post(self, path: str, **kwargs: Any) -> Any:
703 return self.rook_api_call("POST", path, **kwargs)
704
705 def get_storage_class(self) -> 'client.V1StorageClass':
706 matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
707 if len(matching_sc) == 0:
708 log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
709 raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
710 return matching_sc[0]
711
712 def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
713 storage_class = self.get_storage_class()
714 self.fetcher: Optional[DefaultFetcher] = None
715 if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
716 self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
717 else:
718 self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
719 self.fetcher.fetch()
720 return self.fetcher.devices()
721
722 def get_osds(self) -> List:
723 osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
724 return list(osd_pods.items)
725
726 def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
727 #
728 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
729 # URL for the instance within that cluster. If the fetch fails, just
730 # return None.
731 #
732 try:
733 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
734 except ApiException as e:
735 log.info("Unable to fetch cephnfs object: {}".format(e.status))
736 return None
737
738 pool = ceph_nfs['spec']['rados']['pool']
739 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
740
741 if namespace == None:
742 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
743 else:
744 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
745 return url
746
747 def describe_pods(self,
748 service_type: Optional[str],
749 service_id: Optional[str],
750 nodename: Optional[str]) -> List[Dict[str, Any]]:
751 """
752 Go query the k8s API about deployment, containers related to this
753 filesystem
754
755 Example Rook Pod labels for a mgr daemon:
756 Labels: app=rook-ceph-mgr
757 pod-template-hash=2171958073
758 rook_cluster=rook
759 And MDS containers additionally have `rook_filesystem` label
760
761 Label filter is rook_cluster=<cluster namespace>
762 rook_file_system=<self.fs_name>
763 """
764 def predicate(item):
765 # type: (client.V1Pod) -> bool
766 metadata = item.metadata
767 if service_type is not None:
768 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
769 return False
770
771 if service_id is not None:
772 try:
773 k, v = {
774 "mds": ("rook_file_system", service_id),
775 "osd": ("ceph-osd-id", service_id),
776 "mon": ("mon", service_id),
777 "mgr": ("mgr", service_id),
778 "nfs": ("nfs", service_id),
779 "rgw": ("ceph_rgw", service_id),
780 }[service_type]
781 except KeyError:
782 raise orchestrator.OrchestratorValidationError(
783 '{} not supported'.format(service_type))
784 if metadata.labels[k] != v:
785 return False
786
787 if nodename is not None:
788 if item.spec.node_name != nodename:
789 return False
790 return True
791
792 refreshed = datetime_now()
793 pods = [i for i in self.rook_pods.items if predicate(i)]
794
795 pods_summary = []
796 prefix = 'sha256:'
797
798 for p in pods:
799 d = p.to_dict()
800
801 image_name = None
802 for c in d['spec']['containers']:
803 # look at the first listed container in the pod...
804 image_name = c['image']
805 break
806
807 ls = d['status'].get('container_statuses')
808 if not ls:
809 # ignore pods with no containers
810 continue
811 image_id = ls[0]['image_id']
812 image_id = image_id.split(prefix)[1] if prefix in image_id else image_id
813
814 s = {
815 "name": d['metadata']['name'],
816 "hostname": d['spec']['node_name'],
817 "labels": d['metadata']['labels'],
818 'phase': d['status']['phase'],
819 'container_image_name': image_name,
820 'container_image_id': image_id,
821 'refreshed': refreshed,
822 # these may get set below...
823 'started': None,
824 'created': None,
825 }
826
827 # note: we want UTC
828 if d['metadata'].get('creation_timestamp', None):
829 s['created'] = d['metadata']['creation_timestamp'].astimezone(
830 tz=datetime.timezone.utc)
831 if d['status'].get('start_time', None):
832 s['started'] = d['status']['start_time'].astimezone(
833 tz=datetime.timezone.utc)
834
835 pods_summary.append(s)
836
837 return pods_summary
838
839 def remove_pods(self, names: List[str]) -> List[str]:
840 pods = [i for i in self.rook_pods.items]
841 for p in pods:
842 d = p.to_dict()
843 daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
844 daemon_id = d['metadata']['labels']['ceph_daemon_id']
845 name = daemon_type + '.' + daemon_id
846 if name in names:
847 self.coreV1_api.delete_namespaced_pod(
848 d['metadata']['name'],
849 self.rook_env.namespace,
850 body=client.V1DeleteOptions()
851 )
852 return [f'Removed Pod {n}' for n in names]
853
854 def get_node_names(self) -> List[str]:
855 return [i.metadata.name for i in self.nodes.items]
856
857 @contextmanager
858 def ignore_409(self, what: str) -> Iterator[None]:
859 try:
860 yield
861 except ApiException as e:
862 if e.status == 409:
863 # Idempotent, succeed.
864 log.info("{} already exists".format(what))
865 else:
866 raise
867
868 def apply_filesystem(self, spec: ServiceSpec, num_replicas: int,
869 leaf_type: str) -> str:
870 # TODO use spec.placement
871 # TODO warn if spec.extended has entries we don't kow how
872 # to action.
873 all_hosts = self.get_hosts()
874 def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem:
875 new.spec.metadataServer.activeCount = spec.placement.count or 1
876 new.spec.metadataServer.placement = cfs.Placement(
877 nodeAffinity=cfs.NodeAffinity(
878 requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
879 nodeSelectorTerms=cfs.NodeSelectorTermsList(
880 [placement_spec_to_node_selector(spec.placement, all_hosts)]
881 )
882 )
883 )
884 )
885 return new
886 def _create_fs() -> cfs.CephFilesystem:
887 fs = cfs.CephFilesystem(
888 apiVersion=self.rook_env.api_name,
889 metadata=dict(
890 name=spec.service_id,
891 namespace=self.rook_env.namespace,
892 ),
893 spec=cfs.Spec(
894 dataPools=cfs.DataPoolsList(
895 {
896 cfs.DataPoolsItem(
897 failureDomain=leaf_type,
898 replicated=cfs.Replicated(
899 size=num_replicas
900 )
901 )
902 }
903 ),
904 metadataPool=cfs.MetadataPool(
905 failureDomain=leaf_type,
906 replicated=cfs.Replicated(
907 size=num_replicas
908 )
909 ),
910 metadataServer=cfs.MetadataServer(
911 activeCount=spec.placement.count or 1,
912 activeStandby=True,
913 placement=
914 cfs.Placement(
915 nodeAffinity=cfs.NodeAffinity(
916 requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
917 nodeSelectorTerms=cfs.NodeSelectorTermsList(
918 [placement_spec_to_node_selector(spec.placement, all_hosts)]
919 )
920 )
921 )
922 )
923 )
924 )
925 )
926 return fs
927 assert spec.service_id is not None
928 return self._create_or_patch(
929 cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
930 _update_fs, _create_fs)
931
932 def get_matching_node(self, host: str) -> Any:
933 matching_node = None
934 for node in self.nodes.items:
935 if node.metadata.labels['kubernetes.io/hostname'] == host:
936 matching_node = node
937 return matching_node
938
939 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
940 matching_node = self.get_matching_node(host)
941 if matching_node == None:
942 return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster"))
943 matching_node.metadata.labels['ceph-label/'+ label] = ""
944 self.coreV1_api.patch_node(host, matching_node)
945 return OrchResult(f'Added {label} label to {host}')
946
947 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
948 matching_node = self.get_matching_node(host)
949 if matching_node == None:
950 return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster"))
951 matching_node.metadata.labels.pop('ceph-label/' + label, None)
952 self.coreV1_api.patch_node(host, matching_node)
953 return OrchResult(f'Removed {label} label from {host}')
954
955 def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str:
956 assert spec.service_id is not None
957
958 name = spec.service_id
959
960 if '.' in spec.service_id:
961 # rook does not like . in the name. this is could
962 # there because it is a legacy rgw spec that was named
963 # like $realm.$zone, except that I doubt there were any
964 # users of this code. Instead, focus on future users and
965 # translate . to - (fingers crossed!) instead.
966 name = spec.service_id.replace('.', '-')
967
968 all_hosts = self.get_hosts()
969 def _create_zone() -> cos.CephObjectStore:
970 port = None
971 secure_port = None
972 if spec.ssl:
973 secure_port = spec.get_port()
974 else:
975 port = spec.get_port()
976 object_store = cos.CephObjectStore(
977 apiVersion=self.rook_env.api_name,
978 metadata=dict(
979 name=name,
980 namespace=self.rook_env.namespace
981 ),
982 spec=cos.Spec(
983 gateway=cos.Gateway(
984 port=port,
985 securePort=secure_port,
986 instances=spec.placement.count or 1,
987 placement=cos.Placement(
988 cos.NodeAffinity(
989 requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution(
990 nodeSelectorTerms=cos.NodeSelectorTermsList(
991 [
992 placement_spec_to_node_selector(spec.placement, all_hosts)
993 ]
994 )
995 )
996 )
997 )
998 ),
999 dataPool=cos.DataPool(
1000 failureDomain=leaf_type,
1001 replicated=cos.Replicated(
1002 size=num_replicas
1003 )
1004 ),
1005 metadataPool=cos.MetadataPool(
1006 failureDomain=leaf_type,
1007 replicated=cos.Replicated(
1008 size=num_replicas
1009 )
1010 )
1011 )
1012 )
1013 if spec.rgw_zone:
1014 object_store.spec.zone=cos.Zone(
1015 name=spec.rgw_zone
1016 )
1017 return object_store
1018
1019
1020 def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
1021 if new.spec.gateway:
1022 new.spec.gateway.instances = spec.placement.count or 1
1023 else:
1024 new.spec.gateway=cos.Gateway(
1025 instances=spec.placement.count or 1
1026 )
1027 return new
1028 return self._create_or_patch(
1029 cos.CephObjectStore, 'cephobjectstores', name,
1030 _update_zone, _create_zone)
1031
1032 def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str:
1033 # TODO use spec.placement
1034 # TODO warn if spec.extended has entries we don't kow how
1035 # to action.
1036 # TODO Number of pods should be based on the list of hosts in the
1037 # PlacementSpec.
1038 assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing
1039 service_id = spec.service_id
1040 mgr_module = cast(Module, mgr)
1041 count = spec.placement.count or 1
1042 def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS:
1043 new.spec.server.active = count
1044 return new
1045
1046 def _create_nfs() -> cnfs.CephNFS:
1047 rook_nfsgw = cnfs.CephNFS(
1048 apiVersion=self.rook_env.api_name,
1049 metadata=dict(
1050 name=spec.service_id,
1051 namespace=self.rook_env.namespace,
1052 ),
1053 spec=cnfs.Spec(
1054 rados=cnfs.Rados(
1055 namespace=service_id,
1056 pool=NFS_POOL_NAME,
1057 ),
1058 server=cnfs.Server(
1059 active=count
1060 )
1061 )
1062 )
1063
1064
1065 return rook_nfsgw
1066
1067 create_ganesha_pool(mgr)
1068 NFSRados(mgr_module, service_id).write_obj('', f'conf-nfs.{spec.service_id}')
1069 return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id,
1070 _update_nfs, _create_nfs)
1071
1072 def rm_service(self, rooktype: str, service_id: str) -> str:
1073 self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id)
1074 objpath = "{0}/{1}".format(rooktype, service_id)
1075 return f'Removed {objpath}'
1076
1077 def get_resource(self, resource_type: str) -> Iterable:
1078 custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type)
1079 return custom_objects.items
1080
1081 def can_create_osd(self) -> bool:
1082 current_cluster = self.rook_api_get(
1083 "cephclusters/{0}".format(self.rook_env.cluster_name))
1084 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
1085
1086 # If useAllNodes is set, then Rook will not be paying attention
1087 # to anything we put in 'nodes', so can't do OSD creation.
1088 return not use_all_nodes
1089
1090 def node_exists(self, node_name: str) -> bool:
1091 return node_name in self.get_node_names()
1092
1093 def update_mon_count(self, newcount: Optional[int]) -> str:
1094 def _update_mon_count(current, new):
1095 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
1096 if newcount is None:
1097 raise orchestrator.OrchestratorError('unable to set mon count to None')
1098 if not new.spec.mon:
1099 raise orchestrator.OrchestratorError("mon attribute not specified in new spec")
1100 new.spec.mon.count = newcount
1101 return new
1102 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
1103
1104 def add_osds(self, drive_group, matching_hosts):
1105 # type: (DriveGroupSpec, List[str]) -> str
1106 assert drive_group.objectstore in ("bluestore", "filestore")
1107 assert drive_group.service_id
1108 storage_class = self.get_storage_class()
1109 inventory = self.get_discovered_devices()
1110 creator: Optional[DefaultCreator] = None
1111 if (
1112 storage_class.metadata.labels
1113 and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
1114 ):
1115 creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
1116 else:
1117 creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
1118 return self._patch(
1119 ccl.CephCluster,
1120 'cephclusters',
1121 self.rook_env.cluster_name,
1122 creator.add_osds(self.rook_pods, drive_group, matching_hosts)
1123 )
1124
1125 def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
1126 inventory = self.get_discovered_devices()
1127 self.remover = DefaultRemover(
1128 self.coreV1_api,
1129 self.batchV1_api,
1130 self.appsV1_api,
1131 osd_ids,
1132 replace,
1133 force,
1134 mon_command,
1135 self._patch,
1136 self.rook_env,
1137 inventory
1138 )
1139 return self.remover.remove()
1140
1141 def get_hosts(self) -> List[orchestrator.HostSpec]:
1142 ret = []
1143 for node in self.nodes.items:
1144 spec = orchestrator.HostSpec(
1145 node.metadata.name,
1146 addr='/'.join([addr.address for addr in node.status.addresses]),
1147 labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
1148 )
1149 ret.append(spec)
1150 return ret
1151
1152 def create_zap_job(self, host: str, path: str) -> None:
1153 body = client.V1Job(
1154 api_version="batch/v1",
1155 metadata=client.V1ObjectMeta(
1156 name="rook-ceph-device-zap",
1157 namespace="rook-ceph"
1158 ),
1159 spec=client.V1JobSpec(
1160 template=client.V1PodTemplateSpec(
1161 spec=client.V1PodSpec(
1162 containers=[
1163 client.V1Container(
1164 name="device-zap",
1165 image="rook/ceph:master",
1166 command=["bash"],
1167 args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
1168 env=[
1169 client.V1EnvVar(
1170 name="ROOK_CEPH_USERNAME",
1171 value_from=client.V1EnvVarSource(
1172 secret_key_ref=client.V1SecretKeySelector(
1173 key="ceph-username",
1174 name="rook-ceph-mon"
1175 )
1176 )
1177 ),
1178 client.V1EnvVar(
1179 name="ROOK_CEPH_SECRET",
1180 value_from=client.V1EnvVarSource(
1181 secret_key_ref=client.V1SecretKeySelector(
1182 key="ceph-secret",
1183 name="rook-ceph-mon"
1184 )
1185 )
1186 )
1187 ],
1188 security_context=client.V1SecurityContext(
1189 run_as_user=0,
1190 privileged=True
1191 ),
1192 volume_mounts=[
1193 client.V1VolumeMount(
1194 mount_path="/etc/ceph",
1195 name="ceph-conf-emptydir"
1196 ),
1197 client.V1VolumeMount(
1198 mount_path="/etc/rook",
1199 name="rook-config"
1200 ),
1201 client.V1VolumeMount(
1202 mount_path="/dev",
1203 name="devices"
1204 )
1205 ]
1206 )
1207 ],
1208 volumes=[
1209 client.V1Volume(
1210 name="ceph-conf-emptydir",
1211 empty_dir=client.V1EmptyDirVolumeSource()
1212 ),
1213 client.V1Volume(
1214 name="rook-config",
1215 empty_dir=client.V1EmptyDirVolumeSource()
1216 ),
1217 client.V1Volume(
1218 name="devices",
1219 host_path=client.V1HostPathVolumeSource(
1220 path="/dev"
1221 )
1222 ),
1223 ],
1224 node_selector={
1225 "kubernetes.io/hostname": host
1226 },
1227 restart_policy="Never"
1228 )
1229 )
1230 )
1231 )
1232 self.batchV1_api.create_namespaced_job('rook-ceph', body)
1233
1234 def rbd_mirror(self, spec: ServiceSpec) -> None:
1235 service_id = spec.service_id or "default-rbd-mirror"
1236 all_hosts = self.get_hosts()
1237 def _create_rbd_mirror() -> crbdm.CephRBDMirror:
1238 return crbdm.CephRBDMirror(
1239 apiVersion=self.rook_env.api_name,
1240 metadata=dict(
1241 name=service_id,
1242 namespace=self.rook_env.namespace,
1243 ),
1244 spec=crbdm.Spec(
1245 count=spec.placement.count or 1,
1246 placement=crbdm.Placement(
1247 nodeAffinity=crbdm.NodeAffinity(
1248 requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
1249 nodeSelectorTerms=crbdm.NodeSelectorTermsList(
1250 [
1251 placement_spec_to_node_selector(spec.placement, all_hosts)
1252 ]
1253 )
1254 )
1255 )
1256 )
1257 )
1258 )
1259 def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror:
1260 new.spec.count = spec.placement.count or 1
1261 new.spec.placement = crbdm.Placement(
1262 nodeAffinity=crbdm.NodeAffinity(
1263 requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
1264 nodeSelectorTerms=crbdm.NodeSelectorTermsList(
1265 [
1266 placement_spec_to_node_selector(spec.placement, all_hosts)
1267 ]
1268 )
1269 )
1270 )
1271 )
1272 return new
1273 self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror)
1274 def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
1275 current_json = self.rook_api_get(
1276 "{}/{}".format(crd_name, cr_name)
1277 )
1278
1279 current = crd.from_json(current_json)
1280 new = crd.from_json(current_json) # no deepcopy.
1281
1282 new = func(current, new)
1283
1284 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
1285
1286 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
1287
1288 if len(patch) == 0:
1289 return "No change"
1290
1291 try:
1292 self.rook_api_patch(
1293 "{}/{}".format(crd_name, cr_name),
1294 body=patch)
1295 except ApiException as e:
1296 log.exception("API exception: {0}".format(e))
1297 raise ApplyException(
1298 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
1299
1300 return "Success"
1301
1302 def _create_or_patch(self,
1303 crd: Type,
1304 crd_name: str,
1305 cr_name: str,
1306 update_func: Callable[[CrdClassT], CrdClassT],
1307 create_func: Callable[[], CrdClassT]) -> str:
1308 try:
1309 current_json = self.rook_api_get(
1310 "{}/{}".format(crd_name, cr_name)
1311 )
1312 except ApiException as e:
1313 if e.status == 404:
1314 current_json = None
1315 else:
1316 raise
1317
1318 if current_json:
1319 new = crd.from_json(current_json) # no deepcopy.
1320
1321 new = update_func(new)
1322
1323 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
1324
1325 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
1326
1327 if len(patch) == 0:
1328 return "No change"
1329
1330 try:
1331 self.rook_api_patch(
1332 "{}/{}".format(crd_name, cr_name),
1333 body=patch)
1334 except ApiException as e:
1335 log.exception("API exception: {0}".format(e))
1336 raise ApplyException(
1337 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
1338 return "Updated"
1339 else:
1340 new = create_func()
1341 with self.ignore_409("{} {} already exists".format(crd_name,
1342 cr_name)):
1343 self.rook_api_post("{}/".format(crd_name),
1344 body=new.to_json())
1345 return "Created"
1346 def get_ceph_image(self) -> str:
1347 try:
1348 api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
1349 label_selector="app=rook-ceph-mon",
1350 timeout_seconds=10)
1351 if api_response.items:
1352 return api_response.items[-1].spec.containers[0].image
1353 else:
1354 raise orchestrator.OrchestratorError(
1355 "Error getting ceph image. Cluster without monitors")
1356 except ApiException as e:
1357 raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
1358
1359
1360 def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
1361 operation_id = str(hash(loc))
1362 message = ""
1363
1364 # job definition
1365 job_metadata = client.V1ObjectMeta(name=operation_id,
1366 namespace= self.rook_env.namespace,
1367 labels={"ident": operation_id})
1368 pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
1369 pod_container = client.V1Container(name="ceph-lsmcli-command",
1370 security_context=client.V1SecurityContext(privileged=True),
1371 image=self.get_ceph_image(),
1372 command=["lsmcli",],
1373 args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
1374 '--path', loc.path or loc.dev,],
1375 volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
1376 client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
1377 pod_spec = client.V1PodSpec(containers=[pod_container],
1378 active_deadline_seconds=30, # Max time to terminate pod
1379 restart_policy="Never",
1380 node_selector= {"kubernetes.io/hostname": loc.host},
1381 volumes=[client.V1Volume(name="devices",
1382 host_path=client.V1HostPathVolumeSource(path="/dev")),
1383 client.V1Volume(name="run-udev",
1384 host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
1385 pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
1386 spec=pod_spec)
1387 job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
1388 ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
1389 backoff_limit=0,
1390 template=pod_template)
1391 job = client.V1Job(api_version="batch/v1",
1392 kind="Job",
1393 metadata=job_metadata,
1394 spec=job_spec)
1395
1396 # delete previous job if it exists
1397 try:
1398 try:
1399 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
1400 self.rook_env.namespace,
1401 propagation_policy="Background")
1402 except ApiException as e:
1403 if e.status != 404: # No problem if the job does not exist
1404 raise
1405
1406 # wait until the job is not present
1407 deleted = False
1408 retries = 0
1409 while not deleted and retries < 10:
1410 api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
1411 label_selector="ident=%s" % operation_id,
1412 timeout_seconds=10)
1413 deleted = not api_response.items
1414 if retries > 5:
1415 sleep(0.1)
1416 retries += 1
1417 if retries == 10 and not deleted:
1418 raise orchestrator.OrchestratorError(
1419 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
1420 on, loc.host, loc.path or loc.dev, operation_id))
1421
1422 # create the job
1423 api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
1424
1425 # get the result
1426 finished = False
1427 while not finished:
1428 api_response = self.batchV1_api.read_namespaced_job(operation_id,
1429 self.rook_env.namespace)
1430 finished = api_response.status.succeeded or api_response.status.failed
1431 if finished:
1432 message = api_response.status.conditions[-1].message
1433
1434 # get the result of the lsmcli command
1435 api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
1436 label_selector="ident=%s" % operation_id,
1437 timeout_seconds=10)
1438 if api_response.items:
1439 pod_name = api_response.items[-1].metadata.name
1440 message = self.coreV1_api.read_namespaced_pod_log(pod_name,
1441 self.rook_env.namespace)
1442
1443 except ApiException as e:
1444 log.exception('K8s API failed. {}'.format(e))
1445 raise
1446
1447 # Finally, delete the job.
1448 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
1449 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
1450 try:
1451 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
1452 self.rook_env.namespace,
1453 propagation_policy="Background")
1454 except ApiException as e:
1455 if e.status != 404: # No problem if the job does not exist
1456 raise
1457
1458 return message
1459
1460 def blink_light(self, ident_fault, on, locs):
1461 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
1462 return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]
1463
1464 def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem:
1465 all_hostnames = [hs.hostname for hs in all_hosts]
1466 res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList())
1467 if spec.host_pattern and spec.host_pattern != "*":
1468 raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements")
1469 if spec.label:
1470 res.matchExpressions.append(
1471 ccl.MatchExpressionsItem(
1472 key="ceph-label/" + spec.label,
1473 operator="Exists"
1474 )
1475 )
1476 if spec.hosts:
1477 host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames]
1478 res.matchExpressions.append(
1479 ccl.MatchExpressionsItem(
1480 key="kubernetes.io/hostname",
1481 operator="In",
1482 values=ccl.CrdObjectList(host_list)
1483 )
1484 )
1485 if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern):
1486 res.matchExpressions.append(
1487 ccl.MatchExpressionsItem(
1488 key="kubernetes.io/hostname",
1489 operator="Exists",
1490 )
1491 )
1492 return res
1493
1494 def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec:
1495 res = PlacementSpec()
1496 for expression in node_selector.matchExpressions:
1497 if expression.key.startswith("ceph-label/"):
1498 res.label = expression.key.split('/')[1]
1499 elif expression.key == "kubernetes.io/hostname":
1500 if expression.operator == "Exists":
1501 res.host_pattern = "*"
1502 elif expression.operator == "In":
1503 res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
1504 return res