]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
0dfd2ea73f587492fe4f7110d441d57c8c9152da
6 from ceph
.deployment
import inventory
7 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
8 from ceph
.utils
import datetime_now
10 from typing
import List
, Dict
, Optional
, Callable
, Any
, TypeVar
, Tuple
13 from ceph
.deployment
.drive_group
import DriveGroupSpec
15 pass # just for type checking
18 from kubernetes
import client
, config
19 from kubernetes
.client
.rest
import ApiException
21 kubernetes_imported
= True
23 # https://github.com/kubernetes-client/python/issues/895
24 from kubernetes
.client
.models
.v1_container_image
import V1ContainerImage
25 def names(self
: Any
, names
: Any
) -> None:
27 V1ContainerImage
.names
= V1ContainerImage
.names
.setter(names
)
30 kubernetes_imported
= False
34 from mgr_module
import MgrModule
, Option
36 from orchestrator
import handle_orch_error
, OrchResult
, raise_if_exception
38 from .rook_cluster
import RookCluster
41 FuncT
= TypeVar('FuncT', bound
=Callable
)
42 ServiceSpecT
= TypeVar('ServiceSpecT', bound
=ServiceSpec
)
46 class RookEnv(object):
47 def __init__(self
) -> None:
48 # POD_NAMESPACE already exist for Rook 0.9
49 self
.namespace
= os
.environ
.get('POD_NAMESPACE', 'rook-ceph')
51 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
52 self
.cluster_name
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_NAME', self
.namespace
)
54 self
.operator_namespace
= os
.environ
.get('ROOK_OPERATOR_NAMESPACE', self
.namespace
)
55 self
.crd_version
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
56 self
.api_name
= "ceph.rook.io/" + self
.crd_version
58 def api_version_match(self
) -> bool:
59 return self
.crd_version
== 'v1'
61 def has_namespace(self
) -> bool:
62 return 'POD_NAMESPACE' in os
.environ
65 class RookOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
67 Writes are a two-phase thing, firstly sending
68 the write to the k8s API (fast) and then waiting
69 for the corresponding change to appear in the
72 Right now, we are calling the k8s API synchronously.
75 MODULE_OPTIONS
: List
[Option
] = [
76 # TODO: configure k8s API addr instead of assuming local
80 def can_run() -> Tuple
[bool, str]:
81 if not kubernetes_imported
:
82 return False, "`kubernetes` python module not found"
83 if not RookEnv().api_version_match():
84 return False, "Rook version unsupported."
87 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
88 if not kubernetes_imported
:
89 return False, "`kubernetes` python module not found", {}
90 elif not self
._rook
_env
.has_namespace():
91 return False, "ceph-mgr not running in Rook cluster", {}
94 self
.k8s
.list_namespaced_pod(self
._rook
_env
.namespace
)
95 except ApiException
as e
:
96 return False, "Cannot reach Kubernetes API: {}".format(e
), {}
100 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
101 super(RookOrchestrator
, self
).__init
__(*args
, **kwargs
)
103 self
._initialized
= threading
.Event()
104 self
._k
8s
_CoreV
1_api
: Optional
[client
.CoreV1Api
] = None
105 self
._k
8s
_BatchV
1_api
: Optional
[client
.BatchV1Api
] = None
106 self
._rook
_cluster
: Optional
[RookCluster
] = None
107 self
._rook
_env
= RookEnv()
109 self
._shutdown
= threading
.Event()
111 def shutdown(self
) -> None:
116 # type: () -> client.CoreV1Api
117 self
._initialized
.wait()
118 assert self
._k
8s
_CoreV
1_api
is not None
119 return self
._k
8s
_CoreV
1_api
122 def rook_cluster(self
):
123 # type: () -> RookCluster
124 self
._initialized
.wait()
125 assert self
._rook
_cluster
is not None
126 return self
._rook
_cluster
128 def serve(self
) -> None:
129 # For deployed clusters, we should always be running inside
130 # a Rook cluster. For development convenience, also support
131 # running outside (reading ~/.kube config)
133 if self
._rook
_env
.has_namespace():
134 config
.load_incluster_config()
136 self
.log
.warning("DEVELOPMENT ONLY: Reading kube config from ~")
137 config
.load_kube_config()
139 # So that I can do port forwarding from my workstation - jcsp
140 from kubernetes
.client
import configuration
141 configuration
.verify_ssl
= False
143 self
._k
8s
_CoreV
1_api
= client
.CoreV1Api()
144 self
._k
8s
_BatchV
1_api
= client
.BatchV1Api()
147 # XXX mystery hack -- I need to do an API call from
148 # this context, or subsequent API usage from handle_command
149 # fails with SSLError('bad handshake'). Suspect some kind of
150 # thread context setup in SSL lib?
151 self
._k
8s
_CoreV
1_api
.list_namespaced_pod(self
._rook
_env
.namespace
)
153 # Ignore here to make self.available() fail with a proper error message
156 self
._rook
_cluster
= RookCluster(
157 self
._k
8s
_CoreV
1_api
,
158 self
._k
8s
_BatchV
1_api
,
161 self
._initialized
.set()
163 while not self
._shutdown
.is_set():
164 self
._shutdown
.wait(5)
167 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
: bool = False) -> List
[orchestrator
.InventoryHost
]:
169 if host_filter
and host_filter
.hosts
:
171 host_list
= host_filter
.hosts
172 elif host_filter
and host_filter
.labels
:
173 # TODO: query k8s API to resolve to host list, and pass
174 # it into RookCluster.get_discovered_devices
175 raise NotImplementedError()
177 discovered_devs
= self
.rook_cluster
.get_discovered_devices(host_list
)
180 for host_name
, host_devs
in discovered_devs
.items():
183 if 'cephVolumeData' in d
and d
['cephVolumeData']:
184 devs
.append(inventory
.Device
.from_json(json
.loads(d
['cephVolumeData'])))
186 devs
.append(inventory
.Device(
187 path
= '/dev/' + d
['name'],
189 rotational
= '1' if d
['rotational'] else '0',
193 rejected_reasons
=['device data coming from ceph-volume not provided'],
196 result
.append(orchestrator
.InventoryHost(host_name
, inventory
.Devices(devs
)))
202 # type: () -> List[orchestrator.HostSpec]
203 return [orchestrator
.HostSpec(n
) for n
in self
.rook_cluster
.get_node_names()]
206 def describe_service(self
,
207 service_type
: Optional
[str] = None,
208 service_name
: Optional
[str] = None,
209 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
213 cl
= self
.rook_cluster
.rook_api_get(
214 "cephclusters/{0}".format(self
.rook_cluster
.rook_env
.cluster_name
))
215 self
.log
.debug('CephCluster %s' % cl
)
216 image_name
= cl
['spec'].get('cephVersion', {}).get('image', None)
217 num_nodes
= len(self
.rook_cluster
.get_node_names())
220 if service_type
== 'mon' or service_type
is None:
221 spec
['mon'] = orchestrator
.ServiceDescription(
224 placement
=PlacementSpec(
225 count
=cl
['spec'].get('mon', {}).get('count', 1),
228 size
=cl
['spec'].get('mon', {}).get('count', 1),
229 container_image_name
=image_name
,
232 if service_type
== 'mgr' or service_type
is None:
233 spec
['mgr'] = orchestrator
.ServiceDescription(
236 placement
=PlacementSpec
.from_string('count:1'),
239 container_image_name
=image_name
,
242 if not cl
['spec'].get('crashCollector', {}).get('disable', False):
243 spec
['crash'] = orchestrator
.ServiceDescription(
246 placement
=PlacementSpec
.from_string('*'),
249 container_image_name
=image_name
,
253 if service_type
== 'mds' or service_type
is None:
255 all_fs
= self
.rook_cluster
.rook_api_get(
257 self
.log
.debug('CephFilesystems %s' % all_fs
)
258 for fs
in all_fs
.get('items', []):
259 svc
= 'mds.' + fs
['metadata']['name']
262 # FIXME: we are conflating active (+ standby) with count
263 active
= fs
['spec'].get('metadataServer', {}).get('activeCount', 1)
265 if fs
['spec'].get('metadataServer', {}).get('activeStandby', False):
266 total_mds
= active
* 2
267 spec
[svc
] = orchestrator
.ServiceDescription(
270 service_id
=fs
['metadata']['name'],
271 placement
=PlacementSpec(count
=active
),
274 container_image_name
=image_name
,
278 if service_type
== 'rgw' or service_type
is None:
280 all_zones
= self
.rook_cluster
.rook_api_get(
282 self
.log
.debug('CephObjectstores %s' % all_zones
)
283 for zone
in all_zones
.get('items', []):
284 rgw_realm
= zone
['metadata']['name']
286 svc
= 'rgw.' + rgw_realm
+ '.' + rgw_zone
289 active
= zone
['spec']['gateway']['instances'];
290 if 'securePort' in zone
['spec']['gateway']:
292 port
= zone
['spec']['gateway']['securePort']
295 port
= zone
['spec']['gateway']['port'] or 80
296 spec
[svc
] = orchestrator
.ServiceDescription(
298 service_id
=rgw_realm
+ '.' + rgw_zone
,
302 rgw_frontend_port
=port
,
303 placement
=PlacementSpec(count
=active
),
306 container_image_name
=image_name
,
310 if service_type
== 'nfs' or service_type
is None:
312 all_nfs
= self
.rook_cluster
.rook_api_get(
314 self
.log
.warning('CephNFS %s' % all_nfs
)
315 for nfs
in all_nfs
.get('items', []):
316 nfs_name
= nfs
['metadata']['name']
317 svc
= 'nfs.' + nfs_name
320 active
= nfs
['spec'].get('server', {}).get('active')
321 spec
[svc
] = orchestrator
.ServiceDescription(
324 pool
=nfs
['spec']['rados']['pool'],
325 namespace
=nfs
['spec']['rados'].get('namespace', None),
326 placement
=PlacementSpec(count
=active
),
332 for dd
in self
._list
_daemons
():
333 if dd
.service_name() not in spec
:
335 service
= spec
[dd
.service_name()]
337 if not service
.container_image_id
:
338 service
.container_image_id
= dd
.container_image_id
339 if not service
.container_image_name
:
340 service
.container_image_name
= dd
.container_image_name
341 if service
.last_refresh
is None or not dd
.last_refresh
or dd
.last_refresh
< service
.last_refresh
:
342 service
.last_refresh
= dd
.last_refresh
343 if service
.created
is None or dd
.created
is None or dd
.created
< service
.created
:
344 service
.created
= dd
.created
346 return [v
for k
, v
in spec
.items()]
349 def list_daemons(self
,
350 service_name
: Optional
[str] = None,
351 daemon_type
: Optional
[str] = None,
352 daemon_id
: Optional
[str] = None,
353 host
: Optional
[str] = None,
354 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
355 return self
._list
_daemons
(service_name
=service_name
,
356 daemon_type
=daemon_type
,
361 def _list_daemons(self
,
362 service_name
: Optional
[str] = None,
363 daemon_type
: Optional
[str] = None,
364 daemon_id
: Optional
[str] = None,
365 host
: Optional
[str] = None,
366 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
367 pods
= self
.rook_cluster
.describe_pods(daemon_type
, daemon_id
, host
)
368 self
.log
.debug('pods %s' % pods
)
371 sd
= orchestrator
.DaemonDescription()
372 sd
.hostname
= p
['hostname']
373 sd
.daemon_type
= p
['labels']['app'].replace('rook-ceph-', '')
375 'Pending': orchestrator
.DaemonDescriptionStatus
.error
,
376 'Running': orchestrator
.DaemonDescriptionStatus
.running
,
377 'Succeeded': orchestrator
.DaemonDescriptionStatus
.stopped
,
378 'Failed': orchestrator
.DaemonDescriptionStatus
.error
,
379 'Unknown': orchestrator
.DaemonDescriptionStatus
.error
,
382 sd
.status_desc
= p
['phase']
384 if 'ceph_daemon_id' in p
['labels']:
385 sd
.daemon_id
= p
['labels']['ceph_daemon_id']
386 elif 'ceph-osd-id' in p
['labels']:
387 sd
.daemon_id
= p
['labels']['ceph-osd-id']
389 # Unknown type -- skip it
392 if service_name
is not None and service_name
!= sd
.service_name():
394 sd
.container_image_name
= p
['container_image_name']
395 sd
.container_image_id
= p
['container_image_id']
396 sd
.created
= p
['created']
397 sd
.last_configured
= p
['created']
398 sd
.last_deployed
= p
['created']
399 sd
.started
= p
['started']
400 sd
.last_refresh
= p
['refreshed']
406 def remove_service(self
, service_name
: str) -> str:
407 service_type
, service_name
= service_name
.split('.', 1)
408 if service_type
== 'mds':
409 return self
.rook_cluster
.rm_service('cephfilesystems', service_name
)
410 elif service_type
== 'rgw':
411 return self
.rook_cluster
.rm_service('cephobjectstores', service_name
)
412 elif service_type
== 'nfs':
413 return self
.rook_cluster
.rm_service('cephnfses', service_name
)
415 raise orchestrator
.OrchestratorError(f
'Service type {service_type} not supported')
418 def apply_mon(self
, spec
):
419 # type: (ServiceSpec) -> str
420 if spec
.placement
.hosts
or spec
.placement
.label
:
421 raise RuntimeError("Host list or label is not supported by rook.")
423 return self
.rook_cluster
.update_mon_count(spec
.placement
.count
)
426 def apply_mds(self
, spec
):
427 # type: (ServiceSpec) -> str
428 return self
.rook_cluster
.apply_filesystem(spec
)
431 def apply_rgw(self
, spec
):
432 # type: (RGWSpec) -> str
433 return self
.rook_cluster
.apply_objectstore(spec
)
436 def apply_nfs(self
, spec
):
437 # type: (NFSServiceSpec) -> str
438 return self
.rook_cluster
.apply_nfsgw(spec
)
441 def remove_daemons(self
, names
: List
[str]) -> List
[str]:
442 return self
.rook_cluster
.remove_pods(names
)
445 def create_osds(self
, drive_group
):
446 # type: (DriveGroupSpec) -> str
447 """ Creates OSDs from a drive group specification.
449 $: ceph orch osd create -i <dg.file>
451 The drivegroup file must only contain one spec at a time.
454 targets
= [] # type: List[str]
455 if drive_group
.data_devices
and drive_group
.data_devices
.paths
:
456 targets
+= [d
.path
for d
in drive_group
.data_devices
.paths
]
457 if drive_group
.data_directories
:
458 targets
+= drive_group
.data_directories
460 all_hosts
= raise_if_exception(self
.get_hosts())
462 matching_hosts
= drive_group
.placement
.filter_matching_hosts(lambda label
=None, as_hostspec
=None: all_hosts
)
464 assert len(matching_hosts
) == 1
466 if not self
.rook_cluster
.node_exists(matching_hosts
[0]):
467 raise RuntimeError("Node '{0}' is not in the Kubernetes "
468 "cluster".format(matching_hosts
))
470 # Validate whether cluster CRD can accept individual OSD
471 # creations (i.e. not useAllDevices)
472 if not self
.rook_cluster
.can_create_osd():
473 raise RuntimeError("Rook cluster configuration does not "
474 "support OSD creation.")
476 return self
.rook_cluster
.add_osds(drive_group
, matching_hosts
)
478 # TODO: this was the code to update the progress reference:
481 def has_osds(matching_hosts: List[str]) -> bool:
483 # Find OSD pods on this host
485 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
486 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
487 field_selector="spec.nodeName={0}".format(
491 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
493 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
496 osdmap = self.get("osd_map")
497 for osd in osdmap['osds']:
499 if osd_id not in pod_osd_ids:
502 metadata = self.get_metadata('osd', "%s" % osd_id)
503 if metadata and metadata['devices'] in targets:
506 self.log.info("ignoring osd {0} {1}".format(
507 osd_id, metadata['devices'] if metadata else 'DNE'
510 return found is not None
514 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
515 return self
.rook_cluster
.blink_light(ident_fault
, on
, locs
)