]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rook/rook_cluster.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
CommitLineData
11fdf7f2
TL
1"""
2This module wrap's Rook + Kubernetes APIs to expose the calls
3needed to implement an orchestrator module. While the orchestrator
4module exposes an async API, this module simply exposes blocking API
5call methods.
6
7This module is runnable outside of ceph-mgr, useful for testing.
8"""
9f95a23c
TL
9import datetime
10import threading
11fdf7f2 11import logging
11fdf7f2 12from contextlib import contextmanager
801d1391 13from time import sleep
20effc67
TL
14import re
15from orchestrator import OrchResult
11fdf7f2 16
9f95a23c 17import jsonpatch
f67539c2 18from urllib.parse import urljoin
11fdf7f2
TL
19
20# Optional kubernetes imports to enable MgrModule.can_run
21# to behave cleanly.
9f95a23c
TL
22from urllib3.exceptions import ProtocolError
23
20effc67 24from ceph.deployment.inventory import Device
9f95a23c 25from ceph.deployment.drive_group import DriveGroupSpec
20effc67 26from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
522d829b 27from ceph.utils import datetime_now
20effc67
TL
28from ceph.deployment.drive_selection.matchers import SizeMatcher
29from nfs.cluster import create_ganesha_pool
30from nfs.module import Module
31from nfs.export import NFSRados
a4b75251 32from mgr_module import NFS_POOL_NAME
9f95a23c
TL
33from mgr_util import merge_dicts
34
20effc67 35from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \
522d829b 36 Iterable, Dict, Iterator, Type
9f95a23c 37
11fdf7f2 38try:
801d1391 39 from kubernetes import client, watch
11fdf7f2
TL
40 from kubernetes.client.rest import ApiException
41except ImportError:
9f95a23c
TL
42 class ApiException(Exception): # type: ignore
43 status = 0
44
45from .rook_client.ceph import cephfilesystem as cfs
46from .rook_client.ceph import cephnfs as cnfs
47from .rook_client.ceph import cephobjectstore as cos
48from .rook_client.ceph import cephcluster as ccl
20effc67 49from .rook_client.ceph import cephrbdmirror as crbdm
522d829b 50from .rook_client._helper import CrdClass
9f95a23c 51
9f95a23c
TL
52import orchestrator
53
11fdf7f2 54try:
20effc67 55 from rook.module import RookEnv, RookOrchestrator
11fdf7f2
TL
56except ImportError:
57 pass # just used for type checking.
58
522d829b
TL
59
60T = TypeVar('T')
61FuncT = TypeVar('FuncT', bound=Callable)
62
63CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
64
65
11fdf7f2
TL
66log = logging.getLogger(__name__)
67
68
522d829b 69def __urllib3_supports_read_chunked() -> bool:
9f95a23c
TL
70 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
71 # than required by kubernetes-client
72 try:
73 from urllib3.response import HTTPResponse
74 return hasattr(HTTPResponse, 'read_chunked')
75 except ImportError:
76 return False
77
78
522d829b 79_urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
9f95a23c
TL
80
81class ApplyException(orchestrator.OrchestratorError):
11fdf7f2
TL
82 """
83 For failures to update the Rook CRDs, usually indicating
84 some kind of interference between our attempted update
85 and other conflicting activity.
86 """
87
88
522d829b
TL
89def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
90 def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
9f95a23c
TL
91 t = threading.Thread(target=f, args=args, kwargs=kwargs)
92 t.start()
93 return t
94
522d829b 95 return cast(Callable[..., threading.Thread], wrapper)
9f95a23c
TL
96
97
20effc67
TL
98class DefaultFetcher():
99 def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
100 self.storage_class = storage_class
101 self.coreV1_api = coreV1_api
102
103 def fetch(self) -> None:
104 self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
105 self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
106
107 def convert_size(self, size_str: str) -> int:
108 units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
109 coeff_and_unit = re.search('(\d+)(\D+)', size_str)
110 assert coeff_and_unit is not None
111 coeff = int(coeff_and_unit[1])
112 unit = coeff_and_unit[2]
113 try:
114 factor = units.index(unit) % 7
115 except ValueError:
116 log.error("PV size format invalid")
117 raise
118 size = coeff * (2 ** (10 * factor))
119 return size
120
121 def devices(self) -> Dict[str, List[Device]]:
122 nodename_to_devices: Dict[str, List[Device]] = {}
123 for i in self.pvs_in_sc:
124 node, device = self.device(i)
125 if node not in nodename_to_devices:
126 nodename_to_devices[node] = []
127 nodename_to_devices[node].append(device)
128 return nodename_to_devices
129
130 def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
131 node = 'N/A'
132 if i.spec.node_affinity:
133 terms = i.spec.node_affinity.required.node_selector_terms
134 if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
135 node = terms[0].match_expressions[0].values[0]
136 size = self.convert_size(i.spec.capacity['storage'])
137 path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
138 state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
139 pv_name = i.metadata.name
140 device = Device(
141 path = path,
142 sys_api = dict(
143 size = size,
144 node = node,
145 pv_name = pv_name
146 ),
147 available = state,
148 )
149 return (node, device)
150
151
152class LSOFetcher(DefaultFetcher):
153 def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
154 super().__init__(storage_class, coreV1_api)
155 self.customObjects_api = customObjects_api
156 self.nodenames = nodenames
157
158 def fetch(self) -> None:
159 super().fetch()
160 self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
161 group="local.storage.openshift.io",
162 version="v1alpha1",
163 plural="localvolumediscoveryresults")
164
165 def predicate(self, item: 'client.V1ConfigMapList') -> bool:
166 if self.nodenames is not None:
167 return item['spec']['nodeName'] in self.nodenames
168 else:
169 return True
170
171 def devices(self) -> Dict[str, List[Device]]:
172 try:
173 lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
174 except ApiException as dummy_e:
175 log.error("Failed to fetch device metadata")
176 raise
177 self.lso_devices = {}
178 for i in lso_discovery_results:
179 drives = i['status']['discoveredDevices']
180 for drive in drives:
181 self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
182 nodename_to_devices: Dict[str, List[Device]] = {}
183 for i in self.pvs_in_sc:
184 node, device = (None, None)
185 if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
186 node, device = super().device(i)
187 else:
188 node, device = self.device(i)
189 if node not in nodename_to_devices:
190 nodename_to_devices[node] = []
191 nodename_to_devices[node].append(device)
192 return nodename_to_devices
193
194 def device(self, i: Any) -> Tuple[str, Device]:
195 node = i.metadata.labels['kubernetes.io/hostname']
196 device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
197 pv_name = i.metadata.name
198 vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
199 model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
200 device = Device(
201 path = device_discovery['path'],
202 sys_api = dict(
203 size = device_discovery['size'],
204 rotational = '1' if device_discovery['property']=='Rotational' else '0',
205 node = node,
206 pv_name = pv_name,
207 model = model,
208 vendor = vendor
209 ),
210 available = device_discovery['status']['state']=='Available',
211 device_id = device_discovery['deviceID'].split('/')[-1],
212 lsm_data = dict(
213 serialNum = device_discovery['serial']
214 )
215 )
216 return (node, device)
217
522d829b
TL
218class KubernetesResource(Generic[T]):
219 def __init__(self, api_func: Callable, **kwargs: Any) -> None:
9f95a23c
TL
220 """
221 Generic kubernetes Resource parent class
222
223 The api fetch and watch methods should be common across resource types,
224
225 Exceptions in the runner thread are propagated to the caller.
226
227 :param api_func: kubernetes client api function that is passed to the watcher
228 :param filter_func: signature: ``(Item) -> bool``.
229 """
230 self.kwargs = kwargs
231 self.api_func = api_func
232
233 # ``_items`` is accessed by different threads. I assume assignment is atomic.
522d829b 234 self._items: Dict[str, T] = dict()
9f95a23c 235 self.thread = None # type: Optional[threading.Thread]
522d829b 236 self.exception: Optional[Exception] = None
9f95a23c
TL
237 if not _urllib3_supports_read_chunked:
238 logging.info('urllib3 is too old. Fallback to full fetches')
239
522d829b 240 def _fetch(self) -> str:
9f95a23c
TL
241 """ Execute the requested api method as a one-off fetch"""
242 response = self.api_func(**self.kwargs)
20effc67 243 metadata = response.metadata
9f95a23c
TL
244 self._items = {item.metadata.name: item for item in response.items}
245 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
246 return metadata.resource_version
247
248 @property
522d829b 249 def items(self) -> Iterable[T]:
9f95a23c
TL
250 """
251 Returns the items of the request.
252 Creates the watcher as a side effect.
253 :return:
254 """
255 if self.exception:
256 e = self.exception
257 self.exception = None
258 raise e # Propagate the exception to the user.
259 if not self.thread or not self.thread.is_alive():
260 resource_version = self._fetch()
261 if _urllib3_supports_read_chunked:
262 # Start a thread which will use the kubernetes watch client against a resource
263 log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
264 self.thread = self._watch(resource_version)
265
266 return self._items.values()
267
20effc67
TL
268 def get_item_name(self, item: Any) -> Any:
269 try:
270 return item.metadata.name
271 except AttributeError:
272 raise AttributeError(
273 "{} doesn't contain a metadata.name. Unable to track changes".format(
274 self.api_func))
9f95a23c 275 @threaded
522d829b 276 def _watch(self, res_ver: Optional[str]) -> None:
9f95a23c
TL
277 """ worker thread that runs the kubernetes watch """
278
279 self.exception = None
280
281 w = watch.Watch()
282
283 try:
284 # execute generator to continually watch resource for changes
285 for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
286 **self.kwargs):
287 self.health = ''
288 item = event['object']
20effc67 289 name = self.get_item_name(item)
9f95a23c
TL
290
291 log.info('{} event: {}'.format(event['type'], name))
292
293 if event['type'] in ('ADDED', 'MODIFIED'):
294 self._items = merge_dicts(self._items, {name: item})
295 elif event['type'] == 'DELETED':
296 self._items = {k:v for k,v in self._items.items() if k != name}
297 elif event['type'] == 'BOOKMARK':
298 pass
299 elif event['type'] == 'ERROR':
300 raise ApiException(str(event))
301 else:
302 raise KeyError('Unknown watch event {}'.format(event['type']))
303 except ProtocolError as e:
304 if 'Connection broken' in str(e):
305 log.info('Connection reset.')
306 return
307 raise
308 except ApiException as e:
309 log.exception('K8s API failed. {}'.format(self.api_func))
310 self.exception = e
311 raise
312 except Exception as e:
313 log.exception("Watcher failed. ({})".format(self.api_func))
314 self.exception = e
315 raise
316
20effc67
TL
317class KubernetesCustomResource(KubernetesResource):
318 def _fetch(self) -> str:
319 response = self.api_func(**self.kwargs)
320 metadata = response['metadata']
321 self._items = {item['metadata']['name']: item for item in response['items']}
322 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
323 return metadata['resourceVersion']
324
325 def get_item_name(self, item: Any) -> Any:
326 try:
327 return item['metadata']['name']
328 except AttributeError:
329 raise AttributeError(
330 "{} doesn't contain a metadata.name. Unable to track changes".format(
331 self.api_func))
332
333class DefaultCreator():
334 def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
335 self.coreV1_api = coreV1_api
336 self.storage_class = storage_class
337 self.inventory = inventory
338
339 def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
340 device_set = ccl.StorageClassDeviceSetsItem(
341 name=d.sys_api['pv_name'],
342 volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
343 count=1,
344 encrypted=drive_group.encrypted,
345 portable=False
346 )
347 device_set.volumeClaimTemplates.append(
348 ccl.VolumeClaimTemplatesItem(
349 metadata=ccl.Metadata(
350 name="data"
351 ),
352 spec=ccl.Spec(
353 storageClassName=self.storage_class,
354 volumeMode="Block",
355 accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
356 resources={
357 "requests":{
358 "storage": 1
359 }
360 },
361 volumeName=d.sys_api['pv_name']
362 )
363 )
364 )
365 return device_set
366
367 def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
368 device_list = []
369 assert drive_group.data_devices is not None
370 sizematcher: Optional[SizeMatcher] = None
371 if drive_group.data_devices.size:
372 sizematcher = SizeMatcher('size', drive_group.data_devices.size)
373 limit = getattr(drive_group.data_devices, 'limit', None)
374 count = 0
375 all = getattr(drive_group.data_devices, 'all', None)
376 paths = [device.path for device in drive_group.data_devices.paths]
377 osd_list = []
378 for pod in rook_pods.items:
379 if (
380 hasattr(pod, 'metadata')
381 and hasattr(pod.metadata, 'labels')
382 and 'osd' in pod.metadata.labels
383 and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
384 ):
385 osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
386 for _, node in self.inventory.items():
387 for device in node:
388 if device.sys_api['pv_name'] in osd_list:
389 count += 1
390 for _, node in self.inventory.items():
391 for device in node:
392 if not limit or (count < limit):
393 if device.available:
394 if (
395 all
396 or (
397 device.sys_api['node'] in matching_hosts
398 and ((sizematcher != None) or sizematcher.compare(device))
399 and (
400 not drive_group.data_devices.paths
401 or (device.path in paths)
402 )
403 )
404 ):
405 device_list.append(device)
406 count += 1
407
408 return device_list
409
410 def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
411 to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
412 assert drive_group.data_devices is not None
413 def _add_osds(current_cluster, new_cluster):
414 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
415 if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
416 new_cluster.spec.storage = ccl.Storage()
417
418 if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
419 new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
420
421 existing_scds = [
422 scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
423 ]
424 for device in to_create:
425 new_scds = self.device_to_device_set(drive_group, device)
426 if new_scds.name not in existing_scds:
427 new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
428 return new_cluster
429 return _add_osds
430
431class LSOCreator(DefaultCreator):
432 def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
433 device_list = []
434 assert drive_group.data_devices is not None
435 sizematcher = None
436 if drive_group.data_devices.size:
437 sizematcher = SizeMatcher('size', drive_group.data_devices.size)
438 limit = getattr(drive_group.data_devices, 'limit', None)
439 all = getattr(drive_group.data_devices, 'all', None)
440 paths = [device.path for device in drive_group.data_devices.paths]
441 vendor = getattr(drive_group.data_devices, 'vendor', None)
442 model = getattr(drive_group.data_devices, 'model', None)
443 count = 0
444 osd_list = []
445 for pod in rook_pods.items:
446 if (
447 hasattr(pod, 'metadata')
448 and hasattr(pod.metadata, 'labels')
449 and 'osd' in pod.metadata.labels
450 and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
451 ):
452 osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
453 for _, node in self.inventory.items():
454 for device in node:
455 if device.sys_api['pv_name'] in osd_list:
456 count += 1
457 for _, node in self.inventory.items():
458 for device in node:
459 if not limit or (count < limit):
460 if device.available:
461 if (
462 all
463 or (
464 device.sys_api['node'] in matching_hosts
465 and ((sizematcher != None) or sizematcher.compare(device))
466 and (
467 not drive_group.data_devices.paths
468 or device.path in paths
469 )
470 and (
471 not vendor
472 or device.sys_api['vendor'] == vendor
473 )
474 and (
475 not model
476 or device.sys_api['model'].startsWith(model)
477 )
478 )
479 ):
480 device_list.append(device)
481 count += 1
482 return device_list
483
484class DefaultRemover():
485 def __init__(
486 self,
487 coreV1_api: 'client.CoreV1Api',
488 batchV1_api: 'client.BatchV1Api',
489 appsV1_api: 'client.AppsV1Api',
490 osd_ids: List[str],
491 replace_flag: bool,
492 force_flag: bool,
493 mon_command: Callable,
494 patch: Callable,
495 rook_env: 'RookEnv',
496 inventory: Dict[str, List[Device]]
497 ):
498 self.batchV1_api = batchV1_api
499 self.appsV1_api = appsV1_api
500 self.coreV1_api = coreV1_api
501
502 self.osd_ids = osd_ids
503 self.replace_flag = replace_flag
504 self.force_flag = force_flag
505
506 self.mon_command = mon_command
507
508 self.patch = patch
509 self.rook_env = rook_env
510
511 self.inventory = inventory
512 self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
513 self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare')
514 self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
515
516
517 def remove_device_sets(self) -> str:
518 self.to_remove: Dict[str, int] = {}
519 self.pvc_to_remove: List[str] = []
520 for pod in self.osd_pods.items:
521 if (
522 hasattr(pod, 'metadata')
523 and hasattr(pod.metadata, 'labels')
524 and 'osd' in pod.metadata.labels
525 and pod.metadata.labels['osd'] in self.osd_ids
526 ):
527 if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
528 self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
529 else:
530 self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
531 self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
532 def _remove_osds(current_cluster, new_cluster):
533 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
534 assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
535 for _set in new_cluster.spec.storage.storageClassDeviceSets:
536 if _set.name in self.to_remove:
537 if _set.count == self.to_remove[_set.name]:
538 new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
539 else:
540 _set.count = _set.count - self.to_remove[_set.name]
541 return new_cluster
542 return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
543
544 def check_force(self) -> None:
545 if not self.force_flag:
546 safe_args = {'prefix': 'osd safe-to-destroy',
547 'ids': [str(x) for x in self.osd_ids]}
548 ret, out, err = self.mon_command(safe_args)
549 if ret != 0:
550 raise RuntimeError(err)
551
552 def set_osds_down(self) -> None:
553 down_flag_args = {
554 'prefix': 'osd down',
555 'ids': [str(x) for x in self.osd_ids]
556 }
557 ret, out, err = self.mon_command(down_flag_args)
558 if ret != 0:
559 raise RuntimeError(err)
560
561 def scale_deployments(self) -> None:
562 for osd_id in self.osd_ids:
563 self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
564 spec=client.V1ScaleSpec(
565 replicas=0
566 )
567 ))
568
569 def set_osds_out(self) -> None:
570 out_flag_args = {
571 'prefix': 'osd out',
572 'ids': [str(x) for x in self.osd_ids]
573 }
574 ret, out, err = self.mon_command(out_flag_args)
575 if ret != 0:
576 raise RuntimeError(err)
577
578 def delete_deployments(self) -> None:
579 for osd_id in self.osd_ids:
580 self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
581
582 def clean_up_prepare_jobs_and_pvc(self) -> None:
583 for job in self.jobs.items:
584 if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
585 self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
586 self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
587
588 def purge_osds(self) -> None:
589 for id in self.osd_ids:
590 purge_args = {
591 'prefix': 'osd purge-actual',
592 'id': int(id),
593 'yes_i_really_mean_it': True
594 }
595 ret, out, err = self.mon_command(purge_args)
596 if ret != 0:
597 raise RuntimeError(err)
598
599 def destroy_osds(self) -> None:
600 for id in self.osd_ids:
601 destroy_args = {
602 'prefix': 'osd destroy-actual',
603 'id': int(id),
604 'yes_i_really_mean_it': True
605 }
606 ret, out, err = self.mon_command(destroy_args)
607 if ret != 0:
608 raise RuntimeError(err)
609
610 def remove(self) -> str:
611 try:
612 self.check_force()
613 except Exception as e:
614 log.exception("Error checking if OSDs are safe to destroy")
615 return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
616 try:
617 remove_result = self.remove_device_sets()
618 except Exception as e:
619 log.exception("Error patching ceph cluster CRD")
620 return f"Not possible to modify Ceph cluster CRD: {e}"
621 try:
622 self.scale_deployments()
623 self.delete_deployments()
624 self.clean_up_prepare_jobs_and_pvc()
625 except Exception as e:
626 log.exception("Ceph cluster CRD patched, but error cleaning environment")
627 return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
628 try:
629 self.set_osds_down()
630 self.set_osds_out()
631 if self.replace_flag:
632 self.destroy_osds()
633 else:
634 self.purge_osds()
635 except Exception as e:
636 log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
637 return f"Error removing OSDs from Ceph cluster: {e}"
638
639 return remove_result
640
641
9f95a23c 642
11fdf7f2 643class RookCluster(object):
522d829b
TL
644 # import of client.CoreV1Api must be optional at import time.
645 # Instead allow mgr/rook to be imported anyway.
20effc67
TL
646 def __init__(
647 self,
648 coreV1_api: 'client.CoreV1Api',
649 batchV1_api: 'client.BatchV1Api',
650 customObjects_api: 'client.CustomObjectsApi',
651 storageV1_api: 'client.StorageV1Api',
652 appsV1_api: 'client.AppsV1Api',
653 rook_env: 'RookEnv',
654 storage_class: 'str'
655 ):
11fdf7f2 656 self.rook_env = rook_env # type: RookEnv
801d1391
TL
657 self.coreV1_api = coreV1_api # client.CoreV1Api
658 self.batchV1_api = batchV1_api
20effc67
TL
659 self.customObjects_api = customObjects_api
660 self.storageV1_api = storageV1_api # client.StorageV1Api
661 self.appsV1_api = appsV1_api # client.AppsV1Api
662 self.storage_class = storage_class # type: str
9f95a23c
TL
663
664 # TODO: replace direct k8s calls with Rook API calls
20effc67 665 self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
9f95a23c 666
522d829b 667 self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
9f95a23c
TL
668 namespace=self.rook_env.namespace,
669 label_selector="rook_cluster={0}".format(
f67539c2 670 self.rook_env.namespace))
522d829b 671 self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
20effc67 672
522d829b 673 def rook_url(self, path: str) -> str:
11fdf7f2 674 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
81eedcae 675 self.rook_env.crd_version, self.rook_env.namespace)
11fdf7f2
TL
676 return urljoin(prefix, path)
677
522d829b 678 def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
11fdf7f2
TL
679 full_path = self.rook_url(path)
680 log.debug("[%s] %s" % (verb, full_path))
681
801d1391 682 return self.coreV1_api.api_client.call_api(
11fdf7f2
TL
683 full_path,
684 verb,
685 auth_settings=['BearerToken'],
686 response_type="object",
687 _return_http_data_only=True,
688 _preload_content=True,
689 **kwargs)
690
522d829b 691 def rook_api_get(self, path: str, **kwargs: Any) -> Any:
11fdf7f2
TL
692 return self.rook_api_call("GET", path, **kwargs)
693
522d829b 694 def rook_api_delete(self, path: str) -> Any:
11fdf7f2
TL
695 return self.rook_api_call("DELETE", path)
696
522d829b 697 def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
11fdf7f2
TL
698 return self.rook_api_call("PATCH", path,
699 header_params={"Content-Type": "application/json-patch+json"},
700 **kwargs)
701
522d829b 702 def rook_api_post(self, path: str, **kwargs: Any) -> Any:
11fdf7f2
TL
703 return self.rook_api_call("POST", path, **kwargs)
704
20effc67
TL
705 def get_storage_class(self) -> 'client.V1StorageClass':
706 matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
707 if len(matching_sc) == 0:
708 log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
709 raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
710 return matching_sc[0]
711
712 def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
713 storage_class = self.get_storage_class()
714 self.fetcher: Optional[DefaultFetcher] = None
715 if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
716 self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
717 else:
718 self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
719 self.fetcher.fetch()
720 return self.fetcher.devices()
721
722 def get_osds(self) -> List:
723 osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
724 return list(osd_pods.items)
725
522d829b 726 def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
11fdf7f2
TL
727 #
728 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
729 # URL for the instance within that cluster. If the fetch fails, just
730 # return None.
731 #
732 try:
733 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
734 except ApiException as e:
735 log.info("Unable to fetch cephnfs object: {}".format(e.status))
736 return None
737
738 pool = ceph_nfs['spec']['rados']['pool']
739 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
740
741 if namespace == None:
742 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
743 else:
744 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
745 return url
746
522d829b
TL
747 def describe_pods(self,
748 service_type: Optional[str],
749 service_id: Optional[str],
750 nodename: Optional[str]) -> List[Dict[str, Any]]:
9f95a23c
TL
751 """
752 Go query the k8s API about deployment, containers related to this
753 filesystem
11fdf7f2 754
9f95a23c
TL
755 Example Rook Pod labels for a mgr daemon:
756 Labels: app=rook-ceph-mgr
757 pod-template-hash=2171958073
758 rook_cluster=rook
759 And MDS containers additionally have `rook_filesystem` label
11fdf7f2 760
f67539c2 761 Label filter is rook_cluster=<cluster namespace>
9f95a23c
TL
762 rook_file_system=<self.fs_name>
763 """
764 def predicate(item):
801d1391 765 # type: (client.V1Pod) -> bool
9f95a23c
TL
766 metadata = item.metadata
767 if service_type is not None:
768 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
769 return False
770
771 if service_id is not None:
772 try:
773 k, v = {
774 "mds": ("rook_file_system", service_id),
775 "osd": ("ceph-osd-id", service_id),
776 "mon": ("mon", service_id),
777 "mgr": ("mgr", service_id),
20effc67 778 "nfs": ("nfs", service_id),
9f95a23c
TL
779 "rgw": ("ceph_rgw", service_id),
780 }[service_type]
781 except KeyError:
782 raise orchestrator.OrchestratorValidationError(
783 '{} not supported'.format(service_type))
784 if metadata.labels[k] != v:
785 return False
786
787 if nodename is not None:
788 if item.spec.node_name != nodename:
789 return False
790 return True
11fdf7f2 791
522d829b 792 refreshed = datetime_now()
9f95a23c 793 pods = [i for i in self.rook_pods.items if predicate(i)]
11fdf7f2
TL
794
795 pods_summary = []
f67539c2 796 prefix = 'sha256:'
11fdf7f2 797
9f95a23c 798 for p in pods:
11fdf7f2 799 d = p.to_dict()
9f95a23c
TL
800
801 image_name = None
802 for c in d['spec']['containers']:
803 # look at the first listed container in the pod...
804 image_name = c['image']
805 break
806
20effc67
TL
807 ls = d['status'].get('container_statuses')
808 if not ls:
809 # ignore pods with no containers
810 continue
811 image_id = ls[0]['image_id']
f67539c2
TL
812 image_id = image_id.split(prefix)[1] if prefix in image_id else image_id
813
9f95a23c 814 s = {
11fdf7f2 815 "name": d['metadata']['name'],
9f95a23c
TL
816 "hostname": d['spec']['node_name'],
817 "labels": d['metadata']['labels'],
818 'phase': d['status']['phase'],
819 'container_image_name': image_name,
f67539c2 820 'container_image_id': image_id,
9f95a23c
TL
821 'refreshed': refreshed,
822 # these may get set below...
823 'started': None,
824 'created': None,
825 }
826
522d829b 827 # note: we want UTC
9f95a23c
TL
828 if d['metadata'].get('creation_timestamp', None):
829 s['created'] = d['metadata']['creation_timestamp'].astimezone(
522d829b 830 tz=datetime.timezone.utc)
9f95a23c
TL
831 if d['status'].get('start_time', None):
832 s['started'] = d['status']['start_time'].astimezone(
522d829b 833 tz=datetime.timezone.utc)
9f95a23c
TL
834
835 pods_summary.append(s)
11fdf7f2
TL
836
837 return pods_summary
838
522d829b 839 def remove_pods(self, names: List[str]) -> List[str]:
9f95a23c 840 pods = [i for i in self.rook_pods.items]
9f95a23c
TL
841 for p in pods:
842 d = p.to_dict()
843 daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
844 daemon_id = d['metadata']['labels']['ceph_daemon_id']
845 name = daemon_type + '.' + daemon_id
846 if name in names:
801d1391 847 self.coreV1_api.delete_namespaced_pod(
9f95a23c
TL
848 d['metadata']['name'],
849 self.rook_env.namespace,
801d1391 850 body=client.V1DeleteOptions()
9f95a23c 851 )
522d829b 852 return [f'Removed Pod {n}' for n in names]
9f95a23c 853
522d829b 854 def get_node_names(self) -> List[str]:
9f95a23c
TL
855 return [i.metadata.name for i in self.nodes.items]
856
11fdf7f2 857 @contextmanager
522d829b 858 def ignore_409(self, what: str) -> Iterator[None]:
11fdf7f2
TL
859 try:
860 yield
861 except ApiException as e:
862 if e.status == 409:
863 # Idempotent, succeed.
864 log.info("{} already exists".format(what))
865 else:
866 raise
867
20effc67
TL
868 def apply_filesystem(self, spec: ServiceSpec, num_replicas: int,
869 leaf_type: str) -> str:
11fdf7f2
TL
870 # TODO use spec.placement
871 # TODO warn if spec.extended has entries we don't kow how
872 # to action.
20effc67 873 all_hosts = self.get_hosts()
f67539c2 874 def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem:
9f95a23c 875 new.spec.metadataServer.activeCount = spec.placement.count or 1
20effc67
TL
876 new.spec.metadataServer.placement = cfs.Placement(
877 nodeAffinity=cfs.NodeAffinity(
878 requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
879 nodeSelectorTerms=cfs.NodeSelectorTermsList(
880 [placement_spec_to_node_selector(spec.placement, all_hosts)]
881 )
882 )
883 )
884 )
9f95a23c 885 return new
f67539c2 886 def _create_fs() -> cfs.CephFilesystem:
20effc67 887 fs = cfs.CephFilesystem(
9f95a23c
TL
888 apiVersion=self.rook_env.api_name,
889 metadata=dict(
890 name=spec.service_id,
891 namespace=self.rook_env.namespace,
892 ),
893 spec=cfs.Spec(
20effc67
TL
894 dataPools=cfs.DataPoolsList(
895 {
896 cfs.DataPoolsItem(
897 failureDomain=leaf_type,
898 replicated=cfs.Replicated(
899 size=num_replicas
900 )
901 )
902 }
903 ),
904 metadataPool=cfs.MetadataPool(
905 failureDomain=leaf_type,
906 replicated=cfs.Replicated(
907 size=num_replicas
908 )
909 ),
9f95a23c
TL
910 metadataServer=cfs.MetadataServer(
911 activeCount=spec.placement.count or 1,
20effc67
TL
912 activeStandby=True,
913 placement=
914 cfs.Placement(
915 nodeAffinity=cfs.NodeAffinity(
916 requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
917 nodeSelectorTerms=cfs.NodeSelectorTermsList(
918 [placement_spec_to_node_selector(spec.placement, all_hosts)]
919 )
920 )
921 )
922 )
9f95a23c
TL
923 )
924 )
925 )
20effc67 926 return fs
522d829b 927 assert spec.service_id is not None
9f95a23c
TL
928 return self._create_or_patch(
929 cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
930 _update_fs, _create_fs)
931
20effc67
TL
932 def get_matching_node(self, host: str) -> Any:
933 matching_node = None
934 for node in self.nodes.items:
935 if node.metadata.labels['kubernetes.io/hostname'] == host:
936 matching_node = node
937 return matching_node
938
939 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
940 matching_node = self.get_matching_node(host)
941 if matching_node == None:
942 return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster"))
943 matching_node.metadata.labels['ceph-label/'+ label] = ""
944 self.coreV1_api.patch_node(host, matching_node)
945 return OrchResult(f'Added {label} label to {host}')
946
947 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
948 matching_node = self.get_matching_node(host)
949 if matching_node == None:
950 return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster"))
951 matching_node.metadata.labels.pop('ceph-label/' + label, None)
952 self.coreV1_api.patch_node(host, matching_node)
953 return OrchResult(f'Removed {label} label from {host}')
954
955 def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str:
f67539c2
TL
956 assert spec.service_id is not None
957
958 name = spec.service_id
9f95a23c 959
f67539c2
TL
960 if '.' in spec.service_id:
961 # rook does not like . in the name. this is could
962 # there because it is a legacy rgw spec that was named
963 # like $realm.$zone, except that I doubt there were any
964 # users of this code. Instead, focus on future users and
965 # translate . to - (fingers crossed!) instead.
966 name = spec.service_id.replace('.', '-')
9f95a23c 967
20effc67 968 all_hosts = self.get_hosts()
f67539c2 969 def _create_zone() -> cos.CephObjectStore:
9f95a23c
TL
970 port = None
971 secure_port = None
972 if spec.ssl:
973 secure_port = spec.get_port()
974 else:
975 port = spec.get_port()
20effc67
TL
976 object_store = cos.CephObjectStore(
977 apiVersion=self.rook_env.api_name,
978 metadata=dict(
979 name=name,
980 namespace=self.rook_env.namespace
981 ),
982 spec=cos.Spec(
983 gateway=cos.Gateway(
984 port=port,
985 securePort=secure_port,
986 instances=spec.placement.count or 1,
987 placement=cos.Placement(
988 cos.NodeAffinity(
989 requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution(
990 nodeSelectorTerms=cos.NodeSelectorTermsList(
991 [
992 placement_spec_to_node_selector(spec.placement, all_hosts)
993 ]
994 )
995 )
996 )
997 )
998 ),
999 dataPool=cos.DataPool(
1000 failureDomain=leaf_type,
1001 replicated=cos.Replicated(
1002 size=num_replicas
1003 )
1004 ),
1005 metadataPool=cos.MetadataPool(
1006 failureDomain=leaf_type,
1007 replicated=cos.Replicated(
1008 size=num_replicas
1009 )
1010 )
9f95a23c
TL
1011 )
1012 )
20effc67
TL
1013 if spec.rgw_zone:
1014 object_store.spec.zone=cos.Zone(
1015 name=spec.rgw_zone
1016 )
1017 return object_store
1018
9f95a23c 1019
f67539c2 1020 def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
20effc67
TL
1021 if new.spec.gateway:
1022 new.spec.gateway.instances = spec.placement.count or 1
1023 else:
1024 new.spec.gateway=cos.Gateway(
1025 instances=spec.placement.count or 1
1026 )
9f95a23c 1027 return new
9f95a23c
TL
1028 return self._create_or_patch(
1029 cos.CephObjectStore, 'cephobjectstores', name,
1030 _update_zone, _create_zone)
11fdf7f2 1031
20effc67 1032 def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str:
11fdf7f2
TL
1033 # TODO use spec.placement
1034 # TODO warn if spec.extended has entries we don't kow how
1035 # to action.
f67539c2
TL
1036 # TODO Number of pods should be based on the list of hosts in the
1037 # PlacementSpec.
20effc67
TL
1038 assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing
1039 service_id = spec.service_id
1040 mgr_module = cast(Module, mgr)
f67539c2
TL
1041 count = spec.placement.count or 1
1042 def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS:
1043 new.spec.server.active = count
1044 return new
11fdf7f2 1045
f67539c2
TL
1046 def _create_nfs() -> cnfs.CephNFS:
1047 rook_nfsgw = cnfs.CephNFS(
1048 apiVersion=self.rook_env.api_name,
1049 metadata=dict(
1050 name=spec.service_id,
1051 namespace=self.rook_env.namespace,
1052 ),
1053 spec=cnfs.Spec(
1054 rados=cnfs.Rados(
20effc67 1055 namespace=service_id,
a4b75251 1056 pool=NFS_POOL_NAME,
f67539c2
TL
1057 ),
1058 server=cnfs.Server(
1059 active=count
1060 )
1061 )
1062 )
9f95a23c 1063
9f95a23c 1064
f67539c2
TL
1065 return rook_nfsgw
1066
20effc67
TL
1067 create_ganesha_pool(mgr)
1068 NFSRados(mgr_module, service_id).write_obj('', f'conf-nfs.{spec.service_id}')
1069 return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id,
f67539c2 1070 _update_nfs, _create_nfs)
9f95a23c 1071
522d829b 1072 def rm_service(self, rooktype: str, service_id: str) -> str:
20effc67 1073 self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id)
11fdf7f2 1074 objpath = "{0}/{1}".format(rooktype, service_id)
522d829b
TL
1075 return f'Removed {objpath}'
1076
20effc67
TL
1077 def get_resource(self, resource_type: str) -> Iterable:
1078 custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type)
1079 return custom_objects.items
1080
522d829b 1081 def can_create_osd(self) -> bool:
11fdf7f2
TL
1082 current_cluster = self.rook_api_get(
1083 "cephclusters/{0}".format(self.rook_env.cluster_name))
1084 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
1085
1086 # If useAllNodes is set, then Rook will not be paying attention
1087 # to anything we put in 'nodes', so can't do OSD creation.
1088 return not use_all_nodes
1089
522d829b 1090 def node_exists(self, node_name: str) -> bool:
9f95a23c 1091 return node_name in self.get_node_names()
11fdf7f2 1092
522d829b 1093 def update_mon_count(self, newcount: Optional[int]) -> str:
9f95a23c
TL
1094 def _update_mon_count(current, new):
1095 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
522d829b
TL
1096 if newcount is None:
1097 raise orchestrator.OrchestratorError('unable to set mon count to None')
20effc67
TL
1098 if not new.spec.mon:
1099 raise orchestrator.OrchestratorError("mon attribute not specified in new spec")
9f95a23c
TL
1100 new.spec.mon.count = newcount
1101 return new
1102 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
11fdf7f2 1103
e306af50 1104 def add_osds(self, drive_group, matching_hosts):
9f95a23c 1105 # type: (DriveGroupSpec, List[str]) -> str
11fdf7f2 1106 assert drive_group.objectstore in ("bluestore", "filestore")
20effc67
TL
1107 assert drive_group.service_id
1108 storage_class = self.get_storage_class()
1109 inventory = self.get_discovered_devices()
1110 creator: Optional[DefaultCreator] = None
1111 if (
1112 storage_class.metadata.labels
1113 and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
1114 ):
1115 creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
1116 else:
1117 creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
1118 return self._patch(
1119 ccl.CephCluster,
1120 'cephclusters',
1121 self.rook_env.cluster_name,
1122 creator.add_osds(self.rook_pods, drive_group, matching_hosts)
1123 )
11fdf7f2 1124
20effc67
TL
1125 def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
1126 inventory = self.get_discovered_devices()
1127 self.remover = DefaultRemover(
1128 self.coreV1_api,
1129 self.batchV1_api,
1130 self.appsV1_api,
1131 osd_ids,
1132 replace,
1133 force,
1134 mon_command,
1135 self._patch,
1136 self.rook_env,
1137 inventory
1138 )
1139 return self.remover.remove()
1140
1141 def get_hosts(self) -> List[orchestrator.HostSpec]:
1142 ret = []
1143 for node in self.nodes.items:
1144 spec = orchestrator.HostSpec(
1145 node.metadata.name,
1146 addr='/'.join([addr.address for addr in node.status.addresses]),
1147 labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
1148 )
1149 ret.append(spec)
1150 return ret
1151
1152 def create_zap_job(self, host: str, path: str) -> None:
1153 body = client.V1Job(
1154 api_version="batch/v1",
1155 metadata=client.V1ObjectMeta(
1156 name="rook-ceph-device-zap",
1157 namespace="rook-ceph"
1158 ),
1159 spec=client.V1JobSpec(
1160 template=client.V1PodTemplateSpec(
1161 spec=client.V1PodSpec(
1162 containers=[
1163 client.V1Container(
1164 name="device-zap",
1165 image="rook/ceph:master",
1166 command=["bash"],
1167 args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
1168 env=[
1169 client.V1EnvVar(
1170 name="ROOK_CEPH_USERNAME",
1171 value_from=client.V1EnvVarSource(
1172 secret_key_ref=client.V1SecretKeySelector(
1173 key="ceph-username",
1174 name="rook-ceph-mon"
1175 )
1176 )
1177 ),
1178 client.V1EnvVar(
1179 name="ROOK_CEPH_SECRET",
1180 value_from=client.V1EnvVarSource(
1181 secret_key_ref=client.V1SecretKeySelector(
1182 key="ceph-secret",
1183 name="rook-ceph-mon"
1184 )
1185 )
1186 )
1187 ],
1188 security_context=client.V1SecurityContext(
1189 run_as_user=0,
1190 privileged=True
1191 ),
1192 volume_mounts=[
1193 client.V1VolumeMount(
1194 mount_path="/etc/ceph",
1195 name="ceph-conf-emptydir"
1196 ),
1197 client.V1VolumeMount(
1198 mount_path="/etc/rook",
1199 name="rook-config"
1200 ),
1201 client.V1VolumeMount(
1202 mount_path="/dev",
1203 name="devices"
1204 )
1205 ]
1206 )
1207 ],
1208 volumes=[
1209 client.V1Volume(
1210 name="ceph-conf-emptydir",
1211 empty_dir=client.V1EmptyDirVolumeSource()
1212 ),
1213 client.V1Volume(
1214 name="rook-config",
1215 empty_dir=client.V1EmptyDirVolumeSource()
1216 ),
1217 client.V1Volume(
1218 name="devices",
1219 host_path=client.V1HostPathVolumeSource(
1220 path="/dev"
1221 )
1222 ),
1223 ],
1224 node_selector={
1225 "kubernetes.io/hostname": host
1226 },
1227 restart_policy="Never"
9f95a23c
TL
1228 )
1229 )
20effc67
TL
1230 )
1231 )
1232 self.batchV1_api.create_namespaced_job('rook-ceph', body)
9f95a23c 1233
20effc67
TL
1234 def rbd_mirror(self, spec: ServiceSpec) -> None:
1235 service_id = spec.service_id or "default-rbd-mirror"
1236 all_hosts = self.get_hosts()
1237 def _create_rbd_mirror() -> crbdm.CephRBDMirror:
1238 return crbdm.CephRBDMirror(
1239 apiVersion=self.rook_env.api_name,
1240 metadata=dict(
1241 name=service_id,
1242 namespace=self.rook_env.namespace,
1243 ),
1244 spec=crbdm.Spec(
1245 count=spec.placement.count or 1,
1246 placement=crbdm.Placement(
1247 nodeAffinity=crbdm.NodeAffinity(
1248 requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
1249 nodeSelectorTerms=crbdm.NodeSelectorTermsList(
1250 [
1251 placement_spec_to_node_selector(spec.placement, all_hosts)
1252 ]
1253 )
1254 )
1255 )
9f95a23c 1256 )
20effc67
TL
1257 )
1258 )
1259 def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror:
1260 new.spec.count = spec.placement.count or 1
1261 new.spec.placement = crbdm.Placement(
1262 nodeAffinity=crbdm.NodeAffinity(
1263 requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
1264 nodeSelectorTerms=crbdm.NodeSelectorTermsList(
1265 [
1266 placement_spec_to_node_selector(spec.placement, all_hosts)
1267 ]
1268 )
9f95a23c 1269 )
20effc67
TL
1270 )
1271 )
1272 return new
1273 self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror)
522d829b 1274 def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
9f95a23c
TL
1275 current_json = self.rook_api_get(
1276 "{}/{}".format(crd_name, cr_name)
1277 )
1278
1279 current = crd.from_json(current_json)
1280 new = crd.from_json(current_json) # no deepcopy.
1281
1282 new = func(current, new)
1283
1284 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
1285
1286 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
11fdf7f2
TL
1287
1288 if len(patch) == 0:
1289 return "No change"
1290
1291 try:
1292 self.rook_api_patch(
9f95a23c 1293 "{}/{}".format(crd_name, cr_name),
11fdf7f2
TL
1294 body=patch)
1295 except ApiException as e:
1296 log.exception("API exception: {0}".format(e))
1297 raise ApplyException(
9f95a23c 1298 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
11fdf7f2
TL
1299
1300 return "Success"
9f95a23c 1301
522d829b
TL
1302 def _create_or_patch(self,
1303 crd: Type,
1304 crd_name: str,
1305 cr_name: str,
1306 update_func: Callable[[CrdClassT], CrdClassT],
1307 create_func: Callable[[], CrdClassT]) -> str:
9f95a23c
TL
1308 try:
1309 current_json = self.rook_api_get(
1310 "{}/{}".format(crd_name, cr_name)
1311 )
1312 except ApiException as e:
1313 if e.status == 404:
1314 current_json = None
1315 else:
1316 raise
1317
1318 if current_json:
9f95a23c
TL
1319 new = crd.from_json(current_json) # no deepcopy.
1320
f67539c2 1321 new = update_func(new)
9f95a23c
TL
1322
1323 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
1324
1325 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
1326
1327 if len(patch) == 0:
1328 return "No change"
1329
1330 try:
1331 self.rook_api_patch(
1332 "{}/{}".format(crd_name, cr_name),
1333 body=patch)
1334 except ApiException as e:
1335 log.exception("API exception: {0}".format(e))
1336 raise ApplyException(
1337 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
1338 return "Updated"
1339 else:
1340 new = create_func()
1341 with self.ignore_409("{} {} already exists".format(crd_name,
1342 cr_name)):
1343 self.rook_api_post("{}/".format(crd_name),
1344 body=new.to_json())
1345 return "Created"
801d1391
TL
1346 def get_ceph_image(self) -> str:
1347 try:
1348 api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
1349 label_selector="app=rook-ceph-mon",
1350 timeout_seconds=10)
1351 if api_response.items:
1352 return api_response.items[-1].spec.containers[0].image
1353 else:
1354 raise orchestrator.OrchestratorError(
1355 "Error getting ceph image. Cluster without monitors")
1356 except ApiException as e:
1357 raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
1358
1359
1360 def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
1361 operation_id = str(hash(loc))
1362 message = ""
1363
1364 # job definition
1365 job_metadata = client.V1ObjectMeta(name=operation_id,
1366 namespace= self.rook_env.namespace,
1367 labels={"ident": operation_id})
1368 pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
1369 pod_container = client.V1Container(name="ceph-lsmcli-command",
1370 security_context=client.V1SecurityContext(privileged=True),
1371 image=self.get_ceph_image(),
1372 command=["lsmcli",],
1373 args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
1374 '--path', loc.path or loc.dev,],
1375 volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
1376 client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
1377 pod_spec = client.V1PodSpec(containers=[pod_container],
1378 active_deadline_seconds=30, # Max time to terminate pod
1379 restart_policy="Never",
1380 node_selector= {"kubernetes.io/hostname": loc.host},
1381 volumes=[client.V1Volume(name="devices",
1382 host_path=client.V1HostPathVolumeSource(path="/dev")),
1383 client.V1Volume(name="run-udev",
1384 host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
1385 pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
1386 spec=pod_spec)
1387 job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
1388 ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
1389 backoff_limit=0,
1390 template=pod_template)
1391 job = client.V1Job(api_version="batch/v1",
1392 kind="Job",
1393 metadata=job_metadata,
1394 spec=job_spec)
1395
1396 # delete previous job if it exists
1397 try:
1398 try:
1399 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
1400 self.rook_env.namespace,
1401 propagation_policy="Background")
1402 except ApiException as e:
1403 if e.status != 404: # No problem if the job does not exist
1404 raise
1405
1406 # wait until the job is not present
1407 deleted = False
1408 retries = 0
1409 while not deleted and retries < 10:
1410 api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
1411 label_selector="ident=%s" % operation_id,
1412 timeout_seconds=10)
1413 deleted = not api_response.items
1414 if retries > 5:
1415 sleep(0.1)
522d829b 1416 retries += 1
801d1391
TL
1417 if retries == 10 and not deleted:
1418 raise orchestrator.OrchestratorError(
1419 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
1420 on, loc.host, loc.path or loc.dev, operation_id))
1421
1422 # create the job
1423 api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
1424
1425 # get the result
1426 finished = False
1427 while not finished:
1428 api_response = self.batchV1_api.read_namespaced_job(operation_id,
1429 self.rook_env.namespace)
1430 finished = api_response.status.succeeded or api_response.status.failed
1431 if finished:
1432 message = api_response.status.conditions[-1].message
1433
1434 # get the result of the lsmcli command
1435 api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
1436 label_selector="ident=%s" % operation_id,
1437 timeout_seconds=10)
1438 if api_response.items:
1439 pod_name = api_response.items[-1].metadata.name
1440 message = self.coreV1_api.read_namespaced_pod_log(pod_name,
1441 self.rook_env.namespace)
1442
1443 except ApiException as e:
1444 log.exception('K8s API failed. {}'.format(e))
1445 raise
1446
1447 # Finally, delete the job.
1448 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
1449 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
1450 try:
1451 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
1452 self.rook_env.namespace,
1453 propagation_policy="Background")
1454 except ApiException as e:
1455 if e.status != 404: # No problem if the job does not exist
1456 raise
1457
1458 return message
1459
1460 def blink_light(self, ident_fault, on, locs):
1461 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
1462 return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]
20effc67
TL
1463
1464def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem:
1465 all_hostnames = [hs.hostname for hs in all_hosts]
1466 res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList())
1467 if spec.host_pattern and spec.host_pattern != "*":
1468 raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements")
1469 if spec.label:
1470 res.matchExpressions.append(
1471 ccl.MatchExpressionsItem(
1472 key="ceph-label/" + spec.label,
1473 operator="Exists"
1474 )
1475 )
1476 if spec.hosts:
1477 host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames]
1478 res.matchExpressions.append(
1479 ccl.MatchExpressionsItem(
1480 key="kubernetes.io/hostname",
1481 operator="In",
1482 values=ccl.CrdObjectList(host_list)
1483 )
1484 )
1485 if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern):
1486 res.matchExpressions.append(
1487 ccl.MatchExpressionsItem(
1488 key="kubernetes.io/hostname",
1489 operator="Exists",
1490 )
1491 )
1492 return res
1493
1494def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec:
1495 res = PlacementSpec()
1496 for expression in node_selector.matchExpressions:
1497 if expression.key.startswith("ceph-label/"):
1498 res.label = expression.key.split('/')[1]
1499 elif expression.key == "kubernetes.io/hostname":
1500 if expression.operator == "Exists":
1501 res.host_pattern = "*"
1502 elif expression.operator == "In":
1503 res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
1504 return res