]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rook/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
CommitLineData
9f95a23c 1import datetime
11fdf7f2
TL
2import threading
3import functools
4import os
1911f103 5import json
9f95a23c
TL
6
7from ceph.deployment import inventory
8from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
9
11fdf7f2 10try:
9f95a23c
TL
11 from typing import List, Dict, Optional, Callable, Any
12 from ceph.deployment.drive_group import DriveGroupSpec
11fdf7f2
TL
13except ImportError:
14 pass # just for type checking
15
16try:
17 from kubernetes import client, config
18 from kubernetes.client.rest import ApiException
19
20 kubernetes_imported = True
9f95a23c
TL
21
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes.client.models.v1_container_image import V1ContainerImage
24 def names(self, names):
25 self._names = names
26 V1ContainerImage.names = V1ContainerImage.names.setter(names)
27
11fdf7f2
TL
28except ImportError:
29 kubernetes_imported = False
30 client = None
31 config = None
32
33from mgr_module import MgrModule
34import orchestrator
35
36from .rook_cluster import RookCluster
37
38
9f95a23c
TL
39class RookCompletion(orchestrator.Completion):
40 def evaluate(self):
41 self.finalize(None)
11fdf7f2
TL
42
43
44def deferred_read(f):
9f95a23c 45 # type: (Callable) -> Callable[..., RookCompletion]
11fdf7f2
TL
46 """
47 Decorator to make RookOrchestrator methods return
48 a completion object that executes themselves.
49 """
50
51 @functools.wraps(f)
52 def wrapper(*args, **kwargs):
9f95a23c 53 return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
11fdf7f2
TL
54
55 return wrapper
56
57
9f95a23c
TL
58def write_completion(on_complete, # type: Callable
59 message, # type: str
60 mgr,
61 calc_percent=None # type: Optional[Callable[[], RookCompletion]]
62 ):
63 # type: (...) -> RookCompletion
64 return RookCompletion.with_progress(
65 message=message,
66 mgr=mgr,
67 on_complete=lambda _: on_complete(),
68 calc_percent=calc_percent,
69 )
70
71
11fdf7f2
TL
72class RookEnv(object):
73 def __init__(self):
74 # POD_NAMESPACE already exist for Rook 0.9
81eedcae 75 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
11fdf7f2 76
11fdf7f2 77 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
81eedcae 78 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
11fdf7f2 79
9f95a23c 80 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
11fdf7f2
TL
81 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
82 self.api_name = "ceph.rook.io/" + self.crd_version
83
84 def api_version_match(self):
85 return self.crd_version == 'v1'
86
81eedcae
TL
87 def has_namespace(self):
88 return 'POD_NAMESPACE' in os.environ
89
11fdf7f2
TL
90
91class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
9f95a23c
TL
92 """
93 Writes are a two-phase thing, firstly sending
94 the write to the k8s API (fast) and then waiting
95 for the corresponding change to appear in the
96 Ceph cluster (slow)
11fdf7f2 97
801d1391 98 Right now, we are calling the k8s API synchronously.
9f95a23c 99 """
11fdf7f2 100
9f95a23c
TL
101 MODULE_OPTIONS = [
102 # TODO: configure k8s API addr instead of assuming local
103 ] # type: List[Dict[str, Any]]
11fdf7f2 104
9f95a23c
TL
105 def process(self, completions):
106 # type: (List[RookCompletion]) -> None
11fdf7f2 107
9f95a23c
TL
108 if completions:
109 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
11fdf7f2 110
9f95a23c
TL
111 for p in completions:
112 p.evaluate()
11fdf7f2
TL
113
114 @staticmethod
115 def can_run():
116 if not kubernetes_imported:
117 return False, "`kubernetes` python module not found"
118 if not RookEnv().api_version_match():
119 return False, "Rook version unsupported."
120 return True, ''
121
122 def available(self):
123 if not kubernetes_imported:
124 return False, "`kubernetes` python module not found"
81eedcae 125 elif not self._rook_env.has_namespace():
11fdf7f2
TL
126 return False, "ceph-mgr not running in Rook cluster"
127
128 try:
129 self.k8s.list_namespaced_pod(self._rook_env.cluster_name)
130 except ApiException as e:
131 return False, "Cannot reach Kubernetes API: {}".format(e)
132 else:
133 return True, ""
134
135 def __init__(self, *args, **kwargs):
136 super(RookOrchestrator, self).__init__(*args, **kwargs)
137
138 self._initialized = threading.Event()
801d1391
TL
139 self._k8s_CoreV1_api = None
140 self._k8s_BatchV1_api = None
11fdf7f2
TL
141 self._rook_cluster = None
142 self._rook_env = RookEnv()
143
144 self._shutdown = threading.Event()
145
9f95a23c
TL
146 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
147
11fdf7f2
TL
148 def shutdown(self):
149 self._shutdown.set()
150
151 @property
152 def k8s(self):
9f95a23c 153 # type: () -> client.CoreV1Api
11fdf7f2 154 self._initialized.wait()
801d1391
TL
155 assert self._k8s_CoreV1_api is not None
156 return self._k8s_CoreV1_api
11fdf7f2
TL
157
158 @property
159 def rook_cluster(self):
160 # type: () -> RookCluster
161 self._initialized.wait()
9f95a23c 162 assert self._rook_cluster is not None
11fdf7f2
TL
163 return self._rook_cluster
164
165 def serve(self):
166 # For deployed clusters, we should always be running inside
167 # a Rook cluster. For development convenience, also support
168 # running outside (reading ~/.kube config)
169
9f95a23c 170 if self._rook_env.has_namespace():
11fdf7f2
TL
171 config.load_incluster_config()
172 cluster_name = self._rook_env.cluster_name
173 else:
174 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
175 config.load_kube_config()
176
177 cluster_name = "rook-ceph"
178
179 # So that I can do port forwarding from my workstation - jcsp
180 from kubernetes.client import configuration
181 configuration.verify_ssl = False
182
801d1391
TL
183 self._k8s_CoreV1_api = client.CoreV1Api()
184 self._k8s_BatchV1_api = client.BatchV1Api()
11fdf7f2
TL
185
186 try:
187 # XXX mystery hack -- I need to do an API call from
188 # this context, or subsequent API usage from handle_command
189 # fails with SSLError('bad handshake'). Suspect some kind of
190 # thread context setup in SSL lib?
801d1391 191 self._k8s_CoreV1_api.list_namespaced_pod(cluster_name)
11fdf7f2
TL
192 except ApiException:
193 # Ignore here to make self.available() fail with a proper error message
194 pass
195
196 self._rook_cluster = RookCluster(
801d1391
TL
197 self._k8s_CoreV1_api,
198 self._k8s_BatchV1_api,
11fdf7f2
TL
199 self._rook_env)
200
201 self._initialized.set()
202
203 while not self._shutdown.is_set():
204 # XXX hack (or is it?) to kick all completions periodically,
205 # in case we had a caller that wait()'ed on them long enough
206 # to get persistence but not long enough to get completion
207
9f95a23c
TL
208 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
209 for p in self.all_progress_references:
210 p.update()
11fdf7f2
TL
211
212 self._shutdown.wait(5)
213
9f95a23c
TL
214 def cancel_completions(self):
215 for p in self.all_progress_references:
216 p.fail()
217 self.all_progress_references.clear()
11fdf7f2
TL
218
219 @deferred_read
9f95a23c
TL
220 def get_inventory(self, host_filter=None, refresh=False):
221 host_list = None
222 if host_filter and host_filter.hosts:
223 # Explicit host list
224 host_list = host_filter.hosts
225 elif host_filter and host_filter.labels:
226 # TODO: query k8s API to resolve to host list, and pass
11fdf7f2
TL
227 # it into RookCluster.get_discovered_devices
228 raise NotImplementedError()
229
9f95a23c 230 devs = self.rook_cluster.get_discovered_devices(host_list)
11fdf7f2
TL
231
232 result = []
9f95a23c 233 for host_name, host_devs in devs.items():
11fdf7f2 234 devs = []
9f95a23c 235 for d in host_devs:
1911f103
TL
236 if 'cephVolumeData' in d and d['cephVolumeData']:
237 devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData'])))
238 else:
239 devs.append(inventory.Device(
240 path = '/dev/' + d['name'],
241 sys_api = dict(
242 rotational = '1' if d['rotational'] else '0',
243 size = d['size']
244 ),
245 available = False,
246 rejected_reasons=['device data coming from ceph-volume not provided'],
247 ))
11fdf7f2 248
9f95a23c 249 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
11fdf7f2
TL
250
251 return result
252
253 @deferred_read
9f95a23c
TL
254 def get_hosts(self):
255 # type: () -> List[orchestrator.HostSpec]
256 return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
11fdf7f2 257
9f95a23c
TL
258 @deferred_read
259 def describe_service(self, service_type=None, service_name=None,
260 refresh=False):
261 now = datetime.datetime.utcnow()
262
263 # CephCluster
264 cl = self.rook_cluster.rook_api_get(
265 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
266 self.log.debug('CephCluster %s' % cl)
267 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
268 num_nodes = len(self.rook_cluster.get_node_names())
269
270 spec = {}
271 spec['mon'] = orchestrator.ServiceDescription(
9f95a23c
TL
272 spec=ServiceSpec(
273 'mon',
274 placement=PlacementSpec(
275 count=cl['spec'].get('mon', {}).get('count', 1),
276 ),
277 ),
278 size=cl['spec'].get('mon', {}).get('count', 1),
279 container_image_name=image_name,
280 last_refresh=now,
281 )
282 spec['mgr'] = orchestrator.ServiceDescription(
9f95a23c
TL
283 spec=ServiceSpec(
284 'mgr',
285 placement=PlacementSpec.from_string('count:1'),
286 ),
287 size=1,
288 container_image_name=image_name,
289 last_refresh=now,
290 )
291 if not cl['spec'].get('crashCollector', {}).get('disable', False):
292 spec['crash'] = orchestrator.ServiceDescription(
9f95a23c
TL
293 spec=ServiceSpec(
294 'crash',
295 placement=PlacementSpec.from_string('*'),
296 ),
297 size=num_nodes,
298 container_image_name=image_name,
299 last_refresh=now,
300 )
301
302 # CephFilesystems
303 all_fs = self.rook_cluster.rook_api_get(
304 "cephfilesystems/")
305 self.log.debug('CephFilesystems %s' % all_fs)
306 for fs in all_fs.get('items', []):
307 svc = 'mds.' + fs['metadata']['name']
308 if svc in spec:
309 continue
310 # FIXME: we are conflating active (+ standby) with count
311 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
312 total_mds = active
313 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
314 total_mds = active * 2
315 spec[svc] = orchestrator.ServiceDescription(
9f95a23c 316 spec=ServiceSpec(
1911f103
TL
317 service_type='mds',
318 service_id=fs['metadata']['name'],
9f95a23c
TL
319 placement=PlacementSpec(count=active),
320 ),
321 size=total_mds,
322 container_image_name=image_name,
323 last_refresh=now,
324 )
325
326 # CephObjectstores
327 all_zones = self.rook_cluster.rook_api_get(
328 "cephobjectstores/")
329 self.log.debug('CephObjectstores %s' % all_zones)
330 for zone in all_zones.get('items', []):
331 rgw_realm = zone['metadata']['name']
332 rgw_zone = rgw_realm
333 svc = 'rgw.' + rgw_realm + '.' + rgw_zone
334 if svc in spec:
335 continue
336 active = zone['spec']['gateway']['instances'];
337 if 'securePort' in zone['spec']['gateway']:
338 ssl = True
339 port = zone['spec']['gateway']['securePort']
340 else:
341 ssl = False
342 port = zone['spec']['gateway']['port'] or 80
343 spec[svc] = orchestrator.ServiceDescription(
9f95a23c 344 spec=RGWSpec(
1911f103 345 service_id=rgw_realm + '.' + rgw_zone,
9f95a23c
TL
346 rgw_realm=rgw_realm,
347 rgw_zone=rgw_zone,
348 ssl=ssl,
349 rgw_frontend_port=port,
350 placement=PlacementSpec(count=active),
351 ),
352 size=active,
353 container_image_name=image_name,
354 last_refresh=now,
355 )
356
357 for dd in self._list_daemons():
358 if dd.service_name() not in spec:
359 continue
360 spec[dd.service_name()].running += 1
361 return [v for k, v in spec.items()]
11fdf7f2 362
9f95a23c 363 @deferred_read
801d1391 364 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
9f95a23c
TL
365 refresh=False):
366 return self._list_daemons(daemon_type, daemon_id, host, refresh)
367
801d1391 368 def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
9f95a23c
TL
369 refresh=False):
370 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
371 self.log.debug('pods %s' % pods)
11fdf7f2
TL
372 result = []
373 for p in pods:
9f95a23c
TL
374 sd = orchestrator.DaemonDescription()
375 sd.hostname = p['hostname']
11fdf7f2 376 sd.container_id = p['name']
9f95a23c
TL
377 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
378 status = {
379 'Pending': -1,
380 'Running': 1,
381 'Succeeded': 0,
382 'Failed': -1,
383 'Unknown': -1,
384 }[p['phase']]
385 sd.status = status
386 sd.status_desc = p['phase']
387
388 if 'ceph_daemon_id' in p['labels']:
389 sd.daemon_id = p['labels']['ceph_daemon_id']
390 elif 'ceph-osd-id' in p['labels']:
391 sd.daemon_id = p['labels']['ceph-osd-id']
11fdf7f2
TL
392 else:
393 # Unknown type -- skip it
394 continue
395
801d1391
TL
396 if service_name is not None and service_name != sd.service_name():
397 continue
9f95a23c 398 sd.container_image_name = p['container_image_name']
9f95a23c
TL
399 sd.created = p['created']
400 sd.last_configured = p['created']
401 sd.last_deployed = p['created']
402 sd.started = p['started']
403 sd.last_refresh = p['refreshed']
11fdf7f2
TL
404 result.append(sd)
405
406 return result
407
408 def _service_add_decorate(self, typename, spec, func):
9f95a23c
TL
409 return write_completion(
410 on_complete=lambda : func(spec),
411 message="Creating {} services for {}".format(typename, spec.service_id),
412 mgr=self
413 )
414
415 def add_nfs(self, spec):
416 # type: (NFSServiceSpec) -> RookCompletion
417 return self._service_add_decorate("NFS", spec,
418 self.rook_cluster.add_nfsgw)
419
420 def _service_rm_decorate(self, typename, name, func):
421 return write_completion(
422 on_complete=lambda : func(name),
423 message="Removing {} services for {}".format(typename, name),
424 mgr=self
425 )
426
427 def remove_service(self, service_type, service_name):
428 if service_type == 'mds':
429 return self._service_rm_decorate(
430 'MDS', service_name, lambda: self.rook_cluster.rm_service(
431 'cephfilesystems', service_name)
432 )
433 elif service_type == 'rgw':
434 return self._service_rm_decorate(
435 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
436 )
437 elif service_type == 'nfs':
438 return self._service_rm_decorate(
439 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
440 )
441
442 def apply_mon(self, spec):
443 # type: (ServiceSpec) -> RookCompletion
444 if spec.placement.hosts or spec.placement.label:
445 raise RuntimeError("Host list or label is not supported by rook.")
446
447 return write_completion(
448 lambda: self.rook_cluster.update_mon_count(spec.placement.count),
449 "Updating mon count to {0}".format(spec.placement.count),
450 mgr=self
451 )
452
453 def apply_mds(self, spec):
454 # type: (ServiceSpec) -> RookCompletion
455 return self._service_add_decorate('MDS', spec,
456 self.rook_cluster.apply_filesystem)
457
458 def apply_rgw(self, spec):
459 # type: (RGWSpec) -> RookCompletion
460 return self._service_add_decorate('RGW', spec,
461 self.rook_cluster.apply_objectstore)
462
9f95a23c
TL
463 def apply_nfs(self, spec):
464 # type: (NFSServiceSpec) -> RookCompletion
465 num = spec.placement.count
466 return write_completion(
467 lambda: self.rook_cluster.update_nfs_count(spec.service_id, num),
468 "Updating NFS server count in {0} to {1}".format(spec.service_id, num),
469 mgr=self
470 )
471
472 def remove_daemons(self, names):
473 return write_completion(
474 lambda: self.rook_cluster.remove_pods(names),
475 "Removing daemons {}".format(','.join(names)),
476 mgr=self
477 )
478
479 def create_osds(self, drive_group):
480 # type: (DriveGroupSpec) -> RookCompletion
481 """ Creates OSDs from a drive group specification.
482
483 $: ceph orch osd create -i <dg.file>
484
485 The drivegroup file must only contain one spec at a time.
486 """
487
488 targets = [] # type: List[str]
489 if drive_group.data_devices and drive_group.data_devices.paths:
490 targets += [d.path for d in drive_group.data_devices.paths]
11fdf7f2
TL
491 if drive_group.data_directories:
492 targets += drive_group.data_directories
493
9f95a23c
TL
494 def execute(all_hosts_):
495 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
496 all_hosts = [h.hostname for h in all_hosts_]
e306af50 497 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
9f95a23c
TL
498
499 assert len(matching_hosts) == 1
500
501 if not self.rook_cluster.node_exists(matching_hosts[0]):
502 raise RuntimeError("Node '{0}' is not in the Kubernetes "
503 "cluster".format(matching_hosts))
504
505 # Validate whether cluster CRD can accept individual OSD
506 # creations (i.e. not useAllDevices)
507 if not self.rook_cluster.can_create_osd():
508 raise RuntimeError("Rook cluster configuration does not "
509 "support OSD creation.")
11fdf7f2 510
9f95a23c
TL
511 return orchestrator.Completion.with_progress(
512 message="Creating OSD on {0}:{1}".format(
513 matching_hosts,
514 targets),
515 mgr=self,
e306af50
TL
516 on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
517 calc_percent=lambda: has_osds(matching_hosts)
9f95a23c 518 )
11fdf7f2 519
9f95a23c 520 @deferred_read
e306af50 521 def has_osds(matching_hosts):
11fdf7f2 522
11fdf7f2
TL
523 # Find OSD pods on this host
524 pod_osd_ids = set()
9f95a23c 525 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
11fdf7f2
TL
526 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
527 field_selector="spec.nodeName={0}".format(
9f95a23c 528 matching_hosts[0]
11fdf7f2
TL
529 )).items
530 for p in pods:
531 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
532
533 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
534
535 found = []
536 osdmap = self.get("osd_map")
537 for osd in osdmap['osds']:
538 osd_id = osd['osd']
539 if osd_id not in pod_osd_ids:
540 continue
541
542 metadata = self.get_metadata('osd', "%s" % osd_id)
543 if metadata and metadata['devices'] in targets:
544 found.append(osd_id)
545 else:
546 self.log.info("ignoring osd {0} {1}".format(
547 osd_id, metadata['devices']
548 ))
549
550 return found is not None
551
9f95a23c
TL
552 c = self.get_hosts().then(execute)
553 return c
801d1391
TL
554
555 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
556 return write_completion(
557 on_complete=lambda: self.rook_cluster.blink_light(
558 ident_fault, on, locs),
559 message="Switching <{}> identification light in {}".format(
560 on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
561 mgr=self
562 )