]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/rook_cluster.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
1 """
2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
5 call methods.
6
7 This module is runnable outside of ceph-mgr, useful for testing.
8 """
9 import datetime
10 import threading
11 import logging
12 from contextlib import contextmanager
13 from time import sleep
14 import re
15 from orchestrator import OrchResult
16
17 import jsonpatch
18 from urllib.parse import urljoin
19 import json
20
21 # Optional kubernetes imports to enable MgrModule.can_run
22 # to behave cleanly.
23 from urllib3.exceptions import ProtocolError
24
25 from ceph.deployment.inventory import Device
26 from ceph.deployment.drive_group import DriveGroupSpec
27 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
28 from ceph.utils import datetime_now
29 from ceph.deployment.drive_selection.matchers import SizeMatcher
30 from nfs.cluster import create_ganesha_pool
31 from nfs.module import Module
32 from nfs.export import NFSRados
33 from mgr_module import NFS_POOL_NAME
34 from mgr_util import merge_dicts
35
36 from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \
37 Iterable, Dict, Iterator, Type
38
39 try:
40 from kubernetes import client, watch
41 from kubernetes.client.rest import ApiException
42 except ImportError:
43 class ApiException(Exception): # type: ignore
44 status = 0
45
46 from .rook_client.ceph import cephfilesystem as cfs
47 from .rook_client.ceph import cephnfs as cnfs
48 from .rook_client.ceph import cephobjectstore as cos
49 from .rook_client.ceph import cephcluster as ccl
50 from .rook_client.ceph import cephrbdmirror as crbdm
51 from .rook_client._helper import CrdClass
52
53 import orchestrator
54
55 try:
56 from rook.module import RookEnv, RookOrchestrator
57 except ImportError:
58 pass # just used for type checking.
59
60
61 T = TypeVar('T')
62 FuncT = TypeVar('FuncT', bound=Callable)
63
64 CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
65
66
67 log = logging.getLogger(__name__)
68
69
70 def __urllib3_supports_read_chunked() -> bool:
71 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
72 # than required by kubernetes-client
73 try:
74 from urllib3.response import HTTPResponse
75 return hasattr(HTTPResponse, 'read_chunked')
76 except ImportError:
77 return False
78
79
80 _urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
81
82 class ApplyException(orchestrator.OrchestratorError):
83 """
84 For failures to update the Rook CRDs, usually indicating
85 some kind of interference between our attempted update
86 and other conflicting activity.
87 """
88
89
90 def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
91 def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
92 t = threading.Thread(target=f, args=args, kwargs=kwargs)
93 t.start()
94 return t
95
96 return cast(Callable[..., threading.Thread], wrapper)
97
98
99 class DefaultFetcher():
100 def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
101 self.storage_class = storage_class
102 self.coreV1_api = coreV1_api
103
104 def fetch(self) -> None:
105 self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
106 self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
107
108 def convert_size(self, size_str: str) -> int:
109 units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
110 coeff_and_unit = re.search('(\d+)(\D+)', size_str)
111 assert coeff_and_unit is not None
112 coeff = int(coeff_and_unit[1])
113 unit = coeff_and_unit[2]
114 try:
115 factor = units.index(unit) % 7
116 except ValueError:
117 log.error("PV size format invalid")
118 raise
119 size = coeff * (2 ** (10 * factor))
120 return size
121
122 def devices(self) -> Dict[str, List[Device]]:
123 nodename_to_devices: Dict[str, List[Device]] = {}
124 for i in self.pvs_in_sc:
125 node, device = self.device(i)
126 if node not in nodename_to_devices:
127 nodename_to_devices[node] = []
128 nodename_to_devices[node].append(device)
129 return nodename_to_devices
130
131 def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
132 node = 'N/A'
133 if i.spec.node_affinity:
134 terms = i.spec.node_affinity.required.node_selector_terms
135 if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
136 node = terms[0].match_expressions[0].values[0]
137 size = self.convert_size(i.spec.capacity['storage'])
138 path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
139 state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
140 pv_name = i.metadata.name
141 device = Device(
142 path = path,
143 sys_api = dict(
144 size = size,
145 node = node,
146 pv_name = pv_name
147 ),
148 available = state,
149 )
150 return (node, device)
151
152
153 class LSOFetcher(DefaultFetcher):
154 def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
155 super().__init__(storage_class, coreV1_api)
156 self.customObjects_api = customObjects_api
157 self.nodenames = nodenames
158
159 def fetch(self) -> None:
160 super().fetch()
161 self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
162 group="local.storage.openshift.io",
163 version="v1alpha1",
164 plural="localvolumediscoveryresults")
165
166 def predicate(self, item: 'client.V1ConfigMapList') -> bool:
167 if self.nodenames is not None:
168 return item['spec']['nodeName'] in self.nodenames
169 else:
170 return True
171
172 def devices(self) -> Dict[str, List[Device]]:
173 try:
174 lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
175 except ApiException as dummy_e:
176 log.error("Failed to fetch device metadata")
177 raise
178 self.lso_devices = {}
179 for i in lso_discovery_results:
180 drives = i['status']['discoveredDevices']
181 for drive in drives:
182 self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
183 nodename_to_devices: Dict[str, List[Device]] = {}
184 for i in self.pvs_in_sc:
185 node, device = (None, None)
186 if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
187 node, device = super().device(i)
188 else:
189 node, device = self.device(i)
190 if node not in nodename_to_devices:
191 nodename_to_devices[node] = []
192 nodename_to_devices[node].append(device)
193 return nodename_to_devices
194
195 def device(self, i: Any) -> Tuple[str, Device]:
196 node = i.metadata.labels['kubernetes.io/hostname']
197 device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
198 pv_name = i.metadata.name
199 vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
200 model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
201 device = Device(
202 path = device_discovery['path'],
203 sys_api = dict(
204 size = device_discovery['size'],
205 rotational = '1' if device_discovery['property']=='Rotational' else '0',
206 node = node,
207 pv_name = pv_name,
208 model = model,
209 vendor = vendor
210 ),
211 available = device_discovery['status']['state']=='Available',
212 device_id = device_discovery['deviceID'].split('/')[-1],
213 lsm_data = dict(
214 serialNum = device_discovery['serial']
215 )
216 )
217 return (node, device)
218
219
220 class PDFetcher(DefaultFetcher):
221 """ Physical Devices Fetcher"""
222 def __init__(self, coreV1_api: 'client.CoreV1Api'):
223 self.coreV1_api = coreV1_api
224
225 def fetch(self) -> None:
226 """ Collect the devices information from k8s configmaps"""
227 self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
228 namespace='rook-ceph',
229 label_selector='app=rook-discover')
230
231 def devices(self) -> Dict[str, List[Device]]:
232 """ Return the list of devices found"""
233 node_devices: Dict[str, List[Device]] = {}
234 for i in self.dev_cms.items:
235 devices_list: List[Device] = []
236 for d in json.loads(i.data['devices']):
237 devices_list.append(self.device(d)[1])
238 node_devices[i.metadata.labels['rook.io/node']] = devices_list
239
240 return node_devices
241
242 def device(self, devData: Dict[str,str]) -> Tuple[str, Device]:
243 """ Build an orchestrator device """
244 if 'cephVolumeData' in devData and devData['cephVolumeData']:
245 return "", Device.from_json(json.loads(devData['cephVolumeData']))
246 else:
247 return "", Device(
248 path='/dev/' + devData['name'],
249 sys_api=dict(
250 rotational='1' if devData['rotational'] else '0',
251 size=devData['size']
252 ),
253 available=False,
254 rejected_reasons=['device data coming from ceph-volume not provided'],
255 )
256
257
258 class KubernetesResource(Generic[T]):
259 def __init__(self, api_func: Callable, **kwargs: Any) -> None:
260 """
261 Generic kubernetes Resource parent class
262
263 The api fetch and watch methods should be common across resource types,
264
265 Exceptions in the runner thread are propagated to the caller.
266
267 :param api_func: kubernetes client api function that is passed to the watcher
268 :param filter_func: signature: ``(Item) -> bool``.
269 """
270 self.kwargs = kwargs
271 self.api_func = api_func
272
273 # ``_items`` is accessed by different threads. I assume assignment is atomic.
274 self._items: Dict[str, T] = dict()
275 self.thread = None # type: Optional[threading.Thread]
276 self.exception: Optional[Exception] = None
277 if not _urllib3_supports_read_chunked:
278 logging.info('urllib3 is too old. Fallback to full fetches')
279
280 def _fetch(self) -> str:
281 """ Execute the requested api method as a one-off fetch"""
282 response = self.api_func(**self.kwargs)
283 metadata = response.metadata
284 self._items = {item.metadata.name: item for item in response.items}
285 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
286 return metadata.resource_version
287
288 @property
289 def items(self) -> Iterable[T]:
290 """
291 Returns the items of the request.
292 Creates the watcher as a side effect.
293 :return:
294 """
295 if self.exception:
296 e = self.exception
297 self.exception = None
298 raise e # Propagate the exception to the user.
299 if not self.thread or not self.thread.is_alive():
300 resource_version = self._fetch()
301 if _urllib3_supports_read_chunked:
302 # Start a thread which will use the kubernetes watch client against a resource
303 log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
304 self.thread = self._watch(resource_version)
305
306 return self._items.values()
307
308 def get_item_name(self, item: Any) -> Any:
309 try:
310 return item.metadata.name
311 except AttributeError:
312 raise AttributeError(
313 "{} doesn't contain a metadata.name. Unable to track changes".format(
314 self.api_func))
315 @threaded
316 def _watch(self, res_ver: Optional[str]) -> None:
317 """ worker thread that runs the kubernetes watch """
318
319 self.exception = None
320
321 w = watch.Watch()
322
323 try:
324 # execute generator to continually watch resource for changes
325 for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
326 **self.kwargs):
327 self.health = ''
328 item = event['object']
329 name = self.get_item_name(item)
330
331 log.info('{} event: {}'.format(event['type'], name))
332
333 if event['type'] in ('ADDED', 'MODIFIED'):
334 self._items = merge_dicts(self._items, {name: item})
335 elif event['type'] == 'DELETED':
336 self._items = {k:v for k,v in self._items.items() if k != name}
337 elif event['type'] == 'BOOKMARK':
338 pass
339 elif event['type'] == 'ERROR':
340 raise ApiException(str(event))
341 else:
342 raise KeyError('Unknown watch event {}'.format(event['type']))
343 except ProtocolError as e:
344 if 'Connection broken' in str(e):
345 log.info('Connection reset.')
346 return
347 raise
348 except ApiException as e:
349 log.exception('K8s API failed. {}'.format(self.api_func))
350 self.exception = e
351 raise
352 except Exception as e:
353 log.exception("Watcher failed. ({})".format(self.api_func))
354 self.exception = e
355 raise
356
357 class KubernetesCustomResource(KubernetesResource):
358 def _fetch(self) -> str:
359 response = self.api_func(**self.kwargs)
360 metadata = response['metadata']
361 self._items = {item['metadata']['name']: item for item in response['items']}
362 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
363 return metadata['resourceVersion']
364
365 def get_item_name(self, item: Any) -> Any:
366 try:
367 return item['metadata']['name']
368 except AttributeError:
369 raise AttributeError(
370 "{} doesn't contain a metadata.name. Unable to track changes".format(
371 self.api_func))
372
373 class DefaultCreator():
374 def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
375 self.coreV1_api = coreV1_api
376 self.storage_class = storage_class
377 self.inventory = inventory
378
379 def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
380 device_set = ccl.StorageClassDeviceSetsItem(
381 name=d.sys_api['pv_name'],
382 volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
383 count=1,
384 encrypted=drive_group.encrypted,
385 portable=False
386 )
387 device_set.volumeClaimTemplates.append(
388 ccl.VolumeClaimTemplatesItem(
389 metadata=ccl.Metadata(
390 name="data"
391 ),
392 spec=ccl.Spec(
393 storageClassName=self.storage_class,
394 volumeMode="Block",
395 accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
396 resources={
397 "requests":{
398 "storage": 1
399 }
400 },
401 volumeName=d.sys_api['pv_name']
402 )
403 )
404 )
405 return device_set
406
407 def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
408 device_list = []
409 assert drive_group.data_devices is not None
410 sizematcher: Optional[SizeMatcher] = None
411 if drive_group.data_devices.size:
412 sizematcher = SizeMatcher('size', drive_group.data_devices.size)
413 limit = getattr(drive_group.data_devices, 'limit', None)
414 count = 0
415 all = getattr(drive_group.data_devices, 'all', None)
416 paths = [device.path for device in drive_group.data_devices.paths]
417 osd_list = []
418 for pod in rook_pods.items:
419 if (
420 hasattr(pod, 'metadata')
421 and hasattr(pod.metadata, 'labels')
422 and 'osd' in pod.metadata.labels
423 and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
424 ):
425 osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
426 for _, node in self.inventory.items():
427 for device in node:
428 if device.sys_api['pv_name'] in osd_list:
429 count += 1
430 for _, node in self.inventory.items():
431 for device in node:
432 if not limit or (count < limit):
433 if device.available:
434 if (
435 all
436 or (
437 device.sys_api['node'] in matching_hosts
438 and ((sizematcher != None) or sizematcher.compare(device))
439 and (
440 not drive_group.data_devices.paths
441 or (device.path in paths)
442 )
443 )
444 ):
445 device_list.append(device)
446 count += 1
447
448 return device_list
449
450 def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
451 to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
452 assert drive_group.data_devices is not None
453 def _add_osds(current_cluster, new_cluster):
454 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
455 if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
456 new_cluster.spec.storage = ccl.Storage()
457
458 if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
459 new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
460
461 existing_scds = [
462 scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
463 ]
464 for device in to_create:
465 new_scds = self.device_to_device_set(drive_group, device)
466 if new_scds.name not in existing_scds:
467 new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
468 return new_cluster
469 return _add_osds
470
471 class LSOCreator(DefaultCreator):
472 def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
473 device_list = []
474 assert drive_group.data_devices is not None
475 sizematcher = None
476 if drive_group.data_devices.size:
477 sizematcher = SizeMatcher('size', drive_group.data_devices.size)
478 limit = getattr(drive_group.data_devices, 'limit', None)
479 all = getattr(drive_group.data_devices, 'all', None)
480 paths = [device.path for device in drive_group.data_devices.paths]
481 vendor = getattr(drive_group.data_devices, 'vendor', None)
482 model = getattr(drive_group.data_devices, 'model', None)
483 count = 0
484 osd_list = []
485 for pod in rook_pods.items:
486 if (
487 hasattr(pod, 'metadata')
488 and hasattr(pod.metadata, 'labels')
489 and 'osd' in pod.metadata.labels
490 and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
491 ):
492 osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
493 for _, node in self.inventory.items():
494 for device in node:
495 if device.sys_api['pv_name'] in osd_list:
496 count += 1
497 for _, node in self.inventory.items():
498 for device in node:
499 if not limit or (count < limit):
500 if device.available:
501 if (
502 all
503 or (
504 device.sys_api['node'] in matching_hosts
505 and ((sizematcher != None) or sizematcher.compare(device))
506 and (
507 not drive_group.data_devices.paths
508 or device.path in paths
509 )
510 and (
511 not vendor
512 or device.sys_api['vendor'] == vendor
513 )
514 and (
515 not model
516 or device.sys_api['model'].startsWith(model)
517 )
518 )
519 ):
520 device_list.append(device)
521 count += 1
522 return device_list
523
524 class DefaultRemover():
525 def __init__(
526 self,
527 coreV1_api: 'client.CoreV1Api',
528 batchV1_api: 'client.BatchV1Api',
529 appsV1_api: 'client.AppsV1Api',
530 osd_ids: List[str],
531 replace_flag: bool,
532 force_flag: bool,
533 mon_command: Callable,
534 patch: Callable,
535 rook_env: 'RookEnv',
536 inventory: Dict[str, List[Device]]
537 ):
538 self.batchV1_api = batchV1_api
539 self.appsV1_api = appsV1_api
540 self.coreV1_api = coreV1_api
541
542 self.osd_ids = osd_ids
543 self.replace_flag = replace_flag
544 self.force_flag = force_flag
545
546 self.mon_command = mon_command
547
548 self.patch = patch
549 self.rook_env = rook_env
550
551 self.inventory = inventory
552 self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
553 self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare')
554 self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
555
556
557 def remove_device_sets(self) -> str:
558 self.to_remove: Dict[str, int] = {}
559 self.pvc_to_remove: List[str] = []
560 for pod in self.osd_pods.items:
561 if (
562 hasattr(pod, 'metadata')
563 and hasattr(pod.metadata, 'labels')
564 and 'osd' in pod.metadata.labels
565 and pod.metadata.labels['osd'] in self.osd_ids
566 ):
567 if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
568 self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
569 else:
570 self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
571 self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
572 def _remove_osds(current_cluster, new_cluster):
573 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
574 assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
575 for _set in new_cluster.spec.storage.storageClassDeviceSets:
576 if _set.name in self.to_remove:
577 if _set.count == self.to_remove[_set.name]:
578 new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
579 else:
580 _set.count = _set.count - self.to_remove[_set.name]
581 return new_cluster
582 return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
583
584 def check_force(self) -> None:
585 if not self.force_flag:
586 safe_args = {'prefix': 'osd safe-to-destroy',
587 'ids': [str(x) for x in self.osd_ids]}
588 ret, out, err = self.mon_command(safe_args)
589 if ret != 0:
590 raise RuntimeError(err)
591
592 def set_osds_down(self) -> None:
593 down_flag_args = {
594 'prefix': 'osd down',
595 'ids': [str(x) for x in self.osd_ids]
596 }
597 ret, out, err = self.mon_command(down_flag_args)
598 if ret != 0:
599 raise RuntimeError(err)
600
601 def scale_deployments(self) -> None:
602 for osd_id in self.osd_ids:
603 self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
604 spec=client.V1ScaleSpec(
605 replicas=0
606 )
607 ))
608
609 def set_osds_out(self) -> None:
610 out_flag_args = {
611 'prefix': 'osd out',
612 'ids': [str(x) for x in self.osd_ids]
613 }
614 ret, out, err = self.mon_command(out_flag_args)
615 if ret != 0:
616 raise RuntimeError(err)
617
618 def delete_deployments(self) -> None:
619 for osd_id in self.osd_ids:
620 self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
621
622 def clean_up_prepare_jobs_and_pvc(self) -> None:
623 for job in self.jobs.items:
624 if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
625 self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
626 self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
627
628 def purge_osds(self) -> None:
629 for id in self.osd_ids:
630 purge_args = {
631 'prefix': 'osd purge-actual',
632 'id': int(id),
633 'yes_i_really_mean_it': True
634 }
635 ret, out, err = self.mon_command(purge_args)
636 if ret != 0:
637 raise RuntimeError(err)
638
639 def destroy_osds(self) -> None:
640 for id in self.osd_ids:
641 destroy_args = {
642 'prefix': 'osd destroy-actual',
643 'id': int(id),
644 'yes_i_really_mean_it': True
645 }
646 ret, out, err = self.mon_command(destroy_args)
647 if ret != 0:
648 raise RuntimeError(err)
649
650 def remove(self) -> str:
651 try:
652 self.check_force()
653 except Exception as e:
654 log.exception("Error checking if OSDs are safe to destroy")
655 return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
656 try:
657 remove_result = self.remove_device_sets()
658 except Exception as e:
659 log.exception("Error patching ceph cluster CRD")
660 return f"Not possible to modify Ceph cluster CRD: {e}"
661 try:
662 self.scale_deployments()
663 self.delete_deployments()
664 self.clean_up_prepare_jobs_and_pvc()
665 except Exception as e:
666 log.exception("Ceph cluster CRD patched, but error cleaning environment")
667 return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
668 try:
669 self.set_osds_down()
670 self.set_osds_out()
671 if self.replace_flag:
672 self.destroy_osds()
673 else:
674 self.purge_osds()
675 except Exception as e:
676 log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
677 return f"Error removing OSDs from Ceph cluster: {e}"
678
679 return remove_result
680
681
682
683 class RookCluster(object):
684 # import of client.CoreV1Api must be optional at import time.
685 # Instead allow mgr/rook to be imported anyway.
686 def __init__(
687 self,
688 coreV1_api: 'client.CoreV1Api',
689 batchV1_api: 'client.BatchV1Api',
690 customObjects_api: 'client.CustomObjectsApi',
691 storageV1_api: 'client.StorageV1Api',
692 appsV1_api: 'client.AppsV1Api',
693 rook_env: 'RookEnv',
694 storage_class: 'str'
695 ):
696 self.rook_env = rook_env # type: RookEnv
697 self.coreV1_api = coreV1_api # client.CoreV1Api
698 self.batchV1_api = batchV1_api
699 self.customObjects_api = customObjects_api
700 self.storageV1_api = storageV1_api # client.StorageV1Api
701 self.appsV1_api = appsV1_api # client.AppsV1Api
702 self.storage_class = storage_class # type: str
703
704 # TODO: replace direct k8s calls with Rook API calls
705 self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
706
707 self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
708 namespace=self.rook_env.namespace,
709 label_selector="rook_cluster={0}".format(
710 self.rook_env.namespace))
711 self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
712
713 def rook_url(self, path: str) -> str:
714 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
715 self.rook_env.crd_version, self.rook_env.namespace)
716 return urljoin(prefix, path)
717
718 def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
719 full_path = self.rook_url(path)
720 log.debug("[%s] %s" % (verb, full_path))
721
722 return self.coreV1_api.api_client.call_api(
723 full_path,
724 verb,
725 auth_settings=['BearerToken'],
726 response_type="object",
727 _return_http_data_only=True,
728 _preload_content=True,
729 **kwargs)
730
731 def rook_api_get(self, path: str, **kwargs: Any) -> Any:
732 return self.rook_api_call("GET", path, **kwargs)
733
734 def rook_api_delete(self, path: str) -> Any:
735 return self.rook_api_call("DELETE", path)
736
737 def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
738 return self.rook_api_call("PATCH", path,
739 header_params={"Content-Type": "application/json-patch+json"},
740 **kwargs)
741
742 def rook_api_post(self, path: str, **kwargs: Any) -> Any:
743 return self.rook_api_call("POST", path, **kwargs)
744
745 def get_storage_class(self) -> 'client.V1StorageClass':
746 matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
747 if len(matching_sc) == 0:
748 log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
749 raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
750 return matching_sc[0]
751
752 def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
753 self.fetcher: Optional[DefaultFetcher] = None
754 op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
755 if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
756 self.fetcher = PDFetcher(self.coreV1_api)
757 else:
758 storage_class = self.get_storage_class()
759 if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
760 self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
761 else:
762 self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
763
764 self.fetcher.fetch()
765 return self.fetcher.devices()
766
767 def get_osds(self) -> List:
768 osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
769 return list(osd_pods.items)
770
771 def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
772 #
773 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
774 # URL for the instance within that cluster. If the fetch fails, just
775 # return None.
776 #
777 try:
778 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
779 except ApiException as e:
780 log.info("Unable to fetch cephnfs object: {}".format(e.status))
781 return None
782
783 pool = ceph_nfs['spec']['rados']['pool']
784 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
785
786 if namespace == None:
787 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
788 else:
789 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
790 return url
791
792 def describe_pods(self,
793 service_type: Optional[str],
794 service_id: Optional[str],
795 nodename: Optional[str]) -> List[Dict[str, Any]]:
796 """
797 Go query the k8s API about deployment, containers related to this
798 filesystem
799
800 Example Rook Pod labels for a mgr daemon:
801 Labels: app=rook-ceph-mgr
802 pod-template-hash=2171958073
803 rook_cluster=rook
804 And MDS containers additionally have `rook_filesystem` label
805
806 Label filter is rook_cluster=<cluster namespace>
807 rook_file_system=<self.fs_name>
808 """
809 def predicate(item):
810 # type: (client.V1Pod) -> bool
811 metadata = item.metadata
812 if service_type is not None:
813 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
814 return False
815
816 if service_id is not None:
817 try:
818 k, v = {
819 "mds": ("rook_file_system", service_id),
820 "osd": ("ceph-osd-id", service_id),
821 "mon": ("mon", service_id),
822 "mgr": ("mgr", service_id),
823 "nfs": ("nfs", service_id),
824 "rgw": ("ceph_rgw", service_id),
825 }[service_type]
826 except KeyError:
827 raise orchestrator.OrchestratorValidationError(
828 '{} not supported'.format(service_type))
829 if metadata.labels[k] != v:
830 return False
831
832 if nodename is not None:
833 if item.spec.node_name != nodename:
834 return False
835 return True
836
837 refreshed = datetime_now()
838 pods = [i for i in self.rook_pods.items if predicate(i)]
839
840 pods_summary = []
841 prefix = 'sha256:'
842
843 for p in pods:
844 d = p.to_dict()
845
846 image_name = None
847 for c in d['spec']['containers']:
848 # look at the first listed container in the pod...
849 image_name = c['image']
850 break
851
852 ls = d['status'].get('container_statuses')
853 if not ls:
854 # ignore pods with no containers
855 continue
856 image_id = ls[0]['image_id']
857 image_id = image_id.split(prefix)[1] if prefix in image_id else image_id
858
859 s = {
860 "name": d['metadata']['name'],
861 "hostname": d['spec']['node_name'],
862 "labels": d['metadata']['labels'],
863 'phase': d['status']['phase'],
864 'container_image_name': image_name,
865 'container_image_id': image_id,
866 'refreshed': refreshed,
867 # these may get set below...
868 'started': None,
869 'created': None,
870 }
871
872 # note: we want UTC
873 if d['metadata'].get('creation_timestamp', None):
874 s['created'] = d['metadata']['creation_timestamp'].astimezone(
875 tz=datetime.timezone.utc)
876 if d['status'].get('start_time', None):
877 s['started'] = d['status']['start_time'].astimezone(
878 tz=datetime.timezone.utc)
879
880 pods_summary.append(s)
881
882 return pods_summary
883
884 def remove_pods(self, names: List[str]) -> List[str]:
885 pods = [i for i in self.rook_pods.items]
886 for p in pods:
887 d = p.to_dict()
888 daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
889 daemon_id = d['metadata']['labels']['ceph_daemon_id']
890 name = daemon_type + '.' + daemon_id
891 if name in names:
892 self.coreV1_api.delete_namespaced_pod(
893 d['metadata']['name'],
894 self.rook_env.namespace,
895 body=client.V1DeleteOptions()
896 )
897 return [f'Removed Pod {n}' for n in names]
898
899 def get_node_names(self) -> List[str]:
900 return [i.metadata.name for i in self.nodes.items]
901
902 @contextmanager
903 def ignore_409(self, what: str) -> Iterator[None]:
904 try:
905 yield
906 except ApiException as e:
907 if e.status == 409:
908 # Idempotent, succeed.
909 log.info("{} already exists".format(what))
910 else:
911 raise
912
913 def apply_filesystem(self, spec: ServiceSpec, num_replicas: int,
914 leaf_type: str) -> str:
915 # TODO use spec.placement
916 # TODO warn if spec.extended has entries we don't kow how
917 # to action.
918 all_hosts = self.get_hosts()
919 def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem:
920 new.spec.metadataServer.activeCount = spec.placement.count or 1
921 new.spec.metadataServer.placement = cfs.Placement(
922 nodeAffinity=cfs.NodeAffinity(
923 requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
924 nodeSelectorTerms=cfs.NodeSelectorTermsList(
925 [placement_spec_to_node_selector(spec.placement, all_hosts)]
926 )
927 )
928 )
929 )
930 return new
931 def _create_fs() -> cfs.CephFilesystem:
932 fs = cfs.CephFilesystem(
933 apiVersion=self.rook_env.api_name,
934 metadata=dict(
935 name=spec.service_id,
936 namespace=self.rook_env.namespace,
937 ),
938 spec=cfs.Spec(
939 dataPools=cfs.DataPoolsList(
940 {
941 cfs.DataPoolsItem(
942 failureDomain=leaf_type,
943 replicated=cfs.Replicated(
944 size=num_replicas
945 )
946 )
947 }
948 ),
949 metadataPool=cfs.MetadataPool(
950 failureDomain=leaf_type,
951 replicated=cfs.Replicated(
952 size=num_replicas
953 )
954 ),
955 metadataServer=cfs.MetadataServer(
956 activeCount=spec.placement.count or 1,
957 activeStandby=True,
958 placement=
959 cfs.Placement(
960 nodeAffinity=cfs.NodeAffinity(
961 requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
962 nodeSelectorTerms=cfs.NodeSelectorTermsList(
963 [placement_spec_to_node_selector(spec.placement, all_hosts)]
964 )
965 )
966 )
967 )
968 )
969 )
970 )
971 return fs
972 assert spec.service_id is not None
973 return self._create_or_patch(
974 cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
975 _update_fs, _create_fs)
976
977 def get_matching_node(self, host: str) -> Any:
978 matching_node = None
979 for node in self.nodes.items:
980 if node.metadata.labels['kubernetes.io/hostname'] == host:
981 matching_node = node
982 return matching_node
983
984 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
985 matching_node = self.get_matching_node(host)
986 if matching_node == None:
987 return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster"))
988 matching_node.metadata.labels['ceph-label/'+ label] = ""
989 self.coreV1_api.patch_node(host, matching_node)
990 return OrchResult(f'Added {label} label to {host}')
991
992 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
993 matching_node = self.get_matching_node(host)
994 if matching_node == None:
995 return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster"))
996 matching_node.metadata.labels.pop('ceph-label/' + label, None)
997 self.coreV1_api.patch_node(host, matching_node)
998 return OrchResult(f'Removed {label} label from {host}')
999
1000 def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str:
1001 assert spec.service_id is not None
1002
1003 name = spec.service_id
1004
1005 if '.' in spec.service_id:
1006 # rook does not like . in the name. this is could
1007 # there because it is a legacy rgw spec that was named
1008 # like $realm.$zone, except that I doubt there were any
1009 # users of this code. Instead, focus on future users and
1010 # translate . to - (fingers crossed!) instead.
1011 name = spec.service_id.replace('.', '-')
1012
1013 all_hosts = self.get_hosts()
1014 def _create_zone() -> cos.CephObjectStore:
1015 port = None
1016 secure_port = None
1017 if spec.ssl:
1018 secure_port = spec.get_port()
1019 else:
1020 port = spec.get_port()
1021 object_store = cos.CephObjectStore(
1022 apiVersion=self.rook_env.api_name,
1023 metadata=dict(
1024 name=name,
1025 namespace=self.rook_env.namespace
1026 ),
1027 spec=cos.Spec(
1028 gateway=cos.Gateway(
1029 port=port,
1030 securePort=secure_port,
1031 instances=spec.placement.count or 1,
1032 placement=cos.Placement(
1033 cos.NodeAffinity(
1034 requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution(
1035 nodeSelectorTerms=cos.NodeSelectorTermsList(
1036 [
1037 placement_spec_to_node_selector(spec.placement, all_hosts)
1038 ]
1039 )
1040 )
1041 )
1042 )
1043 ),
1044 dataPool=cos.DataPool(
1045 failureDomain=leaf_type,
1046 replicated=cos.Replicated(
1047 size=num_replicas
1048 )
1049 ),
1050 metadataPool=cos.MetadataPool(
1051 failureDomain=leaf_type,
1052 replicated=cos.Replicated(
1053 size=num_replicas
1054 )
1055 )
1056 )
1057 )
1058 if spec.rgw_zone:
1059 object_store.spec.zone=cos.Zone(
1060 name=spec.rgw_zone
1061 )
1062 return object_store
1063
1064
1065 def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
1066 if new.spec.gateway:
1067 new.spec.gateway.instances = spec.placement.count or 1
1068 else:
1069 new.spec.gateway=cos.Gateway(
1070 instances=spec.placement.count or 1
1071 )
1072 return new
1073 return self._create_or_patch(
1074 cos.CephObjectStore, 'cephobjectstores', name,
1075 _update_zone, _create_zone)
1076
1077 def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str:
1078 # TODO use spec.placement
1079 # TODO warn if spec.extended has entries we don't kow how
1080 # to action.
1081 # TODO Number of pods should be based on the list of hosts in the
1082 # PlacementSpec.
1083 assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing
1084 service_id = spec.service_id
1085 mgr_module = cast(Module, mgr)
1086 count = spec.placement.count or 1
1087 def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS:
1088 new.spec.server.active = count
1089 return new
1090
1091 def _create_nfs() -> cnfs.CephNFS:
1092 rook_nfsgw = cnfs.CephNFS(
1093 apiVersion=self.rook_env.api_name,
1094 metadata=dict(
1095 name=spec.service_id,
1096 namespace=self.rook_env.namespace,
1097 ),
1098 spec=cnfs.Spec(
1099 rados=cnfs.Rados(
1100 namespace=service_id,
1101 pool=NFS_POOL_NAME,
1102 ),
1103 server=cnfs.Server(
1104 active=count
1105 )
1106 )
1107 )
1108
1109
1110 return rook_nfsgw
1111
1112 create_ganesha_pool(mgr)
1113 NFSRados(mgr_module.rados, service_id).write_obj('', f'conf-nfs.{spec.service_id}')
1114 return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id,
1115 _update_nfs, _create_nfs)
1116
1117 def rm_service(self, rooktype: str, service_id: str) -> str:
1118 self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id)
1119 objpath = "{0}/{1}".format(rooktype, service_id)
1120 return f'Removed {objpath}'
1121
1122 def get_resource(self, resource_type: str) -> Iterable:
1123 custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type)
1124 return custom_objects.items
1125
1126 def can_create_osd(self) -> bool:
1127 current_cluster = self.rook_api_get(
1128 "cephclusters/{0}".format(self.rook_env.cluster_name))
1129 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
1130
1131 # If useAllNodes is set, then Rook will not be paying attention
1132 # to anything we put in 'nodes', so can't do OSD creation.
1133 return not use_all_nodes
1134
1135 def node_exists(self, node_name: str) -> bool:
1136 return node_name in self.get_node_names()
1137
1138 def update_mon_count(self, newcount: Optional[int]) -> str:
1139 def _update_mon_count(current, new):
1140 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
1141 if newcount is None:
1142 raise orchestrator.OrchestratorError('unable to set mon count to None')
1143 if not new.spec.mon:
1144 raise orchestrator.OrchestratorError("mon attribute not specified in new spec")
1145 new.spec.mon.count = newcount
1146 return new
1147 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
1148
1149 def add_osds(self, drive_group, matching_hosts):
1150 # type: (DriveGroupSpec, List[str]) -> str
1151 assert drive_group.objectstore in ("bluestore", "filestore")
1152 assert drive_group.service_id
1153 storage_class = self.get_storage_class()
1154 inventory = self.get_discovered_devices()
1155 creator: Optional[DefaultCreator] = None
1156 if (
1157 storage_class.metadata.labels
1158 and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
1159 ):
1160 creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
1161 else:
1162 creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
1163 return self._patch(
1164 ccl.CephCluster,
1165 'cephclusters',
1166 self.rook_env.cluster_name,
1167 creator.add_osds(self.rook_pods, drive_group, matching_hosts)
1168 )
1169
1170 def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
1171 inventory = self.get_discovered_devices()
1172 self.remover = DefaultRemover(
1173 self.coreV1_api,
1174 self.batchV1_api,
1175 self.appsV1_api,
1176 osd_ids,
1177 replace,
1178 force,
1179 mon_command,
1180 self._patch,
1181 self.rook_env,
1182 inventory
1183 )
1184 return self.remover.remove()
1185
1186 def get_hosts(self) -> List[orchestrator.HostSpec]:
1187 ret = []
1188 for node in self.nodes.items:
1189 spec = orchestrator.HostSpec(
1190 node.metadata.name,
1191 addr='/'.join([addr.address for addr in node.status.addresses]),
1192 labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
1193 )
1194 ret.append(spec)
1195 return ret
1196
1197 def create_zap_job(self, host: str, path: str) -> None:
1198 body = client.V1Job(
1199 api_version="batch/v1",
1200 metadata=client.V1ObjectMeta(
1201 name="rook-ceph-device-zap",
1202 namespace="rook-ceph"
1203 ),
1204 spec=client.V1JobSpec(
1205 template=client.V1PodTemplateSpec(
1206 spec=client.V1PodSpec(
1207 containers=[
1208 client.V1Container(
1209 name="device-zap",
1210 image="rook/ceph:master",
1211 command=["bash"],
1212 args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
1213 env=[
1214 client.V1EnvVar(
1215 name="ROOK_CEPH_USERNAME",
1216 value_from=client.V1EnvVarSource(
1217 secret_key_ref=client.V1SecretKeySelector(
1218 key="ceph-username",
1219 name="rook-ceph-mon"
1220 )
1221 )
1222 ),
1223 client.V1EnvVar(
1224 name="ROOK_CEPH_SECRET",
1225 value_from=client.V1EnvVarSource(
1226 secret_key_ref=client.V1SecretKeySelector(
1227 key="ceph-secret",
1228 name="rook-ceph-mon"
1229 )
1230 )
1231 )
1232 ],
1233 security_context=client.V1SecurityContext(
1234 run_as_user=0,
1235 privileged=True
1236 ),
1237 volume_mounts=[
1238 client.V1VolumeMount(
1239 mount_path="/etc/ceph",
1240 name="ceph-conf-emptydir"
1241 ),
1242 client.V1VolumeMount(
1243 mount_path="/etc/rook",
1244 name="rook-config"
1245 ),
1246 client.V1VolumeMount(
1247 mount_path="/dev",
1248 name="devices"
1249 )
1250 ]
1251 )
1252 ],
1253 volumes=[
1254 client.V1Volume(
1255 name="ceph-conf-emptydir",
1256 empty_dir=client.V1EmptyDirVolumeSource()
1257 ),
1258 client.V1Volume(
1259 name="rook-config",
1260 empty_dir=client.V1EmptyDirVolumeSource()
1261 ),
1262 client.V1Volume(
1263 name="devices",
1264 host_path=client.V1HostPathVolumeSource(
1265 path="/dev"
1266 )
1267 ),
1268 ],
1269 node_selector={
1270 "kubernetes.io/hostname": host
1271 },
1272 restart_policy="Never"
1273 )
1274 )
1275 )
1276 )
1277 self.batchV1_api.create_namespaced_job('rook-ceph', body)
1278
1279 def rbd_mirror(self, spec: ServiceSpec) -> None:
1280 service_id = spec.service_id or "default-rbd-mirror"
1281 all_hosts = self.get_hosts()
1282 def _create_rbd_mirror() -> crbdm.CephRBDMirror:
1283 return crbdm.CephRBDMirror(
1284 apiVersion=self.rook_env.api_name,
1285 metadata=dict(
1286 name=service_id,
1287 namespace=self.rook_env.namespace,
1288 ),
1289 spec=crbdm.Spec(
1290 count=spec.placement.count or 1,
1291 placement=crbdm.Placement(
1292 nodeAffinity=crbdm.NodeAffinity(
1293 requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
1294 nodeSelectorTerms=crbdm.NodeSelectorTermsList(
1295 [
1296 placement_spec_to_node_selector(spec.placement, all_hosts)
1297 ]
1298 )
1299 )
1300 )
1301 )
1302 )
1303 )
1304 def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror:
1305 new.spec.count = spec.placement.count or 1
1306 new.spec.placement = crbdm.Placement(
1307 nodeAffinity=crbdm.NodeAffinity(
1308 requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
1309 nodeSelectorTerms=crbdm.NodeSelectorTermsList(
1310 [
1311 placement_spec_to_node_selector(spec.placement, all_hosts)
1312 ]
1313 )
1314 )
1315 )
1316 )
1317 return new
1318 self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror)
1319 def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
1320 current_json = self.rook_api_get(
1321 "{}/{}".format(crd_name, cr_name)
1322 )
1323
1324 current = crd.from_json(current_json)
1325 new = crd.from_json(current_json) # no deepcopy.
1326
1327 new = func(current, new)
1328
1329 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
1330
1331 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
1332
1333 if len(patch) == 0:
1334 return "No change"
1335
1336 try:
1337 self.rook_api_patch(
1338 "{}/{}".format(crd_name, cr_name),
1339 body=patch)
1340 except ApiException as e:
1341 log.exception("API exception: {0}".format(e))
1342 raise ApplyException(
1343 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
1344
1345 return "Success"
1346
1347 def _create_or_patch(self,
1348 crd: Type,
1349 crd_name: str,
1350 cr_name: str,
1351 update_func: Callable[[CrdClassT], CrdClassT],
1352 create_func: Callable[[], CrdClassT]) -> str:
1353 try:
1354 current_json = self.rook_api_get(
1355 "{}/{}".format(crd_name, cr_name)
1356 )
1357 except ApiException as e:
1358 if e.status == 404:
1359 current_json = None
1360 else:
1361 raise
1362
1363 if current_json:
1364 new = crd.from_json(current_json) # no deepcopy.
1365
1366 new = update_func(new)
1367
1368 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
1369
1370 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
1371
1372 if len(patch) == 0:
1373 return "No change"
1374
1375 try:
1376 self.rook_api_patch(
1377 "{}/{}".format(crd_name, cr_name),
1378 body=patch)
1379 except ApiException as e:
1380 log.exception("API exception: {0}".format(e))
1381 raise ApplyException(
1382 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
1383 return "Updated"
1384 else:
1385 new = create_func()
1386 with self.ignore_409("{} {} already exists".format(crd_name,
1387 cr_name)):
1388 self.rook_api_post("{}/".format(crd_name),
1389 body=new.to_json())
1390 return "Created"
1391 def get_ceph_image(self) -> str:
1392 try:
1393 api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
1394 label_selector="app=rook-ceph-mon",
1395 timeout_seconds=10)
1396 if api_response.items:
1397 return api_response.items[-1].spec.containers[0].image
1398 else:
1399 raise orchestrator.OrchestratorError(
1400 "Error getting ceph image. Cluster without monitors")
1401 except ApiException as e:
1402 raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
1403
1404
1405 def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
1406 operation_id = str(hash(loc))
1407 message = ""
1408
1409 # job definition
1410 job_metadata = client.V1ObjectMeta(name=operation_id,
1411 namespace= self.rook_env.namespace,
1412 labels={"ident": operation_id})
1413 pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
1414 pod_container = client.V1Container(name="ceph-lsmcli-command",
1415 security_context=client.V1SecurityContext(privileged=True),
1416 image=self.get_ceph_image(),
1417 command=["lsmcli",],
1418 args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
1419 '--path', loc.path or loc.dev,],
1420 volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
1421 client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
1422 pod_spec = client.V1PodSpec(containers=[pod_container],
1423 active_deadline_seconds=30, # Max time to terminate pod
1424 restart_policy="Never",
1425 node_selector= {"kubernetes.io/hostname": loc.host},
1426 volumes=[client.V1Volume(name="devices",
1427 host_path=client.V1HostPathVolumeSource(path="/dev")),
1428 client.V1Volume(name="run-udev",
1429 host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
1430 pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
1431 spec=pod_spec)
1432 job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
1433 ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
1434 backoff_limit=0,
1435 template=pod_template)
1436 job = client.V1Job(api_version="batch/v1",
1437 kind="Job",
1438 metadata=job_metadata,
1439 spec=job_spec)
1440
1441 # delete previous job if it exists
1442 try:
1443 try:
1444 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
1445 self.rook_env.namespace,
1446 propagation_policy="Background")
1447 except ApiException as e:
1448 if e.status != 404: # No problem if the job does not exist
1449 raise
1450
1451 # wait until the job is not present
1452 deleted = False
1453 retries = 0
1454 while not deleted and retries < 10:
1455 api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
1456 label_selector="ident=%s" % operation_id,
1457 timeout_seconds=10)
1458 deleted = not api_response.items
1459 if retries > 5:
1460 sleep(0.1)
1461 retries += 1
1462 if retries == 10 and not deleted:
1463 raise orchestrator.OrchestratorError(
1464 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
1465 on, loc.host, loc.path or loc.dev, operation_id))
1466
1467 # create the job
1468 api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
1469
1470 # get the result
1471 finished = False
1472 while not finished:
1473 api_response = self.batchV1_api.read_namespaced_job(operation_id,
1474 self.rook_env.namespace)
1475 finished = api_response.status.succeeded or api_response.status.failed
1476 if finished:
1477 message = api_response.status.conditions[-1].message
1478
1479 # get the result of the lsmcli command
1480 api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
1481 label_selector="ident=%s" % operation_id,
1482 timeout_seconds=10)
1483 if api_response.items:
1484 pod_name = api_response.items[-1].metadata.name
1485 message = self.coreV1_api.read_namespaced_pod_log(pod_name,
1486 self.rook_env.namespace)
1487
1488 except ApiException as e:
1489 log.exception('K8s API failed. {}'.format(e))
1490 raise
1491
1492 # Finally, delete the job.
1493 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
1494 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
1495 try:
1496 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
1497 self.rook_env.namespace,
1498 propagation_policy="Background")
1499 except ApiException as e:
1500 if e.status != 404: # No problem if the job does not exist
1501 raise
1502
1503 return message
1504
1505 def blink_light(self, ident_fault, on, locs):
1506 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
1507 return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]
1508
1509 def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem:
1510 all_hostnames = [hs.hostname for hs in all_hosts]
1511 res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList())
1512 if spec.host_pattern and spec.host_pattern != "*":
1513 raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements")
1514 if spec.label:
1515 res.matchExpressions.append(
1516 ccl.MatchExpressionsItem(
1517 key="ceph-label/" + spec.label,
1518 operator="Exists"
1519 )
1520 )
1521 if spec.hosts:
1522 host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames]
1523 res.matchExpressions.append(
1524 ccl.MatchExpressionsItem(
1525 key="kubernetes.io/hostname",
1526 operator="In",
1527 values=ccl.CrdObjectList(host_list)
1528 )
1529 )
1530 if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern):
1531 res.matchExpressions.append(
1532 ccl.MatchExpressionsItem(
1533 key="kubernetes.io/hostname",
1534 operator="Exists",
1535 )
1536 )
1537 return res
1538
1539 def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec:
1540 res = PlacementSpec()
1541 for expression in node_selector.matchExpressions:
1542 if expression.key.startswith("ceph-label/"):
1543 res.label = expression.key.split('/')[1]
1544 elif expression.key == "kubernetes.io/hostname":
1545 if expression.operator == "Exists":
1546 res.host_pattern = "*"
1547 elif expression.operator == "In":
1548 res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
1549 return res