]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/rook_cluster.py
import ceph 16.2.6
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
1 """
2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
5 call methods.
6
7 This module is runnable outside of ceph-mgr, useful for testing.
8 """
9 import datetime
10 import threading
11 import logging
12 import json
13 from contextlib import contextmanager
14 from time import sleep
15
16 import jsonpatch
17 from urllib.parse import urljoin
18
19 # Optional kubernetes imports to enable MgrModule.can_run
20 # to behave cleanly.
21 from urllib3.exceptions import ProtocolError
22
23 from ceph.deployment.drive_group import DriveGroupSpec
24 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec
25 from ceph.utils import datetime_now
26 from mgr_util import merge_dicts
27
28 from typing import Optional, TypeVar, List, Callable, Any, cast, Generic, \
29 Iterable, Dict, Iterator, Type
30
31 try:
32 from kubernetes import client, watch
33 from kubernetes.client.rest import ApiException
34 except ImportError:
35 class ApiException(Exception): # type: ignore
36 status = 0
37
38 from .rook_client.ceph import cephfilesystem as cfs
39 from .rook_client.ceph import cephnfs as cnfs
40 from .rook_client.ceph import cephobjectstore as cos
41 from .rook_client.ceph import cephcluster as ccl
42 from .rook_client._helper import CrdClass
43
44
45 import orchestrator
46
47
48 try:
49 from rook.module import RookEnv
50 except ImportError:
51 pass # just used for type checking.
52
53
54 T = TypeVar('T')
55 FuncT = TypeVar('FuncT', bound=Callable)
56
57 CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
58
59
60 log = logging.getLogger(__name__)
61
62
63 def __urllib3_supports_read_chunked() -> bool:
64 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
65 # than required by kubernetes-client
66 try:
67 from urllib3.response import HTTPResponse
68 return hasattr(HTTPResponse, 'read_chunked')
69 except ImportError:
70 return False
71
72
73 _urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
74
75 class ApplyException(orchestrator.OrchestratorError):
76 """
77 For failures to update the Rook CRDs, usually indicating
78 some kind of interference between our attempted update
79 and other conflicting activity.
80 """
81
82
83 def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
84 def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
85 t = threading.Thread(target=f, args=args, kwargs=kwargs)
86 t.start()
87 return t
88
89 return cast(Callable[..., threading.Thread], wrapper)
90
91
92 class KubernetesResource(Generic[T]):
93 def __init__(self, api_func: Callable, **kwargs: Any) -> None:
94 """
95 Generic kubernetes Resource parent class
96
97 The api fetch and watch methods should be common across resource types,
98
99 Exceptions in the runner thread are propagated to the caller.
100
101 :param api_func: kubernetes client api function that is passed to the watcher
102 :param filter_func: signature: ``(Item) -> bool``.
103 """
104 self.kwargs = kwargs
105 self.api_func = api_func
106
107 # ``_items`` is accessed by different threads. I assume assignment is atomic.
108 self._items: Dict[str, T] = dict()
109 self.thread = None # type: Optional[threading.Thread]
110 self.exception: Optional[Exception] = None
111 if not _urllib3_supports_read_chunked:
112 logging.info('urllib3 is too old. Fallback to full fetches')
113
114 def _fetch(self) -> str:
115 """ Execute the requested api method as a one-off fetch"""
116 response = self.api_func(**self.kwargs)
117 # metadata is a client.V1ListMeta object type
118 metadata = response.metadata # type: client.V1ListMeta
119 self._items = {item.metadata.name: item for item in response.items}
120 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
121 return metadata.resource_version
122
123 @property
124 def items(self) -> Iterable[T]:
125 """
126 Returns the items of the request.
127 Creates the watcher as a side effect.
128 :return:
129 """
130 if self.exception:
131 e = self.exception
132 self.exception = None
133 raise e # Propagate the exception to the user.
134 if not self.thread or not self.thread.is_alive():
135 resource_version = self._fetch()
136 if _urllib3_supports_read_chunked:
137 # Start a thread which will use the kubernetes watch client against a resource
138 log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
139 self.thread = self._watch(resource_version)
140
141 return self._items.values()
142
143 @threaded
144 def _watch(self, res_ver: Optional[str]) -> None:
145 """ worker thread that runs the kubernetes watch """
146
147 self.exception = None
148
149 w = watch.Watch()
150
151 try:
152 # execute generator to continually watch resource for changes
153 for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
154 **self.kwargs):
155 self.health = ''
156 item = event['object']
157 try:
158 name = item.metadata.name
159 except AttributeError:
160 raise AttributeError(
161 "{} doesn't contain a metadata.name. Unable to track changes".format(
162 self.api_func))
163
164 log.info('{} event: {}'.format(event['type'], name))
165
166 if event['type'] in ('ADDED', 'MODIFIED'):
167 self._items = merge_dicts(self._items, {name: item})
168 elif event['type'] == 'DELETED':
169 self._items = {k:v for k,v in self._items.items() if k != name}
170 elif event['type'] == 'BOOKMARK':
171 pass
172 elif event['type'] == 'ERROR':
173 raise ApiException(str(event))
174 else:
175 raise KeyError('Unknown watch event {}'.format(event['type']))
176 except ProtocolError as e:
177 if 'Connection broken' in str(e):
178 log.info('Connection reset.')
179 return
180 raise
181 except ApiException as e:
182 log.exception('K8s API failed. {}'.format(self.api_func))
183 self.exception = e
184 raise
185 except Exception as e:
186 log.exception("Watcher failed. ({})".format(self.api_func))
187 self.exception = e
188 raise
189
190
191 class RookCluster(object):
192 # import of client.CoreV1Api must be optional at import time.
193 # Instead allow mgr/rook to be imported anyway.
194 def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'):
195 self.rook_env = rook_env # type: RookEnv
196 self.coreV1_api = coreV1_api # client.CoreV1Api
197 self.batchV1_api = batchV1_api
198
199 # TODO: replace direct k8s calls with Rook API calls
200 # when they're implemented
201 self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
202 namespace=self.rook_env.operator_namespace,
203 label_selector="app=rook-discover")
204
205 self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
206 namespace=self.rook_env.namespace,
207 label_selector="rook_cluster={0}".format(
208 self.rook_env.namespace))
209 self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
210
211 def rook_url(self, path: str) -> str:
212 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
213 self.rook_env.crd_version, self.rook_env.namespace)
214 return urljoin(prefix, path)
215
216 def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
217 full_path = self.rook_url(path)
218 log.debug("[%s] %s" % (verb, full_path))
219
220 return self.coreV1_api.api_client.call_api(
221 full_path,
222 verb,
223 auth_settings=['BearerToken'],
224 response_type="object",
225 _return_http_data_only=True,
226 _preload_content=True,
227 **kwargs)
228
229 def rook_api_get(self, path: str, **kwargs: Any) -> Any:
230 return self.rook_api_call("GET", path, **kwargs)
231
232 def rook_api_delete(self, path: str) -> Any:
233 return self.rook_api_call("DELETE", path)
234
235 def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
236 return self.rook_api_call("PATCH", path,
237 header_params={"Content-Type": "application/json-patch+json"},
238 **kwargs)
239
240 def rook_api_post(self, path: str, **kwargs: Any) -> Any:
241 return self.rook_api_call("POST", path, **kwargs)
242
243 def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]:
244 def predicate(item: client.V1ConfigMapList) -> bool:
245 if nodenames is not None:
246 return item.metadata.labels['rook.io/node'] in nodenames
247 else:
248 return True
249
250 try:
251 result = [i for i in self.inventory_maps.items if predicate(i)]
252 except ApiException as dummy_e:
253 log.exception("Failed to fetch device metadata")
254 raise
255
256 nodename_to_devices = {}
257 for i in result:
258 drives = json.loads(i.data['devices'])
259 nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
260
261 return nodename_to_devices
262
263 def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
264 #
265 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
266 # URL for the instance within that cluster. If the fetch fails, just
267 # return None.
268 #
269 try:
270 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
271 except ApiException as e:
272 log.info("Unable to fetch cephnfs object: {}".format(e.status))
273 return None
274
275 pool = ceph_nfs['spec']['rados']['pool']
276 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
277
278 if namespace == None:
279 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
280 else:
281 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
282 return url
283
284 def describe_pods(self,
285 service_type: Optional[str],
286 service_id: Optional[str],
287 nodename: Optional[str]) -> List[Dict[str, Any]]:
288 """
289 Go query the k8s API about deployment, containers related to this
290 filesystem
291
292 Example Rook Pod labels for a mgr daemon:
293 Labels: app=rook-ceph-mgr
294 pod-template-hash=2171958073
295 rook_cluster=rook
296 And MDS containers additionally have `rook_filesystem` label
297
298 Label filter is rook_cluster=<cluster namespace>
299 rook_file_system=<self.fs_name>
300 """
301 def predicate(item):
302 # type: (client.V1Pod) -> bool
303 metadata = item.metadata
304 if service_type is not None:
305 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
306 return False
307
308 if service_id is not None:
309 try:
310 k, v = {
311 "mds": ("rook_file_system", service_id),
312 "osd": ("ceph-osd-id", service_id),
313 "mon": ("mon", service_id),
314 "mgr": ("mgr", service_id),
315 "ceph_nfs": ("ceph_nfs", service_id),
316 "rgw": ("ceph_rgw", service_id),
317 }[service_type]
318 except KeyError:
319 raise orchestrator.OrchestratorValidationError(
320 '{} not supported'.format(service_type))
321 if metadata.labels[k] != v:
322 return False
323
324 if nodename is not None:
325 if item.spec.node_name != nodename:
326 return False
327 return True
328
329 refreshed = datetime_now()
330 pods = [i for i in self.rook_pods.items if predicate(i)]
331
332 pods_summary = []
333 prefix = 'sha256:'
334
335 for p in pods:
336 d = p.to_dict()
337
338 image_name = None
339 for c in d['spec']['containers']:
340 # look at the first listed container in the pod...
341 image_name = c['image']
342 break
343
344 image_id = d['status']['container_statuses'][0]['image_id']
345 image_id = image_id.split(prefix)[1] if prefix in image_id else image_id
346
347 s = {
348 "name": d['metadata']['name'],
349 "hostname": d['spec']['node_name'],
350 "labels": d['metadata']['labels'],
351 'phase': d['status']['phase'],
352 'container_image_name': image_name,
353 'container_image_id': image_id,
354 'refreshed': refreshed,
355 # these may get set below...
356 'started': None,
357 'created': None,
358 }
359
360 # note: we want UTC
361 if d['metadata'].get('creation_timestamp', None):
362 s['created'] = d['metadata']['creation_timestamp'].astimezone(
363 tz=datetime.timezone.utc)
364 if d['status'].get('start_time', None):
365 s['started'] = d['status']['start_time'].astimezone(
366 tz=datetime.timezone.utc)
367
368 pods_summary.append(s)
369
370 return pods_summary
371
372 def remove_pods(self, names: List[str]) -> List[str]:
373 pods = [i for i in self.rook_pods.items]
374 for p in pods:
375 d = p.to_dict()
376 daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
377 daemon_id = d['metadata']['labels']['ceph_daemon_id']
378 name = daemon_type + '.' + daemon_id
379 if name in names:
380 self.coreV1_api.delete_namespaced_pod(
381 d['metadata']['name'],
382 self.rook_env.namespace,
383 body=client.V1DeleteOptions()
384 )
385 return [f'Removed Pod {n}' for n in names]
386
387 def get_node_names(self) -> List[str]:
388 return [i.metadata.name for i in self.nodes.items]
389
390 @contextmanager
391 def ignore_409(self, what: str) -> Iterator[None]:
392 try:
393 yield
394 except ApiException as e:
395 if e.status == 409:
396 # Idempotent, succeed.
397 log.info("{} already exists".format(what))
398 else:
399 raise
400
401 def apply_filesystem(self, spec: ServiceSpec) -> str:
402 # TODO use spec.placement
403 # TODO warn if spec.extended has entries we don't kow how
404 # to action.
405 def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem:
406 new.spec.metadataServer.activeCount = spec.placement.count or 1
407 return new
408
409 def _create_fs() -> cfs.CephFilesystem:
410 return cfs.CephFilesystem(
411 apiVersion=self.rook_env.api_name,
412 metadata=dict(
413 name=spec.service_id,
414 namespace=self.rook_env.namespace,
415 ),
416 spec=cfs.Spec(
417 metadataServer=cfs.MetadataServer(
418 activeCount=spec.placement.count or 1,
419 activeStandby=True
420 )
421 )
422 )
423 assert spec.service_id is not None
424 return self._create_or_patch(
425 cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
426 _update_fs, _create_fs)
427
428 def apply_objectstore(self, spec: RGWSpec) -> str:
429 assert spec.service_id is not None
430
431 name = spec.service_id
432
433 if '.' in spec.service_id:
434 # rook does not like . in the name. this is could
435 # there because it is a legacy rgw spec that was named
436 # like $realm.$zone, except that I doubt there were any
437 # users of this code. Instead, focus on future users and
438 # translate . to - (fingers crossed!) instead.
439 name = spec.service_id.replace('.', '-')
440
441 # FIXME: pass realm and/or zone through to the CR
442
443 def _create_zone() -> cos.CephObjectStore:
444 port = None
445 secure_port = None
446 if spec.ssl:
447 secure_port = spec.get_port()
448 else:
449 port = spec.get_port()
450 return cos.CephObjectStore(
451 apiVersion=self.rook_env.api_name,
452 metadata=dict(
453 name=name,
454 namespace=self.rook_env.namespace
455 ),
456 spec=cos.Spec(
457 gateway=cos.Gateway(
458 type='s3',
459 port=port,
460 securePort=secure_port,
461 instances=spec.placement.count or 1,
462 )
463 )
464 )
465
466 def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
467 new.spec.gateway.instances = spec.placement.count or 1
468 return new
469
470 return self._create_or_patch(
471 cos.CephObjectStore, 'cephobjectstores', name,
472 _update_zone, _create_zone)
473
474 def apply_nfsgw(self, spec: NFSServiceSpec) -> str:
475 # TODO use spec.placement
476 # TODO warn if spec.extended has entries we don't kow how
477 # to action.
478 # TODO Number of pods should be based on the list of hosts in the
479 # PlacementSpec.
480 count = spec.placement.count or 1
481 def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS:
482 new.spec.server.active = count
483 return new
484
485 def _create_nfs() -> cnfs.CephNFS:
486 rook_nfsgw = cnfs.CephNFS(
487 apiVersion=self.rook_env.api_name,
488 metadata=dict(
489 name=spec.service_id,
490 namespace=self.rook_env.namespace,
491 ),
492 spec=cnfs.Spec(
493 rados=cnfs.Rados(
494 pool=spec.pool
495 ),
496 server=cnfs.Server(
497 active=count
498 )
499 )
500 )
501
502 if spec.namespace:
503 rook_nfsgw.spec.rados.namespace = spec.namespace
504
505 return rook_nfsgw
506
507 assert spec.service_id is not None
508 return self._create_or_patch(cnfs.CephNFS, 'cephnfses', spec.service_id,
509 _update_nfs, _create_nfs)
510
511 def rm_service(self, rooktype: str, service_id: str) -> str:
512
513 objpath = "{0}/{1}".format(rooktype, service_id)
514
515 try:
516 self.rook_api_delete(objpath)
517 except ApiException as e:
518 if e.status == 404:
519 log.info("{0} service '{1}' does not exist".format(rooktype, service_id))
520 # Idempotent, succeed.
521 else:
522 raise
523
524 return f'Removed {objpath}'
525
526 def can_create_osd(self) -> bool:
527 current_cluster = self.rook_api_get(
528 "cephclusters/{0}".format(self.rook_env.cluster_name))
529 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
530
531 # If useAllNodes is set, then Rook will not be paying attention
532 # to anything we put in 'nodes', so can't do OSD creation.
533 return not use_all_nodes
534
535 def node_exists(self, node_name: str) -> bool:
536 return node_name in self.get_node_names()
537
538 def update_mon_count(self, newcount: Optional[int]) -> str:
539 def _update_mon_count(current, new):
540 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
541 if newcount is None:
542 raise orchestrator.OrchestratorError('unable to set mon count to None')
543 new.spec.mon.count = newcount
544 return new
545 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
546
547 def add_osds(self, drive_group, matching_hosts):
548 # type: (DriveGroupSpec, List[str]) -> str
549 """
550 Rook currently (0.8) can only do single-drive OSDs, so we
551 treat all drive groups as just a list of individual OSDs.
552 """
553 block_devices = drive_group.data_devices.paths if drive_group.data_devices else []
554 directories = drive_group.data_directories
555
556 assert drive_group.objectstore in ("bluestore", "filestore")
557
558 def _add_osds(current_cluster, new_cluster):
559 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
560
561 # FIXME: this is all not really atomic, because jsonpatch doesn't
562 # let us do "test" operations that would check if items with
563 # matching names were in existing lists.
564
565 if not hasattr(new_cluster.spec.storage, 'nodes'):
566 new_cluster.spec.storage.nodes = ccl.NodesList()
567
568 current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
569 matching_host = matching_hosts[0]
570
571 if matching_host not in [n.name for n in current_nodes]:
572 pd = ccl.NodesItem(
573 name=matching_host,
574 config=ccl.Config(
575 storeType=drive_group.objectstore
576 )
577 )
578
579 if block_devices:
580 pd.devices = ccl.DevicesList(
581 ccl.DevicesItem(name=d.path) for d in block_devices
582 )
583 if directories:
584 pd.directories = ccl.DirectoriesList(
585 ccl.DirectoriesItem(path=p) for p in directories
586 )
587 new_cluster.spec.storage.nodes.append(pd)
588 else:
589 for _node in new_cluster.spec.storage.nodes:
590 current_node = _node # type: ccl.NodesItem
591 if current_node.name == matching_host:
592 if block_devices:
593 if not hasattr(current_node, 'devices'):
594 current_node.devices = ccl.DevicesList()
595 new_devices = list(set(block_devices) - set([d.name for d in current_node.devices]))
596 current_node.devices.extend(
597 ccl.DevicesItem(name=n.path) for n in new_devices
598 )
599
600 if directories:
601 if not hasattr(current_node, 'directories'):
602 current_node.directories = ccl.DirectoriesList()
603 new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
604 current_node.directories.extend(
605 ccl.DirectoriesItem(path=n) for n in new_dirs
606 )
607 return new_cluster
608
609 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
610
611 def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
612 current_json = self.rook_api_get(
613 "{}/{}".format(crd_name, cr_name)
614 )
615
616 current = crd.from_json(current_json)
617 new = crd.from_json(current_json) # no deepcopy.
618
619 new = func(current, new)
620
621 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
622
623 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
624
625 if len(patch) == 0:
626 return "No change"
627
628 try:
629 self.rook_api_patch(
630 "{}/{}".format(crd_name, cr_name),
631 body=patch)
632 except ApiException as e:
633 log.exception("API exception: {0}".format(e))
634 raise ApplyException(
635 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
636
637 return "Success"
638
639 def _create_or_patch(self,
640 crd: Type,
641 crd_name: str,
642 cr_name: str,
643 update_func: Callable[[CrdClassT], CrdClassT],
644 create_func: Callable[[], CrdClassT]) -> str:
645 try:
646 current_json = self.rook_api_get(
647 "{}/{}".format(crd_name, cr_name)
648 )
649 except ApiException as e:
650 if e.status == 404:
651 current_json = None
652 else:
653 raise
654
655 if current_json:
656 new = crd.from_json(current_json) # no deepcopy.
657
658 new = update_func(new)
659
660 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
661
662 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
663
664 if len(patch) == 0:
665 return "No change"
666
667 try:
668 self.rook_api_patch(
669 "{}/{}".format(crd_name, cr_name),
670 body=patch)
671 except ApiException as e:
672 log.exception("API exception: {0}".format(e))
673 raise ApplyException(
674 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
675 return "Updated"
676 else:
677 new = create_func()
678 with self.ignore_409("{} {} already exists".format(crd_name,
679 cr_name)):
680 self.rook_api_post("{}/".format(crd_name),
681 body=new.to_json())
682 return "Created"
683 def get_ceph_image(self) -> str:
684 try:
685 api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
686 label_selector="app=rook-ceph-mon",
687 timeout_seconds=10)
688 if api_response.items:
689 return api_response.items[-1].spec.containers[0].image
690 else:
691 raise orchestrator.OrchestratorError(
692 "Error getting ceph image. Cluster without monitors")
693 except ApiException as e:
694 raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
695
696
697 def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
698 operation_id = str(hash(loc))
699 message = ""
700
701 # job definition
702 job_metadata = client.V1ObjectMeta(name=operation_id,
703 namespace= self.rook_env.namespace,
704 labels={"ident": operation_id})
705 pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
706 pod_container = client.V1Container(name="ceph-lsmcli-command",
707 security_context=client.V1SecurityContext(privileged=True),
708 image=self.get_ceph_image(),
709 command=["lsmcli",],
710 args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
711 '--path', loc.path or loc.dev,],
712 volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
713 client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
714 pod_spec = client.V1PodSpec(containers=[pod_container],
715 active_deadline_seconds=30, # Max time to terminate pod
716 restart_policy="Never",
717 node_selector= {"kubernetes.io/hostname": loc.host},
718 volumes=[client.V1Volume(name="devices",
719 host_path=client.V1HostPathVolumeSource(path="/dev")),
720 client.V1Volume(name="run-udev",
721 host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
722 pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
723 spec=pod_spec)
724 job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
725 ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
726 backoff_limit=0,
727 template=pod_template)
728 job = client.V1Job(api_version="batch/v1",
729 kind="Job",
730 metadata=job_metadata,
731 spec=job_spec)
732
733 # delete previous job if it exists
734 try:
735 try:
736 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
737 self.rook_env.namespace,
738 propagation_policy="Background")
739 except ApiException as e:
740 if e.status != 404: # No problem if the job does not exist
741 raise
742
743 # wait until the job is not present
744 deleted = False
745 retries = 0
746 while not deleted and retries < 10:
747 api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
748 label_selector="ident=%s" % operation_id,
749 timeout_seconds=10)
750 deleted = not api_response.items
751 if retries > 5:
752 sleep(0.1)
753 retries += 1
754 if retries == 10 and not deleted:
755 raise orchestrator.OrchestratorError(
756 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
757 on, loc.host, loc.path or loc.dev, operation_id))
758
759 # create the job
760 api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
761
762 # get the result
763 finished = False
764 while not finished:
765 api_response = self.batchV1_api.read_namespaced_job(operation_id,
766 self.rook_env.namespace)
767 finished = api_response.status.succeeded or api_response.status.failed
768 if finished:
769 message = api_response.status.conditions[-1].message
770
771 # get the result of the lsmcli command
772 api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
773 label_selector="ident=%s" % operation_id,
774 timeout_seconds=10)
775 if api_response.items:
776 pod_name = api_response.items[-1].metadata.name
777 message = self.coreV1_api.read_namespaced_pod_log(pod_name,
778 self.rook_env.namespace)
779
780 except ApiException as e:
781 log.exception('K8s API failed. {}'.format(e))
782 raise
783
784 # Finally, delete the job.
785 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
786 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
787 try:
788 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
789 self.rook_env.namespace,
790 propagation_policy="Background")
791 except ApiException as e:
792 if e.status != 404: # No problem if the job does not exist
793 raise
794
795 return message
796
797 def blink_light(self, ident_fault, on, locs):
798 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
799 return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]