]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rook/module.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
CommitLineData
11fdf7f2
TL
1import threading
2import functools
3import os
4import uuid
5try:
6 from typing import List
7except ImportError:
8 pass # just for type checking
9
10try:
11 from kubernetes import client, config
12 from kubernetes.client.rest import ApiException
13
14 kubernetes_imported = True
15except ImportError:
16 kubernetes_imported = False
17 client = None
18 config = None
19
20from mgr_module import MgrModule
21import orchestrator
22
23from .rook_cluster import RookCluster
24
25
26all_completions = []
27
28
29class RookReadCompletion(orchestrator.ReadCompletion):
30 """
31 All reads are simply API calls: avoid spawning
32 huge numbers of threads by just running them
33 inline when someone calls wait()
34 """
35
36 def __init__(self, cb):
37 super(RookReadCompletion, self).__init__()
38 self.cb = cb
39 self._result = None
40 self._complete = False
41
42 self.message = "<read op>"
43
44 # XXX hacky global
45 global all_completions
46 all_completions.append(self)
47
48 @property
49 def result(self):
50 return self._result
51
52 @property
53 def is_complete(self):
54 return self._complete
55
56 def execute(self):
57 self._result = self.cb()
58 self._complete = True
59
60
61class RookWriteCompletion(orchestrator.WriteCompletion):
62 """
63 Writes are a two-phase thing, firstly sending
64 the write to the k8s API (fast) and then waiting
65 for the corresponding change to appear in the
66 Ceph cluster (slow)
67 """
68 # XXX kubernetes bindings call_api already usefully has
69 # a completion= param that uses threads. Maybe just
70 # use that?
71 def __init__(self, execute_cb, complete_cb, message):
72 super(RookWriteCompletion, self).__init__()
73 self.execute_cb = execute_cb
74 self.complete_cb = complete_cb
75
76 # Executed means I executed my k8s API call, it may or may
77 # not have succeeded
78 self.executed = False
79
80 # Result of k8s API call, this is set if executed==True
81 self._result = None
82
83 self.effective = False
84
85 self.id = str(uuid.uuid4())
86
87 self.message = message
88
89 self.error = None
90
91 # XXX hacky global
92 global all_completions
93 all_completions.append(self)
94
81eedcae
TL
95 def __str__(self):
96 return self.message
97
11fdf7f2
TL
98 @property
99 def result(self):
100 return self._result
101
102 @property
103 def is_persistent(self):
104 return (not self.is_errored) and self.executed
105
106 @property
107 def is_effective(self):
108 return self.effective
109
110 @property
111 def is_errored(self):
112 return self.error is not None
113
114 def execute(self):
115 if not self.executed:
116 self._result = self.execute_cb()
117 self.executed = True
118
119 if not self.effective:
120 # TODO: check self.result for API errors
121 if self.complete_cb is None:
122 self.effective = True
123 else:
124 self.effective = self.complete_cb()
125
126
127def deferred_read(f):
128 """
129 Decorator to make RookOrchestrator methods return
130 a completion object that executes themselves.
131 """
132
133 @functools.wraps(f)
134 def wrapper(*args, **kwargs):
135 return RookReadCompletion(lambda: f(*args, **kwargs))
136
137 return wrapper
138
139
140class RookEnv(object):
141 def __init__(self):
142 # POD_NAMESPACE already exist for Rook 0.9
81eedcae 143 self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
11fdf7f2 144
11fdf7f2 145 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
81eedcae 146 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
11fdf7f2 147
81eedcae 148 self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
11fdf7f2
TL
149 self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
150 self.api_name = "ceph.rook.io/" + self.crd_version
151
152 def api_version_match(self):
153 return self.crd_version == 'v1'
154
81eedcae
TL
155 def has_namespace(self):
156 return 'POD_NAMESPACE' in os.environ
157
11fdf7f2
TL
158
159class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
160 MODULE_OPTIONS = [
161 # TODO: configure k8s API addr instead of assuming local
162 ]
163
11fdf7f2
TL
164 def wait(self, completions):
165 self.log.info("wait: completions={0}".format(completions))
166
167 incomplete = False
168
169 # Our `wait` implementation is very simple because everything's
170 # just an API call.
171 for c in completions:
172 if not isinstance(c, RookReadCompletion) and \
173 not isinstance(c, RookWriteCompletion):
174 raise TypeError(
175 "wait() requires list of completions, not {0}".format(
176 c.__class__
177 ))
178
179 if c.is_complete:
180 continue
181
11fdf7f2
TL
182 try:
183 c.execute()
184 except Exception as e:
81eedcae
TL
185 if not isinstance(e, orchestrator.OrchestratorError):
186 self.log.exception("Completion {0} threw an exception:".format(
187 c.message
188 ))
189 c.exception = e
11fdf7f2 190 c._complete = True
11fdf7f2
TL
191
192 if not c.is_complete:
193 incomplete = True
194
195 return not incomplete
196
197 @staticmethod
198 def can_run():
199 if not kubernetes_imported:
200 return False, "`kubernetes` python module not found"
201 if not RookEnv().api_version_match():
202 return False, "Rook version unsupported."
203 return True, ''
204
205 def available(self):
206 if not kubernetes_imported:
207 return False, "`kubernetes` python module not found"
81eedcae 208 elif not self._rook_env.has_namespace():
11fdf7f2
TL
209 return False, "ceph-mgr not running in Rook cluster"
210
211 try:
212 self.k8s.list_namespaced_pod(self._rook_env.cluster_name)
213 except ApiException as e:
214 return False, "Cannot reach Kubernetes API: {}".format(e)
215 else:
216 return True, ""
217
218 def __init__(self, *args, **kwargs):
219 super(RookOrchestrator, self).__init__(*args, **kwargs)
220
221 self._initialized = threading.Event()
222 self._k8s = None
223 self._rook_cluster = None
224 self._rook_env = RookEnv()
225
226 self._shutdown = threading.Event()
227
228 def shutdown(self):
229 self._shutdown.set()
230
231 @property
232 def k8s(self):
233 self._initialized.wait()
234 return self._k8s
235
236 @property
237 def rook_cluster(self):
238 # type: () -> RookCluster
239 self._initialized.wait()
240 return self._rook_cluster
241
242 def serve(self):
243 # For deployed clusters, we should always be running inside
244 # a Rook cluster. For development convenience, also support
245 # running outside (reading ~/.kube config)
246
247 if self._rook_env.cluster_name:
248 config.load_incluster_config()
249 cluster_name = self._rook_env.cluster_name
250 else:
251 self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
252 config.load_kube_config()
253
254 cluster_name = "rook-ceph"
255
256 # So that I can do port forwarding from my workstation - jcsp
257 from kubernetes.client import configuration
258 configuration.verify_ssl = False
259
260 self._k8s = client.CoreV1Api()
261
262 try:
263 # XXX mystery hack -- I need to do an API call from
264 # this context, or subsequent API usage from handle_command
265 # fails with SSLError('bad handshake'). Suspect some kind of
266 # thread context setup in SSL lib?
267 self._k8s.list_namespaced_pod(cluster_name)
268 except ApiException:
269 # Ignore here to make self.available() fail with a proper error message
270 pass
271
272 self._rook_cluster = RookCluster(
273 self._k8s,
274 self._rook_env)
275
276 self._initialized.set()
277
278 while not self._shutdown.is_set():
279 # XXX hack (or is it?) to kick all completions periodically,
280 # in case we had a caller that wait()'ed on them long enough
281 # to get persistence but not long enough to get completion
282
283 global all_completions
284 self.wait(all_completions)
285 all_completions = [c for c in all_completions if not c.is_complete]
286
287 self._shutdown.wait(5)
288
289 # TODO: watch Rook for config changes to complain/update if
290 # things look a bit out of sync?
291
292 @deferred_read
293 def get_inventory(self, node_filter=None, refresh=False):
294 node_list = None
295 if node_filter and node_filter.nodes:
296 # Explicit node list
297 node_list = node_filter.nodes
298 elif node_filter and node_filter.labels:
299 # TODO: query k8s API to resolve to node list, and pass
300 # it into RookCluster.get_discovered_devices
301 raise NotImplementedError()
302
303 devs = self.rook_cluster.get_discovered_devices(node_list)
304
305 result = []
306 for node_name, node_devs in devs.items():
307 devs = []
308 for d in node_devs:
309 dev = orchestrator.InventoryDevice()
310
311 # XXX CAUTION! https://github.com/rook/rook/issues/1716
312 # Passing this through for the sake of completeness but it
313 # is not trustworthy!
314 dev.blank = d['empty']
315 dev.type = 'hdd' if d['rotational'] else 'ssd'
316 dev.id = d['name']
317 dev.size = d['size']
318
319 if d['filesystem'] == "" and not d['rotational']:
320 # Empty or partitioned SSD
321 partitioned_space = sum(
322 [p['size'] for p in d['Partitions']])
323 dev.metadata_space_free = max(0, d[
324 'size'] - partitioned_space)
325
326 devs.append(dev)
327
328 result.append(orchestrator.InventoryNode(node_name, devs))
329
330 return result
331
332 @deferred_read
333 def describe_service(self, service_type=None, service_id=None, node_name=None):
334
81eedcae
TL
335 if service_type not in ("mds", "osd", "mgr", "mon", "nfs", None):
336 raise orchestrator.OrchestratorValidationError(service_type + " unsupported")
11fdf7f2
TL
337
338 pods = self.rook_cluster.describe_pods(service_type, service_id, node_name)
339
340 result = []
341 for p in pods:
342 sd = orchestrator.ServiceDescription()
343 sd.nodename = p['nodename']
344 sd.container_id = p['name']
345 sd.service_type = p['labels']['app'].replace('rook-ceph-', '')
346
347 if sd.service_type == "osd":
348 sd.service_instance = "%s" % p['labels']["ceph-osd-id"]
349 elif sd.service_type == "mds":
350 sd.service = p['labels']['rook_file_system']
351 pfx = "{0}-".format(sd.service)
352 sd.service_instance = p['labels']['ceph_daemon_id'].replace(pfx, '', 1)
353 elif sd.service_type == "mon":
354 sd.service_instance = p['labels']["mon"]
355 elif sd.service_type == "mgr":
356 sd.service_instance = p['labels']["mgr"]
357 elif sd.service_type == "nfs":
358 sd.service = p['labels']['ceph_nfs']
359 sd.service_instance = p['labels']['instance']
360 sd.rados_config_location = self.rook_cluster.get_nfs_conf_url(sd.service, sd.service_instance)
81eedcae
TL
361 elif sd.service_type == "rgw":
362 sd.service = p['labels']['rgw']
363 sd.service_instance = p['labels']['ceph_daemon_id']
11fdf7f2
TL
364 else:
365 # Unknown type -- skip it
366 continue
367
368 result.append(sd)
369
370 return result
371
372 def _service_add_decorate(self, typename, spec, func):
373 return RookWriteCompletion(lambda: func(spec), None,
374 "Creating {0} services for {1}".format(typename, spec.name))
375
376 def add_stateless_service(self, service_type, spec):
377 # assert isinstance(spec, orchestrator.StatelessServiceSpec)
378 if service_type == "mds":
379 return self._service_add_decorate("Filesystem", spec,
380 self.rook_cluster.add_filesystem)
381 elif service_type == "rgw" :
382 return self._service_add_decorate("RGW", spec,
383 self.rook_cluster.add_objectstore)
384 elif service_type == "nfs" :
385 return self._service_add_decorate("NFS", spec,
386 self.rook_cluster.add_nfsgw)
387 else:
388 raise NotImplementedError(service_type)
389
390 def remove_stateless_service(self, service_type, service_id):
391 return RookWriteCompletion(
392 lambda: self.rook_cluster.rm_service(service_type, service_id), None,
393 "Removing {0} services for {1}".format(service_type, service_id))
394
395 def update_mons(self, num, hosts):
396 if hosts:
397 raise RuntimeError("Host list is not supported by rook.")
398
399 return RookWriteCompletion(
400 lambda: self.rook_cluster.update_mon_count(num), None,
401 "Updating mon count to {0}".format(num))
402
403 def update_stateless_service(self, svc_type, spec):
404 # only nfs is currently supported
405 if svc_type != "nfs":
406 raise NotImplementedError(svc_type)
407
408 num = spec.count
409 return RookWriteCompletion(
410 lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
411 "Updating NFS server count in {0} to {1}".format(spec.name, num))
412
413 def create_osds(self, drive_group, all_hosts):
414 # type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
415
416 assert len(drive_group.hosts(all_hosts)) == 1
417 targets = []
418 if drive_group.data_devices:
419 targets += drive_group.data_devices.paths
420 if drive_group.data_directories:
421 targets += drive_group.data_directories
422
423 if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
424 raise RuntimeError("Node '{0}' is not in the Kubernetes "
425 "cluster".format(drive_group.hosts(all_hosts)))
426
427 # Validate whether cluster CRD can accept individual OSD
428 # creations (i.e. not useAllDevices)
429 if not self.rook_cluster.can_create_osd():
430 raise RuntimeError("Rook cluster configuration does not "
431 "support OSD creation.")
432
433 def execute():
434 return self.rook_cluster.add_osds(drive_group, all_hosts)
435
436 def is_complete():
437 # Find OSD pods on this host
438 pod_osd_ids = set()
81eedcae 439 pods = self._k8s.list_namespaced_pod(self._rook_env.namespace,
11fdf7f2
TL
440 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
441 field_selector="spec.nodeName={0}".format(
442 drive_group.hosts(all_hosts)[0]
443 )).items
444 for p in pods:
445 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
446
447 self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
448
449 found = []
450 osdmap = self.get("osd_map")
451 for osd in osdmap['osds']:
452 osd_id = osd['osd']
453 if osd_id not in pod_osd_ids:
454 continue
455
456 metadata = self.get_metadata('osd', "%s" % osd_id)
457 if metadata and metadata['devices'] in targets:
458 found.append(osd_id)
459 else:
460 self.log.info("ignoring osd {0} {1}".format(
461 osd_id, metadata['devices']
462 ))
463
464 return found is not None
465
466 return RookWriteCompletion(execute, is_complete,
467 "Creating OSD on {0}:{1}".format(
468 drive_group.hosts(all_hosts)[0], targets
469 ))