]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
1 import datetime
2 import threading
3 import functools
4 import os
5 import json
6
7 from ceph.deployment import inventory
8 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
9
10 try:
11 from typing import List, Dict, Optional, Callable, Any, Tuple
12 from ceph.deployment.drive_group import DriveGroupSpec
13 except ImportError:
14 pass # just for type checking
15
16 try:
17 from kubernetes import client, config
18 from kubernetes.client.rest import ApiException
19
20 kubernetes_imported = True
21
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes.client.models.v1_container_image import V1ContainerImage
24 def names(self, names):
25 self._names = names
26 V1ContainerImage.names = V1ContainerImage.names.setter(names)
27
28 except ImportError:
29 kubernetes_imported = False
30 client = None
31 config = None
32
33 from mgr_module import MgrModule, Option
34 import orchestrator
35 from orchestrator import handle_orch_error, OrchResult, raise_if_exception
36
37 from .rook_cluster import RookCluster
38
39
40
41
42 class RookEnv(object):
43 def __init__(self):
44 # POD_NAMESPACE already exist for Rook 0.9
45 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
46
47 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
48 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
49
50 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
51 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
52 self.api_name = "ceph.rook.io/" + self.crd_version
53
54 def api_version_match(self):
55 return self.crd_version == 'v1'
56
57 def has_namespace(self):
58 return 'POD_NAMESPACE' in os.environ
59
60
61 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
62 """
63 Writes are a two-phase thing, firstly sending
64 the write to the k8s API (fast) and then waiting
65 for the corresponding change to appear in the
66 Ceph cluster (slow)
67
68 Right now, we are calling the k8s API synchronously.
69 """
70
71 MODULE_OPTIONS: List[Option] = [
72 # TODO: configure k8s API addr instead of assuming local
73 ]
74
75 @staticmethod
76 def can_run():
77 if not kubernetes_imported:
78 return False, "`kubernetes` python module not found"
79 if not RookEnv().api_version_match():
80 return False, "Rook version unsupported."
81 return True, ''
82
83 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
84 if not kubernetes_imported:
85 return False, "`kubernetes` python module not found", {}
86 elif not self._rook_env.has_namespace():
87 return False, "ceph-mgr not running in Rook cluster", {}
88
89 try:
90 self.k8s.list_namespaced_pod(self._rook_env.namespace)
91 except ApiException as e:
92 return False, "Cannot reach Kubernetes API: {}".format(e), {}
93 else:
94 return True, "", {}
95
96 def __init__(self, *args, **kwargs):
97 super(RookOrchestrator, self).__init__(*args, **kwargs)
98
99 self._initialized = threading.Event()
100 self._k8s_CoreV1_api = None
101 self._k8s_BatchV1_api = None
102 self._rook_cluster = None
103 self._rook_env = RookEnv()
104
105 self._shutdown = threading.Event()
106
107 def shutdown(self) -> None:
108 self._shutdown.set()
109
110 @property
111 def k8s(self):
112 # type: () -> client.CoreV1Api
113 self._initialized.wait()
114 assert self._k8s_CoreV1_api is not None
115 return self._k8s_CoreV1_api
116
117 @property
118 def rook_cluster(self):
119 # type: () -> RookCluster
120 self._initialized.wait()
121 assert self._rook_cluster is not None
122 return self._rook_cluster
123
124 def serve(self):
125 # For deployed clusters, we should always be running inside
126 # a Rook cluster. For development convenience, also support
127 # running outside (reading ~/.kube config)
128
129 if self._rook_env.has_namespace():
130 config.load_incluster_config()
131 else:
132 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
133 config.load_kube_config()
134
135 # So that I can do port forwarding from my workstation - jcsp
136 from kubernetes.client import configuration
137 configuration.verify_ssl = False
138
139 self._k8s_CoreV1_api = client.CoreV1Api()
140 self._k8s_BatchV1_api = client.BatchV1Api()
141
142 try:
143 # XXX mystery hack -- I need to do an API call from
144 # this context, or subsequent API usage from handle_command
145 # fails with SSLError('bad handshake'). Suspect some kind of
146 # thread context setup in SSL lib?
147 self._k8s_CoreV1_api.list_namespaced_pod(self._rook_env.namespace)
148 except ApiException:
149 # Ignore here to make self.available() fail with a proper error message
150 pass
151
152 self._rook_cluster = RookCluster(
153 self._k8s_CoreV1_api,
154 self._k8s_BatchV1_api,
155 self._rook_env)
156
157 self._initialized.set()
158
159 while not self._shutdown.is_set():
160 self._shutdown.wait(5)
161
162 @handle_orch_error
163 def get_inventory(self, host_filter=None, refresh=False):
164 host_list = None
165 if host_filter and host_filter.hosts:
166 # Explicit host list
167 host_list = host_filter.hosts
168 elif host_filter and host_filter.labels:
169 # TODO: query k8s API to resolve to host list, and pass
170 # it into RookCluster.get_discovered_devices
171 raise NotImplementedError()
172
173 devs = self.rook_cluster.get_discovered_devices(host_list)
174
175 result = []
176 for host_name, host_devs in devs.items():
177 devs = []
178 for d in host_devs:
179 if 'cephVolumeData' in d and d['cephVolumeData']:
180 devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData'])))
181 else:
182 devs.append(inventory.Device(
183 path = '/dev/' + d['name'],
184 sys_api = dict(
185 rotational = '1' if d['rotational'] else '0',
186 size = d['size']
187 ),
188 available = False,
189 rejected_reasons=['device data coming from ceph-volume not provided'],
190 ))
191
192 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
193
194 return result
195
196 @handle_orch_error
197 def get_hosts(self):
198 # type: () -> List[orchestrator.HostSpec]
199 return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
200
201 @handle_orch_error
202 def describe_service(self, service_type=None, service_name=None,
203 refresh=False):
204 now = datetime.datetime.utcnow()
205
206 # CephCluster
207 cl = self.rook_cluster.rook_api_get(
208 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
209 self.log.debug('CephCluster %s' % cl)
210 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
211 num_nodes = len(self.rook_cluster.get_node_names())
212
213 spec = {}
214 if service_type == 'mon' or service_type is None:
215 spec['mon'] = orchestrator.ServiceDescription(
216 spec=ServiceSpec(
217 'mon',
218 placement=PlacementSpec(
219 count=cl['spec'].get('mon', {}).get('count', 1),
220 ),
221 ),
222 size=cl['spec'].get('mon', {}).get('count', 1),
223 container_image_name=image_name,
224 last_refresh=now,
225 )
226 if service_type == 'mgr' or service_type is None:
227 spec['mgr'] = orchestrator.ServiceDescription(
228 spec=ServiceSpec(
229 'mgr',
230 placement=PlacementSpec.from_string('count:1'),
231 ),
232 size=1,
233 container_image_name=image_name,
234 last_refresh=now,
235 )
236 if not cl['spec'].get('crashCollector', {}).get('disable', False):
237 spec['crash'] = orchestrator.ServiceDescription(
238 spec=ServiceSpec(
239 'crash',
240 placement=PlacementSpec.from_string('*'),
241 ),
242 size=num_nodes,
243 container_image_name=image_name,
244 last_refresh=now,
245 )
246
247 if service_type == 'mds' or service_type is None:
248 # CephFilesystems
249 all_fs = self.rook_cluster.rook_api_get(
250 "cephfilesystems/")
251 self.log.debug('CephFilesystems %s' % all_fs)
252 for fs in all_fs.get('items', []):
253 svc = 'mds.' + fs['metadata']['name']
254 if svc in spec:
255 continue
256 # FIXME: we are conflating active (+ standby) with count
257 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
258 total_mds = active
259 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
260 total_mds = active * 2
261 spec[svc] = orchestrator.ServiceDescription(
262 spec=ServiceSpec(
263 service_type='mds',
264 service_id=fs['metadata']['name'],
265 placement=PlacementSpec(count=active),
266 ),
267 size=total_mds,
268 container_image_name=image_name,
269 last_refresh=now,
270 )
271
272 if service_type == 'rgw' or service_type is None:
273 # CephObjectstores
274 all_zones = self.rook_cluster.rook_api_get(
275 "cephobjectstores/")
276 self.log.debug('CephObjectstores %s' % all_zones)
277 for zone in all_zones.get('items', []):
278 rgw_realm = zone['metadata']['name']
279 rgw_zone = rgw_realm
280 svc = 'rgw.' + rgw_realm + '.' + rgw_zone
281 if svc in spec:
282 continue
283 active = zone['spec']['gateway']['instances'];
284 if 'securePort' in zone['spec']['gateway']:
285 ssl = True
286 port = zone['spec']['gateway']['securePort']
287 else:
288 ssl = False
289 port = zone['spec']['gateway']['port'] or 80
290 spec[svc] = orchestrator.ServiceDescription(
291 spec=RGWSpec(
292 service_id=rgw_realm + '.' + rgw_zone,
293 rgw_realm=rgw_realm,
294 rgw_zone=rgw_zone,
295 ssl=ssl,
296 rgw_frontend_port=port,
297 placement=PlacementSpec(count=active),
298 ),
299 size=active,
300 container_image_name=image_name,
301 last_refresh=now,
302 )
303
304 if service_type == 'nfs' or service_type is None:
305 # CephNFSes
306 all_nfs = self.rook_cluster.rook_api_get(
307 "cephnfses/")
308 self.log.warning('CephNFS %s' % all_nfs)
309 for nfs in all_nfs.get('items', []):
310 nfs_name = nfs['metadata']['name']
311 svc = 'nfs.' + nfs_name
312 if svc in spec:
313 continue
314 active = nfs['spec'].get('server', {}).get('active')
315 spec[svc] = orchestrator.ServiceDescription(
316 spec=NFSServiceSpec(
317 service_id=nfs_name,
318 pool=nfs['spec']['rados']['pool'],
319 namespace=nfs['spec']['rados'].get('namespace', None),
320 placement=PlacementSpec(count=active),
321 ),
322 size=active,
323 last_refresh=now,
324 )
325
326 for dd in self._list_daemons():
327 if dd.service_name() not in spec:
328 continue
329 service = spec[dd.service_name()]
330 service.running += 1
331 if not service.container_image_id:
332 service.container_image_id = dd.container_image_id
333 if not service.container_image_name:
334 service.container_image_name = dd.container_image_name
335 if not service.last_refresh or not dd.last_refresh or dd.last_refresh < service.last_refresh:
336 service.last_refresh = dd.last_refresh
337 if not service.created or dd.created < service.created:
338 service.created = dd.created
339
340 return [v for k, v in spec.items()]
341
342 @handle_orch_error
343 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
344 refresh=False):
345 return self._list_daemons(service_name=service_name,
346 daemon_type=daemon_type,
347 daemon_id=daemon_id,
348 host=host,
349 refresh=refresh)
350
351 def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
352 refresh=False):
353 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
354 self.log.debug('pods %s' % pods)
355 result = []
356 for p in pods:
357 sd = orchestrator.DaemonDescription()
358 sd.hostname = p['hostname']
359 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
360 status = {
361 'Pending': orchestrator.DaemonDescriptionStatus.error,
362 'Running': orchestrator.DaemonDescriptionStatus.running,
363 'Succeeded': orchestrator.DaemonDescriptionStatus.stopped,
364 'Failed': orchestrator.DaemonDescriptionStatus.error,
365 'Unknown': orchestrator.DaemonDescriptionStatus.error,
366 }[p['phase']]
367 sd.status = status
368 sd.status_desc = p['phase']
369
370 if 'ceph_daemon_id' in p['labels']:
371 sd.daemon_id = p['labels']['ceph_daemon_id']
372 elif 'ceph-osd-id' in p['labels']:
373 sd.daemon_id = p['labels']['ceph-osd-id']
374 else:
375 # Unknown type -- skip it
376 continue
377
378 if service_name is not None and service_name != sd.service_name():
379 continue
380 sd.container_image_name = p['container_image_name']
381 sd.container_image_id = p['container_image_id']
382 sd.created = p['created']
383 sd.last_configured = p['created']
384 sd.last_deployed = p['created']
385 sd.started = p['started']
386 sd.last_refresh = p['refreshed']
387 result.append(sd)
388
389 return result
390
391 @handle_orch_error
392 def remove_service(self, service_name: str) -> str:
393 service_type, service_name = service_name.split('.', 1)
394 if service_type == 'mds':
395 return self.rook_cluster.rm_service('cephfilesystems', service_name)
396 elif service_type == 'rgw':
397 return self.rook_cluster.rm_service('cephobjectstores', service_name)
398 elif service_type == 'nfs':
399 return self.rook_cluster.rm_service('cephnfses', service_name)
400 else:
401 raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
402
403 @handle_orch_error
404 def apply_mon(self, spec):
405 # type: (ServiceSpec) -> str
406 if spec.placement.hosts or spec.placement.label:
407 raise RuntimeError("Host list or label is not supported by rook.")
408
409 return self.rook_cluster.update_mon_count(spec.placement.count)
410
411 @handle_orch_error
412 def apply_mds(self, spec):
413 # type: (ServiceSpec) -> str
414 return self.rook_cluster.apply_filesystem(spec)
415
416 @handle_orch_error
417 def apply_rgw(self, spec):
418 # type: (RGWSpec) -> str
419 return self.rook_cluster.apply_objectstore(spec)
420
421 @handle_orch_error
422 def apply_nfs(self, spec):
423 # type: (NFSServiceSpec) -> str
424 return self.rook_cluster.apply_nfsgw(spec)
425
426 @handle_orch_error
427 def remove_daemons(self, names: List[str]) -> List[str]:
428 return self.rook_cluster.remove_pods(names)
429
430 @handle_orch_error
431 def create_osds(self, drive_group):
432 # type: (DriveGroupSpec) -> str
433 """ Creates OSDs from a drive group specification.
434
435 $: ceph orch osd create -i <dg.file>
436
437 The drivegroup file must only contain one spec at a time.
438 """
439
440 targets = [] # type: List[str]
441 if drive_group.data_devices and drive_group.data_devices.paths:
442 targets += [d.path for d in drive_group.data_devices.paths]
443 if drive_group.data_directories:
444 targets += drive_group.data_directories
445
446 all_hosts = raise_if_exception(self.get_hosts())
447
448 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
449
450 assert len(matching_hosts) == 1
451
452 if not self.rook_cluster.node_exists(matching_hosts[0]):
453 raise RuntimeError("Node '{0}' is not in the Kubernetes "
454 "cluster".format(matching_hosts))
455
456 # Validate whether cluster CRD can accept individual OSD
457 # creations (i.e. not useAllDevices)
458 if not self.rook_cluster.can_create_osd():
459 raise RuntimeError("Rook cluster configuration does not "
460 "support OSD creation.")
461
462 return self.rook_cluster.add_osds(drive_group, matching_hosts)
463
464 # TODO: this was the code to update the progress reference:
465 """
466 @handle_orch_error
467 def has_osds(matching_hosts: List[str]) -> bool:
468
469 # Find OSD pods on this host
470 pod_osd_ids = set()
471 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
472 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
473 field_selector="spec.nodeName={0}".format(
474 matching_hosts[0]
475 )).items
476 for p in pods:
477 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
478
479 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
480
481 found = []
482 osdmap = self.get("osd_map")
483 for osd in osdmap['osds']:
484 osd_id = osd['osd']
485 if osd_id not in pod_osd_ids:
486 continue
487
488 metadata = self.get_metadata('osd', "%s" % osd_id)
489 if metadata and metadata['devices'] in targets:
490 found.append(osd_id)
491 else:
492 self.log.info("ignoring osd {0} {1}".format(
493 osd_id, metadata['devices'] if metadata else 'DNE'
494 ))
495
496 return found is not None
497 """
498
499 @handle_orch_error
500 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
501 return self.rook_cluster.blink_light(ident_fault, on, locs)