]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rook/rook_cluster.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
CommitLineData
11fdf7f2
TL
1"""
2This module wrap's Rook + Kubernetes APIs to expose the calls
3needed to implement an orchestrator module. While the orchestrator
4module exposes an async API, this module simply exposes blocking API
5call methods.
6
7This module is runnable outside of ceph-mgr, useful for testing.
8"""
9f95a23c
TL
9import datetime
10import threading
11fdf7f2
TL
11import logging
12import json
13from contextlib import contextmanager
14
9f95a23c 15import jsonpatch
11fdf7f2
TL
16from six.moves.urllib.parse import urljoin # pylint: disable=import-error
17
18# Optional kubernetes imports to enable MgrModule.can_run
19# to behave cleanly.
9f95a23c
TL
20from urllib3.exceptions import ProtocolError
21
22from ceph.deployment.drive_group import DriveGroupSpec
23from ceph.deployment.service_spec import ServiceSpec
24from mgr_util import merge_dicts
25
26try:
27 from typing import Optional
28except ImportError:
29 pass # just for type annotations
30
11fdf7f2
TL
31try:
32 from kubernetes.client.rest import ApiException
9f95a23c
TL
33 from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod, V1DeleteOptions
34 from kubernetes import watch
11fdf7f2 35except ImportError:
9f95a23c
TL
36 class ApiException(Exception): # type: ignore
37 status = 0
38
39from .rook_client.ceph import cephfilesystem as cfs
40from .rook_client.ceph import cephnfs as cnfs
41from .rook_client.ceph import cephobjectstore as cos
42from .rook_client.ceph import cephcluster as ccl
43
44
45import orchestrator
46
11fdf7f2
TL
47
48try:
11fdf7f2 49 from rook.module import RookEnv
9f95a23c 50 from typing import List, Dict
11fdf7f2
TL
51except ImportError:
52 pass # just used for type checking.
53
11fdf7f2
TL
54log = logging.getLogger(__name__)
55
56
9f95a23c
TL
57def _urllib3_supports_read_chunked():
58 # There is a bug in CentOS 7 as it ships a urllib3 which is lower
59 # than required by kubernetes-client
60 try:
61 from urllib3.response import HTTPResponse
62 return hasattr(HTTPResponse, 'read_chunked')
63 except ImportError:
64 return False
65
66
67_urllib3_supports_read_chunked = _urllib3_supports_read_chunked()
68
69class ApplyException(orchestrator.OrchestratorError):
11fdf7f2
TL
70 """
71 For failures to update the Rook CRDs, usually indicating
72 some kind of interference between our attempted update
73 and other conflicting activity.
74 """
75
76
9f95a23c
TL
77def threaded(f):
78 def wrapper(*args, **kwargs):
79 t = threading.Thread(target=f, args=args, kwargs=kwargs)
80 t.start()
81 return t
82
83 return wrapper
84
85
86class KubernetesResource(object):
87 def __init__(self, api_func, **kwargs):
88 """
89 Generic kubernetes Resource parent class
90
91 The api fetch and watch methods should be common across resource types,
92
93 Exceptions in the runner thread are propagated to the caller.
94
95 :param api_func: kubernetes client api function that is passed to the watcher
96 :param filter_func: signature: ``(Item) -> bool``.
97 """
98 self.kwargs = kwargs
99 self.api_func = api_func
100
101 # ``_items`` is accessed by different threads. I assume assignment is atomic.
102 self._items = dict()
103 self.thread = None # type: Optional[threading.Thread]
104 self.exception = None
105 if not _urllib3_supports_read_chunked:
106 logging.info('urllib3 is too old. Fallback to full fetches')
107
108 def _fetch(self):
109 """ Execute the requested api method as a one-off fetch"""
110 response = self.api_func(**self.kwargs)
111 # metadata is a V1ListMeta object type
112 metadata = response.metadata # type: V1ListMeta
113 self._items = {item.metadata.name: item for item in response.items}
114 log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
115 return metadata.resource_version
116
117 @property
118 def items(self):
119 """
120 Returns the items of the request.
121 Creates the watcher as a side effect.
122 :return:
123 """
124 if self.exception:
125 e = self.exception
126 self.exception = None
127 raise e # Propagate the exception to the user.
128 if not self.thread or not self.thread.is_alive():
129 resource_version = self._fetch()
130 if _urllib3_supports_read_chunked:
131 # Start a thread which will use the kubernetes watch client against a resource
132 log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
133 self.thread = self._watch(resource_version)
134
135 return self._items.values()
136
137 @threaded
138 def _watch(self, res_ver):
139 """ worker thread that runs the kubernetes watch """
140
141 self.exception = None
142
143 w = watch.Watch()
144
145 try:
146 # execute generator to continually watch resource for changes
147 for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
148 **self.kwargs):
149 self.health = ''
150 item = event['object']
151 try:
152 name = item.metadata.name
153 except AttributeError:
154 raise AttributeError(
155 "{} doesn't contain a metadata.name. Unable to track changes".format(
156 self.api_func))
157
158 log.info('{} event: {}'.format(event['type'], name))
159
160 if event['type'] in ('ADDED', 'MODIFIED'):
161 self._items = merge_dicts(self._items, {name: item})
162 elif event['type'] == 'DELETED':
163 self._items = {k:v for k,v in self._items.items() if k != name}
164 elif event['type'] == 'BOOKMARK':
165 pass
166 elif event['type'] == 'ERROR':
167 raise ApiException(str(event))
168 else:
169 raise KeyError('Unknown watch event {}'.format(event['type']))
170 except ProtocolError as e:
171 if 'Connection broken' in str(e):
172 log.info('Connection reset.')
173 return
174 raise
175 except ApiException as e:
176 log.exception('K8s API failed. {}'.format(self.api_func))
177 self.exception = e
178 raise
179 except Exception as e:
180 log.exception("Watcher failed. ({})".format(self.api_func))
181 self.exception = e
182 raise
183
184
11fdf7f2
TL
185class RookCluster(object):
186 def __init__(self, k8s, rook_env):
187 self.rook_env = rook_env # type: RookEnv
9f95a23c
TL
188 self.k8s = k8s # type: CoreV1Api
189
190 # TODO: replace direct k8s calls with Rook API calls
191 # when they're implemented
192 self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map,
193 namespace=self.rook_env.operator_namespace,
194 label_selector="app=rook-discover")
195
196 self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod,
197 namespace=self.rook_env.namespace,
198 label_selector="rook_cluster={0}".format(
199 self.rook_env.cluster_name))
200 self.nodes = KubernetesResource(self.k8s.list_node)
11fdf7f2
TL
201
202 def rook_url(self, path):
203 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
81eedcae 204 self.rook_env.crd_version, self.rook_env.namespace)
11fdf7f2
TL
205 return urljoin(prefix, path)
206
207 def rook_api_call(self, verb, path, **kwargs):
208 full_path = self.rook_url(path)
209 log.debug("[%s] %s" % (verb, full_path))
210
211 return self.k8s.api_client.call_api(
212 full_path,
213 verb,
214 auth_settings=['BearerToken'],
215 response_type="object",
216 _return_http_data_only=True,
217 _preload_content=True,
218 **kwargs)
219
220 def rook_api_get(self, path, **kwargs):
221 return self.rook_api_call("GET", path, **kwargs)
222
223 def rook_api_delete(self, path):
224 return self.rook_api_call("DELETE", path)
225
226 def rook_api_patch(self, path, **kwargs):
227 return self.rook_api_call("PATCH", path,
228 header_params={"Content-Type": "application/json-patch+json"},
229 **kwargs)
230
231 def rook_api_post(self, path, **kwargs):
232 return self.rook_api_call("POST", path, **kwargs)
233
234 def get_discovered_devices(self, nodenames=None):
9f95a23c
TL
235 def predicate(item):
236 if nodenames is not None:
237 return item.metadata.labels['rook.io/node'] in nodenames
238 else:
239 return True
11fdf7f2
TL
240
241 try:
9f95a23c 242 result = [i for i in self.inventory_maps.items if predicate(i)]
11fdf7f2 243 except ApiException as e:
9f95a23c 244 log.exception("Failed to fetch device metadata")
11fdf7f2
TL
245 raise
246
247 nodename_to_devices = {}
9f95a23c 248 for i in result:
11fdf7f2
TL
249 drives = json.loads(i.data['devices'])
250 nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
251
252 return nodename_to_devices
253
254 def get_nfs_conf_url(self, nfs_cluster, instance):
255 #
256 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
257 # URL for the instance within that cluster. If the fetch fails, just
258 # return None.
259 #
260 try:
261 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
262 except ApiException as e:
263 log.info("Unable to fetch cephnfs object: {}".format(e.status))
264 return None
265
266 pool = ceph_nfs['spec']['rados']['pool']
267 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
268
269 if namespace == None:
270 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
271 else:
272 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
273 return url
274
11fdf7f2 275 def describe_pods(self, service_type, service_id, nodename):
9f95a23c
TL
276 """
277 Go query the k8s API about deployment, containers related to this
278 filesystem
11fdf7f2 279
9f95a23c
TL
280 Example Rook Pod labels for a mgr daemon:
281 Labels: app=rook-ceph-mgr
282 pod-template-hash=2171958073
283 rook_cluster=rook
284 And MDS containers additionally have `rook_filesystem` label
11fdf7f2 285
9f95a23c
TL
286 Label filter is rook_cluster=<cluster name>
287 rook_file_system=<self.fs_name>
288 """
289 def predicate(item):
290 # type: (V1Pod) -> bool
291 metadata = item.metadata
292 if service_type is not None:
293 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
294 return False
295
296 if service_id is not None:
297 try:
298 k, v = {
299 "mds": ("rook_file_system", service_id),
300 "osd": ("ceph-osd-id", service_id),
301 "mon": ("mon", service_id),
302 "mgr": ("mgr", service_id),
303 "ceph_nfs": ("ceph_nfs", service_id),
304 "rgw": ("ceph_rgw", service_id),
305 }[service_type]
306 except KeyError:
307 raise orchestrator.OrchestratorValidationError(
308 '{} not supported'.format(service_type))
309 if metadata.labels[k] != v:
310 return False
311
312 if nodename is not None:
313 if item.spec.node_name != nodename:
314 return False
315 return True
11fdf7f2 316
9f95a23c
TL
317 refreshed = datetime.datetime.utcnow()
318 pods = [i for i in self.rook_pods.items if predicate(i)]
11fdf7f2
TL
319
320 pods_summary = []
321
9f95a23c 322 for p in pods:
11fdf7f2 323 d = p.to_dict()
9f95a23c
TL
324
325 image_name = None
326 for c in d['spec']['containers']:
327 # look at the first listed container in the pod...
328 image_name = c['image']
329 break
330
331 s = {
11fdf7f2 332 "name": d['metadata']['name'],
9f95a23c
TL
333 "hostname": d['spec']['node_name'],
334 "labels": d['metadata']['labels'],
335 'phase': d['status']['phase'],
336 'container_image_name': image_name,
337 'refreshed': refreshed,
338 # these may get set below...
339 'started': None,
340 'created': None,
341 }
342
343 # note: we want UTC but no tzinfo
344 if d['metadata'].get('creation_timestamp', None):
345 s['created'] = d['metadata']['creation_timestamp'].astimezone(
346 tz=datetime.timezone.utc).replace(tzinfo=None)
347 if d['status'].get('start_time', None):
348 s['started'] = d['status']['start_time'].astimezone(
349 tz=datetime.timezone.utc).replace(tzinfo=None)
350
351 pods_summary.append(s)
11fdf7f2
TL
352
353 return pods_summary
354
9f95a23c
TL
355 def remove_pods(self, names):
356 pods = [i for i in self.rook_pods.items]
357 num = 0
358 for p in pods:
359 d = p.to_dict()
360 daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
361 daemon_id = d['metadata']['labels']['ceph_daemon_id']
362 name = daemon_type + '.' + daemon_id
363 if name in names:
364 self.k8s.delete_namespaced_pod(
365 d['metadata']['name'],
366 self.rook_env.namespace,
367 body=V1DeleteOptions()
368 )
369 num += 1
370 return "Removed %d pods" % num
371
372 def get_node_names(self):
373 return [i.metadata.name for i in self.nodes.items]
374
11fdf7f2
TL
375 @contextmanager
376 def ignore_409(self, what):
377 try:
378 yield
379 except ApiException as e:
380 if e.status == 409:
381 # Idempotent, succeed.
382 log.info("{} already exists".format(what))
383 else:
384 raise
385
9f95a23c
TL
386 def apply_filesystem(self, spec):
387 # type: (ServiceSpec) -> None
11fdf7f2
TL
388 # TODO use spec.placement
389 # TODO warn if spec.extended has entries we don't kow how
390 # to action.
9f95a23c
TL
391 def _update_fs(current, new):
392 # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
393 new.spec.metadataServer.activeCount = spec.placement.count or 1
394 return new
395
396 def _create_fs():
397 # type: () -> cfs.CephFilesystem
398 return cfs.CephFilesystem(
399 apiVersion=self.rook_env.api_name,
400 metadata=dict(
401 name=spec.service_id,
402 namespace=self.rook_env.namespace,
403 ),
404 spec=cfs.Spec(
405 metadataServer=cfs.MetadataServer(
406 activeCount=spec.placement.count or 1,
407 activeStandby=True
408 )
409 )
410 )
411 return self._create_or_patch(
412 cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
413 _update_fs, _create_fs)
414
415 def apply_objectstore(self, spec):
416
417 # FIXME: service_id is $realm.$zone, but rook uses realm
418 # $crname and zone $crname. The '.' will confuse kubernetes.
419 # For now, assert that realm==zone.
420 (realm, zone) = spec.service_id.split('.', 1)
421 assert realm == zone
422 assert spec.subcluster is None
423 name = realm
424
425 def _create_zone():
426 # type: () -> cos.CephObjectStore
427 port = None
428 secure_port = None
429 if spec.ssl:
430 secure_port = spec.get_port()
431 else:
432 port = spec.get_port()
433 return cos.CephObjectStore(
434 apiVersion=self.rook_env.api_name,
435 metadata=dict(
436 name=name,
437 namespace=self.rook_env.namespace
438 ),
439 spec=cos.Spec(
440 gateway=cos.Gateway(
441 type='s3',
442 port=port,
443 securePort=secure_port,
444 instances=spec.placement.count or 1,
445 )
446 )
447 )
448
449 def _update_zone(current, new):
450 new.spec.gateway.instances = spec.placement.count or 1
451 return new
452
453 return self._create_or_patch(
454 cos.CephObjectStore, 'cephobjectstores', name,
455 _update_zone, _create_zone)
11fdf7f2
TL
456
457 def add_nfsgw(self, spec):
458 # TODO use spec.placement
459 # TODO warn if spec.extended has entries we don't kow how
460 # to action.
461
9f95a23c
TL
462 rook_nfsgw = cnfs.CephNFS(
463 apiVersion=self.rook_env.api_name,
464 metadata=dict(
465 name=spec.service_id,
466 namespace=self.rook_env.namespace,
467 ),
468 spec=cnfs.Spec(
469 rados=cnfs.Rados(
470 pool=spec.pool
471 ),
472 server=cnfs.Server(
473 active=spec.placement.count
474 )
475 )
476 )
477
478 if spec.namespace:
479 rook_nfsgw.spec.rados.namespace = spec.namespace
480
481 with self.ignore_409("NFS cluster '{0}' already exists".format(spec.service_id)):
482 self.rook_api_post("cephnfses/", body=rook_nfsgw.to_json())
483
484 def rm_service(self, rooktype, service_id):
11fdf7f2
TL
485
486 objpath = "{0}/{1}".format(rooktype, service_id)
487
488 try:
489 self.rook_api_delete(objpath)
490 except ApiException as e:
491 if e.status == 404:
9f95a23c 492 log.info("{0} service '{1}' does not exist".format(rooktype, service_id))
11fdf7f2
TL
493 # Idempotent, succeed.
494 else:
495 raise
496
497 def can_create_osd(self):
498 current_cluster = self.rook_api_get(
499 "cephclusters/{0}".format(self.rook_env.cluster_name))
500 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
501
502 # If useAllNodes is set, then Rook will not be paying attention
503 # to anything we put in 'nodes', so can't do OSD creation.
504 return not use_all_nodes
505
506 def node_exists(self, node_name):
9f95a23c 507 return node_name in self.get_node_names()
11fdf7f2
TL
508
509 def update_mon_count(self, newcount):
9f95a23c
TL
510 def _update_mon_count(current, new):
511 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
512 new.spec.mon.count = newcount
513 return new
514 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
11fdf7f2
TL
515
516 def update_nfs_count(self, svc_id, newcount):
9f95a23c
TL
517 def _update_nfs_count(current, new):
518 # type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
519 new.spec.server.active = newcount
520 return new
521 return self._patch(cnfs.CephNFS, 'cephnfses',svc_id, _update_nfs_count)
11fdf7f2
TL
522
523 def add_osds(self, drive_group, all_hosts):
9f95a23c 524 # type: (DriveGroupSpec, List[str]) -> str
11fdf7f2
TL
525 """
526 Rook currently (0.8) can only do single-drive OSDs, so we
527 treat all drive groups as just a list of individual OSDs.
528 """
9f95a23c 529 block_devices = drive_group.data_devices.paths if drive_group.data_devices else []
11fdf7f2
TL
530 directories = drive_group.data_directories
531
532 assert drive_group.objectstore in ("bluestore", "filestore")
533
9f95a23c
TL
534 def _add_osds(current_cluster, new_cluster):
535 # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
536
537 # FIXME: this is all not really atomic, because jsonpatch doesn't
538 # let us do "test" operations that would check if items with
539 # matching names were in existing lists.
540
541 if not hasattr(new_cluster.spec.storage, 'nodes'):
542 new_cluster.spec.storage.nodes = ccl.NodesList()
543
544 current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
545 matching_host = drive_group.placement.pattern_matches_hosts(all_hosts)[0]
546
547 if matching_host not in [n.name for n in current_nodes]:
548 pd = ccl.NodesItem(
549 name=matching_host,
550 config=ccl.Config(
551 storeType=drive_group.objectstore
552 )
553 )
554
555 if block_devices:
556 pd.devices = ccl.DevicesList(
557 ccl.DevicesItem(name=d.path) for d in block_devices
558 )
559 if directories:
560 pd.directories = ccl.DirectoriesList(
561 ccl.DirectoriesItem(path=p) for p in directories
562 )
563 new_cluster.spec.storage.nodes.append(pd)
564 else:
565 for _node in new_cluster.spec.storage.nodes:
566 current_node = _node # type: ccl.NodesItem
567 if current_node.name == matching_host:
568 if block_devices:
569 if not hasattr(current_node, 'devices'):
570 current_node.devices = ccl.DevicesList()
571 new_devices = list(set(block_devices) - set([d.name for d in current_node.devices]))
572 current_node.devices.extend(
573 ccl.DevicesItem(name=n.path) for n in new_devices
574 )
575
576 if directories:
577 if not hasattr(current_node, 'directories'):
578 current_node.directories = ccl.DirectoriesList()
579 new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
580 current_node.directories.extend(
581 ccl.DirectoriesItem(path=n) for n in new_dirs
582 )
583 return new_cluster
584
585 return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
586
587 def _patch(self, crd, crd_name, cr_name, func):
588 current_json = self.rook_api_get(
589 "{}/{}".format(crd_name, cr_name)
590 )
591
592 current = crd.from_json(current_json)
593 new = crd.from_json(current_json) # no deepcopy.
594
595 new = func(current, new)
596
597 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
598
599 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
11fdf7f2
TL
600
601 if len(patch) == 0:
602 return "No change"
603
604 try:
605 self.rook_api_patch(
9f95a23c 606 "{}/{}".format(crd_name, cr_name),
11fdf7f2
TL
607 body=patch)
608 except ApiException as e:
609 log.exception("API exception: {0}".format(e))
610 raise ApplyException(
9f95a23c 611 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
11fdf7f2
TL
612
613 return "Success"
9f95a23c
TL
614
615 def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func):
616 try:
617 current_json = self.rook_api_get(
618 "{}/{}".format(crd_name, cr_name)
619 )
620 except ApiException as e:
621 if e.status == 404:
622 current_json = None
623 else:
624 raise
625
626 if current_json:
627 current = crd.from_json(current_json)
628 new = crd.from_json(current_json) # no deepcopy.
629
630 new = update_func(current, new)
631
632 patch = list(jsonpatch.make_patch(current_json, new.to_json()))
633
634 log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
635
636 if len(patch) == 0:
637 return "No change"
638
639 try:
640 self.rook_api_patch(
641 "{}/{}".format(crd_name, cr_name),
642 body=patch)
643 except ApiException as e:
644 log.exception("API exception: {0}".format(e))
645 raise ApplyException(
646 "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
647 return "Updated"
648 else:
649 new = create_func()
650 with self.ignore_409("{} {} already exists".format(crd_name,
651 cr_name)):
652 self.rook_api_post("{}/".format(crd_name),
653 body=new.to_json())
654 return "Created"