]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rook/module.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
CommitLineData
9f95a23c 1import datetime
11fdf7f2
TL
2import threading
3import functools
4import os
1911f103 5import json
9f95a23c
TL
6
7from ceph.deployment import inventory
8from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
9
11fdf7f2 10try:
9f95a23c
TL
11 from typing import List, Dict, Optional, Callable, Any
12 from ceph.deployment.drive_group import DriveGroupSpec
11fdf7f2
TL
13except ImportError:
14 pass # just for type checking
15
16try:
17 from kubernetes import client, config
18 from kubernetes.client.rest import ApiException
19
20 kubernetes_imported = True
9f95a23c
TL
21
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes.client.models.v1_container_image import V1ContainerImage
24 def names(self, names):
25 self._names = names
26 V1ContainerImage.names = V1ContainerImage.names.setter(names)
27
11fdf7f2
TL
28except ImportError:
29 kubernetes_imported = False
30 client = None
31 config = None
32
33from mgr_module import MgrModule
34import orchestrator
35
36from .rook_cluster import RookCluster
37
38
9f95a23c
TL
39class RookCompletion(orchestrator.Completion):
40 def evaluate(self):
41 self.finalize(None)
11fdf7f2
TL
42
43
44def deferred_read(f):
9f95a23c 45 # type: (Callable) -> Callable[..., RookCompletion]
11fdf7f2
TL
46 """
47 Decorator to make RookOrchestrator methods return
48 a completion object that executes themselves.
49 """
50
51 @functools.wraps(f)
52 def wrapper(*args, **kwargs):
9f95a23c 53 return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
11fdf7f2
TL
54
55 return wrapper
56
57
9f95a23c
TL
58def write_completion(on_complete, # type: Callable
59 message, # type: str
60 mgr,
61 calc_percent=None # type: Optional[Callable[[], RookCompletion]]
62 ):
63 # type: (...) -> RookCompletion
64 return RookCompletion.with_progress(
65 message=message,
66 mgr=mgr,
67 on_complete=lambda _: on_complete(),
68 calc_percent=calc_percent,
69 )
70
71
11fdf7f2
TL
72class RookEnv(object):
73 def __init__(self):
74 # POD_NAMESPACE already exist for Rook 0.9
81eedcae 75 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
11fdf7f2 76
11fdf7f2 77 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
81eedcae 78 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
11fdf7f2 79
9f95a23c 80 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
11fdf7f2
TL
81 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
82 self.api_name = "ceph.rook.io/" + self.crd_version
83
84 def api_version_match(self):
85 return self.crd_version == 'v1'
86
81eedcae
TL
87 def has_namespace(self):
88 return 'POD_NAMESPACE' in os.environ
89
11fdf7f2
TL
90
91class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
9f95a23c
TL
92 """
93 Writes are a two-phase thing, firstly sending
94 the write to the k8s API (fast) and then waiting
95 for the corresponding change to appear in the
96 Ceph cluster (slow)
11fdf7f2 97
801d1391 98 Right now, we are calling the k8s API synchronously.
9f95a23c 99 """
11fdf7f2 100
9f95a23c
TL
101 MODULE_OPTIONS = [
102 # TODO: configure k8s API addr instead of assuming local
103 ] # type: List[Dict[str, Any]]
11fdf7f2 104
9f95a23c
TL
105 def process(self, completions):
106 # type: (List[RookCompletion]) -> None
11fdf7f2 107
9f95a23c
TL
108 if completions:
109 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
11fdf7f2 110
9f95a23c
TL
111 for p in completions:
112 p.evaluate()
11fdf7f2
TL
113
114 @staticmethod
115 def can_run():
116 if not kubernetes_imported:
117 return False, "`kubernetes` python module not found"
118 if not RookEnv().api_version_match():
119 return False, "Rook version unsupported."
120 return True, ''
121
122 def available(self):
123 if not kubernetes_imported:
124 return False, "`kubernetes` python module not found"
81eedcae 125 elif not self._rook_env.has_namespace():
11fdf7f2
TL
126 return False, "ceph-mgr not running in Rook cluster"
127
128 try:
f91f0fd5 129 self.k8s.list_namespaced_pod(self._rook_env.namespace)
11fdf7f2
TL
130 except ApiException as e:
131 return False, "Cannot reach Kubernetes API: {}".format(e)
132 else:
133 return True, ""
134
135 def __init__(self, *args, **kwargs):
136 super(RookOrchestrator, self).__init__(*args, **kwargs)
137
138 self._initialized = threading.Event()
801d1391
TL
139 self._k8s_CoreV1_api = None
140 self._k8s_BatchV1_api = None
11fdf7f2
TL
141 self._rook_cluster = None
142 self._rook_env = RookEnv()
143
144 self._shutdown = threading.Event()
145
9f95a23c
TL
146 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
147
11fdf7f2
TL
148 def shutdown(self):
149 self._shutdown.set()
150
151 @property
152 def k8s(self):
9f95a23c 153 # type: () -> client.CoreV1Api
11fdf7f2 154 self._initialized.wait()
801d1391
TL
155 assert self._k8s_CoreV1_api is not None
156 return self._k8s_CoreV1_api
11fdf7f2
TL
157
158 @property
159 def rook_cluster(self):
160 # type: () -> RookCluster
161 self._initialized.wait()
9f95a23c 162 assert self._rook_cluster is not None
11fdf7f2
TL
163 return self._rook_cluster
164
165 def serve(self):
166 # For deployed clusters, we should always be running inside
167 # a Rook cluster. For development convenience, also support
168 # running outside (reading ~/.kube config)
169
9f95a23c 170 if self._rook_env.has_namespace():
11fdf7f2 171 config.load_incluster_config()
11fdf7f2
TL
172 else:
173 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
174 config.load_kube_config()
175
11fdf7f2
TL
176 # So that I can do port forwarding from my workstation - jcsp
177 from kubernetes.client import configuration
178 configuration.verify_ssl = False
179
801d1391
TL
180 self._k8s_CoreV1_api = client.CoreV1Api()
181 self._k8s_BatchV1_api = client.BatchV1Api()
11fdf7f2
TL
182
183 try:
184 # XXX mystery hack -- I need to do an API call from
185 # this context, or subsequent API usage from handle_command
186 # fails with SSLError('bad handshake'). Suspect some kind of
187 # thread context setup in SSL lib?
f91f0fd5 188 self._k8s_CoreV1_api.list_namespaced_pod(self._rook_env.namespace)
11fdf7f2
TL
189 except ApiException:
190 # Ignore here to make self.available() fail with a proper error message
191 pass
192
193 self._rook_cluster = RookCluster(
801d1391
TL
194 self._k8s_CoreV1_api,
195 self._k8s_BatchV1_api,
11fdf7f2
TL
196 self._rook_env)
197
198 self._initialized.set()
199
200 while not self._shutdown.is_set():
201 # XXX hack (or is it?) to kick all completions periodically,
202 # in case we had a caller that wait()'ed on them long enough
203 # to get persistence but not long enough to get completion
204
9f95a23c
TL
205 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
206 for p in self.all_progress_references:
207 p.update()
11fdf7f2
TL
208
209 self._shutdown.wait(5)
210
9f95a23c
TL
211 def cancel_completions(self):
212 for p in self.all_progress_references:
213 p.fail()
214 self.all_progress_references.clear()
11fdf7f2
TL
215
216 @deferred_read
9f95a23c
TL
217 def get_inventory(self, host_filter=None, refresh=False):
218 host_list = None
219 if host_filter and host_filter.hosts:
220 # Explicit host list
221 host_list = host_filter.hosts
222 elif host_filter and host_filter.labels:
223 # TODO: query k8s API to resolve to host list, and pass
11fdf7f2
TL
224 # it into RookCluster.get_discovered_devices
225 raise NotImplementedError()
226
9f95a23c 227 devs = self.rook_cluster.get_discovered_devices(host_list)
11fdf7f2
TL
228
229 result = []
9f95a23c 230 for host_name, host_devs in devs.items():
11fdf7f2 231 devs = []
9f95a23c 232 for d in host_devs:
1911f103
TL
233 if 'cephVolumeData' in d and d['cephVolumeData']:
234 devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData'])))
235 else:
236 devs.append(inventory.Device(
237 path = '/dev/' + d['name'],
238 sys_api = dict(
239 rotational = '1' if d['rotational'] else '0',
240 size = d['size']
241 ),
242 available = False,
243 rejected_reasons=['device data coming from ceph-volume not provided'],
244 ))
11fdf7f2 245
9f95a23c 246 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
11fdf7f2
TL
247
248 return result
249
250 @deferred_read
9f95a23c
TL
251 def get_hosts(self):
252 # type: () -> List[orchestrator.HostSpec]
253 return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
11fdf7f2 254
9f95a23c
TL
255 @deferred_read
256 def describe_service(self, service_type=None, service_name=None,
257 refresh=False):
258 now = datetime.datetime.utcnow()
259
260 # CephCluster
261 cl = self.rook_cluster.rook_api_get(
262 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
263 self.log.debug('CephCluster %s' % cl)
264 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
265 num_nodes = len(self.rook_cluster.get_node_names())
266
267 spec = {}
268 spec['mon'] = orchestrator.ServiceDescription(
9f95a23c
TL
269 spec=ServiceSpec(
270 'mon',
271 placement=PlacementSpec(
272 count=cl['spec'].get('mon', {}).get('count', 1),
273 ),
274 ),
275 size=cl['spec'].get('mon', {}).get('count', 1),
276 container_image_name=image_name,
277 last_refresh=now,
278 )
279 spec['mgr'] = orchestrator.ServiceDescription(
9f95a23c
TL
280 spec=ServiceSpec(
281 'mgr',
282 placement=PlacementSpec.from_string('count:1'),
283 ),
284 size=1,
285 container_image_name=image_name,
286 last_refresh=now,
287 )
288 if not cl['spec'].get('crashCollector', {}).get('disable', False):
289 spec['crash'] = orchestrator.ServiceDescription(
9f95a23c
TL
290 spec=ServiceSpec(
291 'crash',
292 placement=PlacementSpec.from_string('*'),
293 ),
294 size=num_nodes,
295 container_image_name=image_name,
296 last_refresh=now,
297 )
298
299 # CephFilesystems
300 all_fs = self.rook_cluster.rook_api_get(
301 "cephfilesystems/")
302 self.log.debug('CephFilesystems %s' % all_fs)
303 for fs in all_fs.get('items', []):
304 svc = 'mds.' + fs['metadata']['name']
305 if svc in spec:
306 continue
307 # FIXME: we are conflating active (+ standby) with count
308 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
309 total_mds = active
310 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
311 total_mds = active * 2
312 spec[svc] = orchestrator.ServiceDescription(
9f95a23c 313 spec=ServiceSpec(
1911f103
TL
314 service_type='mds',
315 service_id=fs['metadata']['name'],
9f95a23c
TL
316 placement=PlacementSpec(count=active),
317 ),
318 size=total_mds,
319 container_image_name=image_name,
320 last_refresh=now,
321 )
322
323 # CephObjectstores
324 all_zones = self.rook_cluster.rook_api_get(
325 "cephobjectstores/")
326 self.log.debug('CephObjectstores %s' % all_zones)
327 for zone in all_zones.get('items', []):
328 rgw_realm = zone['metadata']['name']
329 rgw_zone = rgw_realm
330 svc = 'rgw.' + rgw_realm + '.' + rgw_zone
331 if svc in spec:
332 continue
333 active = zone['spec']['gateway']['instances'];
334 if 'securePort' in zone['spec']['gateway']:
335 ssl = True
336 port = zone['spec']['gateway']['securePort']
337 else:
338 ssl = False
339 port = zone['spec']['gateway']['port'] or 80
340 spec[svc] = orchestrator.ServiceDescription(
9f95a23c 341 spec=RGWSpec(
1911f103 342 service_id=rgw_realm + '.' + rgw_zone,
9f95a23c
TL
343 rgw_realm=rgw_realm,
344 rgw_zone=rgw_zone,
345 ssl=ssl,
346 rgw_frontend_port=port,
347 placement=PlacementSpec(count=active),
348 ),
349 size=active,
350 container_image_name=image_name,
351 last_refresh=now,
352 )
353
354 for dd in self._list_daemons():
355 if dd.service_name() not in spec:
356 continue
357 spec[dd.service_name()].running += 1
358 return [v for k, v in spec.items()]
11fdf7f2 359
9f95a23c 360 @deferred_read
801d1391 361 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
9f95a23c 362 refresh=False):
f91f0fd5
TL
363 return self._list_daemons(service_name=service_name,
364 daemon_type=daemon_type,
365 daemon_id=daemon_id,
366 host=host,
367 refresh=refresh)
9f95a23c 368
801d1391 369 def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
9f95a23c
TL
370 refresh=False):
371 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
372 self.log.debug('pods %s' % pods)
11fdf7f2
TL
373 result = []
374 for p in pods:
9f95a23c
TL
375 sd = orchestrator.DaemonDescription()
376 sd.hostname = p['hostname']
11fdf7f2 377 sd.container_id = p['name']
9f95a23c
TL
378 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
379 status = {
380 'Pending': -1,
381 'Running': 1,
382 'Succeeded': 0,
383 'Failed': -1,
384 'Unknown': -1,
385 }[p['phase']]
386 sd.status = status
387 sd.status_desc = p['phase']
388
389 if 'ceph_daemon_id' in p['labels']:
390 sd.daemon_id = p['labels']['ceph_daemon_id']
391 elif 'ceph-osd-id' in p['labels']:
392 sd.daemon_id = p['labels']['ceph-osd-id']
11fdf7f2
TL
393 else:
394 # Unknown type -- skip it
395 continue
396
801d1391
TL
397 if service_name is not None and service_name != sd.service_name():
398 continue
9f95a23c 399 sd.container_image_name = p['container_image_name']
9f95a23c
TL
400 sd.created = p['created']
401 sd.last_configured = p['created']
402 sd.last_deployed = p['created']
403 sd.started = p['started']
404 sd.last_refresh = p['refreshed']
11fdf7f2
TL
405 result.append(sd)
406
407 return result
408
409 def _service_add_decorate(self, typename, spec, func):
9f95a23c
TL
410 return write_completion(
411 on_complete=lambda : func(spec),
412 message="Creating {} services for {}".format(typename, spec.service_id),
413 mgr=self
414 )
415
416 def add_nfs(self, spec):
417 # type: (NFSServiceSpec) -> RookCompletion
418 return self._service_add_decorate("NFS", spec,
419 self.rook_cluster.add_nfsgw)
420
421 def _service_rm_decorate(self, typename, name, func):
422 return write_completion(
423 on_complete=lambda : func(name),
424 message="Removing {} services for {}".format(typename, name),
425 mgr=self
426 )
427
428 def remove_service(self, service_type, service_name):
429 if service_type == 'mds':
430 return self._service_rm_decorate(
431 'MDS', service_name, lambda: self.rook_cluster.rm_service(
432 'cephfilesystems', service_name)
433 )
434 elif service_type == 'rgw':
435 return self._service_rm_decorate(
436 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
437 )
438 elif service_type == 'nfs':
439 return self._service_rm_decorate(
440 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
441 )
442
443 def apply_mon(self, spec):
444 # type: (ServiceSpec) -> RookCompletion
445 if spec.placement.hosts or spec.placement.label:
446 raise RuntimeError("Host list or label is not supported by rook.")
447
448 return write_completion(
449 lambda: self.rook_cluster.update_mon_count(spec.placement.count),
450 "Updating mon count to {0}".format(spec.placement.count),
451 mgr=self
452 )
453
454 def apply_mds(self, spec):
455 # type: (ServiceSpec) -> RookCompletion
456 return self._service_add_decorate('MDS', spec,
457 self.rook_cluster.apply_filesystem)
458
459 def apply_rgw(self, spec):
460 # type: (RGWSpec) -> RookCompletion
461 return self._service_add_decorate('RGW', spec,
462 self.rook_cluster.apply_objectstore)
463
9f95a23c
TL
464 def apply_nfs(self, spec):
465 # type: (NFSServiceSpec) -> RookCompletion
466 num = spec.placement.count
467 return write_completion(
468 lambda: self.rook_cluster.update_nfs_count(spec.service_id, num),
469 "Updating NFS server count in {0} to {1}".format(spec.service_id, num),
470 mgr=self
471 )
472
473 def remove_daemons(self, names):
474 return write_completion(
475 lambda: self.rook_cluster.remove_pods(names),
476 "Removing daemons {}".format(','.join(names)),
477 mgr=self
478 )
479
480 def create_osds(self, drive_group):
481 # type: (DriveGroupSpec) -> RookCompletion
482 """ Creates OSDs from a drive group specification.
483
484 $: ceph orch osd create -i <dg.file>
485
486 The drivegroup file must only contain one spec at a time.
487 """
488
489 targets = [] # type: List[str]
490 if drive_group.data_devices and drive_group.data_devices.paths:
491 targets += [d.path for d in drive_group.data_devices.paths]
11fdf7f2
TL
492 if drive_group.data_directories:
493 targets += drive_group.data_directories
494
9f95a23c
TL
495 def execute(all_hosts_):
496 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
f6b5b4d7 497 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_)
9f95a23c
TL
498
499 assert len(matching_hosts) == 1
500
501 if not self.rook_cluster.node_exists(matching_hosts[0]):
502 raise RuntimeError("Node '{0}' is not in the Kubernetes "
503 "cluster".format(matching_hosts))
504
505 # Validate whether cluster CRD can accept individual OSD
506 # creations (i.e. not useAllDevices)
507 if not self.rook_cluster.can_create_osd():
508 raise RuntimeError("Rook cluster configuration does not "
509 "support OSD creation.")
11fdf7f2 510
9f95a23c
TL
511 return orchestrator.Completion.with_progress(
512 message="Creating OSD on {0}:{1}".format(
513 matching_hosts,
514 targets),
515 mgr=self,
e306af50
TL
516 on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
517 calc_percent=lambda: has_osds(matching_hosts)
9f95a23c 518 )
11fdf7f2 519
9f95a23c 520 @deferred_read
e306af50 521 def has_osds(matching_hosts):
11fdf7f2 522
11fdf7f2
TL
523 # Find OSD pods on this host
524 pod_osd_ids = set()
9f95a23c 525 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
11fdf7f2
TL
526 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
527 field_selector="spec.nodeName={0}".format(
9f95a23c 528 matching_hosts[0]
11fdf7f2
TL
529 )).items
530 for p in pods:
531 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
532
533 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
534
535 found = []
536 osdmap = self.get("osd_map")
537 for osd in osdmap['osds']:
538 osd_id = osd['osd']
539 if osd_id not in pod_osd_ids:
540 continue
541
542 metadata = self.get_metadata('osd', "%s" % osd_id)
543 if metadata and metadata['devices'] in targets:
544 found.append(osd_id)
545 else:
546 self.log.info("ignoring osd {0} {1}".format(
547 osd_id, metadata['devices']
548 ))
549
550 return found is not None
551
9f95a23c
TL
552 c = self.get_hosts().then(execute)
553 return c
801d1391
TL
554
555 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
556 return write_completion(
557 on_complete=lambda: self.rook_cluster.blink_light(
558 ident_fault, on, locs),
559 message="Switching <{}> identification light in {}".format(
560 on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
561 mgr=self
562 )