]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
7 from ceph
.deployment
import inventory
8 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
11 from typing
import List
, Dict
, Optional
, Callable
, Any
, Tuple
12 from ceph
.deployment
.drive_group
import DriveGroupSpec
14 pass # just for type checking
17 from kubernetes
import client
, config
18 from kubernetes
.client
.rest
import ApiException
20 kubernetes_imported
= True
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes
.client
.models
.v1_container_image
import V1ContainerImage
24 def names(self
, names
):
26 V1ContainerImage
.names
= V1ContainerImage
.names
.setter(names
)
29 kubernetes_imported
= False
33 from mgr_module
import MgrModule
, Option
35 from orchestrator
import handle_orch_error
, OrchResult
, raise_if_exception
37 from .rook_cluster
import RookCluster
42 class RookEnv(object):
44 # POD_NAMESPACE already exist for Rook 0.9
45 self
.namespace
= os
.environ
.get('POD_NAMESPACE', 'rook-ceph')
47 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
48 self
.cluster_name
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_NAME', self
.namespace
)
50 self
.operator_namespace
= os
.environ
.get('ROOK_OPERATOR_NAMESPACE', self
.namespace
)
51 self
.crd_version
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
52 self
.api_name
= "ceph.rook.io/" + self
.crd_version
54 def api_version_match(self
):
55 return self
.crd_version
== 'v1'
57 def has_namespace(self
):
58 return 'POD_NAMESPACE' in os
.environ
61 class RookOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
63 Writes are a two-phase thing, firstly sending
64 the write to the k8s API (fast) and then waiting
65 for the corresponding change to appear in the
68 Right now, we are calling the k8s API synchronously.
71 MODULE_OPTIONS
: List
[Option
] = [
72 # TODO: configure k8s API addr instead of assuming local
77 if not kubernetes_imported
:
78 return False, "`kubernetes` python module not found"
79 if not RookEnv().api_version_match():
80 return False, "Rook version unsupported."
83 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
84 if not kubernetes_imported
:
85 return False, "`kubernetes` python module not found", {}
86 elif not self
._rook
_env
.has_namespace():
87 return False, "ceph-mgr not running in Rook cluster", {}
90 self
.k8s
.list_namespaced_pod(self
._rook
_env
.namespace
)
91 except ApiException
as e
:
92 return False, "Cannot reach Kubernetes API: {}".format(e
), {}
96 def __init__(self
, *args
, **kwargs
):
97 super(RookOrchestrator
, self
).__init
__(*args
, **kwargs
)
99 self
._initialized
= threading
.Event()
100 self
._k
8s
_CoreV
1_api
= None
101 self
._k
8s
_BatchV
1_api
= None
102 self
._rook
_cluster
= None
103 self
._rook
_env
= RookEnv()
105 self
._shutdown
= threading
.Event()
107 def shutdown(self
) -> None:
112 # type: () -> client.CoreV1Api
113 self
._initialized
.wait()
114 assert self
._k
8s
_CoreV
1_api
is not None
115 return self
._k
8s
_CoreV
1_api
118 def rook_cluster(self
):
119 # type: () -> RookCluster
120 self
._initialized
.wait()
121 assert self
._rook
_cluster
is not None
122 return self
._rook
_cluster
125 # For deployed clusters, we should always be running inside
126 # a Rook cluster. For development convenience, also support
127 # running outside (reading ~/.kube config)
129 if self
._rook
_env
.has_namespace():
130 config
.load_incluster_config()
132 self
.log
.warning("DEVELOPMENT ONLY: Reading kube config from ~")
133 config
.load_kube_config()
135 # So that I can do port forwarding from my workstation - jcsp
136 from kubernetes
.client
import configuration
137 configuration
.verify_ssl
= False
139 self
._k
8s
_CoreV
1_api
= client
.CoreV1Api()
140 self
._k
8s
_BatchV
1_api
= client
.BatchV1Api()
143 # XXX mystery hack -- I need to do an API call from
144 # this context, or subsequent API usage from handle_command
145 # fails with SSLError('bad handshake'). Suspect some kind of
146 # thread context setup in SSL lib?
147 self
._k
8s
_CoreV
1_api
.list_namespaced_pod(self
._rook
_env
.namespace
)
149 # Ignore here to make self.available() fail with a proper error message
152 self
._rook
_cluster
= RookCluster(
153 self
._k
8s
_CoreV
1_api
,
154 self
._k
8s
_BatchV
1_api
,
157 self
._initialized
.set()
159 while not self
._shutdown
.is_set():
160 self
._shutdown
.wait(5)
163 def get_inventory(self
, host_filter
=None, refresh
=False):
165 if host_filter
and host_filter
.hosts
:
167 host_list
= host_filter
.hosts
168 elif host_filter
and host_filter
.labels
:
169 # TODO: query k8s API to resolve to host list, and pass
170 # it into RookCluster.get_discovered_devices
171 raise NotImplementedError()
173 devs
= self
.rook_cluster
.get_discovered_devices(host_list
)
176 for host_name
, host_devs
in devs
.items():
179 if 'cephVolumeData' in d
and d
['cephVolumeData']:
180 devs
.append(inventory
.Device
.from_json(json
.loads(d
['cephVolumeData'])))
182 devs
.append(inventory
.Device(
183 path
= '/dev/' + d
['name'],
185 rotational
= '1' if d
['rotational'] else '0',
189 rejected_reasons
=['device data coming from ceph-volume not provided'],
192 result
.append(orchestrator
.InventoryHost(host_name
, inventory
.Devices(devs
)))
198 # type: () -> List[orchestrator.HostSpec]
199 return [orchestrator
.HostSpec(n
) for n
in self
.rook_cluster
.get_node_names()]
202 def describe_service(self
, service_type
=None, service_name
=None,
204 now
= datetime
.datetime
.utcnow()
207 cl
= self
.rook_cluster
.rook_api_get(
208 "cephclusters/{0}".format(self
.rook_cluster
.rook_env
.cluster_name
))
209 self
.log
.debug('CephCluster %s' % cl
)
210 image_name
= cl
['spec'].get('cephVersion', {}).get('image', None)
211 num_nodes
= len(self
.rook_cluster
.get_node_names())
214 if service_type
== 'mon' or service_type
is None:
215 spec
['mon'] = orchestrator
.ServiceDescription(
218 placement
=PlacementSpec(
219 count
=cl
['spec'].get('mon', {}).get('count', 1),
222 size
=cl
['spec'].get('mon', {}).get('count', 1),
223 container_image_name
=image_name
,
226 if service_type
== 'mgr' or service_type
is None:
227 spec
['mgr'] = orchestrator
.ServiceDescription(
230 placement
=PlacementSpec
.from_string('count:1'),
233 container_image_name
=image_name
,
236 if not cl
['spec'].get('crashCollector', {}).get('disable', False):
237 spec
['crash'] = orchestrator
.ServiceDescription(
240 placement
=PlacementSpec
.from_string('*'),
243 container_image_name
=image_name
,
247 if service_type
== 'mds' or service_type
is None:
249 all_fs
= self
.rook_cluster
.rook_api_get(
251 self
.log
.debug('CephFilesystems %s' % all_fs
)
252 for fs
in all_fs
.get('items', []):
253 svc
= 'mds.' + fs
['metadata']['name']
256 # FIXME: we are conflating active (+ standby) with count
257 active
= fs
['spec'].get('metadataServer', {}).get('activeCount', 1)
259 if fs
['spec'].get('metadataServer', {}).get('activeStandby', False):
260 total_mds
= active
* 2
261 spec
[svc
] = orchestrator
.ServiceDescription(
264 service_id
=fs
['metadata']['name'],
265 placement
=PlacementSpec(count
=active
),
268 container_image_name
=image_name
,
272 if service_type
== 'rgw' or service_type
is None:
274 all_zones
= self
.rook_cluster
.rook_api_get(
276 self
.log
.debug('CephObjectstores %s' % all_zones
)
277 for zone
in all_zones
.get('items', []):
278 rgw_realm
= zone
['metadata']['name']
280 svc
= 'rgw.' + rgw_realm
+ '.' + rgw_zone
283 active
= zone
['spec']['gateway']['instances'];
284 if 'securePort' in zone
['spec']['gateway']:
286 port
= zone
['spec']['gateway']['securePort']
289 port
= zone
['spec']['gateway']['port'] or 80
290 spec
[svc
] = orchestrator
.ServiceDescription(
292 service_id
=rgw_realm
+ '.' + rgw_zone
,
296 rgw_frontend_port
=port
,
297 placement
=PlacementSpec(count
=active
),
300 container_image_name
=image_name
,
304 if service_type
== 'nfs' or service_type
is None:
306 all_nfs
= self
.rook_cluster
.rook_api_get(
308 self
.log
.warning('CephNFS %s' % all_nfs
)
309 for nfs
in all_nfs
.get('items', []):
310 nfs_name
= nfs
['metadata']['name']
311 svc
= 'nfs.' + nfs_name
314 active
= nfs
['spec'].get('server', {}).get('active')
315 spec
[svc
] = orchestrator
.ServiceDescription(
318 pool
=nfs
['spec']['rados']['pool'],
319 namespace
=nfs
['spec']['rados'].get('namespace', None),
320 placement
=PlacementSpec(count
=active
),
326 for dd
in self
._list
_daemons
():
327 if dd
.service_name() not in spec
:
329 service
= spec
[dd
.service_name()]
331 if not service
.container_image_id
:
332 service
.container_image_id
= dd
.container_image_id
333 if not service
.container_image_name
:
334 service
.container_image_name
= dd
.container_image_name
335 if not service
.last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< service
.last_refresh
:
336 service
.last_refresh
= dd
.last_refresh
337 if not service
.created
or dd
.created
< service
.created
:
338 service
.created
= dd
.created
340 return [v
for k
, v
in spec
.items()]
343 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
345 return self
._list
_daemons
(service_name
=service_name
,
346 daemon_type
=daemon_type
,
351 def _list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
353 pods
= self
.rook_cluster
.describe_pods(daemon_type
, daemon_id
, host
)
354 self
.log
.debug('pods %s' % pods
)
357 sd
= orchestrator
.DaemonDescription()
358 sd
.hostname
= p
['hostname']
359 sd
.daemon_type
= p
['labels']['app'].replace('rook-ceph-', '')
361 'Pending': orchestrator
.DaemonDescriptionStatus
.error
,
362 'Running': orchestrator
.DaemonDescriptionStatus
.running
,
363 'Succeeded': orchestrator
.DaemonDescriptionStatus
.stopped
,
364 'Failed': orchestrator
.DaemonDescriptionStatus
.error
,
365 'Unknown': orchestrator
.DaemonDescriptionStatus
.error
,
368 sd
.status_desc
= p
['phase']
370 if 'ceph_daemon_id' in p
['labels']:
371 sd
.daemon_id
= p
['labels']['ceph_daemon_id']
372 elif 'ceph-osd-id' in p
['labels']:
373 sd
.daemon_id
= p
['labels']['ceph-osd-id']
375 # Unknown type -- skip it
378 if service_name
is not None and service_name
!= sd
.service_name():
380 sd
.container_image_name
= p
['container_image_name']
381 sd
.container_image_id
= p
['container_image_id']
382 sd
.created
= p
['created']
383 sd
.last_configured
= p
['created']
384 sd
.last_deployed
= p
['created']
385 sd
.started
= p
['started']
386 sd
.last_refresh
= p
['refreshed']
392 def remove_service(self
, service_name
: str) -> str:
393 service_type
, service_name
= service_name
.split('.', 1)
394 if service_type
== 'mds':
395 return self
.rook_cluster
.rm_service('cephfilesystems', service_name
)
396 elif service_type
== 'rgw':
397 return self
.rook_cluster
.rm_service('cephobjectstores', service_name
)
398 elif service_type
== 'nfs':
399 return self
.rook_cluster
.rm_service('cephnfses', service_name
)
401 raise orchestrator
.OrchestratorError(f
'Service type {service_type} not supported')
404 def apply_mon(self
, spec
):
405 # type: (ServiceSpec) -> str
406 if spec
.placement
.hosts
or spec
.placement
.label
:
407 raise RuntimeError("Host list or label is not supported by rook.")
409 return self
.rook_cluster
.update_mon_count(spec
.placement
.count
)
412 def apply_mds(self
, spec
):
413 # type: (ServiceSpec) -> str
414 return self
.rook_cluster
.apply_filesystem(spec
)
417 def apply_rgw(self
, spec
):
418 # type: (RGWSpec) -> str
419 return self
.rook_cluster
.apply_objectstore(spec
)
422 def apply_nfs(self
, spec
):
423 # type: (NFSServiceSpec) -> str
424 return self
.rook_cluster
.apply_nfsgw(spec
)
427 def remove_daemons(self
, names
: List
[str]) -> List
[str]:
428 return self
.rook_cluster
.remove_pods(names
)
431 def create_osds(self
, drive_group
):
432 # type: (DriveGroupSpec) -> str
433 """ Creates OSDs from a drive group specification.
435 $: ceph orch osd create -i <dg.file>
437 The drivegroup file must only contain one spec at a time.
440 targets
= [] # type: List[str]
441 if drive_group
.data_devices
and drive_group
.data_devices
.paths
:
442 targets
+= [d
.path
for d
in drive_group
.data_devices
.paths
]
443 if drive_group
.data_directories
:
444 targets
+= drive_group
.data_directories
446 all_hosts
= raise_if_exception(self
.get_hosts())
448 matching_hosts
= drive_group
.placement
.filter_matching_hosts(lambda label
=None, as_hostspec
=None: all_hosts
)
450 assert len(matching_hosts
) == 1
452 if not self
.rook_cluster
.node_exists(matching_hosts
[0]):
453 raise RuntimeError("Node '{0}' is not in the Kubernetes "
454 "cluster".format(matching_hosts
))
456 # Validate whether cluster CRD can accept individual OSD
457 # creations (i.e. not useAllDevices)
458 if not self
.rook_cluster
.can_create_osd():
459 raise RuntimeError("Rook cluster configuration does not "
460 "support OSD creation.")
462 return self
.rook_cluster
.add_osds(drive_group
, matching_hosts
)
464 # TODO: this was the code to update the progress reference:
467 def has_osds(matching_hosts: List[str]) -> bool:
469 # Find OSD pods on this host
471 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
472 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
473 field_selector="spec.nodeName={0}".format(
477 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
479 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
482 osdmap = self.get("osd_map")
483 for osd in osdmap['osds']:
485 if osd_id not in pod_osd_ids:
488 metadata = self.get_metadata('osd', "%s" % osd_id)
489 if metadata and metadata['devices'] in targets:
492 self.log.info("ignoring osd {0} {1}".format(
493 osd_id, metadata['devices'] if metadata else 'DNE'
496 return found is not None
500 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
501 return self
.rook_cluster
.blink_light(ident_fault
, on
, locs
)