]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
0dfd2ea73f587492fe4f7110d441d57c8c9152da
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
1 import threading
2 import functools
3 import os
4 import json
5
6 from ceph.deployment import inventory
7 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
8 from ceph.utils import datetime_now
9
10 from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
11
12 try:
13 from ceph.deployment.drive_group import DriveGroupSpec
14 except ImportError:
15 pass # just for type checking
16
17 try:
18 from kubernetes import client, config
19 from kubernetes.client.rest import ApiException
20
21 kubernetes_imported = True
22
23 # https://github.com/kubernetes-client/python/issues/895
24 from kubernetes.client.models.v1_container_image import V1ContainerImage
25 def names(self: Any, names: Any) -> None:
26 self._names = names
27 V1ContainerImage.names = V1ContainerImage.names.setter(names)
28
29 except ImportError:
30 kubernetes_imported = False
31 client = None
32 config = None
33
34 from mgr_module import MgrModule, Option
35 import orchestrator
36 from orchestrator import handle_orch_error, OrchResult, raise_if_exception
37
38 from .rook_cluster import RookCluster
39
40 T = TypeVar('T')
41 FuncT = TypeVar('FuncT', bound=Callable)
42 ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
43
44
45
46 class RookEnv(object):
47 def __init__(self) -> None:
48 # POD_NAMESPACE already exist for Rook 0.9
49 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
50
51 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
52 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
53
54 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
55 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
56 self.api_name = "ceph.rook.io/" + self.crd_version
57
58 def api_version_match(self) -> bool:
59 return self.crd_version == 'v1'
60
61 def has_namespace(self) -> bool:
62 return 'POD_NAMESPACE' in os.environ
63
64
65 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
66 """
67 Writes are a two-phase thing, firstly sending
68 the write to the k8s API (fast) and then waiting
69 for the corresponding change to appear in the
70 Ceph cluster (slow)
71
72 Right now, we are calling the k8s API synchronously.
73 """
74
75 MODULE_OPTIONS: List[Option] = [
76 # TODO: configure k8s API addr instead of assuming local
77 ]
78
79 @staticmethod
80 def can_run() -> Tuple[bool, str]:
81 if not kubernetes_imported:
82 return False, "`kubernetes` python module not found"
83 if not RookEnv().api_version_match():
84 return False, "Rook version unsupported."
85 return True, ''
86
87 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
88 if not kubernetes_imported:
89 return False, "`kubernetes` python module not found", {}
90 elif not self._rook_env.has_namespace():
91 return False, "ceph-mgr not running in Rook cluster", {}
92
93 try:
94 self.k8s.list_namespaced_pod(self._rook_env.namespace)
95 except ApiException as e:
96 return False, "Cannot reach Kubernetes API: {}".format(e), {}
97 else:
98 return True, "", {}
99
100 def __init__(self, *args: Any, **kwargs: Any) -> None:
101 super(RookOrchestrator, self).__init__(*args, **kwargs)
102
103 self._initialized = threading.Event()
104 self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None
105 self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None
106 self._rook_cluster: Optional[RookCluster] = None
107 self._rook_env = RookEnv()
108
109 self._shutdown = threading.Event()
110
111 def shutdown(self) -> None:
112 self._shutdown.set()
113
114 @property
115 def k8s(self):
116 # type: () -> client.CoreV1Api
117 self._initialized.wait()
118 assert self._k8s_CoreV1_api is not None
119 return self._k8s_CoreV1_api
120
121 @property
122 def rook_cluster(self):
123 # type: () -> RookCluster
124 self._initialized.wait()
125 assert self._rook_cluster is not None
126 return self._rook_cluster
127
128 def serve(self) -> None:
129 # For deployed clusters, we should always be running inside
130 # a Rook cluster. For development convenience, also support
131 # running outside (reading ~/.kube config)
132
133 if self._rook_env.has_namespace():
134 config.load_incluster_config()
135 else:
136 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
137 config.load_kube_config()
138
139 # So that I can do port forwarding from my workstation - jcsp
140 from kubernetes.client import configuration
141 configuration.verify_ssl = False
142
143 self._k8s_CoreV1_api = client.CoreV1Api()
144 self._k8s_BatchV1_api = client.BatchV1Api()
145
146 try:
147 # XXX mystery hack -- I need to do an API call from
148 # this context, or subsequent API usage from handle_command
149 # fails with SSLError('bad handshake'). Suspect some kind of
150 # thread context setup in SSL lib?
151 self._k8s_CoreV1_api.list_namespaced_pod(self._rook_env.namespace)
152 except ApiException:
153 # Ignore here to make self.available() fail with a proper error message
154 pass
155
156 self._rook_cluster = RookCluster(
157 self._k8s_CoreV1_api,
158 self._k8s_BatchV1_api,
159 self._rook_env)
160
161 self._initialized.set()
162
163 while not self._shutdown.is_set():
164 self._shutdown.wait(5)
165
166 @handle_orch_error
167 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
168 host_list = None
169 if host_filter and host_filter.hosts:
170 # Explicit host list
171 host_list = host_filter.hosts
172 elif host_filter and host_filter.labels:
173 # TODO: query k8s API to resolve to host list, and pass
174 # it into RookCluster.get_discovered_devices
175 raise NotImplementedError()
176
177 discovered_devs = self.rook_cluster.get_discovered_devices(host_list)
178
179 result = []
180 for host_name, host_devs in discovered_devs.items():
181 devs = []
182 for d in host_devs:
183 if 'cephVolumeData' in d and d['cephVolumeData']:
184 devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData'])))
185 else:
186 devs.append(inventory.Device(
187 path = '/dev/' + d['name'],
188 sys_api = dict(
189 rotational = '1' if d['rotational'] else '0',
190 size = d['size']
191 ),
192 available = False,
193 rejected_reasons=['device data coming from ceph-volume not provided'],
194 ))
195
196 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
197
198 return result
199
200 @handle_orch_error
201 def get_hosts(self):
202 # type: () -> List[orchestrator.HostSpec]
203 return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
204
205 @handle_orch_error
206 def describe_service(self,
207 service_type: Optional[str] = None,
208 service_name: Optional[str] = None,
209 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
210 now = datetime_now()
211
212 # CephCluster
213 cl = self.rook_cluster.rook_api_get(
214 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
215 self.log.debug('CephCluster %s' % cl)
216 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
217 num_nodes = len(self.rook_cluster.get_node_names())
218
219 spec = {}
220 if service_type == 'mon' or service_type is None:
221 spec['mon'] = orchestrator.ServiceDescription(
222 spec=ServiceSpec(
223 'mon',
224 placement=PlacementSpec(
225 count=cl['spec'].get('mon', {}).get('count', 1),
226 ),
227 ),
228 size=cl['spec'].get('mon', {}).get('count', 1),
229 container_image_name=image_name,
230 last_refresh=now,
231 )
232 if service_type == 'mgr' or service_type is None:
233 spec['mgr'] = orchestrator.ServiceDescription(
234 spec=ServiceSpec(
235 'mgr',
236 placement=PlacementSpec.from_string('count:1'),
237 ),
238 size=1,
239 container_image_name=image_name,
240 last_refresh=now,
241 )
242 if not cl['spec'].get('crashCollector', {}).get('disable', False):
243 spec['crash'] = orchestrator.ServiceDescription(
244 spec=ServiceSpec(
245 'crash',
246 placement=PlacementSpec.from_string('*'),
247 ),
248 size=num_nodes,
249 container_image_name=image_name,
250 last_refresh=now,
251 )
252
253 if service_type == 'mds' or service_type is None:
254 # CephFilesystems
255 all_fs = self.rook_cluster.rook_api_get(
256 "cephfilesystems/")
257 self.log.debug('CephFilesystems %s' % all_fs)
258 for fs in all_fs.get('items', []):
259 svc = 'mds.' + fs['metadata']['name']
260 if svc in spec:
261 continue
262 # FIXME: we are conflating active (+ standby) with count
263 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
264 total_mds = active
265 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
266 total_mds = active * 2
267 spec[svc] = orchestrator.ServiceDescription(
268 spec=ServiceSpec(
269 service_type='mds',
270 service_id=fs['metadata']['name'],
271 placement=PlacementSpec(count=active),
272 ),
273 size=total_mds,
274 container_image_name=image_name,
275 last_refresh=now,
276 )
277
278 if service_type == 'rgw' or service_type is None:
279 # CephObjectstores
280 all_zones = self.rook_cluster.rook_api_get(
281 "cephobjectstores/")
282 self.log.debug('CephObjectstores %s' % all_zones)
283 for zone in all_zones.get('items', []):
284 rgw_realm = zone['metadata']['name']
285 rgw_zone = rgw_realm
286 svc = 'rgw.' + rgw_realm + '.' + rgw_zone
287 if svc in spec:
288 continue
289 active = zone['spec']['gateway']['instances'];
290 if 'securePort' in zone['spec']['gateway']:
291 ssl = True
292 port = zone['spec']['gateway']['securePort']
293 else:
294 ssl = False
295 port = zone['spec']['gateway']['port'] or 80
296 spec[svc] = orchestrator.ServiceDescription(
297 spec=RGWSpec(
298 service_id=rgw_realm + '.' + rgw_zone,
299 rgw_realm=rgw_realm,
300 rgw_zone=rgw_zone,
301 ssl=ssl,
302 rgw_frontend_port=port,
303 placement=PlacementSpec(count=active),
304 ),
305 size=active,
306 container_image_name=image_name,
307 last_refresh=now,
308 )
309
310 if service_type == 'nfs' or service_type is None:
311 # CephNFSes
312 all_nfs = self.rook_cluster.rook_api_get(
313 "cephnfses/")
314 self.log.warning('CephNFS %s' % all_nfs)
315 for nfs in all_nfs.get('items', []):
316 nfs_name = nfs['metadata']['name']
317 svc = 'nfs.' + nfs_name
318 if svc in spec:
319 continue
320 active = nfs['spec'].get('server', {}).get('active')
321 spec[svc] = orchestrator.ServiceDescription(
322 spec=NFSServiceSpec(
323 service_id=nfs_name,
324 pool=nfs['spec']['rados']['pool'],
325 namespace=nfs['spec']['rados'].get('namespace', None),
326 placement=PlacementSpec(count=active),
327 ),
328 size=active,
329 last_refresh=now,
330 )
331
332 for dd in self._list_daemons():
333 if dd.service_name() not in spec:
334 continue
335 service = spec[dd.service_name()]
336 service.running += 1
337 if not service.container_image_id:
338 service.container_image_id = dd.container_image_id
339 if not service.container_image_name:
340 service.container_image_name = dd.container_image_name
341 if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh:
342 service.last_refresh = dd.last_refresh
343 if service.created is None or dd.created is None or dd.created < service.created:
344 service.created = dd.created
345
346 return [v for k, v in spec.items()]
347
348 @handle_orch_error
349 def list_daemons(self,
350 service_name: Optional[str] = None,
351 daemon_type: Optional[str] = None,
352 daemon_id: Optional[str] = None,
353 host: Optional[str] = None,
354 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
355 return self._list_daemons(service_name=service_name,
356 daemon_type=daemon_type,
357 daemon_id=daemon_id,
358 host=host,
359 refresh=refresh)
360
361 def _list_daemons(self,
362 service_name: Optional[str] = None,
363 daemon_type: Optional[str] = None,
364 daemon_id: Optional[str] = None,
365 host: Optional[str] = None,
366 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
367 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
368 self.log.debug('pods %s' % pods)
369 result = []
370 for p in pods:
371 sd = orchestrator.DaemonDescription()
372 sd.hostname = p['hostname']
373 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
374 status = {
375 'Pending': orchestrator.DaemonDescriptionStatus.error,
376 'Running': orchestrator.DaemonDescriptionStatus.running,
377 'Succeeded': orchestrator.DaemonDescriptionStatus.stopped,
378 'Failed': orchestrator.DaemonDescriptionStatus.error,
379 'Unknown': orchestrator.DaemonDescriptionStatus.error,
380 }[p['phase']]
381 sd.status = status
382 sd.status_desc = p['phase']
383
384 if 'ceph_daemon_id' in p['labels']:
385 sd.daemon_id = p['labels']['ceph_daemon_id']
386 elif 'ceph-osd-id' in p['labels']:
387 sd.daemon_id = p['labels']['ceph-osd-id']
388 else:
389 # Unknown type -- skip it
390 continue
391
392 if service_name is not None and service_name != sd.service_name():
393 continue
394 sd.container_image_name = p['container_image_name']
395 sd.container_image_id = p['container_image_id']
396 sd.created = p['created']
397 sd.last_configured = p['created']
398 sd.last_deployed = p['created']
399 sd.started = p['started']
400 sd.last_refresh = p['refreshed']
401 result.append(sd)
402
403 return result
404
405 @handle_orch_error
406 def remove_service(self, service_name: str) -> str:
407 service_type, service_name = service_name.split('.', 1)
408 if service_type == 'mds':
409 return self.rook_cluster.rm_service('cephfilesystems', service_name)
410 elif service_type == 'rgw':
411 return self.rook_cluster.rm_service('cephobjectstores', service_name)
412 elif service_type == 'nfs':
413 return self.rook_cluster.rm_service('cephnfses', service_name)
414 else:
415 raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
416
417 @handle_orch_error
418 def apply_mon(self, spec):
419 # type: (ServiceSpec) -> str
420 if spec.placement.hosts or spec.placement.label:
421 raise RuntimeError("Host list or label is not supported by rook.")
422
423 return self.rook_cluster.update_mon_count(spec.placement.count)
424
425 @handle_orch_error
426 def apply_mds(self, spec):
427 # type: (ServiceSpec) -> str
428 return self.rook_cluster.apply_filesystem(spec)
429
430 @handle_orch_error
431 def apply_rgw(self, spec):
432 # type: (RGWSpec) -> str
433 return self.rook_cluster.apply_objectstore(spec)
434
435 @handle_orch_error
436 def apply_nfs(self, spec):
437 # type: (NFSServiceSpec) -> str
438 return self.rook_cluster.apply_nfsgw(spec)
439
440 @handle_orch_error
441 def remove_daemons(self, names: List[str]) -> List[str]:
442 return self.rook_cluster.remove_pods(names)
443
444 @handle_orch_error
445 def create_osds(self, drive_group):
446 # type: (DriveGroupSpec) -> str
447 """ Creates OSDs from a drive group specification.
448
449 $: ceph orch osd create -i <dg.file>
450
451 The drivegroup file must only contain one spec at a time.
452 """
453
454 targets = [] # type: List[str]
455 if drive_group.data_devices and drive_group.data_devices.paths:
456 targets += [d.path for d in drive_group.data_devices.paths]
457 if drive_group.data_directories:
458 targets += drive_group.data_directories
459
460 all_hosts = raise_if_exception(self.get_hosts())
461
462 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
463
464 assert len(matching_hosts) == 1
465
466 if not self.rook_cluster.node_exists(matching_hosts[0]):
467 raise RuntimeError("Node '{0}' is not in the Kubernetes "
468 "cluster".format(matching_hosts))
469
470 # Validate whether cluster CRD can accept individual OSD
471 # creations (i.e. not useAllDevices)
472 if not self.rook_cluster.can_create_osd():
473 raise RuntimeError("Rook cluster configuration does not "
474 "support OSD creation.")
475
476 return self.rook_cluster.add_osds(drive_group, matching_hosts)
477
478 # TODO: this was the code to update the progress reference:
479 """
480 @handle_orch_error
481 def has_osds(matching_hosts: List[str]) -> bool:
482
483 # Find OSD pods on this host
484 pod_osd_ids = set()
485 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
486 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
487 field_selector="spec.nodeName={0}".format(
488 matching_hosts[0]
489 )).items
490 for p in pods:
491 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
492
493 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
494
495 found = []
496 osdmap = self.get("osd_map")
497 for osd in osdmap['osds']:
498 osd_id = osd['osd']
499 if osd_id not in pod_osd_ids:
500 continue
501
502 metadata = self.get_metadata('osd', "%s" % osd_id)
503 if metadata and metadata['devices'] in targets:
504 found.append(osd_id)
505 else:
506 self.log.info("ignoring osd {0} {1}".format(
507 osd_id, metadata['devices'] if metadata else 'DNE'
508 ))
509
510 return found is not None
511 """
512
513 @handle_orch_error
514 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
515 return self.rook_cluster.blink_light(ident_fault, on, locs)