2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
7 This module is runnable outside of ceph-mgr, useful for testing.
13 from contextlib
import contextmanager
14 from time
import sleep
17 from urllib
.parse
import urljoin
19 # Optional kubernetes imports to enable MgrModule.can_run
21 from urllib3
.exceptions
import ProtocolError
23 from ceph
.deployment
.drive_group
import DriveGroupSpec
24 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
25 from ceph
.utils
import datetime_now
26 from mgr_util
import merge_dicts
28 from typing
import Optional
, TypeVar
, List
, Callable
, Any
, cast
, Generic
, \
29 Iterable
, Dict
, Iterator
, Type
32 from kubernetes
import client
, watch
33 from kubernetes
.client
.rest
import ApiException
35 class ApiException(Exception): # type: ignore
38 from .rook_client
.ceph
import cephfilesystem
as cfs
39 from .rook_client
.ceph
import cephnfs
as cnfs
40 from .rook_client
.ceph
import cephobjectstore
as cos
41 from .rook_client
.ceph
import cephcluster
as ccl
42 from .rook_client
._helper
import CrdClass
49 from rook
.module
import RookEnv
51 pass # just used for type checking.
55 FuncT
= TypeVar('FuncT', bound
=Callable
)
57 CrdClassT
= TypeVar('CrdClassT', bound
=CrdClass
)
60 log
= logging
.getLogger(__name__
)
63 def __urllib3_supports_read_chunked() -> bool:
64 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
65 # than required by kubernetes-client
67 from urllib3
.response
import HTTPResponse
68 return hasattr(HTTPResponse
, 'read_chunked')
73 _urllib3_supports_read_chunked
= __urllib3_supports_read_chunked()
75 class ApplyException(orchestrator
.OrchestratorError
):
77 For failures to update the Rook CRDs, usually indicating
78 some kind of interference between our attempted update
79 and other conflicting activity.
83 def threaded(f
: Callable
[..., None]) -> Callable
[..., threading
.Thread
]:
84 def wrapper(*args
: Any
, **kwargs
: Any
) -> threading
.Thread
:
85 t
= threading
.Thread(target
=f
, args
=args
, kwargs
=kwargs
)
89 return cast(Callable
[..., threading
.Thread
], wrapper
)
92 class KubernetesResource(Generic
[T
]):
93 def __init__(self
, api_func
: Callable
, **kwargs
: Any
) -> None:
95 Generic kubernetes Resource parent class
97 The api fetch and watch methods should be common across resource types,
99 Exceptions in the runner thread are propagated to the caller.
101 :param api_func: kubernetes client api function that is passed to the watcher
102 :param filter_func: signature: ``(Item) -> bool``.
105 self
.api_func
= api_func
107 # ``_items`` is accessed by different threads. I assume assignment is atomic.
108 self
._items
: Dict
[str, T
] = dict()
109 self
.thread
= None # type: Optional[threading.Thread]
110 self
.exception
: Optional
[Exception] = None
111 if not _urllib3_supports_read_chunked
:
112 logging
.info('urllib3 is too old. Fallback to full fetches')
114 def _fetch(self
) -> str:
115 """ Execute the requested api method as a one-off fetch"""
116 response
= self
.api_func(**self
.kwargs
)
117 # metadata is a client.V1ListMeta object type
118 metadata
= response
.metadata
# type: client.V1ListMeta
119 self
._items
= {item
.metadata
.name
: item
for item
in response
.items
}
120 log
.info('Full fetch of {}. result: {}'.format(self
.api_func
, len(self
._items
)))
121 return metadata
.resource_version
124 def items(self
) -> Iterable
[T
]:
126 Returns the items of the request.
127 Creates the watcher as a side effect.
132 self
.exception
= None
133 raise e
# Propagate the exception to the user.
134 if not self
.thread
or not self
.thread
.is_alive():
135 resource_version
= self
._fetch
()
136 if _urllib3_supports_read_chunked
:
137 # Start a thread which will use the kubernetes watch client against a resource
138 log
.debug("Attaching resource watcher for k8s {}".format(self
.api_func
))
139 self
.thread
= self
._watch
(resource_version
)
141 return self
._items
.values()
144 def _watch(self
, res_ver
: Optional
[str]) -> None:
145 """ worker thread that runs the kubernetes watch """
147 self
.exception
= None
152 # execute generator to continually watch resource for changes
153 for event
in w
.stream(self
.api_func
, resource_version
=res_ver
, watch
=True,
156 item
= event
['object']
158 name
= item
.metadata
.name
159 except AttributeError:
160 raise AttributeError(
161 "{} doesn't contain a metadata.name. Unable to track changes".format(
164 log
.info('{} event: {}'.format(event
['type'], name
))
166 if event
['type'] in ('ADDED', 'MODIFIED'):
167 self
._items
= merge_dicts(self
._items
, {name
: item
})
168 elif event
['type'] == 'DELETED':
169 self
._items
= {k
:v
for k
,v
in self
._items
.items() if k
!= name
}
170 elif event
['type'] == 'BOOKMARK':
172 elif event
['type'] == 'ERROR':
173 raise ApiException(str(event
))
175 raise KeyError('Unknown watch event {}'.format(event
['type']))
176 except ProtocolError
as e
:
177 if 'Connection broken' in str(e
):
178 log
.info('Connection reset.')
181 except ApiException
as e
:
182 log
.exception('K8s API failed. {}'.format(self
.api_func
))
185 except Exception as e
:
186 log
.exception("Watcher failed. ({})".format(self
.api_func
))
191 class RookCluster(object):
192 # import of client.CoreV1Api must be optional at import time.
193 # Instead allow mgr/rook to be imported anyway.
194 def __init__(self
, coreV1_api
: 'client.CoreV1Api', batchV1_api
: 'client.BatchV1Api', rook_env
: 'RookEnv'):
195 self
.rook_env
= rook_env
# type: RookEnv
196 self
.coreV1_api
= coreV1_api
# client.CoreV1Api
197 self
.batchV1_api
= batchV1_api
199 # TODO: replace direct k8s calls with Rook API calls
200 # when they're implemented
201 self
.inventory_maps
: KubernetesResource
[client
.V1ConfigMapList
] = KubernetesResource(self
.coreV1_api
.list_namespaced_config_map
,
202 namespace
=self
.rook_env
.operator_namespace
,
203 label_selector
="app=rook-discover")
205 self
.rook_pods
: KubernetesResource
[client
.V1Pod
] = KubernetesResource(self
.coreV1_api
.list_namespaced_pod
,
206 namespace
=self
.rook_env
.namespace
,
207 label_selector
="rook_cluster={0}".format(
208 self
.rook_env
.namespace
))
209 self
.nodes
: KubernetesResource
[client
.V1Node
] = KubernetesResource(self
.coreV1_api
.list_node
)
211 def rook_url(self
, path
: str) -> str:
212 prefix
= "/apis/ceph.rook.io/%s/namespaces/%s/" % (
213 self
.rook_env
.crd_version
, self
.rook_env
.namespace
)
214 return urljoin(prefix
, path
)
216 def rook_api_call(self
, verb
: str, path
: str, **kwargs
: Any
) -> Any
:
217 full_path
= self
.rook_url(path
)
218 log
.debug("[%s] %s" % (verb
, full_path
))
220 return self
.coreV1_api
.api_client
.call_api(
223 auth_settings
=['BearerToken'],
224 response_type
="object",
225 _return_http_data_only
=True,
226 _preload_content
=True,
229 def rook_api_get(self
, path
: str, **kwargs
: Any
) -> Any
:
230 return self
.rook_api_call("GET", path
, **kwargs
)
232 def rook_api_delete(self
, path
: str) -> Any
:
233 return self
.rook_api_call("DELETE", path
)
235 def rook_api_patch(self
, path
: str, **kwargs
: Any
) -> Any
:
236 return self
.rook_api_call("PATCH", path
,
237 header_params
={"Content-Type": "application/json-patch+json"},
240 def rook_api_post(self
, path
: str, **kwargs
: Any
) -> Any
:
241 return self
.rook_api_call("POST", path
, **kwargs
)
243 def get_discovered_devices(self
, nodenames
: Optional
[List
[str]] = None) -> Dict
[str, dict]:
244 def predicate(item
: client
.V1ConfigMapList
) -> bool:
245 if nodenames
is not None:
246 return item
.metadata
.labels
['rook.io/node'] in nodenames
251 result
= [i
for i
in self
.inventory_maps
.items
if predicate(i
)]
252 except ApiException
as dummy_e
:
253 log
.exception("Failed to fetch device metadata")
256 nodename_to_devices
= {}
258 drives
= json
.loads(i
.data
['devices'])
259 nodename_to_devices
[i
.metadata
.labels
['rook.io/node']] = drives
261 return nodename_to_devices
263 def get_nfs_conf_url(self
, nfs_cluster
: str, instance
: str) -> Optional
[str]:
265 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
266 # URL for the instance within that cluster. If the fetch fails, just
270 ceph_nfs
= self
.rook_api_get("cephnfses/{0}".format(nfs_cluster
))
271 except ApiException
as e
:
272 log
.info("Unable to fetch cephnfs object: {}".format(e
.status
))
275 pool
= ceph_nfs
['spec']['rados']['pool']
276 namespace
= ceph_nfs
['spec']['rados'].get('namespace', None)
278 if namespace
== None:
279 url
= "rados://{0}/conf-{1}.{2}".format(pool
, nfs_cluster
, instance
)
281 url
= "rados://{0}/{1}/conf-{2}.{3}".format(pool
, namespace
, nfs_cluster
, instance
)
284 def describe_pods(self
,
285 service_type
: Optional
[str],
286 service_id
: Optional
[str],
287 nodename
: Optional
[str]) -> List
[Dict
[str, Any
]]:
289 Go query the k8s API about deployment, containers related to this
292 Example Rook Pod labels for a mgr daemon:
293 Labels: app=rook-ceph-mgr
294 pod-template-hash=2171958073
296 And MDS containers additionally have `rook_filesystem` label
298 Label filter is rook_cluster=<cluster namespace>
299 rook_file_system=<self.fs_name>
302 # type: (client.V1Pod) -> bool
303 metadata
= item
.metadata
304 if service_type
is not None:
305 if metadata
.labels
['app'] != "rook-ceph-{0}".format(service_type
):
308 if service_id
is not None:
311 "mds": ("rook_file_system", service_id
),
312 "osd": ("ceph-osd-id", service_id
),
313 "mon": ("mon", service_id
),
314 "mgr": ("mgr", service_id
),
315 "ceph_nfs": ("ceph_nfs", service_id
),
316 "rgw": ("ceph_rgw", service_id
),
319 raise orchestrator
.OrchestratorValidationError(
320 '{} not supported'.format(service_type
))
321 if metadata
.labels
[k
] != v
:
324 if nodename
is not None:
325 if item
.spec
.node_name
!= nodename
:
329 refreshed
= datetime_now()
330 pods
= [i
for i
in self
.rook_pods
.items
if predicate(i
)]
339 for c
in d
['spec']['containers']:
340 # look at the first listed container in the pod...
341 image_name
= c
['image']
344 image_id
= d
['status']['container_statuses'][0]['image_id']
345 image_id
= image_id
.split(prefix
)[1] if prefix
in image_id
else image_id
348 "name": d
['metadata']['name'],
349 "hostname": d
['spec']['node_name'],
350 "labels": d
['metadata']['labels'],
351 'phase': d
['status']['phase'],
352 'container_image_name': image_name
,
353 'container_image_id': image_id
,
354 'refreshed': refreshed
,
355 # these may get set below...
361 if d
['metadata'].get('creation_timestamp', None):
362 s
['created'] = d
['metadata']['creation_timestamp'].astimezone(
363 tz
=datetime
.timezone
.utc
)
364 if d
['status'].get('start_time', None):
365 s
['started'] = d
['status']['start_time'].astimezone(
366 tz
=datetime
.timezone
.utc
)
368 pods_summary
.append(s
)
372 def remove_pods(self
, names
: List
[str]) -> List
[str]:
373 pods
= [i
for i
in self
.rook_pods
.items
]
376 daemon_type
= d
['metadata']['labels']['app'].replace('rook-ceph-','')
377 daemon_id
= d
['metadata']['labels']['ceph_daemon_id']
378 name
= daemon_type
+ '.' + daemon_id
380 self
.coreV1_api
.delete_namespaced_pod(
381 d
['metadata']['name'],
382 self
.rook_env
.namespace
,
383 body
=client
.V1DeleteOptions()
385 return [f
'Removed Pod {n}' for n
in names
]
387 def get_node_names(self
) -> List
[str]:
388 return [i
.metadata
.name
for i
in self
.nodes
.items
]
391 def ignore_409(self
, what
: str) -> Iterator
[None]:
394 except ApiException
as e
:
396 # Idempotent, succeed.
397 log
.info("{} already exists".format(what
))
401 def apply_filesystem(self
, spec
: ServiceSpec
) -> str:
402 # TODO use spec.placement
403 # TODO warn if spec.extended has entries we don't kow how
405 def _update_fs(new
: cfs
.CephFilesystem
) -> cfs
.CephFilesystem
:
406 new
.spec
.metadataServer
.activeCount
= spec
.placement
.count
or 1
409 def _create_fs() -> cfs
.CephFilesystem
:
410 return cfs
.CephFilesystem(
411 apiVersion
=self
.rook_env
.api_name
,
413 name
=spec
.service_id
,
414 namespace
=self
.rook_env
.namespace
,
417 metadataServer
=cfs
.MetadataServer(
418 activeCount
=spec
.placement
.count
or 1,
423 assert spec
.service_id
is not None
424 return self
._create
_or
_patch
(
425 cfs
.CephFilesystem
, 'cephfilesystems', spec
.service_id
,
426 _update_fs
, _create_fs
)
428 def apply_objectstore(self
, spec
: RGWSpec
) -> str:
429 assert spec
.service_id
is not None
431 name
= spec
.service_id
433 if '.' in spec
.service_id
:
434 # rook does not like . in the name. this is could
435 # there because it is a legacy rgw spec that was named
436 # like $realm.$zone, except that I doubt there were any
437 # users of this code. Instead, focus on future users and
438 # translate . to - (fingers crossed!) instead.
439 name
= spec
.service_id
.replace('.', '-')
441 # FIXME: pass realm and/or zone through to the CR
443 def _create_zone() -> cos
.CephObjectStore
:
447 secure_port
= spec
.get_port()
449 port
= spec
.get_port()
450 return cos
.CephObjectStore(
451 apiVersion
=self
.rook_env
.api_name
,
454 namespace
=self
.rook_env
.namespace
460 securePort
=secure_port
,
461 instances
=spec
.placement
.count
or 1,
466 def _update_zone(new
: cos
.CephObjectStore
) -> cos
.CephObjectStore
:
467 new
.spec
.gateway
.instances
= spec
.placement
.count
or 1
470 return self
._create
_or
_patch
(
471 cos
.CephObjectStore
, 'cephobjectstores', name
,
472 _update_zone
, _create_zone
)
474 def apply_nfsgw(self
, spec
: NFSServiceSpec
) -> str:
475 # TODO use spec.placement
476 # TODO warn if spec.extended has entries we don't kow how
478 # TODO Number of pods should be based on the list of hosts in the
480 count
= spec
.placement
.count
or 1
481 def _update_nfs(new
: cnfs
.CephNFS
) -> cnfs
.CephNFS
:
482 new
.spec
.server
.active
= count
485 def _create_nfs() -> cnfs
.CephNFS
:
486 rook_nfsgw
= cnfs
.CephNFS(
487 apiVersion
=self
.rook_env
.api_name
,
489 name
=spec
.service_id
,
490 namespace
=self
.rook_env
.namespace
,
503 rook_nfsgw
.spec
.rados
.namespace
= spec
.namespace
507 assert spec
.service_id
is not None
508 return self
._create
_or
_patch
(cnfs
.CephNFS
, 'cephnfses', spec
.service_id
,
509 _update_nfs
, _create_nfs
)
511 def rm_service(self
, rooktype
: str, service_id
: str) -> str:
513 objpath
= "{0}/{1}".format(rooktype
, service_id
)
516 self
.rook_api_delete(objpath
)
517 except ApiException
as e
:
519 log
.info("{0} service '{1}' does not exist".format(rooktype
, service_id
))
520 # Idempotent, succeed.
524 return f
'Removed {objpath}'
526 def can_create_osd(self
) -> bool:
527 current_cluster
= self
.rook_api_get(
528 "cephclusters/{0}".format(self
.rook_env
.cluster_name
))
529 use_all_nodes
= current_cluster
['spec'].get('useAllNodes', False)
531 # If useAllNodes is set, then Rook will not be paying attention
532 # to anything we put in 'nodes', so can't do OSD creation.
533 return not use_all_nodes
535 def node_exists(self
, node_name
: str) -> bool:
536 return node_name
in self
.get_node_names()
538 def update_mon_count(self
, newcount
: Optional
[int]) -> str:
539 def _update_mon_count(current
, new
):
540 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
542 raise orchestrator
.OrchestratorError('unable to set mon count to None')
543 new
.spec
.mon
.count
= newcount
545 return self
._patch
(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _update_mon_count
)
547 def add_osds(self
, drive_group
, matching_hosts
):
548 # type: (DriveGroupSpec, List[str]) -> str
550 Rook currently (0.8) can only do single-drive OSDs, so we
551 treat all drive groups as just a list of individual OSDs.
553 block_devices
= drive_group
.data_devices
.paths
if drive_group
.data_devices
else []
554 directories
= drive_group
.data_directories
556 assert drive_group
.objectstore
in ("bluestore", "filestore")
558 def _add_osds(current_cluster
, new_cluster
):
559 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
561 # FIXME: this is all not really atomic, because jsonpatch doesn't
562 # let us do "test" operations that would check if items with
563 # matching names were in existing lists.
565 if not hasattr(new_cluster
.spec
.storage
, 'nodes'):
566 new_cluster
.spec
.storage
.nodes
= ccl
.NodesList()
568 current_nodes
= getattr(current_cluster
.spec
.storage
, 'nodes', ccl
.NodesList())
569 matching_host
= matching_hosts
[0]
571 if matching_host
not in [n
.name
for n
in current_nodes
]:
575 storeType
=drive_group
.objectstore
580 pd
.devices
= ccl
.DevicesList(
581 ccl
.DevicesItem(name
=d
.path
) for d
in block_devices
584 pd
.directories
= ccl
.DirectoriesList(
585 ccl
.DirectoriesItem(path
=p
) for p
in directories
587 new_cluster
.spec
.storage
.nodes
.append(pd
)
589 for _node
in new_cluster
.spec
.storage
.nodes
:
590 current_node
= _node
# type: ccl.NodesItem
591 if current_node
.name
== matching_host
:
593 if not hasattr(current_node
, 'devices'):
594 current_node
.devices
= ccl
.DevicesList()
595 new_devices
= list(set(block_devices
) - set([d
.name
for d
in current_node
.devices
]))
596 current_node
.devices
.extend(
597 ccl
.DevicesItem(name
=n
.path
) for n
in new_devices
601 if not hasattr(current_node
, 'directories'):
602 current_node
.directories
= ccl
.DirectoriesList()
603 new_dirs
= list(set(directories
) - set([d
.path
for d
in current_node
.directories
]))
604 current_node
.directories
.extend(
605 ccl
.DirectoriesItem(path
=n
) for n
in new_dirs
609 return self
._patch
(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _add_osds
)
611 def _patch(self
, crd
: Type
, crd_name
: str, cr_name
: str, func
: Callable
[[CrdClassT
, CrdClassT
], CrdClassT
]) -> str:
612 current_json
= self
.rook_api_get(
613 "{}/{}".format(crd_name
, cr_name
)
616 current
= crd
.from_json(current_json
)
617 new
= crd
.from_json(current_json
) # no deepcopy.
619 new
= func(current
, new
)
621 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
623 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
630 "{}/{}".format(crd_name
, cr_name
),
632 except ApiException
as e
:
633 log
.exception("API exception: {0}".format(e
))
634 raise ApplyException(
635 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
639 def _create_or_patch(self
,
643 update_func
: Callable
[[CrdClassT
], CrdClassT
],
644 create_func
: Callable
[[], CrdClassT
]) -> str:
646 current_json
= self
.rook_api_get(
647 "{}/{}".format(crd_name
, cr_name
)
649 except ApiException
as e
:
656 new
= crd
.from_json(current_json
) # no deepcopy.
658 new
= update_func(new
)
660 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
662 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
669 "{}/{}".format(crd_name
, cr_name
),
671 except ApiException
as e
:
672 log
.exception("API exception: {0}".format(e
))
673 raise ApplyException(
674 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
678 with self
.ignore_409("{} {} already exists".format(crd_name
,
680 self
.rook_api_post("{}/".format(crd_name
),
683 def get_ceph_image(self
) -> str:
685 api_response
= self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
686 label_selector
="app=rook-ceph-mon",
688 if api_response
.items
:
689 return api_response
.items
[-1].spec
.containers
[0].image
691 raise orchestrator
.OrchestratorError(
692 "Error getting ceph image. Cluster without monitors")
693 except ApiException
as e
:
694 raise orchestrator
.OrchestratorError("Error getting ceph image: {}".format(e
))
697 def _execute_blight_job(self
, ident_fault
: str, on
: bool, loc
: orchestrator
.DeviceLightLoc
) -> str:
698 operation_id
= str(hash(loc
))
702 job_metadata
= client
.V1ObjectMeta(name
=operation_id
,
703 namespace
= self
.rook_env
.namespace
,
704 labels
={"ident": operation_id
})
705 pod_metadata
= client
.V1ObjectMeta(labels
={"ident": operation_id
})
706 pod_container
= client
.V1Container(name
="ceph-lsmcli-command",
707 security_context
=client
.V1SecurityContext(privileged
=True),
708 image
=self
.get_ceph_image(),
710 args
=['local-disk-%s-led-%s' % (ident_fault
,'on' if on
else 'off'),
711 '--path', loc
.path
or loc
.dev
,],
712 volume_mounts
=[client
.V1VolumeMount(name
="devices", mount_path
="/dev"),
713 client
.V1VolumeMount(name
="run-udev", mount_path
="/run/udev")])
714 pod_spec
= client
.V1PodSpec(containers
=[pod_container
],
715 active_deadline_seconds
=30, # Max time to terminate pod
716 restart_policy
="Never",
717 node_selector
= {"kubernetes.io/hostname": loc
.host
},
718 volumes
=[client
.V1Volume(name
="devices",
719 host_path
=client
.V1HostPathVolumeSource(path
="/dev")),
720 client
.V1Volume(name
="run-udev",
721 host_path
=client
.V1HostPathVolumeSource(path
="/run/udev"))])
722 pod_template
= client
.V1PodTemplateSpec(metadata
=pod_metadata
,
724 job_spec
= client
.V1JobSpec(active_deadline_seconds
=60, # Max time to terminate job
725 ttl_seconds_after_finished
=10, # Alfa. Lifetime after finishing (either Complete or Failed)
727 template
=pod_template
)
728 job
= client
.V1Job(api_version
="batch/v1",
730 metadata
=job_metadata
,
733 # delete previous job if it exists
736 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
737 self
.rook_env
.namespace
,
738 propagation_policy
="Background")
739 except ApiException
as e
:
740 if e
.status
!= 404: # No problem if the job does not exist
743 # wait until the job is not present
746 while not deleted
and retries
< 10:
747 api_response
= self
.batchV1_api
.list_namespaced_job(self
.rook_env
.namespace
,
748 label_selector
="ident=%s" % operation_id
,
750 deleted
= not api_response
.items
754 if retries
== 10 and not deleted
:
755 raise orchestrator
.OrchestratorError(
756 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
757 on
, loc
.host
, loc
.path
or loc
.dev
, operation_id
))
760 api_response
= self
.batchV1_api
.create_namespaced_job(self
.rook_env
.namespace
, job
)
765 api_response
= self
.batchV1_api
.read_namespaced_job(operation_id
,
766 self
.rook_env
.namespace
)
767 finished
= api_response
.status
.succeeded
or api_response
.status
.failed
769 message
= api_response
.status
.conditions
[-1].message
771 # get the result of the lsmcli command
772 api_response
=self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
773 label_selector
="ident=%s" % operation_id
,
775 if api_response
.items
:
776 pod_name
= api_response
.items
[-1].metadata
.name
777 message
= self
.coreV1_api
.read_namespaced_pod_log(pod_name
,
778 self
.rook_env
.namespace
)
780 except ApiException
as e
:
781 log
.exception('K8s API failed. {}'.format(e
))
784 # Finally, delete the job.
785 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
786 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
788 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
789 self
.rook_env
.namespace
,
790 propagation_policy
="Background")
791 except ApiException
as e
:
792 if e
.status
!= 404: # No problem if the job does not exist
797 def blink_light(self
, ident_fault
, on
, locs
):
798 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
799 return [self
._execute
_blight
_job
(ident_fault
, on
, loc
) for loc
in locs
]