2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
7 This module is runnable outside of ceph-mgr, useful for testing.
13 from contextlib
import contextmanager
14 from time
import sleep
17 from six
.moves
.urllib
.parse
import urljoin
# pylint: disable=import-error
19 # Optional kubernetes imports to enable MgrModule.can_run
21 from urllib3
.exceptions
import ProtocolError
23 from ceph
.deployment
.drive_group
import DriveGroupSpec
24 from ceph
.deployment
.service_spec
import ServiceSpec
25 from mgr_util
import merge_dicts
28 from typing
import Optional
30 pass # just for type annotations
33 from kubernetes
import client
, watch
34 from kubernetes
.client
.rest
import ApiException
36 class ApiException(Exception): # type: ignore
39 from .rook_client
.ceph
import cephfilesystem
as cfs
40 from .rook_client
.ceph
import cephnfs
as cnfs
41 from .rook_client
.ceph
import cephobjectstore
as cos
42 from .rook_client
.ceph
import cephcluster
as ccl
49 from rook
.module
import RookEnv
50 from typing
import List
, Dict
52 pass # just used for type checking.
54 log
= logging
.getLogger(__name__
)
57 def _urllib3_supports_read_chunked():
58 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
59 # than required by kubernetes-client
61 from urllib3
.response
import HTTPResponse
62 return hasattr(HTTPResponse
, 'read_chunked')
67 _urllib3_supports_read_chunked
= _urllib3_supports_read_chunked()
69 class ApplyException(orchestrator
.OrchestratorError
):
71 For failures to update the Rook CRDs, usually indicating
72 some kind of interference between our attempted update
73 and other conflicting activity.
78 def wrapper(*args
, **kwargs
):
79 t
= threading
.Thread(target
=f
, args
=args
, kwargs
=kwargs
)
86 class KubernetesResource(object):
87 def __init__(self
, api_func
, **kwargs
):
89 Generic kubernetes Resource parent class
91 The api fetch and watch methods should be common across resource types,
93 Exceptions in the runner thread are propagated to the caller.
95 :param api_func: kubernetes client api function that is passed to the watcher
96 :param filter_func: signature: ``(Item) -> bool``.
99 self
.api_func
= api_func
101 # ``_items`` is accessed by different threads. I assume assignment is atomic.
103 self
.thread
= None # type: Optional[threading.Thread]
104 self
.exception
= None
105 if not _urllib3_supports_read_chunked
:
106 logging
.info('urllib3 is too old. Fallback to full fetches')
109 """ Execute the requested api method as a one-off fetch"""
110 response
= self
.api_func(**self
.kwargs
)
111 # metadata is a client.V1ListMeta object type
112 metadata
= response
.metadata
# type: client.V1ListMeta
113 self
._items
= {item
.metadata
.name
: item
for item
in response
.items
}
114 log
.info('Full fetch of {}. result: {}'.format(self
.api_func
, len(self
._items
)))
115 return metadata
.resource_version
120 Returns the items of the request.
121 Creates the watcher as a side effect.
126 self
.exception
= None
127 raise e
# Propagate the exception to the user.
128 if not self
.thread
or not self
.thread
.is_alive():
129 resource_version
= self
._fetch
()
130 if _urllib3_supports_read_chunked
:
131 # Start a thread which will use the kubernetes watch client against a resource
132 log
.debug("Attaching resource watcher for k8s {}".format(self
.api_func
))
133 self
.thread
= self
._watch
(resource_version
)
135 return self
._items
.values()
138 def _watch(self
, res_ver
):
139 """ worker thread that runs the kubernetes watch """
141 self
.exception
= None
146 # execute generator to continually watch resource for changes
147 for event
in w
.stream(self
.api_func
, resource_version
=res_ver
, watch
=True,
150 item
= event
['object']
152 name
= item
.metadata
.name
153 except AttributeError:
154 raise AttributeError(
155 "{} doesn't contain a metadata.name. Unable to track changes".format(
158 log
.info('{} event: {}'.format(event
['type'], name
))
160 if event
['type'] in ('ADDED', 'MODIFIED'):
161 self
._items
= merge_dicts(self
._items
, {name
: item
})
162 elif event
['type'] == 'DELETED':
163 self
._items
= {k
:v
for k
,v
in self
._items
.items() if k
!= name
}
164 elif event
['type'] == 'BOOKMARK':
166 elif event
['type'] == 'ERROR':
167 raise ApiException(str(event
))
169 raise KeyError('Unknown watch event {}'.format(event
['type']))
170 except ProtocolError
as e
:
171 if 'Connection broken' in str(e
):
172 log
.info('Connection reset.')
175 except ApiException
as e
:
176 log
.exception('K8s API failed. {}'.format(self
.api_func
))
179 except Exception as e
:
180 log
.exception("Watcher failed. ({})".format(self
.api_func
))
185 class RookCluster(object):
186 def __init__(self
, coreV1_api
, batchV1_api
, rook_env
):
187 self
.rook_env
= rook_env
# type: RookEnv
188 self
.coreV1_api
= coreV1_api
# client.CoreV1Api
189 self
.batchV1_api
= batchV1_api
191 # TODO: replace direct k8s calls with Rook API calls
192 # when they're implemented
193 self
.inventory_maps
= KubernetesResource(self
.coreV1_api
.list_namespaced_config_map
,
194 namespace
=self
.rook_env
.operator_namespace
,
195 label_selector
="app=rook-discover")
197 self
.rook_pods
= KubernetesResource(self
.coreV1_api
.list_namespaced_pod
,
198 namespace
=self
.rook_env
.namespace
,
199 label_selector
="rook_cluster={0}".format(
200 self
.rook_env
.cluster_name
))
201 self
.nodes
= KubernetesResource(self
.coreV1_api
.list_node
)
203 def rook_url(self
, path
):
204 prefix
= "/apis/ceph.rook.io/%s/namespaces/%s/" % (
205 self
.rook_env
.crd_version
, self
.rook_env
.namespace
)
206 return urljoin(prefix
, path
)
208 def rook_api_call(self
, verb
, path
, **kwargs
):
209 full_path
= self
.rook_url(path
)
210 log
.debug("[%s] %s" % (verb
, full_path
))
212 return self
.coreV1_api
.api_client
.call_api(
215 auth_settings
=['BearerToken'],
216 response_type
="object",
217 _return_http_data_only
=True,
218 _preload_content
=True,
221 def rook_api_get(self
, path
, **kwargs
):
222 return self
.rook_api_call("GET", path
, **kwargs
)
224 def rook_api_delete(self
, path
):
225 return self
.rook_api_call("DELETE", path
)
227 def rook_api_patch(self
, path
, **kwargs
):
228 return self
.rook_api_call("PATCH", path
,
229 header_params
={"Content-Type": "application/json-patch+json"},
232 def rook_api_post(self
, path
, **kwargs
):
233 return self
.rook_api_call("POST", path
, **kwargs
)
235 def get_discovered_devices(self
, nodenames
=None):
237 if nodenames
is not None:
238 return item
.metadata
.labels
['rook.io/node'] in nodenames
243 result
= [i
for i
in self
.inventory_maps
.items
if predicate(i
)]
244 except ApiException
as dummy_e
:
245 log
.exception("Failed to fetch device metadata")
248 nodename_to_devices
= {}
250 drives
= json
.loads(i
.data
['devices'])
251 nodename_to_devices
[i
.metadata
.labels
['rook.io/node']] = drives
253 return nodename_to_devices
255 def get_nfs_conf_url(self
, nfs_cluster
, instance
):
257 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
258 # URL for the instance within that cluster. If the fetch fails, just
262 ceph_nfs
= self
.rook_api_get("cephnfses/{0}".format(nfs_cluster
))
263 except ApiException
as e
:
264 log
.info("Unable to fetch cephnfs object: {}".format(e
.status
))
267 pool
= ceph_nfs
['spec']['rados']['pool']
268 namespace
= ceph_nfs
['spec']['rados'].get('namespace', None)
270 if namespace
== None:
271 url
= "rados://{0}/conf-{1}.{2}".format(pool
, nfs_cluster
, instance
)
273 url
= "rados://{0}/{1}/conf-{2}.{3}".format(pool
, namespace
, nfs_cluster
, instance
)
276 def describe_pods(self
, service_type
, service_id
, nodename
):
278 Go query the k8s API about deployment, containers related to this
281 Example Rook Pod labels for a mgr daemon:
282 Labels: app=rook-ceph-mgr
283 pod-template-hash=2171958073
285 And MDS containers additionally have `rook_filesystem` label
287 Label filter is rook_cluster=<cluster name>
288 rook_file_system=<self.fs_name>
291 # type: (client.V1Pod) -> bool
292 metadata
= item
.metadata
293 if service_type
is not None:
294 if metadata
.labels
['app'] != "rook-ceph-{0}".format(service_type
):
297 if service_id
is not None:
300 "mds": ("rook_file_system", service_id
),
301 "osd": ("ceph-osd-id", service_id
),
302 "mon": ("mon", service_id
),
303 "mgr": ("mgr", service_id
),
304 "ceph_nfs": ("ceph_nfs", service_id
),
305 "rgw": ("ceph_rgw", service_id
),
308 raise orchestrator
.OrchestratorValidationError(
309 '{} not supported'.format(service_type
))
310 if metadata
.labels
[k
] != v
:
313 if nodename
is not None:
314 if item
.spec
.node_name
!= nodename
:
318 refreshed
= datetime
.datetime
.utcnow()
319 pods
= [i
for i
in self
.rook_pods
.items
if predicate(i
)]
327 for c
in d
['spec']['containers']:
328 # look at the first listed container in the pod...
329 image_name
= c
['image']
333 "name": d
['metadata']['name'],
334 "hostname": d
['spec']['node_name'],
335 "labels": d
['metadata']['labels'],
336 'phase': d
['status']['phase'],
337 'container_image_name': image_name
,
338 'refreshed': refreshed
,
339 # these may get set below...
344 # note: we want UTC but no tzinfo
345 if d
['metadata'].get('creation_timestamp', None):
346 s
['created'] = d
['metadata']['creation_timestamp'].astimezone(
347 tz
=datetime
.timezone
.utc
).replace(tzinfo
=None)
348 if d
['status'].get('start_time', None):
349 s
['started'] = d
['status']['start_time'].astimezone(
350 tz
=datetime
.timezone
.utc
).replace(tzinfo
=None)
352 pods_summary
.append(s
)
356 def remove_pods(self
, names
):
357 pods
= [i
for i
in self
.rook_pods
.items
]
361 daemon_type
= d
['metadata']['labels']['app'].replace('rook-ceph-','')
362 daemon_id
= d
['metadata']['labels']['ceph_daemon_id']
363 name
= daemon_type
+ '.' + daemon_id
365 self
.coreV1_api
.delete_namespaced_pod(
366 d
['metadata']['name'],
367 self
.rook_env
.namespace
,
368 body
=client
.V1DeleteOptions()
371 return "Removed %d pods" % num
373 def get_node_names(self
):
374 return [i
.metadata
.name
for i
in self
.nodes
.items
]
377 def ignore_409(self
, what
):
380 except ApiException
as e
:
382 # Idempotent, succeed.
383 log
.info("{} already exists".format(what
))
387 def apply_filesystem(self
, spec
):
388 # type: (ServiceSpec) -> None
389 # TODO use spec.placement
390 # TODO warn if spec.extended has entries we don't kow how
392 def _update_fs(current
, new
):
393 # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
394 new
.spec
.metadataServer
.activeCount
= spec
.placement
.count
or 1
398 # type: () -> cfs.CephFilesystem
399 return cfs
.CephFilesystem(
400 apiVersion
=self
.rook_env
.api_name
,
402 name
=spec
.service_id
,
403 namespace
=self
.rook_env
.namespace
,
406 metadataServer
=cfs
.MetadataServer(
407 activeCount
=spec
.placement
.count
or 1,
412 return self
._create
_or
_patch
(
413 cfs
.CephFilesystem
, 'cephfilesystems', spec
.service_id
,
414 _update_fs
, _create_fs
)
416 def apply_objectstore(self
, spec
):
418 # FIXME: service_id is $realm.$zone, but rook uses realm
419 # $crname and zone $crname. The '.' will confuse kubernetes.
420 # For now, assert that realm==zone.
421 (realm
, zone
) = spec
.service_id
.split('.', 1)
423 assert spec
.subcluster
is None
427 # type: () -> cos.CephObjectStore
431 secure_port
= spec
.get_port()
433 port
= spec
.get_port()
434 return cos
.CephObjectStore(
435 apiVersion
=self
.rook_env
.api_name
,
438 namespace
=self
.rook_env
.namespace
444 securePort
=secure_port
,
445 instances
=spec
.placement
.count
or 1,
450 def _update_zone(current
, new
):
451 new
.spec
.gateway
.instances
= spec
.placement
.count
or 1
454 return self
._create
_or
_patch
(
455 cos
.CephObjectStore
, 'cephobjectstores', name
,
456 _update_zone
, _create_zone
)
458 def add_nfsgw(self
, spec
):
459 # TODO use spec.placement
460 # TODO warn if spec.extended has entries we don't kow how
463 rook_nfsgw
= cnfs
.CephNFS(
464 apiVersion
=self
.rook_env
.api_name
,
466 name
=spec
.service_id
,
467 namespace
=self
.rook_env
.namespace
,
474 active
=spec
.placement
.count
480 rook_nfsgw
.spec
.rados
.namespace
= spec
.namespace
482 with self
.ignore_409("NFS cluster '{0}' already exists".format(spec
.service_id
)):
483 self
.rook_api_post("cephnfses/", body
=rook_nfsgw
.to_json())
485 def rm_service(self
, rooktype
, service_id
):
487 objpath
= "{0}/{1}".format(rooktype
, service_id
)
490 self
.rook_api_delete(objpath
)
491 except ApiException
as e
:
493 log
.info("{0} service '{1}' does not exist".format(rooktype
, service_id
))
494 # Idempotent, succeed.
498 def can_create_osd(self
):
499 current_cluster
= self
.rook_api_get(
500 "cephclusters/{0}".format(self
.rook_env
.cluster_name
))
501 use_all_nodes
= current_cluster
['spec'].get('useAllNodes', False)
503 # If useAllNodes is set, then Rook will not be paying attention
504 # to anything we put in 'nodes', so can't do OSD creation.
505 return not use_all_nodes
507 def node_exists(self
, node_name
):
508 return node_name
in self
.get_node_names()
510 def update_mon_count(self
, newcount
):
511 def _update_mon_count(current
, new
):
512 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
513 new
.spec
.mon
.count
= newcount
515 return self
._patch
(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _update_mon_count
)
517 def update_nfs_count(self
, svc_id
, newcount
):
518 def _update_nfs_count(current
, new
):
519 # type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
520 new
.spec
.server
.active
= newcount
522 return self
._patch
(cnfs
.CephNFS
, 'cephnfses',svc_id
, _update_nfs_count
)
524 def add_osds(self
, drive_group
, matching_hosts
):
525 # type: (DriveGroupSpec, List[str]) -> str
527 Rook currently (0.8) can only do single-drive OSDs, so we
528 treat all drive groups as just a list of individual OSDs.
530 block_devices
= drive_group
.data_devices
.paths
if drive_group
.data_devices
else []
531 directories
= drive_group
.data_directories
533 assert drive_group
.objectstore
in ("bluestore", "filestore")
535 def _add_osds(current_cluster
, new_cluster
):
536 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
538 # FIXME: this is all not really atomic, because jsonpatch doesn't
539 # let us do "test" operations that would check if items with
540 # matching names were in existing lists.
542 if not hasattr(new_cluster
.spec
.storage
, 'nodes'):
543 new_cluster
.spec
.storage
.nodes
= ccl
.NodesList()
545 current_nodes
= getattr(current_cluster
.spec
.storage
, 'nodes', ccl
.NodesList())
546 matching_host
= matching_hosts
[0]
548 if matching_host
not in [n
.name
for n
in current_nodes
]:
552 storeType
=drive_group
.objectstore
557 pd
.devices
= ccl
.DevicesList(
558 ccl
.DevicesItem(name
=d
.path
) for d
in block_devices
561 pd
.directories
= ccl
.DirectoriesList(
562 ccl
.DirectoriesItem(path
=p
) for p
in directories
564 new_cluster
.spec
.storage
.nodes
.append(pd
)
566 for _node
in new_cluster
.spec
.storage
.nodes
:
567 current_node
= _node
# type: ccl.NodesItem
568 if current_node
.name
== matching_host
:
570 if not hasattr(current_node
, 'devices'):
571 current_node
.devices
= ccl
.DevicesList()
572 new_devices
= list(set(block_devices
) - set([d
.name
for d
in current_node
.devices
]))
573 current_node
.devices
.extend(
574 ccl
.DevicesItem(name
=n
.path
) for n
in new_devices
578 if not hasattr(current_node
, 'directories'):
579 current_node
.directories
= ccl
.DirectoriesList()
580 new_dirs
= list(set(directories
) - set([d
.path
for d
in current_node
.directories
]))
581 current_node
.directories
.extend(
582 ccl
.DirectoriesItem(path
=n
) for n
in new_dirs
586 return self
._patch
(ccl
.CephCluster
, 'cephclusters', self
.rook_env
.cluster_name
, _add_osds
)
588 def _patch(self
, crd
, crd_name
, cr_name
, func
):
589 current_json
= self
.rook_api_get(
590 "{}/{}".format(crd_name
, cr_name
)
593 current
= crd
.from_json(current_json
)
594 new
= crd
.from_json(current_json
) # no deepcopy.
596 new
= func(current
, new
)
598 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
600 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
607 "{}/{}".format(crd_name
, cr_name
),
609 except ApiException
as e
:
610 log
.exception("API exception: {0}".format(e
))
611 raise ApplyException(
612 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
616 def _create_or_patch(self
, crd
, crd_name
, cr_name
, update_func
, create_func
):
618 current_json
= self
.rook_api_get(
619 "{}/{}".format(crd_name
, cr_name
)
621 except ApiException
as e
:
628 current
= crd
.from_json(current_json
)
629 new
= crd
.from_json(current_json
) # no deepcopy.
631 new
= update_func(current
, new
)
633 patch
= list(jsonpatch
.make_patch(current_json
, new
.to_json()))
635 log
.info('patch for {}/{}: \n{}'.format(crd_name
, cr_name
, patch
))
642 "{}/{}".format(crd_name
, cr_name
),
644 except ApiException
as e
:
645 log
.exception("API exception: {0}".format(e
))
646 raise ApplyException(
647 "Failed to update {}/{}: {}".format(crd_name
, cr_name
, e
))
651 with self
.ignore_409("{} {} already exists".format(crd_name
,
653 self
.rook_api_post("{}/".format(crd_name
),
656 def get_ceph_image(self
) -> str:
658 api_response
= self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
659 label_selector
="app=rook-ceph-mon",
661 if api_response
.items
:
662 return api_response
.items
[-1].spec
.containers
[0].image
664 raise orchestrator
.OrchestratorError(
665 "Error getting ceph image. Cluster without monitors")
666 except ApiException
as e
:
667 raise orchestrator
.OrchestratorError("Error getting ceph image: {}".format(e
))
670 def _execute_blight_job(self
, ident_fault
: str, on
: bool, loc
: orchestrator
.DeviceLightLoc
) -> str:
671 operation_id
= str(hash(loc
))
675 job_metadata
= client
.V1ObjectMeta(name
=operation_id
,
676 namespace
= self
.rook_env
.namespace
,
677 labels
={"ident": operation_id
})
678 pod_metadata
= client
.V1ObjectMeta(labels
={"ident": operation_id
})
679 pod_container
= client
.V1Container(name
="ceph-lsmcli-command",
680 security_context
=client
.V1SecurityContext(privileged
=True),
681 image
=self
.get_ceph_image(),
683 args
=['local-disk-%s-led-%s' % (ident_fault
,'on' if on
else 'off'),
684 '--path', loc
.path
or loc
.dev
,],
685 volume_mounts
=[client
.V1VolumeMount(name
="devices", mount_path
="/dev"),
686 client
.V1VolumeMount(name
="run-udev", mount_path
="/run/udev")])
687 pod_spec
= client
.V1PodSpec(containers
=[pod_container
],
688 active_deadline_seconds
=30, # Max time to terminate pod
689 restart_policy
="Never",
690 node_selector
= {"kubernetes.io/hostname": loc
.host
},
691 volumes
=[client
.V1Volume(name
="devices",
692 host_path
=client
.V1HostPathVolumeSource(path
="/dev")),
693 client
.V1Volume(name
="run-udev",
694 host_path
=client
.V1HostPathVolumeSource(path
="/run/udev"))])
695 pod_template
= client
.V1PodTemplateSpec(metadata
=pod_metadata
,
697 job_spec
= client
.V1JobSpec(active_deadline_seconds
=60, # Max time to terminate job
698 ttl_seconds_after_finished
=10, # Alfa. Lifetime after finishing (either Complete or Failed)
700 template
=pod_template
)
701 job
= client
.V1Job(api_version
="batch/v1",
703 metadata
=job_metadata
,
706 # delete previous job if it exists
709 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
710 self
.rook_env
.namespace
,
711 propagation_policy
="Background")
712 except ApiException
as e
:
713 if e
.status
!= 404: # No problem if the job does not exist
716 # wait until the job is not present
719 while not deleted
and retries
< 10:
720 api_response
= self
.batchV1_api
.list_namespaced_job(self
.rook_env
.namespace
,
721 label_selector
="ident=%s" % operation_id
,
723 deleted
= not api_response
.items
727 if retries
== 10 and not deleted
:
728 raise orchestrator
.OrchestratorError(
729 "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
730 on
, loc
.host
, loc
.path
or loc
.dev
, operation_id
))
733 api_response
= self
.batchV1_api
.create_namespaced_job(self
.rook_env
.namespace
, job
)
738 api_response
= self
.batchV1_api
.read_namespaced_job(operation_id
,
739 self
.rook_env
.namespace
)
740 finished
= api_response
.status
.succeeded
or api_response
.status
.failed
742 message
= api_response
.status
.conditions
[-1].message
744 # get the result of the lsmcli command
745 api_response
=self
.coreV1_api
.list_namespaced_pod(self
.rook_env
.namespace
,
746 label_selector
="ident=%s" % operation_id
,
748 if api_response
.items
:
749 pod_name
= api_response
.items
[-1].metadata
.name
750 message
= self
.coreV1_api
.read_namespaced_pod_log(pod_name
,
751 self
.rook_env
.namespace
)
753 except ApiException
as e
:
754 log
.exception('K8s API failed. {}'.format(e
))
757 # Finally, delete the job.
758 # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
759 # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
761 api_response
= self
.batchV1_api
.delete_namespaced_job(operation_id
,
762 self
.rook_env
.namespace
,
763 propagation_policy
="Background")
764 except ApiException
as e
:
765 if e
.status
!= 404: # No problem if the job does not exist
770 def blink_light(self
, ident_fault
, on
, locs
):
771 # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
772 return [self
._execute
_blight
_job
(ident_fault
, on
, loc
) for loc
in locs
]