]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rook/rook_cluster.py
import ceph 15.2.13
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
CommitLineData
11fdf7f2
TL
1"""
2This module wrap's Rook + Kubernetes APIs to expose the calls
3needed to implement an orchestrator module. While the orchestrator
4module exposes an async API, this module simply exposes blocking API
5call methods.
6
7This module is runnable outside of ceph-mgr, useful for testing.
8"""
9f95a23c
TL
9import datetime
10import threading
11fdf7f2
TL
11import logging
12import json
13from contextlib import contextmanager
801d1391 14from time import sleep
11fdf7f2 15
9f95a23c 16import jsonpatch
11fdf7f2
TL
17from six.moves.urllib.parse import urljoin # pylint: disable=import-error
18
19# Optional kubernetes imports to enable MgrModule.can_run
20# to behave cleanly.
9f95a23c
TL
21from urllib3.exceptions import ProtocolError
22
23from ceph.deployment.drive_group import DriveGroupSpec
24from ceph.deployment.service_spec import ServiceSpec
7f7e6c64 25from ceph.utils import datetime_now
9f95a23c
TL
26from mgr_util import merge_dicts
27
28try:
29 from typing import Optional
30except ImportError:
31 pass # just for type annotations
32
11fdf7f2 33try:
801d1391 34 from kubernetes import client, watch
11fdf7f2
TL
35 from kubernetes.client.rest import ApiException
36except ImportError:
9f95a23c
TL
37 class ApiException(Exception): # type: ignore
38 status = 0
39
40from .rook_client.ceph import cephfilesystem as cfs
41from .rook_client.ceph import cephnfs as cnfs
42from .rook_client.ceph import cephobjectstore as cos
43from .rook_client.ceph import cephcluster as ccl
44
45
46import orchestrator
47
11fdf7f2
TL
48
49try:
11fdf7f2 50 from rook.module import RookEnv
9f95a23c 51 from typing import List, Dict
11fdf7f2
TL
52except ImportError:
53 pass # just used for type checking.
54
11fdf7f2
TL
55log = logging.getLogger(__name__)
56
57
9f95a23c
TL
58def _urllib3_supports_read_chunked():
59 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
60 # than required by kubernetes-client
61 try:
62 from urllib3.response import HTTPResponse
63 return hasattr(HTTPResponse, 'read_chunked')
64 except ImportError:
65 return False
66
67
68_urllib3_supports_read_chunked = _urllib3_supports_read_chunked()
69
70class ApplyException(orchestrator.OrchestratorError):
11fdf7f2
TL
71 """
72 For failures to update the Rook CRDs, usually indicating
73 some kind of interference between our attempted update
74 and other conflicting activity.
75 """
76
77
9f95a23c
TL
78def threaded(f):
79 def wrapper(*args, **kwargs):
80 t = threading.Thread(target=f, args=args, kwargs=kwargs)
81 t.start()
82 return t
83
84 return wrapper
85
86
87class KubernetesResource(object):
88 def __init__(self, api_func, **kwargs):
89 """
90 Generic kubernetes Resource parent class
91
92 The api fetch and watch methods should be common across resource types,
93
94 Exceptions in the runner thread are propagated to the caller.
95
96 :param api_func: kubernetes client api function that is passed to the watcher
97 :param filter_func: signature: ``(Item) -> bool``.
98 """
99 self.kwargs = kwargs
100 self.api_func = api_func
101
102 # ``_items`` is accessed by different threads. I assume assignment is atomic.
103 self._items = dict()
104 self.thread = None # type: Optional[threading.Thread]
105 self.exception = None
106 if not _urllib3_supports_read_chunked:
107 logging.info('urllib3 is too old. Fallback to full fetches')
108
109 def _fetch(self):
110 """ Execute the requested api method as a one-off fetch"""
111 response = self.api_func(**self.kwargs)
801d1391
TL
112 # metadata is a client.V1ListMeta object type
113 metadata = response.metadata # type: client.V1ListMeta
9f95a23c
TL
114 self._items = {item.metadata.name: item for item in response.items}
115 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
116 return metadata.resource_version
117
118 @property
119 def items(self):
120 """
121 Returns the items of the request.
122 Creates the watcher as a side effect.
123 :return:
124 """
125 if self.exception:
126 e = self.exception
127 self.exception = None
128 raise e # Propagate the exception to the user.
129 if not self.thread or not self.thread.is_alive():
130 resource_version = self._fetch()
131 if _urllib3_supports_read_chunked:
132 # Start a thread which will use the kubernetes watch client against a resource
133 log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
134 self.thread = self._watch(resource_version)
135
136 return self._items.values()
137
138 @threaded
139 def _watch(self, res_ver):
140 """ worker thread that runs the kubernetes watch """
141
142 self.exception = None
143
144 w = watch.Watch()
145
146 try:
147 # execute generator to continually watch resource for changes
148 for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
149 **self.kwargs):
150 self.health = ''
151 item = event['object']
152 try:
153 name = item.metadata.name
154 except AttributeError:
155 raise AttributeError(
156 "{} doesn't contain a metadata.name. Unable to track changes".format(
157 self.api_func))
158
159 log.info('{} event: {}'.format(event['type'], name))
160
161 if event['type'] in ('ADDED', 'MODIFIED'):
162 self._items = merge_dicts(self._items, {name: item})
163 elif event['type'] == 'DELETED':
164 self._items = {k:v for k,v in self._items.items() if k != name}
165 elif event['type'] == 'BOOKMARK':
166 pass
167 elif event['type'] == 'ERROR':
168 raise ApiException(str(event))
169 else:
170 raise KeyError('Unknown watch event {}'.format(event['type']))
171 except ProtocolError as e:
172 if 'Connection broken' in str(e):
173 log.info('Connection reset.')
174 return
175 raise
176 except ApiException as e:
177 log.exception('K8s API failed. {}'.format(self.api_func))
178 self.exception = e
179 raise
180 except Exception as e:
181 log.exception("Watcher failed. ({})".format(self.api_func))
182 self.exception = e
183 raise
184
185
11fdf7f2 186class RookCluster(object):
801d1391 187 def __init__(self, coreV1_api, batchV1_api, rook_env):
11fdf7f2 188 self.rook_env = rook_env # type: RookEnv
801d1391
TL
189 self.coreV1_api = coreV1_api # client.CoreV1Api
190 self.batchV1_api = batchV1_api
9f95a23c
TL
191
192 # TODO: replace direct k8s calls with Rook API calls
193 # when they're implemented
801d1391 194 self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
9f95a23c
TL
195 namespace=self.rook_env.operator_namespace,
196 label_selector="app=rook-discover")
197
801d1391 198 self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
9f95a23c
TL
199 namespace=self.rook_env.namespace,
200 label_selector="rook_cluster={0}".format(
201 self.rook_env.cluster_name))
801d1391 202 self.nodes = KubernetesResource(self.coreV1_api.list_node)
11fdf7f2
TL
203
204 def rook_url(self, path):
205 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
81eedcae 206 self.rook_env.crd_version, self.rook_env.namespace)
11fdf7f2
TL
207 return urljoin(prefix, path)
208
209 def rook_api_call(self, verb, path, **kwargs):
210 full_path = self.rook_url(path)
211 log.debug("[%s] %s" % (verb, full_path))
212
801d1391 213 return self.coreV1_api.api_client.call_api(
11fdf7f2
TL
214 full_path,
215 verb,
216 auth_settings=['BearerToken'],
217 response_type="object",
218 _return_http_data_only=True,
219 _preload_content=True,
220 **kwargs)
221
222 def rook_api_get(self, path, **kwargs):
223 return self.rook_api_call("GET", path, **kwargs)
224
225 def rook_api_delete(self, path):
226 return self.rook_api_call("DELETE", path)
227
228 def rook_api_patch(self, path, **kwargs):
229 return self.rook_api_call("PATCH", path,
230 header_params={"Content-Type": "application/json-patch+json"},
231 **kwargs)
232
233 def rook_api_post(self, path, **kwargs):
234 return self.rook_api_call("POST", path, **kwargs)
235
236 def get_discovered_devices(self, nodenames=None):
9f95a23c
TL
237 def predicate(item):
238 if nodenames is not None:
239 return item.metadata.labels['rook.io/node'] in nodenames
240 else:
241 return True
11fdf7f2
TL
242
243 try:
9f95a23c 244 result = [i for i in self.inventory_maps.items if predicate(i)]
801d1391 245 except ApiException as dummy_e:
9f95a23c 246 log.exception("Failed to fetch device metadata")
11fdf7f2
TL
247 raise
248
249 nodename_to_devices = {}
9f95a23c 250 for i in result:
11fdf7f2
TL
251 drives = json.loads(i.data['devices'])
252 nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
253
254 return nodename_to_devices
255
256 def get_nfs_conf_url(self, nfs_cluster, instance):
257 #
258 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
259 # URL for the instance within that cluster. If the fetch fails, just
260 # return None.
261 #
262 try:
263 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
264 except ApiException as e:
265 log.info("Unable to fetch cephnfs object: {}".format(e.status))
266 return None
267
268 pool = ceph_nfs['spec']['rados']['pool']
269 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
270
271 if namespace == None:
272 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
273 else:
274 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
275 return url
276
11fdf7f2 277 def describe_pods(self, service_type, service_id, nodename):
9f95a23c
TL
278 """
279 Go query the k8s API about deployment, containers related to this
280 filesystem
11fdf7f2 281
9f95a23c
TL
282 Example Rook Pod labels for a mgr daemon:
283 Labels: app=rook-ceph-mgr
284 pod-template-hash=2171958073
285 rook_cluster=rook
286 And MDS containers additionally have `rook_filesystem` label
11fdf7f2 287
9f95a23c
TL
288 Label filter is rook_cluster=<cluster name>
289 rook_file_system=<self.fs_name>
290 """
291 def predicate(item):
801d1391 292 # type: (client.V1Pod) -> bool
9f95a23c
TL
293 metadata = item.metadata
294 if service_type is not None:
295 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
296 return False
297
298 if service_id is not None:
299 try:
300 k, v = {
301 "mds": ("rook_file_system", service_id),
302 "osd": ("ceph-osd-id", service_id),
303 "mon": ("mon", service_id),
304 "mgr": ("mgr", service_id),
305 "ceph_nfs": ("ceph_nfs", service_id),
306 "rgw": ("ceph_rgw", service_id),
307 }[service_type]
308 except KeyError:
309 raise orchestrator.OrchestratorValidationError(
310 '{} not supported'.format(service_type))
311 if metadata.labels[k] != v:
312 return False
313
314 if nodename is not None:
315 if item.spec.node_name != nodename:
316 return False
317 return True
11fdf7f2 318
7f7e6c64 319 refreshed = datetime_now()
9f95a23c 320 pods = [i for i in self.rook_pods.items if predicate(i)]
11fdf7f2
TL
321
322 pods_summary = []
323
9f95a23c 324 for p in pods:
11fdf7f2 325 d = p.to_dict()
9f95a23c
TL
326
327 image_name = None
328 for c in d['spec']['containers']:
329 # look at the first listed container in the pod...
330 image_name = c['image']
331 break
332
333 s = {
11fdf7f2 334 "name": d['metadata']['name'],
9f95a23c
TL
335 "hostname": d['spec']['node_name'],
336 "labels": d['metadata']['labels'],
337 'phase': d['status']['phase'],
338 'container_image_name': image_name,
339 'refreshed': refreshed,
340 # these may get set below...
341 'started': None,
342 'created': None,
343 }
344
7f7e6c64 345 # note: we want UTC
9f95a23c
TL
346 if d['metadata'].get('creation_timestamp', None):
347 s['created'] = d['metadata']['creation_timestamp'].astimezone(
7f7e6c64 348 tz=datetime.timezone.utc)
9f95a23c
TL
349 if d['status'].get('start_time', None):
350 s['started'] = d['status']['start_time'].astimezone(
7f7e6c64 351 tz=datetime.timezone.utc)
9f95a23c
TL
352
353 pods_summary.append(s)
11fdf7f2
TL
354
355 return pods_summary
356
9f95a23c
TL
357 def remove_pods(self, names):
358 pods = [i for i in self.rook_pods.items]
359 num = 0
360 for p in pods:
361 d = p.to_dict()
362 daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
363 daemon_id = d['metadata']['labels']['ceph_daemon_id']
364 name = daemon_type + '.' + daemon_id
365 if name in names:
801d1391 366 self.coreV1_api.delete_namespaced_pod(
9f95a23c
TL
367 d['metadata']['name'],
368 self.rook_env.namespace,
801d1391 369 body=client.V1DeleteOptions()
9f95a23c
TL
370 )
371 num += 1
372 return "Removed %d pods" % num
373
374 def get_node_names(self):
375 return [i.metadata.name for i in self.nodes.items]
376
11fdf7f2
TL
377 @contextmanager
378 def ignore_409(self, what):
379 try:
380 yield
381 except ApiException as e:
382 if e.status == 409:
383 # Idempotent, succeed.
384 log.info("{} already exists".format(what))
385 else:
386 raise
387
9f95a23c
TL
388 def apply_filesystem(self, spec):
389 # type: (ServiceSpec) -> None
11fdf7f2
TL
390 # TODO use spec.placement
391 # TODO warn if spec.extended has entries we don't kow how
392 # to action.
9f95a23c
TL
393 def _update_fs(current, new):
394 # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
395 new.spec.metadataServer.activeCount = spec.placement.count or 1
396 return new
397
398 def _create_fs():
399 # type: () -> cfs.CephFilesystem
400 return cfs.CephFilesystem(
401 apiVersion=self.rook_env.api_name,
402 metadata=dict(
403 name=spec.service_id,
404 namespace=self.rook_env.namespace,
405 ),
406 spec=cfs.Spec(
407 metadataServer=cfs.MetadataServer(
408 activeCount=spec.placement.count or 1,
409 activeStandby=True
410 )
411 )
412 )
413 return self._create_or_patch(
414 cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
415 _update_fs, _create_fs)
416
417 def apply_objectstore(self, spec):
418
419 # FIXME: service_id is $realm.$zone, but rook uses realm
420 # $crname and zone $crname. The '.' will confuse kubernetes.
421 # For now, assert that realm==zone.
422 (realm, zone) = spec.service_id.split('.', 1)
423 assert realm == zone
424 assert spec.subcluster is None
425 name = realm
426
427 def _create_zone():
428 # type: () -> cos.CephObjectStore
429 port = None
430 secure_port = None
431 if spec.ssl:
432 secure_port = spec.get_port()
433 else:
434 port = spec.get_port()
435 return cos.CephObjectStore(
436 apiVersion=self.rook_env.api_name,
437 metadata=dict(
438 name=name,
439 namespace=self.rook_env.namespace
440 ),
441 spec=cos.Spec(
442 gateway=cos.Gateway(
443 type='s3',
444 port=port,
445 securePort=secure_port,
446 instances=spec.placement.count or 1,
447 )
448 )
449 )
450
451 def _update_zone(current, new):
452 new.spec.gateway.instances = spec.placement.count or 1
453 return new
454
455 return self._create_or_patch(
456 cos.CephObjectStore, 'cephobjectstores', name,
457 _update_zone, _create_zone)
11fdf7f2
TL
458
459 def add_nfsgw(self, spec):
460 # TODO use spec.placement
461 # TODO warn if spec.extended has entries we don't kow how
462 # to action.
463
9f95a23c
TL
464 rook_nfsgw = cnfs.CephNFS(
465 apiVersion=self.rook_env.api_name,
466 metadata=dict(
467 name=spec.service_id,
468 namespace=self.rook_env.namespace,
469 ),
470 spec=cnfs.Spec(
471 rados=cnfs.Rados(
472 pool=spec.pool
473 ),
474 server=cnfs.Server(
475 active=spec.placement.count
476 )
477 )
478 )
479
480 if spec.namespace:
481 rook_nfsgw.spec.rados.namespace = spec.namespace
482
483 with self.ignore_409("NFS cluster '{0}' already exists".format(spec.service_id)):
484 self.rook_api_post("cephnfses/", body=rook_nfsgw.to_json())
485
486 def rm_service(self, rooktype, service_id):
11fdf7f2
TL
487
488 objpath = "{0}/{1}".format(rooktype, service_id)
489
490 try:
491 self.rook_api_delete(objpath)
492 except ApiException as e:
493 if e.status == 404:
9f95a23c 494 log.info("{0} service '{1}' does not exist".format(rooktype, service_id))
11fdf7f2
TL
495 # Idempotent, succeed.
496 else:
497 raise
498
499 def can_create_osd(self):
500 current_cluster = self.rook_api_get(
501 "cephclusters/{0}".format(self.rook_env.cluster_name))
502 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
503
504 # If useAllNodes is set, then Rook will not be paying attention
505 # to anything we put in 'nodes', so can't do OSD creation.
506 return not use_all_nodes
507
508 def node_exists(self, node_name):
9f95a23c 509 return node_name in self.get_node_names()
11fdf7f2
TL
510
511 def update_mon_count(self, newcount):
9f95a23c
TL
512 def _update_mon_count(current, new):
513 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
514 new.spec.mon.count = newcount
515 return new
516 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
11fdf7f2
TL
517
518 def update_nfs_count(self, svc_id, newcount):
9f95a23c
TL
519 def _update_nfs_count(current, new):
520 # type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
521 new.spec.server.active = newcount
522 return new
523 return self._patch(cnfs.CephNFS, 'cephnfses',svc_id, _update_nfs_count)
11fdf7f2 524
e306af50 525 def add_osds(self, drive_group, matching_hosts):
9f95a23c 526 # type: (DriveGroupSpec, List[str]) -> str
11fdf7f2
TL
527 """
528 Rook currently (0.8) can only do single-drive OSDs, so we
529 treat all drive groups as just a list of individual OSDs.
530 """
9f95a23c 531 block_devices = drive_group.data_devices.paths if drive_group.data_devices else []
11fdf7f2
TL
532 directories = drive_group.data_directories
533
534 assert drive_group.objectstore in ("bluestore", "filestore")
535
9f95a23c
TL
536 def _add_osds(current_cluster, new_cluster):
537 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
538
539 # FIXME: this is all not really atomic, because jsonpatch doesn't
540 # let us do "test" operations that would check if items with
541 # matching names were in existing lists.
542
543 if not hasattr(new_cluster.spec.storage, 'nodes'):
544 new_cluster.spec.storage.nodes = ccl.NodesList()
545
546 current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
e306af50 547 matching_host = matching_hosts[0]
9f95a23c
TL
548
549 if matching_host not in [n.name for n in current_nodes]:
550 pd = ccl.NodesItem(
551 name=matching_host,
552 config=ccl.Config(
553 storeType=drive_group.objectstore
554 )
555 )
556
557 if block_devices:
558 pd.devices = ccl.DevicesList(
559 ccl.DevicesItem(name=d.path) for d in block_devices
560 )
561 if directories:
562 pd.directories = ccl.DirectoriesList(
563 ccl.DirectoriesItem(path=p) for p in directories
564 )
565 new_cluster.spec.storage.nodes.append(pd)
566 else:
567 for _node in new_cluster.spec.storage.nodes:
568 current_node = _node # type: ccl.NodesItem
569 if current_node.name == matching_host:
570 if block_devices:
571 if not hasattr(current_node, 'devices'):
572 current_node.devices = ccl.DevicesList()
573 new_devices = list(set(block_devices) - set([d.name for d in current_node.devices]))
574 current_node.devices.extend(
575 ccl.DevicesItem(name=n.path) for n in new_devices
576 )
577
578 if directories:
579 if not hasattr(current_node, 'directories'):
580 current_node.directories = ccl.DirectoriesList()
581 new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
582 current_node.directories.extend(
583 ccl.DirectoriesItem(path=n) for n in new_dirs
584 )
585 return new_cluster
586
587 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
588
589 def _patch(self, crd, crd_name, cr_name, func):
590 current_json = self.rook_api_get(
591 "{}/{}".format(crd_name, cr_name)
592 )
593
594 current = crd.from_json(current_json)
595 new = crd.from_json(current_json) # no deepcopy.
596
597 new = func(current, new)
598
599 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
600
601 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
11fdf7f2
TL
602
603 if len(patch) == 0:
604 return "No change"
605
606 try:
607 self.rook_api_patch(
9f95a23c 608 "{}/{}".format(crd_name, cr_name),
11fdf7f2
TL
609 body=patch)
610 except ApiException as e:
611 log.exception("API exception: {0}".format(e))
612 raise ApplyException(
9f95a23c 613 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
11fdf7f2
TL
614
615 return "Success"
9f95a23c
TL
616
617 def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func):
618 try:
619 current_json = self.rook_api_get(
620 "{}/{}".format(crd_name, cr_name)
621 )
622 except ApiException as e:
623 if e.status == 404:
624 current_json = None
625 else:
626 raise
627
628 if current_json:
629 current = crd.from_json(current_json)
630 new = crd.from_json(current_json) # no deepcopy.
631
632 new = update_func(current, new)
633
634 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
635
636 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
637
638 if len(patch) == 0:
639 return "No change"
640
641 try:
642 self.rook_api_patch(
643 "{}/{}".format(crd_name, cr_name),
644 body=patch)
645 except ApiException as e:
646 log.exception("API exception: {0}".format(e))
647 raise ApplyException(
648 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
649 return "Updated"
650 else:
651 new = create_func()
652 with self.ignore_409("{} {} already exists".format(crd_name,
653 cr_name)):
654 self.rook_api_post("{}/".format(crd_name),
655 body=new.to_json())
656 return "Created"
801d1391
TL
657 def get_ceph_image(self) -> str:
658 try:
659 api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
660 label_selector="app=rook-ceph-mon",
661 timeout_seconds=10)
662 if api_response.items:
663 return api_response.items[-1].spec.containers[0].image
664 else:
665 raise orchestrator.OrchestratorError(
666 "Error getting ceph image. Cluster without monitors")
667 except ApiException as e:
668 raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
669
670
671 def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
672 operation_id = str(hash(loc))
673 message = ""
674
675 # job definition
676 job_metadata = client.V1ObjectMeta(name=operation_id,
677 namespace= self.rook_env.namespace,
678 labels={"ident": operation_id})
679 pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
680 pod_container = client.V1Container(name="ceph-lsmcli-command",
681 security_context=client.V1SecurityContext(privileged=True),
682 image=self.get_ceph_image(),
683 command=["lsmcli",],
684 args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
685 '--path', loc.path or loc.dev,],
686 volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
687 client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
688 pod_spec = client.V1PodSpec(containers=[pod_container],
689 active_deadline_seconds=30, # Max time to terminate pod
690 restart_policy="Never",
691 node_selector= {"kubernetes.io/hostname": loc.host},
692 volumes=[client.V1Volume(name="devices",
693 host_path=client.V1HostPathVolumeSource(path="/dev")),
694 client.V1Volume(name="run-udev",
695 host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
696 pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
697 spec=pod_spec)
698 job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
699 ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
700 backoff_limit=0,
701 template=pod_template)
702 job = client.V1Job(api_version="batch/v1",
703 kind="Job",
704 metadata=job_metadata,
705 spec=job_spec)
706
707 # delete previous job if it exists
708 try:
709 try:
710 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
711 self.rook_env.namespace,
712 propagation_policy="Background")
713 except ApiException as e:
714 if e.status != 404: # No problem if the job does not exist
715 raise
716
717 # wait until the job is not present
718 deleted = False
719 retries = 0
720 while not deleted and retries < 10:
721 api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
722 label_selector="ident=%s" % operation_id,
723 timeout_seconds=10)
724 deleted = not api_response.items
725 if retries > 5:
726 sleep(0.1)
727 ++retries
728 if retries == 10 and not deleted:
729 raise orchestrator.OrchestratorError(
730 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
731 on, loc.host, loc.path or loc.dev, operation_id))
732
733 # create the job
734 api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
735
736 # get the result
737 finished = False
738 while not finished:
739 api_response = self.batchV1_api.read_namespaced_job(operation_id,
740 self.rook_env.namespace)
741 finished = api_response.status.succeeded or api_response.status.failed
742 if finished:
743 message = api_response.status.conditions[-1].message
744
745 # get the result of the lsmcli command
746 api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
747 label_selector="ident=%s" % operation_id,
748 timeout_seconds=10)
749 if api_response.items:
750 pod_name = api_response.items[-1].metadata.name
751 message = self.coreV1_api.read_namespaced_pod_log(pod_name,
752 self.rook_env.namespace)
753
754 except ApiException as e:
755 log.exception('K8s API failed. {}'.format(e))
756 raise
757
758 # Finally, delete the job.
759 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
760 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
761 try:
762 api_response = self.batchV1_api.delete_namespaced_job(operation_id,
763 self.rook_env.namespace,
764 propagation_policy="Background")
765 except ApiException as e:
766 if e.status != 404: # No problem if the job does not exist
767 raise
768
769 return message
770
771 def blink_light(self, ident_fault, on, locs):
772 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
773 return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]