]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/rook_cluster.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
1 """
2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
5 call methods.
6
7 This module is runnable outside of ceph-mgr, useful for testing.
8 """
9 import datetime
10 import threading
11 import logging
12 import json
13 from contextlib import contextmanager
14 from time import sleep
15
16 import jsonpatch
17 from six.moves.urllib.parse import urljoin # pylint: disable=import-error
18
19 # Optional kubernetes imports to enable MgrModule.can_run
20 # to behave cleanly.
21 from urllib3.exceptions import ProtocolError
22
23 from ceph.deployment.drive_group import DriveGroupSpec
24 from ceph.deployment.service_spec import ServiceSpec
25 from mgr_util import merge_dicts
26
27 try:
28 from typing import Optional
29 except ImportError:
30 pass # just for type annotations
31
32 try:
33 from kubernetes import client, watch
34 from kubernetes.client.rest import ApiException
35 except ImportError:
36 class ApiException(Exception): # type: ignore
37 status = 0
38
39 from .rook_client.ceph import cephfilesystem as cfs
40 from .rook_client.ceph import cephnfs as cnfs
41 from .rook_client.ceph import cephobjectstore as cos
42 from .rook_client.ceph import cephcluster as ccl
43
44
45 import orchestrator
46
47
48 try:
49 from rook.module import RookEnv
50 from typing import List, Dict
51 except ImportError:
52 pass # just used for type checking.
53
54 log = logging.getLogger(__name__)
55
56
57 def _urllib3_supports_read_chunked():
58 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
59 # than required by kubernetes-client
60 try:
61 from urllib3.response import HTTPResponse
62 return hasattr(HTTPResponse, 'read_chunked')
63 except ImportError:
64 return False
65
66
67 _urllib3_supports_read_chunked = _urllib3_supports_read_chunked()
68
69 class ApplyException(orchestrator.OrchestratorError):
70 """
71 For failures to update the Rook CRDs, usually indicating
72 some kind of interference between our attempted update
73 and other conflicting activity.
74 """
75
76
77 def threaded(f):
78 def wrapper(*args, **kwargs):
79 t = threading.Thread(target=f, args=args, kwargs=kwargs)
80 t.start()
81 return t
82
83 return wrapper
84
85
86 class KubernetesResource(object):
87 def __init__(self, api_func, **kwargs):
88 """
89 Generic kubernetes Resource parent class
90
91 The api fetch and watch methods should be common across resource types,
92
93 Exceptions in the runner thread are propagated to the caller.
94
95 :param api_func: kubernetes client api function that is passed to the watcher
96 :param filter_func: signature: ``(Item) -> bool``.
97 """
98 self.kwargs = kwargs
99 self.api_func = api_func
100
101 # ``_items`` is accessed by different threads. I assume assignment is atomic.
102 self._items = dict()
103 self.thread = None # type: Optional[threading.Thread]
104 self.exception = None
105 if not _urllib3_supports_read_chunked:
106 logging.info('urllib3 is too old. Fallback to full fetches')
107
108 def _fetch(self):
109 """ Execute the requested api method as a one-off fetch"""
110 response = self.api_func(**self.kwargs)
111 # metadata is a client.V1ListMeta object type
112 metadata = response.metadata # type: client.V1ListMeta
113 self._items = {item.metadata.name: item for item in response.items}
114 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
115 return metadata.resource_version
116
117 @property
118 def items(self):
119 """
120 Returns the items of the request.
121 Creates the watcher as a side effect.
122 :return:
123 """
124 if self.exception:
125 e = self.exception
126 self.exception = None
127 raise e # Propagate the exception to the user.
128 if not self.thread or not self.thread.is_alive():
129 resource_version = self._fetch()
130 if _urllib3_supports_read_chunked:
131 # Start a thread which will use the kubernetes watch client against a resource
132 log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
133 self.thread = self._watch(resource_version)
134
135 return self._items.values()
136
137 @threaded
138 def _watch(self, res_ver):
139 """ worker thread that runs the kubernetes watch """
140
141 self.exception = None
142
143 w = watch.Watch()
144
145 try:
146 # execute generator to continually watch resource for changes
147 for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
148 **self.kwargs):
149 self.health = ''
150 item = event['object']
151 try:
152 name = item.metadata.name
153 except AttributeError:
154 raise AttributeError(
155 "{} doesn't contain a metadata.name. Unable to track changes".format(
156 self.api_func))
157
158 log.info('{} event: {}'.format(event['type'], name))
159
160 if event['type'] in ('ADDED', 'MODIFIED'):
161 self._items = merge_dicts(self._items, {name: item})
162 elif event['type'] == 'DELETED':
163 self._items = {k:v for k,v in self._items.items() if k != name}
164 elif event['type'] == 'BOOKMARK':
165 pass
166 elif event['type'] == 'ERROR':
167 raise ApiException(str(event))
168 else:
169 raise KeyError('Unknown watch event {}'.format(event['type']))
170 except ProtocolError as e:
171 if 'Connection broken' in str(e):
172 log.info('Connection reset.')
173 return
174 raise
175 except ApiException as e:
176 log.exception('K8s API failed. {}'.format(self.api_func))
177 self.exception = e
178 raise
179 except Exception as e:
180 log.exception("Watcher failed. ({})".format(self.api_func))
181 self.exception = e
182 raise
183
184
185 class RookCluster(object):
186 def __init__(self, coreV1_api, batchV1_api, rook_env):
187 self.rook_env = rook_env # type: RookEnv
188 self.coreV1_api = coreV1_api # client.CoreV1Api
189 self.batchV1_api = batchV1_api
190
191 # TODO: replace direct k8s calls with Rook API calls
192 # when they're implemented
193 self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
194 namespace=self.rook_env.operator_namespace,
195 label_selector="app=rook-discover")
196
197 self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
198 namespace=self.rook_env.namespace,
199 label_selector="rook_cluster={0}".format(
200 self.rook_env.cluster_name))
201 self.nodes = KubernetesResource(self.coreV1_api.list_node)
202
203 def rook_url(self, path):
204 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
205 self.rook_env.crd_version, self.rook_env.namespace)
206 return urljoin(prefix, path)
207
208 def rook_api_call(self, verb, path, **kwargs):
209 full_path = self.rook_url(path)
210 log.debug("[%s] %s" % (verb, full_path))
211
212 return self.coreV1_api.api_client.call_api(
213 full_path,
214 verb,
215 auth_settings=['BearerToken'],
216 response_type="object",
217 _return_http_data_only=True,
218 _preload_content=True,
219 **kwargs)
220
221 def rook_api_get(self, path, **kwargs):
222 return self.rook_api_call("GET", path, **kwargs)
223
224 def rook_api_delete(self, path):
225 return self.rook_api_call("DELETE", path)
226
227 def rook_api_patch(self, path, **kwargs):
228 return self.rook_api_call("PATCH", path,
229 header_params={"Content-Type": "application/json-patch+json"},
230 **kwargs)
231
232 def rook_api_post(self, path, **kwargs):
233 return self.rook_api_call("POST", path, **kwargs)
234
235 def get_discovered_devices(self, nodenames=None):
236 def predicate(item):
237 if nodenames is not None:
238 return item.metadata.labels['rook.io/node'] in nodenames
239 else:
240 return True
241
242 try:
243 result = [i for i in self.inventory_maps.items if predicate(i)]
244 except ApiException as dummy_e:
245 log.exception("Failed to fetch device metadata")
246 raise
247
248 nodename_to_devices = {}
249 for i in result:
250 drives = json.loads(i.data['devices'])
251 nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
252
253 return nodename_to_devices
254
255 def get_nfs_conf_url(self, nfs_cluster, instance):
256 #
257 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
258 # URL for the instance within that cluster. If the fetch fails, just
259 # return None.
260 #
261 try:
262 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
263 except ApiException as e:
264 log.info("Unable to fetch cephnfs object: {}".format(e.status))
265 return None
266
267 pool = ceph_nfs['spec']['rados']['pool']
268 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
269
270 if namespace == None:
271 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
272 else:
273 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
274 return url
275
276 def describe_pods(self, service_type, service_id, nodename):
277 """
278 Go query the k8s API about deployment, containers related to this
279 filesystem
280
281 Example Rook Pod labels for a mgr daemon:
282 Labels: app=rook-ceph-mgr
283 pod-template-hash=2171958073
284 rook_cluster=rook
285 And MDS containers additionally have `rook_filesystem` label
286
287 Label filter is rook_cluster=<cluster name>
288 rook_file_system=<self.fs_name>
289 """
290 def predicate(item):
291 # type: (client.V1Pod) -> bool
292 metadata = item.metadata
293 if service_type is not None:
294 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
295 return False
296
297 if service_id is not None:
298 try:
299 k, v = {
300 "mds": ("rook_file_system", service_id),
301 "osd": ("ceph-osd-id", service_id),
302 "mon": ("mon", service_id),
303 "mgr": ("mgr", service_id),
304 "ceph_nfs": ("ceph_nfs", service_id),
305 "rgw": ("ceph_rgw", service_id),
306 }[service_type]
307 except KeyError:
308 raise orchestrator.OrchestratorValidationError(
309 '{} not supported'.format(service_type))
310 if metadata.labels[k] != v:
311 return False
312
313 if nodename is not None:
314 if item.spec.node_name != nodename:
315 return False
316 return True
317
318 refreshed = datetime.datetime.utcnow()
319 pods = [i for i in self.rook_pods.items if predicate(i)]
320
321 pods_summary = []
322
323 for p in pods:
324 d = p.to_dict()
325
326 image_name = None
327 for c in d['spec']['containers']:
328 # look at the first listed container in the pod...
329 image_name = c['image']
330 break
331
332 s = {
333 "name": d['metadata']['name'],
334 "hostname": d['spec']['node_name'],
335 "labels": d['metadata']['labels'],
336 'phase': d['status']['phase'],
337 'container_image_name': image_name,
338 'refreshed': refreshed,
339 # these may get set below...
340 'started': None,
341 'created': None,
342 }
343
344 # note: we want UTC but no tzinfo
345 if d['metadata'].get('creation_timestamp', None):
346 s['created'] = d['metadata']['creation_timestamp'].astimezone(
347 tz=datetime.timezone.utc).replace(tzinfo=None)
348 if d['status'].get('start_time', None):
349 s['started'] = d['status']['start_time'].astimezone(
350 tz=datetime.timezone.utc).replace(tzinfo=None)
351
352 pods_summary.append(s)
353
354 return pods_summary
355
356 def remove_pods(self, names):
357 pods = [i for i in self.rook_pods.items]
358 num = 0
359 for p in pods:
360 d = p.to_dict()
361 daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
362 daemon_id = d['metadata']['labels']['ceph_daemon_id']
363 name = daemon_type + '.' + daemon_id
364 if name in names:
365 self.coreV1_api.delete_namespaced_pod(
366 d['metadata']['name'],
367 self.rook_env.namespace,
368 body=client.V1DeleteOptions()
369 )
370 num += 1
371 return "Removed %d pods" % num
372
373 def get_node_names(self):
374 return [i.metadata.name for i in self.nodes.items]
375
376 @contextmanager
377 def ignore_409(self, what):
378 try:
379 yield
380 except ApiException as e:
381 if e.status == 409:
382 # Idempotent, succeed.
383 log.info("{} already exists".format(what))
384 else:
385 raise
386
387 def apply_filesystem(self, spec):
388 # type: (ServiceSpec) -> None
389 # TODO use spec.placement
390 # TODO warn if spec.extended has entries we don't kow how
391 # to action.
392 def _update_fs(current, new):
393 # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
394 new.spec.metadataServer.activeCount = spec.placement.count or 1
395 return new
396
397 def _create_fs():
398 # type: () -> cfs.CephFilesystem
399 return cfs.CephFilesystem(
400 apiVersion=self.rook_env.api_name,
401 metadata=dict(
402 name=spec.service_id,
403 namespace=self.rook_env.namespace,
404 ),
405 spec=cfs.Spec(
406 metadataServer=cfs.MetadataServer(
407 activeCount=spec.placement.count or 1,
408 activeStandby=True
409 )
410 )
411 )
412 return self._create_or_patch(
413 cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
414 _update_fs, _create_fs)
415
416 def apply_objectstore(self, spec):
417
418 # FIXME: service_id is $realm.$zone, but rook uses realm
419 # $crname and zone $crname. The '.' will confuse kubernetes.
420 # For now, assert that realm==zone.
421 (realm, zone) = spec.service_id.split('.', 1)
422 assert realm == zone
423 assert spec.subcluster is None
424 name = realm
425
426 def _create_zone():
427 # type: () -> cos.CephObjectStore
428 port = None
429 secure_port = None
430 if spec.ssl:
431 secure_port = spec.get_port()
432 else:
433 port = spec.get_port()
434 return cos.CephObjectStore(
435 apiVersion=self.rook_env.api_name,
436 metadata=dict(
437 name=name,
438 namespace=self.rook_env.namespace
439 ),
440 spec=cos.Spec(
441 gateway=cos.Gateway(
442 type='s3',
443 port=port,
444 securePort=secure_port,
445 instances=spec.placement.count or 1,
446 )
447 )
448 )
449
450 def _update_zone(current, new):
451 new.spec.gateway.instances = spec.placement.count or 1
452 return new
453
454 return self._create_or_patch(
455 cos.CephObjectStore, 'cephobjectstores', name,
456 _update_zone, _create_zone)
457
458 def add_nfsgw(self, spec):
459 # TODO use spec.placement
460 # TODO warn if spec.extended has entries we don't kow how
461 # to action.
462
463 rook_nfsgw = cnfs.CephNFS(
464 apiVersion=self.rook_env.api_name,
465 metadata=dict(
466 name=spec.service_id,
467 namespace=self.rook_env.namespace,
468 ),
469 spec=cnfs.Spec(
470 rados=cnfs.Rados(
471 pool=spec.pool
472 ),
473 server=cnfs.Server(
474 active=spec.placement.count
475 )
476 )
477 )
478
479 if spec.namespace:
480 rook_nfsgw.spec.rados.namespace = spec.namespace
481
482 with self.ignore_409("NFS cluster '{0}' already exists".format(spec.service_id)):
483 self.rook_api_post("cephnfses/", body=rook_nfsgw.to_json())
484
485 def rm_service(self, rooktype, service_id):
486
487 objpath = "{0}/{1}".format(rooktype, service_id)
488
489 try:
490 self.rook_api_delete(objpath)
491 except ApiException as e:
492 if e.status == 404:
493 log.info("{0} service '{1}' does not exist".format(rooktype, service_id))
494 # Idempotent, succeed.
495 else:
496 raise
497
498 def can_create_osd(self):
499 current_cluster = self.rook_api_get(
500 "cephclusters/{0}".format(self.rook_env.cluster_name))
501 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
502
503 # If useAllNodes is set, then Rook will not be paying attention
504 # to anything we put in 'nodes', so can't do OSD creation.
505 return not use_all_nodes
506
507 def node_exists(self, node_name):
508 return node_name in self.get_node_names()
509
510 def update_mon_count(self, newcount):
511 def _update_mon_count(current, new):
512 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
513 new.spec.mon.count = newcount
514 return new
515 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
516
517 def update_nfs_count(self, svc_id, newcount):
518 def _update_nfs_count(current, new):
519 # type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
520 new.spec.server.active = newcount
521 return new
522 return self._patch(cnfs.CephNFS, 'cephnfses',svc_id, _update_nfs_count)
523
524 def add_osds(self, drive_group, matching_hosts):
525 # type: (DriveGroupSpec, List[str]) -> str
526 """
527 Rook currently (0.8) can only do single-drive OSDs, so we
528 treat all drive groups as just a list of individual OSDs.
529 """
530 block_devices = drive_group.data_devices.paths if drive_group.data_devices else []
531 directories = drive_group.data_directories
532
533 assert drive_group.objectstore in ("bluestore", "filestore")
534
535 def _add_osds(current_cluster, new_cluster):
536 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
537
538 # FIXME: this is all not really atomic, because jsonpatch doesn't
539 # let us do "test" operations that would check if items with
540 # matching names were in existing lists.
541
542 if not hasattr(new_cluster.spec.storage, 'nodes'):
543 new_cluster.spec.storage.nodes = ccl.NodesList()
544
545 current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
546 matching_host = matching_hosts[0]
547
548 if matching_host not in [n.name for n in current_nodes]:
549 pd = ccl.NodesItem(
550 name=matching_host,
551 config=ccl.Config(
552 storeType=drive_group.objectstore
553 )
554 )
555
556 if block_devices:
557 pd.devices = ccl.DevicesList(
558 ccl.DevicesItem(name=d.path) for d in block_devices
559 )
560 if directories:
561 pd.directories = ccl.DirectoriesList(
562 ccl.DirectoriesItem(path=p) for p in directories
563 )
564 new_cluster.spec.storage.nodes.append(pd)
565 else:
566 for _node in new_cluster.spec.storage.nodes:
567 current_node = _node # type: ccl.NodesItem
568 if current_node.name == matching_host:
569 if block_devices:
570 if not hasattr(current_node, 'devices'):
571 current_node.devices = ccl.DevicesList()
572 new_devices = list(set(block_devices) - set([d.name for d in current_node.devices]))
573 current_node.devices.extend(
574 ccl.DevicesItem(name=n.path) for n in new_devices
575 )
576
577 if directories:
578 if not hasattr(current_node, 'directories'):
579 current_node.directories = ccl.DirectoriesList()
580 new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
581 current_node.directories.extend(
582 ccl.DirectoriesItem(path=n) for n in new_dirs
583 )
584 return new_cluster
585
586 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
587
588 def _patch(self, crd, crd_name, cr_name, func):
589 current_json = self.rook_api_get(
590 "{}/{}".format(crd_name, cr_name)
591 )
592
593 current = crd.from_json(current_json)
594 new = crd.from_json(current_json) # no deepcopy.
595
596 new = func(current, new)
597
598 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
599
600 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
601
602 if len(patch) == 0:
603 return "No change"
604
605 try:
606 self.rook_api_patch(
607 "{}/{}".format(crd_name, cr_name),
608 body=patch)
609 except ApiException as e:
610 log.exception("API exception: {0}".format(e))
611 raise ApplyException(
612 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
613
614 return "Success"
615
616 def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func):
617 try:
618 current_json = self.rook_api_get(
619 "{}/{}".format(crd_name, cr_name)
620 )
621 except ApiException as e:
622 if e.status == 404:
623 current_json = None
624 else:
625 raise
626
627 if current_json:
628 current = crd.from_json(current_json)
629 new = crd.from_json(current_json) # no deepcopy.
630
631 new = update_func(current, new)
632
633 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
634
635 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
636
637 if len(patch) == 0:
638 return "No change"
639
640 try:
641 self.rook_api_patch(
642 "{}/{}".format(crd_name, cr_name),
643 body=patch)
644 except ApiException as e:
645 log.exception("API exception: {0}".format(e))
646 raise ApplyException(
647 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
648 return "Updated"
649 else:
650 new = create_func()
651 with self.ignore_409("{} {} already exists".format(crd_name,
652 cr_name)):
653 self.rook_api_post("{}/".format(crd_name),
654 body=new.to_json())
655 return "Created"
656 def get_ceph_image(self) -> str:
657 try:
658 api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
659 label_selector="app=rook-ceph-mon",
660 timeout_seconds=10)
661 if api_response.items:
662 return api_response.items[-1].spec.containers[0].image
663 else:
664 raise orchestrator.OrchestratorError(
665 "Error getting ceph image. Cluster without monitors")
666 except ApiException as e:
667 raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
668
669
670 def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
671 operation_id = str(hash(loc))
672 message = ""
673
674 # job definition
675 job_metadata = client.V1ObjectMeta(name=operation_id,
676 namespace= self.rook_env.namespace,
677 labels={"ident": operation_id})
678 pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
679 pod_container = client.V1Container(name="ceph-lsmcli-command",
680 security_context=client.V1SecurityContext(privileged=True),
681 image=self.get_ceph_image(),
682 command=["lsmcli",],
683 args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
684 '--path', loc.path or loc.dev,],
685 volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
686 client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
687 pod_spec = client.V1PodSpec(containers=[pod_container],
688 active_deadline_seconds=30, # Max time to terminate pod
689 restart_policy="Never",
690 node_selector= {"kubernetes.io/hostname": loc.host},
691 volumes=[client.V1Volume(name="devices",
692 host_path=client.V1HostPathVolumeSource(path="/dev")),
693 client.V1Volume(name="run-udev",
694 host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
695 pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
696 spec=pod_spec)
697 job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
698 ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
699 backoff_limit=0,
700 template=pod_template)
701 job = client.V1Job(api_version="batch/v1",
702 kind="Job",
703 metadata=job_metadata,
704 spec=job_spec)
705
706 # delete previous job if it exists
707 try:
708 try:
709 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
710 self.rook_env.namespace,
711 propagation_policy="Background")
712 except ApiException as e:
713 if e.status != 404: # No problem if the job does not exist
714 raise
715
716 # wait until the job is not present
717 deleted = False
718 retries = 0
719 while not deleted and retries < 10:
720 api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
721 label_selector="ident=%s" % operation_id,
722 timeout_seconds=10)
723 deleted = not api_response.items
724 if retries > 5:
725 sleep(0.1)
726 ++retries
727 if retries == 10 and not deleted:
728 raise orchestrator.OrchestratorError(
729 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
730 on, loc.host, loc.path or loc.dev, operation_id))
731
732 # create the job
733 api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
734
735 # get the result
736 finished = False
737 while not finished:
738 api_response = self.batchV1_api.read_namespaced_job(operation_id,
739 self.rook_env.namespace)
740 finished = api_response.status.succeeded or api_response.status.failed
741 if finished:
742 message = api_response.status.conditions[-1].message
743
744 # get the result of the lsmcli command
745 api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
746 label_selector="ident=%s" % operation_id,
747 timeout_seconds=10)
748 if api_response.items:
749 pod_name = api_response.items[-1].metadata.name
750 message = self.coreV1_api.read_namespaced_pod_log(pod_name,
751 self.rook_env.namespace)
752
753 except ApiException as e:
754 log.exception('K8s API failed. {}'.format(e))
755 raise
756
757 # Finally, delete the job.
758 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
759 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
760 try:
761 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
762 self.rook_env.namespace,
763 propagation_policy="Background")
764 except ApiException as e:
765 if e.status != 404: # No problem if the job does not exist
766 raise
767
768 return message
769
770 def blink_light(self, ident_fault, on, locs):
771 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
772 return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]