]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
import 15.2.1 Octopus source
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
1 import datetime
2 import threading
3 import functools
4 import os
5
6 from ceph.deployment import inventory
7 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
8
9 try:
10 from typing import List, Dict, Optional, Callable, Any
11 from ceph.deployment.drive_group import DriveGroupSpec
12 except ImportError:
13 pass # just for type checking
14
15 try:
16 from kubernetes import client, config
17 from kubernetes.client.rest import ApiException
18
19 kubernetes_imported = True
20
21 # https://github.com/kubernetes-client/python/issues/895
22 from kubernetes.client.models.v1_container_image import V1ContainerImage
23 def names(self, names):
24 self._names = names
25 V1ContainerImage.names = V1ContainerImage.names.setter(names)
26
27 except ImportError:
28 kubernetes_imported = False
29 client = None
30 config = None
31
32 from mgr_module import MgrModule
33 import orchestrator
34
35 from .rook_cluster import RookCluster
36
37
38 class RookCompletion(orchestrator.Completion):
39 def evaluate(self):
40 self.finalize(None)
41
42
43 def deferred_read(f):
44 # type: (Callable) -> Callable[..., RookCompletion]
45 """
46 Decorator to make RookOrchestrator methods return
47 a completion object that executes themselves.
48 """
49
50 @functools.wraps(f)
51 def wrapper(*args, **kwargs):
52 return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
53
54 return wrapper
55
56
57 def write_completion(on_complete, # type: Callable
58 message, # type: str
59 mgr,
60 calc_percent=None # type: Optional[Callable[[], RookCompletion]]
61 ):
62 # type: (...) -> RookCompletion
63 return RookCompletion.with_progress(
64 message=message,
65 mgr=mgr,
66 on_complete=lambda _: on_complete(),
67 calc_percent=calc_percent,
68 )
69
70
71 class RookEnv(object):
72 def __init__(self):
73 # POD_NAMESPACE already exist for Rook 0.9
74 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
75
76 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
77 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
78
79 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
80 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
81 self.api_name = "ceph.rook.io/" + self.crd_version
82
83 def api_version_match(self):
84 return self.crd_version == 'v1'
85
86 def has_namespace(self):
87 return 'POD_NAMESPACE' in os.environ
88
89
90 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
91 """
92 Writes are a two-phase thing, firstly sending
93 the write to the k8s API (fast) and then waiting
94 for the corresponding change to appear in the
95 Ceph cluster (slow)
96
97 Right now, we are calling the k8s API synchronously.
98 """
99
100 MODULE_OPTIONS = [
101 # TODO: configure k8s API addr instead of assuming local
102 ] # type: List[Dict[str, Any]]
103
104 def process(self, completions):
105 # type: (List[RookCompletion]) -> None
106
107 if completions:
108 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
109
110 for p in completions:
111 p.evaluate()
112
113 @staticmethod
114 def can_run():
115 if not kubernetes_imported:
116 return False, "`kubernetes` python module not found"
117 if not RookEnv().api_version_match():
118 return False, "Rook version unsupported."
119 return True, ''
120
121 def available(self):
122 if not kubernetes_imported:
123 return False, "`kubernetes` python module not found"
124 elif not self._rook_env.has_namespace():
125 return False, "ceph-mgr not running in Rook cluster"
126
127 try:
128 self.k8s.list_namespaced_pod(self._rook_env.cluster_name)
129 except ApiException as e:
130 return False, "Cannot reach Kubernetes API: {}".format(e)
131 else:
132 return True, ""
133
134 def __init__(self, *args, **kwargs):
135 super(RookOrchestrator, self).__init__(*args, **kwargs)
136
137 self._initialized = threading.Event()
138 self._k8s_CoreV1_api = None
139 self._k8s_BatchV1_api = None
140 self._rook_cluster = None
141 self._rook_env = RookEnv()
142
143 self._shutdown = threading.Event()
144
145 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
146
147 def shutdown(self):
148 self._shutdown.set()
149
150 @property
151 def k8s(self):
152 # type: () -> client.CoreV1Api
153 self._initialized.wait()
154 assert self._k8s_CoreV1_api is not None
155 return self._k8s_CoreV1_api
156
157 @property
158 def rook_cluster(self):
159 # type: () -> RookCluster
160 self._initialized.wait()
161 assert self._rook_cluster is not None
162 return self._rook_cluster
163
164 def serve(self):
165 # For deployed clusters, we should always be running inside
166 # a Rook cluster. For development convenience, also support
167 # running outside (reading ~/.kube config)
168
169 if self._rook_env.has_namespace():
170 config.load_incluster_config()
171 cluster_name = self._rook_env.cluster_name
172 else:
173 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
174 config.load_kube_config()
175
176 cluster_name = "rook-ceph"
177
178 # So that I can do port forwarding from my workstation - jcsp
179 from kubernetes.client import configuration
180 configuration.verify_ssl = False
181
182 self._k8s_CoreV1_api = client.CoreV1Api()
183 self._k8s_BatchV1_api = client.BatchV1Api()
184
185 try:
186 # XXX mystery hack -- I need to do an API call from
187 # this context, or subsequent API usage from handle_command
188 # fails with SSLError('bad handshake'). Suspect some kind of
189 # thread context setup in SSL lib?
190 self._k8s_CoreV1_api.list_namespaced_pod(cluster_name)
191 except ApiException:
192 # Ignore here to make self.available() fail with a proper error message
193 pass
194
195 self._rook_cluster = RookCluster(
196 self._k8s_CoreV1_api,
197 self._k8s_BatchV1_api,
198 self._rook_env)
199
200 self._initialized.set()
201
202 while not self._shutdown.is_set():
203 # XXX hack (or is it?) to kick all completions periodically,
204 # in case we had a caller that wait()'ed on them long enough
205 # to get persistence but not long enough to get completion
206
207 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
208 for p in self.all_progress_references:
209 p.update()
210
211 self._shutdown.wait(5)
212
213 def cancel_completions(self):
214 for p in self.all_progress_references:
215 p.fail()
216 self.all_progress_references.clear()
217
218 @deferred_read
219 def get_inventory(self, host_filter=None, refresh=False):
220 host_list = None
221 if host_filter and host_filter.hosts:
222 # Explicit host list
223 host_list = host_filter.hosts
224 elif host_filter and host_filter.labels:
225 # TODO: query k8s API to resolve to host list, and pass
226 # it into RookCluster.get_discovered_devices
227 raise NotImplementedError()
228
229 devs = self.rook_cluster.get_discovered_devices(host_list)
230
231 result = []
232 for host_name, host_devs in devs.items():
233 devs = []
234 for d in host_devs:
235 dev = inventory.Device(
236 path='/dev/' + d['name'],
237 sys_api=dict(
238 rotational='1' if d['rotational'] else '0',
239 size=d['size']
240 ),
241 available=d['empty'],
242 rejected_reasons=[] if d['empty'] else ['not empty'],
243 )
244 devs.append(dev)
245
246 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
247
248 return result
249
250 @deferred_read
251 def get_hosts(self):
252 # type: () -> List[orchestrator.HostSpec]
253 return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
254
255 @deferred_read
256 def describe_service(self, service_type=None, service_name=None,
257 refresh=False):
258 now = datetime.datetime.utcnow()
259
260 # CephCluster
261 cl = self.rook_cluster.rook_api_get(
262 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
263 self.log.debug('CephCluster %s' % cl)
264 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
265 num_nodes = len(self.rook_cluster.get_node_names())
266
267 spec = {}
268 spec['mon'] = orchestrator.ServiceDescription(
269 service_name='mon',
270 spec=ServiceSpec(
271 'mon',
272 placement=PlacementSpec(
273 count=cl['spec'].get('mon', {}).get('count', 1),
274 ),
275 ),
276 size=cl['spec'].get('mon', {}).get('count', 1),
277 container_image_name=image_name,
278 last_refresh=now,
279 )
280 spec['mgr'] = orchestrator.ServiceDescription(
281 service_name='mgr',
282 spec=ServiceSpec(
283 'mgr',
284 placement=PlacementSpec.from_string('count:1'),
285 ),
286 size=1,
287 container_image_name=image_name,
288 last_refresh=now,
289 )
290 if not cl['spec'].get('crashCollector', {}).get('disable', False):
291 spec['crash'] = orchestrator.ServiceDescription(
292 service_name='crash',
293 spec=ServiceSpec(
294 'crash',
295 placement=PlacementSpec.from_string('*'),
296 ),
297 size=num_nodes,
298 container_image_name=image_name,
299 last_refresh=now,
300 )
301
302 # CephFilesystems
303 all_fs = self.rook_cluster.rook_api_get(
304 "cephfilesystems/")
305 self.log.debug('CephFilesystems %s' % all_fs)
306 for fs in all_fs.get('items', []):
307 svc = 'mds.' + fs['metadata']['name']
308 if svc in spec:
309 continue
310 # FIXME: we are conflating active (+ standby) with count
311 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
312 total_mds = active
313 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
314 total_mds = active * 2
315 spec[svc] = orchestrator.ServiceDescription(
316 service_name=svc,
317 spec=ServiceSpec(
318 svc,
319 placement=PlacementSpec(count=active),
320 ),
321 size=total_mds,
322 container_image_name=image_name,
323 last_refresh=now,
324 )
325
326 # CephObjectstores
327 all_zones = self.rook_cluster.rook_api_get(
328 "cephobjectstores/")
329 self.log.debug('CephObjectstores %s' % all_zones)
330 for zone in all_zones.get('items', []):
331 rgw_realm = zone['metadata']['name']
332 rgw_zone = rgw_realm
333 svc = 'rgw.' + rgw_realm + '.' + rgw_zone
334 if svc in spec:
335 continue
336 active = zone['spec']['gateway']['instances'];
337 if 'securePort' in zone['spec']['gateway']:
338 ssl = True
339 port = zone['spec']['gateway']['securePort']
340 else:
341 ssl = False
342 port = zone['spec']['gateway']['port'] or 80
343 spec[svc] = orchestrator.ServiceDescription(
344 service_name=svc,
345 spec=RGWSpec(
346 rgw_realm=rgw_realm,
347 rgw_zone=rgw_zone,
348 ssl=ssl,
349 rgw_frontend_port=port,
350 placement=PlacementSpec(count=active),
351 ),
352 size=active,
353 container_image_name=image_name,
354 last_refresh=now,
355 )
356
357 for dd in self._list_daemons():
358 if dd.service_name() not in spec:
359 continue
360 spec[dd.service_name()].running += 1
361 return [v for k, v in spec.items()]
362
363 @deferred_read
364 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
365 refresh=False):
366 return self._list_daemons(daemon_type, daemon_id, host, refresh)
367
368 def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
369 refresh=False):
370 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
371 self.log.debug('pods %s' % pods)
372 result = []
373 for p in pods:
374 sd = orchestrator.DaemonDescription()
375 sd.hostname = p['hostname']
376 sd.container_id = p['name']
377 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
378 status = {
379 'Pending': -1,
380 'Running': 1,
381 'Succeeded': 0,
382 'Failed': -1,
383 'Unknown': -1,
384 }[p['phase']]
385 sd.status = status
386 sd.status_desc = p['phase']
387
388 if 'ceph_daemon_id' in p['labels']:
389 sd.daemon_id = p['labels']['ceph_daemon_id']
390 elif 'ceph-osd-id' in p['labels']:
391 sd.daemon_id = p['labels']['ceph-osd-id']
392 else:
393 # Unknown type -- skip it
394 continue
395
396 if service_name is not None and service_name != sd.service_name():
397 continue
398 sd.container_image_name = p['container_image_name']
399 sd.created = p['created']
400 sd.last_configured = p['created']
401 sd.last_deployed = p['created']
402 sd.started = p['started']
403 sd.last_refresh = p['refreshed']
404 result.append(sd)
405
406 return result
407
408 def _service_add_decorate(self, typename, spec, func):
409 return write_completion(
410 on_complete=lambda : func(spec),
411 message="Creating {} services for {}".format(typename, spec.service_id),
412 mgr=self
413 )
414
415 def add_nfs(self, spec):
416 # type: (NFSServiceSpec) -> RookCompletion
417 return self._service_add_decorate("NFS", spec,
418 self.rook_cluster.add_nfsgw)
419
420 def _service_rm_decorate(self, typename, name, func):
421 return write_completion(
422 on_complete=lambda : func(name),
423 message="Removing {} services for {}".format(typename, name),
424 mgr=self
425 )
426
427 def remove_service(self, service_type, service_name):
428 if service_type == 'mds':
429 return self._service_rm_decorate(
430 'MDS', service_name, lambda: self.rook_cluster.rm_service(
431 'cephfilesystems', service_name)
432 )
433 elif service_type == 'rgw':
434 return self._service_rm_decorate(
435 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
436 )
437 elif service_type == 'nfs':
438 return self._service_rm_decorate(
439 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
440 )
441
442 def apply_mon(self, spec):
443 # type: (ServiceSpec) -> RookCompletion
444 if spec.placement.hosts or spec.placement.label:
445 raise RuntimeError("Host list or label is not supported by rook.")
446
447 return write_completion(
448 lambda: self.rook_cluster.update_mon_count(spec.placement.count),
449 "Updating mon count to {0}".format(spec.placement.count),
450 mgr=self
451 )
452
453 def apply_mds(self, spec):
454 # type: (ServiceSpec) -> RookCompletion
455 return self._service_add_decorate('MDS', spec,
456 self.rook_cluster.apply_filesystem)
457
458 def apply_rgw(self, spec):
459 # type: (RGWSpec) -> RookCompletion
460 return self._service_add_decorate('RGW', spec,
461 self.rook_cluster.apply_objectstore)
462
463
464 def apply_nfs(self, spec):
465 # type: (NFSServiceSpec) -> RookCompletion
466 num = spec.placement.count
467 return write_completion(
468 lambda: self.rook_cluster.update_nfs_count(spec.service_id, num),
469 "Updating NFS server count in {0} to {1}".format(spec.service_id, num),
470 mgr=self
471 )
472
473 def remove_daemons(self, names):
474 return write_completion(
475 lambda: self.rook_cluster.remove_pods(names),
476 "Removing daemons {}".format(','.join(names)),
477 mgr=self
478 )
479
480 def create_osds(self, drive_group):
481 # type: (DriveGroupSpec) -> RookCompletion
482 """ Creates OSDs from a drive group specification.
483
484 $: ceph orch osd create -i <dg.file>
485
486 The drivegroup file must only contain one spec at a time.
487 """
488
489 targets = [] # type: List[str]
490 if drive_group.data_devices and drive_group.data_devices.paths:
491 targets += [d.path for d in drive_group.data_devices.paths]
492 if drive_group.data_directories:
493 targets += drive_group.data_directories
494
495 def execute(all_hosts_):
496 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
497 all_hosts = [h.hostname for h in all_hosts_]
498 matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
499
500 assert len(matching_hosts) == 1
501
502 if not self.rook_cluster.node_exists(matching_hosts[0]):
503 raise RuntimeError("Node '{0}' is not in the Kubernetes "
504 "cluster".format(matching_hosts))
505
506 # Validate whether cluster CRD can accept individual OSD
507 # creations (i.e. not useAllDevices)
508 if not self.rook_cluster.can_create_osd():
509 raise RuntimeError("Rook cluster configuration does not "
510 "support OSD creation.")
511
512 return orchestrator.Completion.with_progress(
513 message="Creating OSD on {0}:{1}".format(
514 matching_hosts,
515 targets),
516 mgr=self,
517 on_complete=lambda _:self.rook_cluster.add_osds(drive_group, all_hosts),
518 calc_percent=lambda: has_osds(all_hosts)
519 )
520
521 @deferred_read
522 def has_osds(all_hosts):
523 matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
524
525 # Find OSD pods on this host
526 pod_osd_ids = set()
527 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
528 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
529 field_selector="spec.nodeName={0}".format(
530 matching_hosts[0]
531 )).items
532 for p in pods:
533 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
534
535 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
536
537 found = []
538 osdmap = self.get("osd_map")
539 for osd in osdmap['osds']:
540 osd_id = osd['osd']
541 if osd_id not in pod_osd_ids:
542 continue
543
544 metadata = self.get_metadata('osd', "%s" % osd_id)
545 if metadata and metadata['devices'] in targets:
546 found.append(osd_id)
547 else:
548 self.log.info("ignoring osd {0} {1}".format(
549 osd_id, metadata['devices']
550 ))
551
552 return found is not None
553
554 c = self.get_hosts().then(execute)
555 return c
556
557 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
558 return write_completion(
559 on_complete=lambda: self.rook_cluster.blink_light(
560 ident_fault, on, locs),
561 message="Switching <{}> identification light in {}".format(
562 on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
563 mgr=self
564 )