]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
1 import datetime
2 import threading
3 import functools
4 import os
5 import json
6
7 from ceph.deployment import inventory
8 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
9
10 try:
11 from typing import List, Dict, Optional, Callable, Any
12 from ceph.deployment.drive_group import DriveGroupSpec
13 except ImportError:
14 pass # just for type checking
15
16 try:
17 from kubernetes import client, config
18 from kubernetes.client.rest import ApiException
19
20 kubernetes_imported = True
21
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes.client.models.v1_container_image import V1ContainerImage
24 def names(self, names):
25 self._names = names
26 V1ContainerImage.names = V1ContainerImage.names.setter(names)
27
28 except ImportError:
29 kubernetes_imported = False
30 client = None
31 config = None
32
33 from mgr_module import MgrModule
34 import orchestrator
35
36 from .rook_cluster import RookCluster
37
38
39 class RookCompletion(orchestrator.Completion):
40 def evaluate(self):
41 self.finalize(None)
42
43
44 def deferred_read(f):
45 # type: (Callable) -> Callable[..., RookCompletion]
46 """
47 Decorator to make RookOrchestrator methods return
48 a completion object that executes themselves.
49 """
50
51 @functools.wraps(f)
52 def wrapper(*args, **kwargs):
53 return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
54
55 return wrapper
56
57
58 def write_completion(on_complete, # type: Callable
59 message, # type: str
60 mgr,
61 calc_percent=None # type: Optional[Callable[[], RookCompletion]]
62 ):
63 # type: (...) -> RookCompletion
64 return RookCompletion.with_progress(
65 message=message,
66 mgr=mgr,
67 on_complete=lambda _: on_complete(),
68 calc_percent=calc_percent,
69 )
70
71
72 class RookEnv(object):
73 def __init__(self):
74 # POD_NAMESPACE already exist for Rook 0.9
75 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
76
77 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
78 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
79
80 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
81 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
82 self.api_name = "ceph.rook.io/" + self.crd_version
83
84 def api_version_match(self):
85 return self.crd_version == 'v1'
86
87 def has_namespace(self):
88 return 'POD_NAMESPACE' in os.environ
89
90
91 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
92 """
93 Writes are a two-phase thing, firstly sending
94 the write to the k8s API (fast) and then waiting
95 for the corresponding change to appear in the
96 Ceph cluster (slow)
97
98 Right now, we are calling the k8s API synchronously.
99 """
100
101 MODULE_OPTIONS = [
102 # TODO: configure k8s API addr instead of assuming local
103 ] # type: List[Dict[str, Any]]
104
105 def process(self, completions):
106 # type: (List[RookCompletion]) -> None
107
108 if completions:
109 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
110
111 for p in completions:
112 p.evaluate()
113
114 @staticmethod
115 def can_run():
116 if not kubernetes_imported:
117 return False, "`kubernetes` python module not found"
118 if not RookEnv().api_version_match():
119 return False, "Rook version unsupported."
120 return True, ''
121
122 def available(self):
123 if not kubernetes_imported:
124 return False, "`kubernetes` python module not found"
125 elif not self._rook_env.has_namespace():
126 return False, "ceph-mgr not running in Rook cluster"
127
128 try:
129 self.k8s.list_namespaced_pod(self._rook_env.namespace)
130 except ApiException as e:
131 return False, "Cannot reach Kubernetes API: {}".format(e)
132 else:
133 return True, ""
134
135 def __init__(self, *args, **kwargs):
136 super(RookOrchestrator, self).__init__(*args, **kwargs)
137
138 self._initialized = threading.Event()
139 self._k8s_CoreV1_api = None
140 self._k8s_BatchV1_api = None
141 self._rook_cluster = None
142 self._rook_env = RookEnv()
143
144 self._shutdown = threading.Event()
145
146 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
147
148 def shutdown(self):
149 self._shutdown.set()
150
151 @property
152 def k8s(self):
153 # type: () -> client.CoreV1Api
154 self._initialized.wait()
155 assert self._k8s_CoreV1_api is not None
156 return self._k8s_CoreV1_api
157
158 @property
159 def rook_cluster(self):
160 # type: () -> RookCluster
161 self._initialized.wait()
162 assert self._rook_cluster is not None
163 return self._rook_cluster
164
165 def serve(self):
166 # For deployed clusters, we should always be running inside
167 # a Rook cluster. For development convenience, also support
168 # running outside (reading ~/.kube config)
169
170 if self._rook_env.has_namespace():
171 config.load_incluster_config()
172 else:
173 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
174 config.load_kube_config()
175
176 # So that I can do port forwarding from my workstation - jcsp
177 from kubernetes.client import configuration
178 configuration.verify_ssl = False
179
180 self._k8s_CoreV1_api = client.CoreV1Api()
181 self._k8s_BatchV1_api = client.BatchV1Api()
182
183 try:
184 # XXX mystery hack -- I need to do an API call from
185 # this context, or subsequent API usage from handle_command
186 # fails with SSLError('bad handshake'). Suspect some kind of
187 # thread context setup in SSL lib?
188 self._k8s_CoreV1_api.list_namespaced_pod(self._rook_env.namespace)
189 except ApiException:
190 # Ignore here to make self.available() fail with a proper error message
191 pass
192
193 self._rook_cluster = RookCluster(
194 self._k8s_CoreV1_api,
195 self._k8s_BatchV1_api,
196 self._rook_env)
197
198 self._initialized.set()
199
200 while not self._shutdown.is_set():
201 # XXX hack (or is it?) to kick all completions periodically,
202 # in case we had a caller that wait()'ed on them long enough
203 # to get persistence but not long enough to get completion
204
205 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
206 for p in self.all_progress_references:
207 p.update()
208
209 self._shutdown.wait(5)
210
211 def cancel_completions(self):
212 for p in self.all_progress_references:
213 p.fail()
214 self.all_progress_references.clear()
215
216 @deferred_read
217 def get_inventory(self, host_filter=None, refresh=False):
218 host_list = None
219 if host_filter and host_filter.hosts:
220 # Explicit host list
221 host_list = host_filter.hosts
222 elif host_filter and host_filter.labels:
223 # TODO: query k8s API to resolve to host list, and pass
224 # it into RookCluster.get_discovered_devices
225 raise NotImplementedError()
226
227 devs = self.rook_cluster.get_discovered_devices(host_list)
228
229 result = []
230 for host_name, host_devs in devs.items():
231 devs = []
232 for d in host_devs:
233 if 'cephVolumeData' in d and d['cephVolumeData']:
234 devs.append(inventory.Device.from_json(json.loads(d['cephVolumeData'])))
235 else:
236 devs.append(inventory.Device(
237 path = '/dev/' + d['name'],
238 sys_api = dict(
239 rotational = '1' if d['rotational'] else '0',
240 size = d['size']
241 ),
242 available = False,
243 rejected_reasons=['device data coming from ceph-volume not provided'],
244 ))
245
246 result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
247
248 return result
249
250 @deferred_read
251 def get_hosts(self):
252 # type: () -> List[orchestrator.HostSpec]
253 return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
254
255 @deferred_read
256 def describe_service(self, service_type=None, service_name=None,
257 refresh=False):
258 now = datetime.datetime.utcnow()
259
260 # CephCluster
261 cl = self.rook_cluster.rook_api_get(
262 "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
263 self.log.debug('CephCluster %s' % cl)
264 image_name = cl['spec'].get('cephVersion', {}).get('image', None)
265 num_nodes = len(self.rook_cluster.get_node_names())
266
267 spec = {}
268 spec['mon'] = orchestrator.ServiceDescription(
269 spec=ServiceSpec(
270 'mon',
271 placement=PlacementSpec(
272 count=cl['spec'].get('mon', {}).get('count', 1),
273 ),
274 ),
275 size=cl['spec'].get('mon', {}).get('count', 1),
276 container_image_name=image_name,
277 last_refresh=now,
278 )
279 spec['mgr'] = orchestrator.ServiceDescription(
280 spec=ServiceSpec(
281 'mgr',
282 placement=PlacementSpec.from_string('count:1'),
283 ),
284 size=1,
285 container_image_name=image_name,
286 last_refresh=now,
287 )
288 if not cl['spec'].get('crashCollector', {}).get('disable', False):
289 spec['crash'] = orchestrator.ServiceDescription(
290 spec=ServiceSpec(
291 'crash',
292 placement=PlacementSpec.from_string('*'),
293 ),
294 size=num_nodes,
295 container_image_name=image_name,
296 last_refresh=now,
297 )
298
299 # CephFilesystems
300 all_fs = self.rook_cluster.rook_api_get(
301 "cephfilesystems/")
302 self.log.debug('CephFilesystems %s' % all_fs)
303 for fs in all_fs.get('items', []):
304 svc = 'mds.' + fs['metadata']['name']
305 if svc in spec:
306 continue
307 # FIXME: we are conflating active (+ standby) with count
308 active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
309 total_mds = active
310 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
311 total_mds = active * 2
312 spec[svc] = orchestrator.ServiceDescription(
313 spec=ServiceSpec(
314 service_type='mds',
315 service_id=fs['metadata']['name'],
316 placement=PlacementSpec(count=active),
317 ),
318 size=total_mds,
319 container_image_name=image_name,
320 last_refresh=now,
321 )
322
323 # CephObjectstores
324 all_zones = self.rook_cluster.rook_api_get(
325 "cephobjectstores/")
326 self.log.debug('CephObjectstores %s' % all_zones)
327 for zone in all_zones.get('items', []):
328 rgw_realm = zone['metadata']['name']
329 rgw_zone = rgw_realm
330 svc = 'rgw.' + rgw_realm + '.' + rgw_zone
331 if svc in spec:
332 continue
333 active = zone['spec']['gateway']['instances'];
334 if 'securePort' in zone['spec']['gateway']:
335 ssl = True
336 port = zone['spec']['gateway']['securePort']
337 else:
338 ssl = False
339 port = zone['spec']['gateway']['port'] or 80
340 spec[svc] = orchestrator.ServiceDescription(
341 spec=RGWSpec(
342 service_id=rgw_realm + '.' + rgw_zone,
343 rgw_realm=rgw_realm,
344 rgw_zone=rgw_zone,
345 ssl=ssl,
346 rgw_frontend_port=port,
347 placement=PlacementSpec(count=active),
348 ),
349 size=active,
350 container_image_name=image_name,
351 last_refresh=now,
352 )
353
354 for dd in self._list_daemons():
355 if dd.service_name() not in spec:
356 continue
357 spec[dd.service_name()].running += 1
358 return [v for k, v in spec.items()]
359
360 @deferred_read
361 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
362 refresh=False):
363 return self._list_daemons(service_name=service_name,
364 daemon_type=daemon_type,
365 daemon_id=daemon_id,
366 host=host,
367 refresh=refresh)
368
369 def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
370 refresh=False):
371 pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
372 self.log.debug('pods %s' % pods)
373 result = []
374 for p in pods:
375 sd = orchestrator.DaemonDescription()
376 sd.hostname = p['hostname']
377 sd.container_id = p['name']
378 sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
379 status = {
380 'Pending': -1,
381 'Running': 1,
382 'Succeeded': 0,
383 'Failed': -1,
384 'Unknown': -1,
385 }[p['phase']]
386 sd.status = status
387 sd.status_desc = p['phase']
388
389 if 'ceph_daemon_id' in p['labels']:
390 sd.daemon_id = p['labels']['ceph_daemon_id']
391 elif 'ceph-osd-id' in p['labels']:
392 sd.daemon_id = p['labels']['ceph-osd-id']
393 else:
394 # Unknown type -- skip it
395 continue
396
397 if service_name is not None and service_name != sd.service_name():
398 continue
399 sd.container_image_name = p['container_image_name']
400 sd.created = p['created']
401 sd.last_configured = p['created']
402 sd.last_deployed = p['created']
403 sd.started = p['started']
404 sd.last_refresh = p['refreshed']
405 result.append(sd)
406
407 return result
408
409 def _service_add_decorate(self, typename, spec, func):
410 return write_completion(
411 on_complete=lambda : func(spec),
412 message="Creating {} services for {}".format(typename, spec.service_id),
413 mgr=self
414 )
415
416 def add_nfs(self, spec):
417 # type: (NFSServiceSpec) -> RookCompletion
418 return self._service_add_decorate("NFS", spec,
419 self.rook_cluster.add_nfsgw)
420
421 def _service_rm_decorate(self, typename, name, func):
422 return write_completion(
423 on_complete=lambda : func(name),
424 message="Removing {} services for {}".format(typename, name),
425 mgr=self
426 )
427
428 def remove_service(self, service_type, service_name):
429 if service_type == 'mds':
430 return self._service_rm_decorate(
431 'MDS', service_name, lambda: self.rook_cluster.rm_service(
432 'cephfilesystems', service_name)
433 )
434 elif service_type == 'rgw':
435 return self._service_rm_decorate(
436 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
437 )
438 elif service_type == 'nfs':
439 return self._service_rm_decorate(
440 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
441 )
442
443 def apply_mon(self, spec):
444 # type: (ServiceSpec) -> RookCompletion
445 if spec.placement.hosts or spec.placement.label:
446 raise RuntimeError("Host list or label is not supported by rook.")
447
448 return write_completion(
449 lambda: self.rook_cluster.update_mon_count(spec.placement.count),
450 "Updating mon count to {0}".format(spec.placement.count),
451 mgr=self
452 )
453
454 def apply_mds(self, spec):
455 # type: (ServiceSpec) -> RookCompletion
456 return self._service_add_decorate('MDS', spec,
457 self.rook_cluster.apply_filesystem)
458
459 def apply_rgw(self, spec):
460 # type: (RGWSpec) -> RookCompletion
461 return self._service_add_decorate('RGW', spec,
462 self.rook_cluster.apply_objectstore)
463
464 def apply_nfs(self, spec):
465 # type: (NFSServiceSpec) -> RookCompletion
466 num = spec.placement.count
467 return write_completion(
468 lambda: self.rook_cluster.update_nfs_count(spec.service_id, num),
469 "Updating NFS server count in {0} to {1}".format(spec.service_id, num),
470 mgr=self
471 )
472
473 def remove_daemons(self, names):
474 return write_completion(
475 lambda: self.rook_cluster.remove_pods(names),
476 "Removing daemons {}".format(','.join(names)),
477 mgr=self
478 )
479
480 def create_osds(self, drive_group):
481 # type: (DriveGroupSpec) -> RookCompletion
482 """ Creates OSDs from a drive group specification.
483
484 $: ceph orch osd create -i <dg.file>
485
486 The drivegroup file must only contain one spec at a time.
487 """
488
489 targets = [] # type: List[str]
490 if drive_group.data_devices and drive_group.data_devices.paths:
491 targets += [d.path for d in drive_group.data_devices.paths]
492 if drive_group.data_directories:
493 targets += drive_group.data_directories
494
495 def execute(all_hosts_):
496 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
497 matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_)
498
499 assert len(matching_hosts) == 1
500
501 if not self.rook_cluster.node_exists(matching_hosts[0]):
502 raise RuntimeError("Node '{0}' is not in the Kubernetes "
503 "cluster".format(matching_hosts))
504
505 # Validate whether cluster CRD can accept individual OSD
506 # creations (i.e. not useAllDevices)
507 if not self.rook_cluster.can_create_osd():
508 raise RuntimeError("Rook cluster configuration does not "
509 "support OSD creation.")
510
511 return orchestrator.Completion.with_progress(
512 message="Creating OSD on {0}:{1}".format(
513 matching_hosts,
514 targets),
515 mgr=self,
516 on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
517 calc_percent=lambda: has_osds(matching_hosts)
518 )
519
520 @deferred_read
521 def has_osds(matching_hosts):
522
523 # Find OSD pods on this host
524 pod_osd_ids = set()
525 pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
526 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
527 field_selector="spec.nodeName={0}".format(
528 matching_hosts[0]
529 )).items
530 for p in pods:
531 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
532
533 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
534
535 found = []
536 osdmap = self.get("osd_map")
537 for osd in osdmap['osds']:
538 osd_id = osd['osd']
539 if osd_id not in pod_osd_ids:
540 continue
541
542 metadata = self.get_metadata('osd', "%s" % osd_id)
543 if metadata and metadata['devices'] in targets:
544 found.append(osd_id)
545 else:
546 self.log.info("ignoring osd {0} {1}".format(
547 osd_id, metadata['devices']
548 ))
549
550 return found is not None
551
552 c = self.get_hosts().then(execute)
553 return c
554
555 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
556 return write_completion(
557 on_complete=lambda: self.rook_cluster.blink_light(
558 ident_fault, on, locs),
559 message="Switching <{}> identification light in {}".format(
560 on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
561 mgr=self
562 )