]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
1 import datetime
2 import threading
3 import functools
4 import os
5 import json
6
7 from ceph.deployment import inventory
8 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
9
10 try:
11 from typing import List, Dict, Optional, Callable, Any
12 from ceph.deployment.drive_group import DriveGroupSpec
13 except ImportError:
14 pass # just for type checking
15
16 try:
17 from kubernetes import client, config
18 from kubernetes.client.rest import ApiException
19
20 kubernetes_imported = True
21
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes.client.models.v1_container_image import V1ContainerImage
24 def names(self, names):
25 self._names = names
26 V1ContainerImage.names = V1ContainerImage.names.setter(names)
27
28 except ImportError:
29 kubernetes_imported = False
30 client = None
31 config = None
32
33 from mgr_module import MgrModule
34 import orchestrator
35
36 from .rook_cluster import RookCluster
37
38
39 class RookCompletion(orchestrator.Completion):
40 def evaluate(self):
41 self.finalize(None)
42
43
44 def deferred_read(f):
45 # type: (Callable) -> Callable[..., RookCompletion]
46 """
47 Decorator to make RookOrchestrator methods return
48 a completion object that executes themselves.
49 """
50
51 @functools.wraps(f)
52 def wrapper(*args, **kwargs):
53 return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
54
55 return wrapper
56
57
58 def write_completion(on_complete, # type: Callable
59 message, # type: str
60 mgr,
61 calc_percent=None # type: Optional[Callable[[], RookCompletion]]
62 ):
63 # type: (...) -> RookCompletion
64 return RookCompletion.with_progress(
65 message=message,
66 mgr=mgr,
67 on_complete=lambda _: on_complete(),
68 calc_percent=calc_percent,
69 )
70
71
72 class RookEnv(object):
73 def __init__(self):
74 # POD_NAMESPACE already exist for Rook 0.9
75 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
76
77 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
78 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
79
80 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
81 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
82 self.api_name = "ceph.rook.io/" + self.crd_version
83
84 def api_version_match(self):
85 return self.crd_version == 'v1'
86
87 def has_namespace(self):
88 return 'POD_NAMESPACE' in os.environ
89
90
91 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
92 """
93 Writes are a two-phase thing, firstly sending
94 the write to the k8s API (fast) and then waiting
95 for the corresponding change to appear in the
96 Ceph cluster (slow)
97
98 Right now, we are calling the k8s API synchronously.
99 """
100
101 MODULE_OPTIONS = [
102 # TODO: configure k8s API addr instead of assuming local
103 ] # type: List[Dict[str, Any]]
104
105 def process(self, completions):
106 # type: (List[RookCompletion]) -> None
107
108 if completions:
109 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
110
111 for p in completions:
112 p.evaluate()
113
114 @staticmethod
115 def can_run():
116 if not kubernetes_imported:
117 return False, "`kubernetes` python module not found"
118 if not RookEnv().api_version_match():
119 return False, "Rook version unsupported."
120 return True, ''
121
122 def available(self):
123 if not kubernetes_imported:
124 return False, "`kubernetes` python module not found"
125 elif not self._rook_env.has_namespace():
126 return False, "ceph-mgr not running in Rook cluster"
127
128 try:
129 self.k8s.list_namespaced_pod(self._rook_env.cluster_name)
130 except ApiException as e:
131 return False, "Cannot reach Kubernetes API: {}".format(e)
132 else:
133 return True, ""
134
135 def __init__(self, *args, **kwargs):
136 super(RookOrchestrator, self).__init__(*args, **kwargs)
137
138 self._initialized = threading.Event()
139 self._k8s_CoreV1_api = None
140 self._k8s_BatchV1_api = None
141 self._rook_cluster = None
142 self._rook_env = RookEnv()
143
144 self._shutdown = threading.Event()
145
146 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
147
148 def shutdown(self):
149 self._shutdown.set()
150
151 @property
152 def k8s(self):
153 # type: () -> client.CoreV1Api
154 self._initialized.wait()
155 assert self._k8s_CoreV1_api is not None
156 return self._k8s_CoreV1_api
157
158 @property
159 def rook_cluster(self):
160 # type: () -> RookCluster
161 self._initialized.wait()
162 assert self._rook_cluster is not None
163 return self._rook_cluster
164
165 def serve(self):
166 # For deployed clusters, we should always be running inside
167 # a Rook cluster. For development convenience, also support
168 # running outside (reading ~/.kube config)
169
170 if self._rook_env.has_namespace():
171 config.load_incluster_config()
172 cluster_name = self._rook_env.cluster_name
173 else:
174 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
175 config.load_kube_config()
176
177 cluster_name = "rook-ceph"
178
179 # So that I can do port forwarding from my workstation - jcsp
180 from kubernetes.client import configuration
181 configuration.verify_ssl = False
182
183 self._k8s_CoreV1_api = client.CoreV1Api()
184 self._k8s_BatchV1_api = client.BatchV1Api()
185
186 try:
187 # XXX mystery hack -- I need to do an API call from
188 # this context, or subsequent API usage from handle_command
189 # fails with SSLError('bad handshake'). Suspect some kind of
190 # thread context setup in SSL lib?
191 self._k8s_CoreV1_api.list_namespaced_pod(cluster_name)
192 except ApiException:
193 # Ignore here to make self.available() fail with a proper error message
194 pass
195
196 self._rook_cluster = RookCluster(
197 self._k8s_CoreV1_api,
198 self._k8s_BatchV1_api,
199 self._rook_env)
200
201 self._initialized.set()
202
203 while not self._shutdown.is_set():
204 # XXX hack (or is it?) to kick all completions periodically,
205 # in case we had a caller that wait()'ed on them long enough
206 # to get persistence but not long enough to get completion
207
208 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
209 for p in self.all_progress_references:
210 p.update()
211
212 self._shutdown.wait(5)
213
214 def cancel_completions(self):
215 for p in self.all_progress_references:
216 p.fail()
217 self.all_progress_references.clear()
218
219 @deferred_read
220 def get_inventory(self, host_filter=None, refresh=False):
221 host_list = None
222 if host_filter and host_filter.hosts:
223 # Explicit host list
224 host_list = host_filter.hosts
225 elif host_filter and host_filter.labels:
226 # TODO: query k8s API to resolve to host list, and pass
227 # it into RookCluster.get_discovered_devices
228 raise NotImplementedError()
229
230 devs = self.rook_cluster.get_discovered_devices(host_list)
231
232 result = []
233 for host_name, host_devs in devs.items():
234 devs = []
235 for d in host_devs:
236 if 'cephVolumeData' in d and d['cephVolumeData']:
237 devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData'])))
238 else:
239 devs.append(inventory.Device(
240 path = '/dev/' + d['name'],
241 sys_api = dict(
242 rotational = '1' if d['rotational'] else '0',
243 size = d['size']
244 ),
245 available = False,
246 rejected_reasons=['device data coming from ceph-volume not provided'],
247 ))
248
249 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
250
251 return result
252
253 @deferred_read
254 def get_hosts(self):
255 # type: () -> List[orchestrator.HostSpec]
256 return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
257
258 @deferred_read
259 def describe_service(self, service_type=None, service_name=None,
260 refresh=False):
261 now = datetime.datetime.utcnow()
262
263 # CephCluster
264 cl = self.rook_cluster.rook_api_get(
265 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
266 self.log.debug('CephCluster %s' % cl)
267 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
268 num_nodes = len(self.rook_cluster.get_node_names())
269
270 spec = {}
271 spec['mon'] = orchestrator.ServiceDescription(
272 spec=ServiceSpec(
273 'mon',
274 placement=PlacementSpec(
275 count=cl['spec'].get('mon', {}).get('count', 1),
276 ),
277 ),
278 size=cl['spec'].get('mon', {}).get('count', 1),
279 container_image_name=image_name,
280 last_refresh=now,
281 )
282 spec['mgr'] = orchestrator.ServiceDescription(
283 spec=ServiceSpec(
284 'mgr',
285 placement=PlacementSpec.from_string('count:1'),
286 ),
287 size=1,
288 container_image_name=image_name,
289 last_refresh=now,
290 )
291 if not cl['spec'].get('crashCollector', {}).get('disable', False):
292 spec['crash'] = orchestrator.ServiceDescription(
293 spec=ServiceSpec(
294 'crash',
295 placement=PlacementSpec.from_string('*'),
296 ),
297 size=num_nodes,
298 container_image_name=image_name,
299 last_refresh=now,
300 )
301
302 # CephFilesystems
303 all_fs = self.rook_cluster.rook_api_get(
304 "cephfilesystems/")
305 self.log.debug('CephFilesystems %s' % all_fs)
306 for fs in all_fs.get('items', []):
307 svc = 'mds.' + fs['metadata']['name']
308 if svc in spec:
309 continue
310 # FIXME: we are conflating active (+ standby) with count
311 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
312 total_mds = active
313 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
314 total_mds = active * 2
315 spec[svc] = orchestrator.ServiceDescription(
316 spec=ServiceSpec(
317 service_type='mds',
318 service_id=fs['metadata']['name'],
319 placement=PlacementSpec(count=active),
320 ),
321 size=total_mds,
322 container_image_name=image_name,
323 last_refresh=now,
324 )
325
326 # CephObjectstores
327 all_zones = self.rook_cluster.rook_api_get(
328 "cephobjectstores/")
329 self.log.debug('CephObjectstores %s' % all_zones)
330 for zone in all_zones.get('items', []):
331 rgw_realm = zone['metadata']['name']
332 rgw_zone = rgw_realm
333 svc = 'rgw.' + rgw_realm + '.' + rgw_zone
334 if svc in spec:
335 continue
336 active = zone['spec']['gateway']['instances'];
337 if 'securePort' in zone['spec']['gateway']:
338 ssl = True
339 port = zone['spec']['gateway']['securePort']
340 else:
341 ssl = False
342 port = zone['spec']['gateway']['port'] or 80
343 spec[svc] = orchestrator.ServiceDescription(
344 spec=RGWSpec(
345 service_id=rgw_realm + '.' + rgw_zone,
346 rgw_realm=rgw_realm,
347 rgw_zone=rgw_zone,
348 ssl=ssl,
349 rgw_frontend_port=port,
350 placement=PlacementSpec(count=active),
351 ),
352 size=active,
353 container_image_name=image_name,
354 last_refresh=now,
355 )
356
357 for dd in self._list_daemons():
358 if dd.service_name() not in spec:
359 continue
360 spec[dd.service_name()].running += 1
361 return [v for k, v in spec.items()]
362
363 @deferred_read
364 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
365 refresh=False):
366 return self._list_daemons(daemon_type, daemon_id, host, refresh)
367
368 def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
369 refresh=False):
370 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
371 self.log.debug('pods %s' % pods)
372 result = []
373 for p in pods:
374 sd = orchestrator.DaemonDescription()
375 sd.hostname = p['hostname']
376 sd.container_id = p['name']
377 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
378 status = {
379 'Pending': -1,
380 'Running': 1,
381 'Succeeded': 0,
382 'Failed': -1,
383 'Unknown': -1,
384 }[p['phase']]
385 sd.status = status
386 sd.status_desc = p['phase']
387
388 if 'ceph_daemon_id' in p['labels']:
389 sd.daemon_id = p['labels']['ceph_daemon_id']
390 elif 'ceph-osd-id' in p['labels']:
391 sd.daemon_id = p['labels']['ceph-osd-id']
392 else:
393 # Unknown type -- skip it
394 continue
395
396 if service_name is not None and service_name != sd.service_name():
397 continue
398 sd.container_image_name = p['container_image_name']
399 sd.created = p['created']
400 sd.last_configured = p['created']
401 sd.last_deployed = p['created']
402 sd.started = p['started']
403 sd.last_refresh = p['refreshed']
404 result.append(sd)
405
406 return result
407
408 def _service_add_decorate(self, typename, spec, func):
409 return write_completion(
410 on_complete=lambda : func(spec),
411 message="Creating {} services for {}".format(typename, spec.service_id),
412 mgr=self
413 )
414
415 def add_nfs(self, spec):
416 # type: (NFSServiceSpec) -> RookCompletion
417 return self._service_add_decorate("NFS", spec,
418 self.rook_cluster.add_nfsgw)
419
420 def _service_rm_decorate(self, typename, name, func):
421 return write_completion(
422 on_complete=lambda : func(name),
423 message="Removing {} services for {}".format(typename, name),
424 mgr=self
425 )
426
427 def remove_service(self, service_type, service_name):
428 if service_type == 'mds':
429 return self._service_rm_decorate(
430 'MDS', service_name, lambda: self.rook_cluster.rm_service(
431 'cephfilesystems', service_name)
432 )
433 elif service_type == 'rgw':
434 return self._service_rm_decorate(
435 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
436 )
437 elif service_type == 'nfs':
438 return self._service_rm_decorate(
439 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
440 )
441
442 def apply_mon(self, spec):
443 # type: (ServiceSpec) -> RookCompletion
444 if spec.placement.hosts or spec.placement.label:
445 raise RuntimeError("Host list or label is not supported by rook.")
446
447 return write_completion(
448 lambda: self.rook_cluster.update_mon_count(spec.placement.count),
449 "Updating mon count to {0}".format(spec.placement.count),
450 mgr=self
451 )
452
453 def apply_mds(self, spec):
454 # type: (ServiceSpec) -> RookCompletion
455 return self._service_add_decorate('MDS', spec,
456 self.rook_cluster.apply_filesystem)
457
458 def apply_rgw(self, spec):
459 # type: (RGWSpec) -> RookCompletion
460 return self._service_add_decorate('RGW', spec,
461 self.rook_cluster.apply_objectstore)
462
463 def apply_nfs(self, spec):
464 # type: (NFSServiceSpec) -> RookCompletion
465 num = spec.placement.count
466 return write_completion(
467 lambda: self.rook_cluster.update_nfs_count(spec.service_id, num),
468 "Updating NFS server count in {0} to {1}".format(spec.service_id, num),
469 mgr=self
470 )
471
472 def remove_daemons(self, names):
473 return write_completion(
474 lambda: self.rook_cluster.remove_pods(names),
475 "Removing daemons {}".format(','.join(names)),
476 mgr=self
477 )
478
479 def create_osds(self, drive_group):
480 # type: (DriveGroupSpec) -> RookCompletion
481 """ Creates OSDs from a drive group specification.
482
483 $: ceph orch osd create -i <dg.file>
484
485 The drivegroup file must only contain one spec at a time.
486 """
487
488 targets = [] # type: List[str]
489 if drive_group.data_devices and drive_group.data_devices.paths:
490 targets += [d.path for d in drive_group.data_devices.paths]
491 if drive_group.data_directories:
492 targets += drive_group.data_directories
493
494 def execute(all_hosts_):
495 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
496 all_hosts = [h.hostname for h in all_hosts_]
497 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
498
499 assert len(matching_hosts) == 1
500
501 if not self.rook_cluster.node_exists(matching_hosts[0]):
502 raise RuntimeError("Node '{0}' is not in the Kubernetes "
503 "cluster".format(matching_hosts))
504
505 # Validate whether cluster CRD can accept individual OSD
506 # creations (i.e. not useAllDevices)
507 if not self.rook_cluster.can_create_osd():
508 raise RuntimeError("Rook cluster configuration does not "
509 "support OSD creation.")
510
511 return orchestrator.Completion.with_progress(
512 message="Creating OSD on {0}:{1}".format(
513 matching_hosts,
514 targets),
515 mgr=self,
516 on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
517 calc_percent=lambda: has_osds(matching_hosts)
518 )
519
520 @deferred_read
521 def has_osds(matching_hosts):
522
523 # Find OSD pods on this host
524 pod_osd_ids = set()
525 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
526 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
527 field_selector="spec.nodeName={0}".format(
528 matching_hosts[0]
529 )).items
530 for p in pods:
531 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
532
533 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
534
535 found = []
536 osdmap = self.get("osd_map")
537 for osd in osdmap['osds']:
538 osd_id = osd['osd']
539 if osd_id not in pod_osd_ids:
540 continue
541
542 metadata = self.get_metadata('osd', "%s" % osd_id)
543 if metadata and metadata['devices'] in targets:
544 found.append(osd_id)
545 else:
546 self.log.info("ignoring osd {0} {1}".format(
547 osd_id, metadata['devices']
548 ))
549
550 return found is not None
551
552 c = self.get_hosts().then(execute)
553 return c
554
555 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
556 return write_completion(
557 on_complete=lambda: self.rook_cluster.blink_light(
558 ident_fault, on, locs),
559 message="Switching <{}> identification light in {}".format(
560 on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
561 mgr=self
562 )