]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/k8sevents/module.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / k8sevents / module.py
CommitLineData
eafe8130
TL
1# Integrate with the kubernetes events API.
2# This module sends events to Kubernetes, and also captures/tracks all events
3# in the rook-ceph namespace so kubernetes activity like pod restarts,
4# imagepulls etc can be seen from within the ceph cluster itself.
5#
6# To interact with the events API, the mgr service to access needs to be
7# granted additional permissions
8# e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules
9#
10# These are the changes needed;
11# - apiGroups:
12# - ""
13# resources:
14# - events
15# verbs:
16# - create
17# - patch
18# - list
19# - get
20# - watch
21
22
23import os
24import re
25import sys
26import time
27import json
28import yaml
29import errno
30import socket
31import base64
32import logging
33import tempfile
34import threading
35
f67539c2 36from urllib.parse import urlparse
eafe8130
TL
37from datetime import tzinfo, datetime, timedelta
38
39from urllib3.exceptions import MaxRetryError,ProtocolError
40from collections import OrderedDict
41
42import rados
43from mgr_module import MgrModule
44from mgr_util import verify_cacrt, ServerConfigException
45
46try:
47 import queue
48except ImportError:
49 # python 2.7.5
50 import Queue as queue
51finally:
52 # python 2.7.15 or python3
53 event_queue = queue.Queue()
54
55try:
56 from kubernetes import client, config, watch
57 from kubernetes.client.rest import ApiException
58except ImportError:
59 kubernetes_imported = False
60 client = None
61 config = None
62 watch = None
63else:
64 kubernetes_imported = True
65
66 # The watch.Watch.stream method can provide event objects that have involved_object = None
67 # which causes an exception in the generator. A workaround is discussed for a similar issue
68 # in https://github.com/kubernetes-client/python/issues/376 which has been used here
69 # pylint: disable=no-member
70 from kubernetes.client.models.v1_event import V1Event
71 def local_involved_object(self, involved_object):
72 if involved_object is None:
73 involved_object = client.V1ObjectReference(api_version="1")
74 self._involved_object = involved_object
75 V1Event.involved_object = V1Event.involved_object.setter(local_involved_object)
76
77log = logging.getLogger(__name__)
78
79# use a simple local class to represent UTC
80# datetime pkg modules vary between python2 and 3 and pytz is not available on older
81# ceph container images, so taking a pragmatic approach!
82class UTC(tzinfo):
83 def utcoffset(self, dt):
84 return timedelta(0)
85
86 def tzname(self, dt):
87 return "UTC"
88
89 def dst(self, dt):
90 return timedelta(0)
91
92
93def text_suffix(num):
94 """Define a text suffix based on a value i.e. turn host into hosts"""
95 return '' if num == 1 else 's'
96
97
98def create_temp_file(fname, content, suffix=".tmp"):
99 """Create a temp file
100
101 Attempt to create an temporary file containing the given content
102
103 Returns:
104 str .. full path to the temporary file
105
106 Raises:
107 OSError: problems creating the file
108
109 """
110
111 if content is not None:
112 file_name = os.path.join(tempfile.gettempdir(), fname + suffix)
113
114 try:
115 with open(file_name, "w") as f:
116 f.write(content)
117 except OSError as e:
118 raise OSError("Unable to create temporary file : {}".format(str(e)))
119
120 return file_name
121
122
123class HealthCheck(object):
124 """Transform a healthcheck msg into it's component parts"""
125
126 def __init__(self, msg, msg_level):
127
128 # msg looks like
129 #
130 # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY)
131 # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set)
132 # Health check failed: nodown flag(s) set (OSDMAP_FLAGS)
133 #
134 self.msg = None
135 self.name = None
136 self.text = None
137 self.valid = False
138
139 if msg.lower().startswith('health check'):
140
141 self.valid = True
142 self.msg = msg
143 msg_tokens = self.msg.split()
144
145 if msg_level == 'INF':
146 self.text = ' '.join(msg_tokens[3:])
147 self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS
148 else: # WRN or ERR
149 self.text = ' '.join(msg_tokens[3:-1])
150 self.name = msg_tokens[-1][1:-1]
151
152
153class LogEntry(object):
154 """Generic 'log' object"""
155
156 reason_map = {
157 "audit": "Audit",
158 "cluster": "HealthCheck",
159 "config": "ClusterChange",
160 "heartbeat":"Heartbeat",
161 "startup": "Started"
162 }
163
164 def __init__(self, source, msg, msg_type, level, tstamp=None):
165
166 self.source = source
167 self.msg = msg
168 self.msg_type = msg_type
169 self.level = level
170 self.tstamp = tstamp
171 self.healthcheck = None
172
173 if 'health check ' in self.msg.lower():
174 self.healthcheck = HealthCheck(self.msg, self.level)
175
176
177 def __str__(self):
178 return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,
179 self.msg_type,
180 self.msg,
181 self.level,
182 self.tstamp)
183
184 @property
185 def cmd(self):
186 """Look at the msg string and extract the command content"""
187
188 # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished'
189 if self.msg_type != 'audit':
190 return None
191 else:
192 _m=self.msg[:-10].replace("\'","").split("cmd=")
193 _s='"cmd":{}'.format(_m[1])
194 cmds_list = json.loads('{' + _s + '}')['cmd']
195
196 # TODO. Assuming only one command was issued for now
197 _c = cmds_list[0]
198 return "{} {}".format(_c['prefix'], _c.get('key', ''))
199
200 @property
201 def event_type(self):
202 return 'Normal' if self.level == 'INF' else 'Warning'
203
204 @property
205 def event_reason(self):
206 return self.reason_map[self.msg_type]
207
208 @property
209 def event_name(self):
210 if self.msg_type == 'heartbeat':
211 return 'mgr.Heartbeat'
212 elif self.healthcheck:
213 return 'mgr.health.{}'.format(self.healthcheck.name)
214 elif self.msg_type == 'audit':
215 return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_')
216 elif self.msg_type == 'config':
217 return 'mgr.ConfigurationChange'
218 elif self.msg_type == 'startup':
219 return "mgr.k8sevents-module"
220 else:
221 return None
222
223 @property
224 def event_entity(self):
225 if self.msg_type == 'audit':
226 return self.msg.replace("\'","").split('entity=')[1].split(' ')[0]
227 else:
228 return None
229
230 @property
231 def event_msg(self):
232 if self.msg_type == 'audit':
233 return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd)
234
235 elif self.healthcheck:
236 return self.healthcheck.text
237 else:
238 return self.msg
239
240
241class BaseThread(threading.Thread):
242 health = 'OK'
243 reported = False
244 daemon = True
245
246
f6b5b4d7
TL
247def clean_event(event):
248 """ clean an event record """
249 if not event.first_timestamp:
250 log.error("first_timestamp is empty")
251 if event.metadata.creation_timestamp:
252 log.error("setting first_timestamp to the creation timestamp")
253 event.first_timestamp = event.metadata.creation_timestamp
254 else:
255 log.error("defaulting event first timestamp to current datetime")
256 event.first_timestamp = datetime.datetime.now()
257
258 if not event.last_timestamp:
259 log.error("setting event last timestamp to {}".format(event.first_timestamp))
260 event.last_timestamp = event.first_timestamp
261
262 if not event.count:
263 event.count = 1
264
265 return event
266
267
eafe8130
TL
268class NamespaceWatcher(BaseThread):
269 """Watch events in a given namespace
270
271 Using the watch package we can listen to event traffic in the namespace to
272 get an idea of what kubernetes related events surround the ceph cluster. The
273 thing to bear in mind is that events have a TTL enforced by the kube-apiserver
274 so this stream will only really show activity inside this retention window.
275 """
276
277 def __init__(self, api_client_config, namespace=None):
278 super(NamespaceWatcher, self).__init__()
279
280 if api_client_config:
281 self.api = client.CoreV1Api(api_client_config)
282 else:
283 self.api = client.CoreV1Api()
284
285 self.namespace = namespace
286
287 self.events = OrderedDict()
288 self.lock = threading.Lock()
289 self.active = None
290 self.resource_version = None
291
292 def fetch(self):
293 # clear the cache on every call to fetch
294 self.events.clear()
295 try:
296 resp = self.api.list_namespaced_event(self.namespace)
297 # TODO - Perhaps test for auth problem to be more specific in the except clause?
298 except:
299 self.active = False
300 self.health = "Unable to access events API (list_namespaced_event call failed)"
301 log.warning(self.health)
302 else:
303 self.active = True
304 self.resource_version = resp.metadata.resource_version
305
306 for item in resp.items:
f6b5b4d7 307 self.events[item.metadata.name] = clean_event(item)
eafe8130
TL
308 log.info('Added {} events'.format(len(resp.items)))
309
310 def run(self):
311 self.fetch()
312 func = getattr(self.api, "list_namespaced_event")
313
314 if self.active:
315 log.info("Namespace event watcher started")
316
317
318 while True:
319
320 try:
321 w = watch.Watch()
322 # execute generator to continually watch resource for changes
323 for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True):
324 obj = item['object']
325
326 with self.lock:
327
328 if item['type'] in ['ADDED', 'MODIFIED']:
f6b5b4d7 329 self.events[obj.metadata.name] = clean_event(obj)
eafe8130
TL
330
331 elif item['type'] == 'DELETED':
332 del self.events[obj.metadata.name]
333
334 # TODO test the exception for auth problem (403?)
335
336 # Attribute error is generated when urllib3 on the system is old and doesn't have a
337 # read_chunked method
338 except AttributeError as e:
339 self.health = ("Error: Unable to 'watch' events API in namespace '{}' - "
340 "urllib3 too old? ({})".format(self.namespace, e))
341 self.active = False
342 log.warning(self.health)
343 break
344
345 except ApiException as e:
346 # refresh the resource_version & watcher
347 log.warning("API exception caught in watcher ({})".format(e))
348 log.warning("Restarting namespace watcher")
349 self.fetch()
350
f6b5b4d7
TL
351 except ProtocolError as e:
352 log.warning("Namespace watcher hit protocolerror ({}) - restarting".format(e))
353 self.fetch()
354
eafe8130
TL
355 except Exception:
356 self.health = "{} Exception at {}".format(
357 sys.exc_info()[0].__name__,
358 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
359 )
360 log.exception(self.health)
361 self.active = False
362 break
363
364 log.warning("Namespace event watcher stopped")
365
366
367class KubernetesEvent(object):
368
369 def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None):
370
371 if api_client_config:
372 self.api = client.CoreV1Api(api_client_config)
373 else:
374 self.api = client.CoreV1Api()
375
376 self.namespace = namespace
377
378 self.event_name = log_entry.event_name
379 self.message = log_entry.event_msg
380 self.event_type = log_entry.event_type
381 self.event_reason = log_entry.event_reason
382 self.unique_name = unique_name
383
384 self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN'))
385
386 self.api_status = 200
387 self.count = 1
388 self.first_timestamp = None
389 self.last_timestamp = None
390
391 @property
392 def type(self):
393 """provide a type property matching a V1Event object"""
394 return self.event_type
395
396 @property
397 def event_body(self):
398 if self.unique_name:
399 obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name))
400 else:
401 obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name))
402
403 # field_path is needed to prevent problems in the namespacewatcher when
404 # deleted event are received
405 obj_ref = client.V1ObjectReference(kind="CephCluster",
406 field_path='spec.containers{mgr}',
407 name=self.event_name,
408 namespace=self.namespace)
409
410 event_source = client.V1EventSource(component="ceph-mgr",
411 host=self.host)
412 return client.V1Event(
413 involved_object=obj_ref,
414 metadata=obj_meta,
415 message=self.message,
416 count=self.count,
417 type=self.event_type,
418 reason=self.event_reason,
419 source=event_source,
420 first_timestamp=self.first_timestamp,
421 last_timestamp=self.last_timestamp
422 )
423
424 def write(self):
425
426 now=datetime.now(UTC())
427
428 self.first_timestamp = now
429 self.last_timestamp = now
430
431 try:
432 self.api.create_namespaced_event(self.namespace, self.event_body)
433 except (OSError, ProtocolError):
434 # unable to reach to the API server
435 log.error("Unable to reach API server")
436 self.api_status = 400
437 except MaxRetryError:
438 # k8s config has not be defined properly
439 log.error("multiple attempts to connect to the API have failed")
440 self.api_status = 403 # Forbidden
441 except ApiException as e:
442 log.debug("event.write status:{}".format(e.status))
443 self.api_status = e.status
444 if e.status == 409:
445 log.debug("attempting event update for an existing event")
446 # 409 means the event is there already, so read it back (v1Event object returned)
447 # this could happen if the event has been created, and then the k8sevent module
448 # disabled and reenabled - i.e. the internal event tracking no longer matches k8s
449 response = self.api.read_namespaced_event(self.event_name, self.namespace)
450 #
451 # response looks like
452 #
453 # {'action': None,
454 # 'api_version': 'v1',
455 # 'count': 1,
456 # 'event_time': None,
457 # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
458 # 'involved_object': {'api_version': None,
459 # 'field_path': None,
460 # 'kind': 'CephCluster',
461 # 'name': 'ceph-mgr.k8sevent-module',
462 # 'namespace': 'rook-ceph',
463 # 'resource_version': None,
464 # 'uid': None},
465 # 'kind': 'Event',
466 # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
467 # 'message': 'Ceph log -> event tracking started',
468 # 'metadata': {'annotations': None,
469 # 'cluster_name': None,
470 # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
471 # 'deletion_grace_period_seconds': None,
472 # 'deletion_timestamp': None,
473 # 'finalizers': None,
474 # 'generate_name': 'ceph-mgr.k8sevent-module',
475 # 'generation': None,
476 # 'initializers': None,
477 # 'labels': None,
478 # 'name': 'ceph-mgr.k8sevent-module5z7kq',
479 # 'namespace': 'rook-ceph',
480 # 'owner_references': None,
481 # 'resource_version': '1195832',
482 # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq',
483 # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'},
484 # 'reason': 'Started',
485 # 'related': None,
486 # 'reporting_component': '',
487 # 'reporting_instance': '',
488 # 'series': None,
489 # 'source': {'component': 'ceph-mgr', 'host': 'minikube'},
490 # 'type': 'Normal'}
491
492 # conflict event already exists
493 # read it
494 # update : count and last_timestamp and msg
495
496 self.count = response.count + 1
497 self.first_timestamp = response.first_timestamp
498 try:
499 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
500 except ApiException as e:
501 log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status))
502 self.api_status = e.status
503 else:
504 log.debug("event {} patched".format(self.event_name))
505 self.api_status = 200
506
507 else:
508 log.debug("event {} created successfully".format(self.event_name))
509 self.api_status = 200
510
511 @property
512 def api_success(self):
513 return self.api_status == 200
514
515 def update(self, log_entry):
516 self.message = log_entry.event_msg
517 self.event_type = log_entry.event_type
518 self.last_timestamp = datetime.now(UTC())
519 self.count += 1
520 log.debug("performing event update for {}".format(self.event_name))
521
522 try:
523 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
524 except ApiException as e:
525 log.error("event patch call failed: {}".format(e.status))
526 if e.status == 404:
527 # tried to patch, but hit a 404. The event's TTL must have been reached, and
528 # pruned by the kube-apiserver
529 log.debug("event not found, so attempting to create it")
530 try:
531 self.api.create_namespaced_event(self.namespace, self.event_body)
532 except ApiException as e:
533 log.error("unable to create the event: {}".format(e.status))
534 self.api_status = e.status
535 else:
536 log.debug("event {} created successfully".format(self.event_name))
537 self.api_status = 200
538 else:
539 log.debug("event {} updated".format(self.event_name))
540 self.api_status = 200
541
542
543class EventProcessor(BaseThread):
544 """Handle a global queue used to track events we want to send/update to kubernetes"""
545
546 can_run = True
547
548 def __init__(self, config_watcher, event_retention_days, api_client_config, namespace):
549 super(EventProcessor, self).__init__()
550
551 self.events = dict()
552 self.config_watcher = config_watcher
553 self.event_retention_days = event_retention_days
554 self.api_client_config = api_client_config
555 self.namespace = namespace
556
557 def startup(self):
558 """Log an event to show we're active"""
559
560 event = KubernetesEvent(
561 LogEntry(
562 source='self',
563 msg='Ceph log -> event tracking started',
564 msg_type='startup',
565 level='INF',
566 tstamp=None
567 ),
568 unique_name=False,
569 api_client_config=self.api_client_config,
570 namespace=self.namespace
571 )
572
573 event.write()
574 return event.api_success
575
576 @property
577 def ok(self):
578 return self.startup()
579
580 def prune_events(self):
581 log.debug("prune_events - looking for old events to remove from cache")
582 oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days)
583 local_events = dict(self.events)
584
585 for event_name in sorted(local_events,
586 key = lambda name: local_events[name].last_timestamp):
587 event = local_events[event_name]
588 if event.last_timestamp >= oldest:
589 break
590 else:
591 # drop this event
592 log.debug("prune_events - removing old event : {}".format(event_name))
593 del self.events[event_name]
594
595 def process(self, log_object):
596
597 log.debug("log entry being processed : {}".format(str(log_object)))
598
599 event_out = False
600 unique_name = True
601
602 if log_object.msg_type == 'audit':
603 # audit traffic : operator commands
604 if log_object.msg.endswith('finished'):
605 log.debug("K8sevents received command finished msg")
606 event_out = True
607 else:
608 # NO OP - ignoring 'dispatch' log records
609 return
610
611 elif log_object.msg_type == 'cluster':
612 # cluster messages : health checks
613 if log_object.event_name:
614 event_out = True
615
616 elif log_object.msg_type == 'config':
617 # configuration checker messages
618 event_out = True
619 unique_name = False
620
621 elif log_object.msg_type == 'heartbeat':
622 # hourly health message summary from Ceph
623 event_out = True
624 unique_name = False
625 log_object.msg = str(self.config_watcher)
626
627 else:
628 log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type))
629
630 if event_out:
631 log.debug("k8sevents sending event to kubernetes")
632 # we don't cache non-unique events like heartbeats or config changes
633 if not unique_name or log_object.event_name not in self.events.keys():
634 event = KubernetesEvent(log_entry=log_object,
635 unique_name=unique_name,
636 api_client_config=self.api_client_config,
637 namespace=self.namespace)
638 event.write()
639 log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status))
640 if event.api_success and unique_name:
641 self.events[log_object.event_name] = event
642 else:
643 event = self.events[log_object.event_name]
644 event.update(log_object)
645 log.debug("event update ended : {}".format(event.api_status))
646
647 self.prune_events()
648
649 else:
650 log.debug("K8sevents ignored message : {}".format(log_object.msg))
651
652 def run(self):
653 log.info("Ceph event processing thread started, "
654 "event retention set to {} days".format(self.event_retention_days))
655
656 while True:
657
658 try:
659 log_object = event_queue.get(block=False)
660 except queue.Empty:
661 pass
662 else:
663 try:
664 self.process(log_object)
665 except Exception:
666 self.health = "{} Exception at {}".format(
667 sys.exc_info()[0].__name__,
668 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
669 )
670 log.exception(self.health)
671 break
672
673 if not self.can_run:
674 break
675
676 time.sleep(0.5)
677
678 log.warning("Ceph event processing thread stopped")
679
680
681class ListDiff(object):
682 def __init__(self, before, after):
683 self.before = set(before)
684 self.after = set(after)
685
686 @property
687 def removed(self):
688 return list(self.before - self.after)
689
690 @property
691 def added(self):
692 return list(self.after - self.before)
693
694 @property
695 def is_equal(self):
696 return self.before == self.after
697
698
699class CephConfigWatcher(BaseThread):
700 """Detect configuration changes within the cluster and generate human readable events"""
701
702 def __init__(self, mgr):
703 super(CephConfigWatcher, self).__init__()
704 self.mgr = mgr
705 self.server_map = dict()
706 self.osd_map = dict()
707 self.pool_map = dict()
708 self.service_map = dict()
709
710 self.config_check_secs = mgr.config_check_secs
711
712 @property
713 def raw_capacity(self):
714 # Note. if the osd's are not online the capacity field will be 0
715 return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map])
716
717 @property
718 def num_servers(self):
719 return len(self.server_map.keys())
720
721 @property
722 def num_osds(self):
723 return len(self.osd_map.keys())
724
725 @property
726 def num_pools(self):
727 return len(self.pool_map.keys())
728
729 def __str__(self):
730 s = ''
731
732 s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format(
733 json.loads(self.mgr.get('health')['json'])['status'],
734 self.num_servers,
735 text_suffix(self.num_servers),
736 self.num_pools,
737 text_suffix(self.num_pools),
738 self.num_osds,
739 MgrModule.to_pretty_iec(self.raw_capacity))
740 return s
741
742 def fetch_servers(self):
743 """Return a server summary, and service summary"""
744 servers = self.mgr.list_servers()
745 server_map = dict() # host -> services
746 service_map = dict() # service -> host
747 for server_info in servers:
748 services = dict()
749 for svc in server_info['services']:
750 if svc.get('type') in services.keys():
751 services[svc.get('type')].append(svc.get('id'))
752 else:
753 services[svc.get('type')] = list([svc.get('id')])
754 # maintain the service xref map service -> host and version
755 service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '')
756 server_map[server_info.get('hostname')] = services
757
758 return server_map, service_map
759
760 def fetch_pools(self):
761 interesting = ["type", "size", "min_size"]
762 # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool',
763 # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn',
764 # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100,
765 # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0',
766 # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0,
767 # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1,
768 # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0,
769 # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000,
770 # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0,
771 # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0,
772 # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0,
773 # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0,
774 # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}},
775 # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0',
776 # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}]
777 pools = self.mgr.get('osd_map')['pools']
778 pool_map = dict()
779 for pool in pools:
780 pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting}
781 return pool_map
782
783
784 def fetch_osd_map(self, service_map):
785 """Create an osd map"""
786 stats = self.mgr.get('osd_stats')
787
788 osd_map = dict()
789
790 devices = self.mgr.get('osd_map_crush')['devices']
791 for dev in devices:
792 osd_id = str(dev['id'])
793 osd_map[osd_id] = dict(
794 deviceclass=dev.get('class'),
795 capacity=0,
796 hostname=service_map['osd', osd_id]
797 )
798
799 for osd_stat in stats['osd_stats']:
800 osd_id = str(osd_stat.get('osd'))
801 osd_map[osd_id]['capacity'] = osd_stat['statfs']['total']
802
803 return osd_map
804
805 def push_events(self, changes):
806 """Add config change to the global queue to generate an event in kubernetes"""
807 log.debug("{} events will be generated")
808 for change in changes:
809 event_queue.put(change)
810
811 def _generate_config_logentry(self, msg):
812 return LogEntry(
813 source="config",
814 msg_type="config",
815 msg=msg,
816 level='INF',
817 tstamp=None
818 )
819
820 def _check_hosts(self, server_map):
821 log.debug("K8sevents checking host membership")
822 changes = list()
823 servers = ListDiff(self.server_map.keys(), server_map.keys())
824 if servers.is_equal:
825 # no hosts have been added or removed
826 pass
827 else:
828 # host changes detected, find out what
829 host_msg = "Host '{}' has been {} the cluster"
830 for new_server in servers.added:
831 changes.append(self._generate_config_logentry(
832 msg=host_msg.format(new_server, 'added to'))
833 )
834
835 for removed_server in servers.removed:
836 changes.append(self._generate_config_logentry(
837 msg=host_msg.format(removed_server, 'removed from'))
838 )
839
840 return changes
841
842 def _check_osds(self,server_map, osd_map):
843 log.debug("K8sevents checking OSD configuration")
844 changes = list()
845 before_osds = list()
846 for svr in self.server_map:
847 before_osds.extend(self.server_map[svr].get('osd',[]))
848
849 after_osds = list()
850 for svr in server_map:
851 after_osds.extend(server_map[svr].get('osd',[]))
852
853 if set(before_osds) == set(after_osds):
854 # no change in osd id's
855 pass
856 else:
857 # osd changes detected
858 osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}"
859
860 osds = ListDiff(before_osds, after_osds)
861 for new_osd in osds.added:
862 changes.append(self._generate_config_logentry(
863 msg=osd_msg.format(
864 new_osd,
865 osd_map[new_osd]['deviceclass'],
866 MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']),
867 'added to',
868 osd_map[new_osd]['hostname']))
869 )
870
871 for removed_osd in osds.removed:
872 changes.append(self._generate_config_logentry(
873 msg=osd_msg.format(
874 removed_osd,
875 osd_map[removed_osd]['deviceclass'],
876 MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']),
877 'removed from',
878 osd_map[removed_osd]['hostname']))
879 )
880
881 return changes
882
883 def _check_pools(self, pool_map):
884 changes = list()
885 log.debug("K8sevents checking pool configurations")
886 if self.pool_map.keys() == pool_map.keys():
887 # no pools added/removed
888 pass
889 else:
890 # Pool changes
891 pools = ListDiff(self.pool_map.keys(), pool_map.keys())
892 pool_msg = "Pool '{}' has been {} the cluster"
893 for new_pool in pools.added:
894 changes.append(self._generate_config_logentry(
895 msg=pool_msg.format(new_pool, 'added to'))
896 )
897
898 for removed_pool in pools.removed:
899 changes.append(self._generate_config_logentry(
900 msg=pool_msg.format(removed_pool, 'removed from'))
901 )
902
903 # check pool configuration changes
904 for pool_name in pool_map:
905 if not self.pool_map.get(pool_name, dict()):
906 # pool didn't exist before so just skip the checks
907 continue
908
909 if pool_map[pool_name] == self.pool_map[pool_name]:
910 # no changes - dicts match in key and value
911 continue
912 else:
913 # determine the change and add it to the change list
914 size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size']
915 if size_diff != 0:
916 if size_diff < 0:
917 msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name,
918 pool_map[pool_name]['size'])
919 level = 'WRN'
920 else:
921 msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name,
922 pool_map[pool_name]['size'])
923 level = 'INF'
924
925 changes.append(LogEntry(source="config",
926 msg_type="config",
927 msg=msg,
928 level=level,
929 tstamp=None)
930 )
931
932 if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']:
933 changes.append(LogEntry(source="config",
934 msg_type="config",
935 msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name),
936 level='WRN',
937 tstamp=None)
938 )
939
940 return changes
941
942 def get_changes(self, server_map, osd_map, pool_map):
943 """Detect changes in maps between current observation and the last"""
944
945 changes = list()
946
947 changes.extend(self._check_hosts(server_map))
948 changes.extend(self._check_osds(server_map, osd_map))
949 changes.extend(self._check_pools(pool_map))
950
951 # FUTURE
952 # Could generate an event if a ceph daemon has moved hosts
953 # (assumes the ceph metadata host information is valid though!)
954
955 return changes
956
957 def run(self):
958 log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs))
959
960 self.server_map, self.service_map = self.fetch_servers()
961 self.pool_map = self.fetch_pools()
962
963 self.osd_map = self.fetch_osd_map(self.service_map)
964
965 while True:
966
967 try:
968 start_time = time.time()
969 server_map, service_map = self.fetch_servers()
970 pool_map = self.fetch_pools()
971 osd_map = self.fetch_osd_map(service_map)
972
973 changes = self.get_changes(server_map, osd_map, pool_map)
974 if changes:
975 self.push_events(changes)
976
977 self.osd_map = osd_map
978 self.pool_map = pool_map
979 self.server_map = server_map
980 self.service_map = service_map
981
982 checks_duration = int(time.time() - start_time)
983
984 # check that the time it took to run the checks fits within the
985 # interval, and if not extend the interval and emit a log message
986 # to show that the runtime for the checks exceeded the desired
987 # interval
988 if checks_duration > self.config_check_secs:
989 new_interval = self.config_check_secs * 2
990 log.warning("K8sevents check interval warning. "
991 "Current checks took {}s, interval was {}s. "
992 "Increasing interval to {}s".format(int(checks_duration),
993 self.config_check_secs,
994 new_interval))
995 self.config_check_secs = new_interval
996
997 time.sleep(self.config_check_secs)
998
999 except Exception:
1000 self.health = "{} Exception at {}".format(
1001 sys.exc_info()[0].__name__,
1002 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
1003 )
1004 log.exception(self.health)
1005 break
1006
1007 log.warning("Ceph configuration watcher stopped")
1008
1009
1010class Module(MgrModule):
1011 COMMANDS = [
1012 {
1013 "cmd": "k8sevents status",
1014 "desc": "Show the status of the data gathering threads",
1015 "perm": "r"
1016 },
1017 {
1018 "cmd": "k8sevents ls",
1019 "desc": "List all current Kuberenetes events from the Ceph namespace",
1020 "perm": "r"
1021 },
1022 {
1023 "cmd": "k8sevents ceph",
1024 "desc": "List Ceph events tracked & sent to the kubernetes cluster",
1025 "perm": "r"
1026 },
1027 {
1028 "cmd": "k8sevents set-access name=key,type=CephString",
1029 "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt",
1030 "perm": "rw"
1031 },
1032 {
1033 "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString",
1034 "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433",
1035 "perm": "rw"
1036 },
1037 {
1038 "cmd": "k8sevents clear-config",
1039 "desc": "Clear external kubernetes configuration settings",
1040 "perm": "rw"
1041 },
1042 ]
1043 MODULE_OPTIONS = [
1044 {'name': 'config_check_secs',
1045 'type': 'int',
1046 'default': 10,
1047 'min': 10,
1048 'desc': "interval (secs) to check for cluster configuration changes"},
1049 {'name': 'ceph_event_retention_days',
1050 'type': 'int',
1051 'default': 7,
1052 'desc': "Days to hold ceph event information within local cache"}
1053 ]
1054
1055 def __init__(self, *args, **kwargs):
1056 self.run = True
1057 self.kubernetes_control = 'POD_NAME' in os.environ
1058 self.event_processor = None
1059 self.config_watcher = None
1060 self.ns_watcher = None
1061 self.trackers = list()
1062 self.error_msg = None
1063 self._api_client_config = None
1064 self._namespace = None
1065
1066 # Declare the module options we accept
1067 self.config_check_secs = None
1068 self.ceph_event_retention_days = None
1069
1070 self.k8s_config = dict(
1071 cacrt = None,
1072 token = None,
1073 server = None,
1074 namespace = None
1075 )
1076
1077 super(Module, self).__init__(*args, **kwargs)
1078
1079 def k8s_ready(self):
1080 """Validate the k8s_config dict
1081
1082 Returns:
1083 - bool .... indicating whether the config is ready to use
1084 - string .. variables that need to be defined before the module will function
1085
1086 """
1087 missing = list()
1088 ready = True
1089 for k in self.k8s_config:
1090 if not self.k8s_config[k]:
1091 missing.append(k)
1092 ready = False
1093 return ready, missing
1094
1095 def config_notify(self):
1096 """Apply runtime module options, and defaults from the modules KV store"""
1097 self.log.debug("applying runtime module option settings")
1098 for opt in self.MODULE_OPTIONS:
1099 setattr(self,
1100 opt['name'],
9f95a23c 1101 self.get_module_option(opt['name']))
eafe8130
TL
1102
1103 if not self.kubernetes_control:
1104 # Populate the config
1105 self.log.debug("loading config from KV store")
1106 for k in self.k8s_config:
1107 self.k8s_config[k] = self.get_store(k, default=None)
1108
1109 def fetch_events(self, limit=None):
1110 """Interface to expose current events to another mgr module"""
1111 # FUTURE: Implement this to provide k8s events to the dashboard?
1112 raise NotImplementedError
1113
1114 def process_clog(self, log_message):
1115 """Add log message to the event queue
1116
1117 :param log_message: dict from the cluster log (audit/cluster channels)
1118 """
1119 required_fields = ['channel', 'message', 'priority', 'stamp']
1120 _message_attrs = log_message.keys()
1121 if all(_field in _message_attrs for _field in required_fields):
1122 self.log.debug("clog entry received - adding to the queue")
1123 if log_message.get('message').startswith('overall HEALTH'):
1124 m_type = 'heartbeat'
1125 else:
1126 m_type = log_message.get('channel')
1127
1128 event_queue.put(
1129 LogEntry(
1130 source='log',
1131 msg_type=m_type,
1132 msg=log_message.get('message'),
1133 level=log_message.get('priority')[1:-1],
1134 tstamp=log_message.get('stamp')
1135 )
1136 )
1137
1138 else:
1139 self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
1140
1141 def notify(self, notify_type, notify_id):
1142 """
1143 Called by the ceph-mgr service to notify the Python plugin
1144 that new state is available.
1145
1146 :param notify_type: string indicating what kind of notification,
1147 such as osd_map, mon_map, fs_map, mon_status,
1148 health, pg_summary, command, service_map
1149 :param notify_id: string (may be empty) that optionally specifies
1150 which entity is being notified about. With
1151 "command" notifications this is set to the tag
1152 ``from send_command``.
1153 """
1154
1155 # only interested in cluster log (clog) messages for now
1156 if notify_type == 'clog':
1157 self.log.debug("received a clog entry from mgr.notify")
1158 if isinstance(notify_id, dict):
1159 # create a log object to process
1160 self.process_clog(notify_id)
1161 else:
1162 self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type)))
1163
1164 def _show_events(self, events):
1165
1166 max_msg_length = max([len(events[k].message) for k in events])
1167 fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n"
1168 s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name")
1169
1170 for event_name in sorted(events,
1171 key = lambda name: events[name].last_timestamp,
1172 reverse=True):
1173
1174 event = events[event_name]
1175
1176 s += fmt.format(
1177 datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"),
f6b5b4d7
TL
1178 str(event.type),
1179 str(event.count),
1180 str(event.message),
1181 str(event_name)
eafe8130
TL
1182 )
1183 s += "Total : {:>3}\n".format(len(events))
1184 return s
1185
1186 def show_events(self, events):
1187 """Show events we're holding from the ceph namespace - most recent 1st"""
1188
1189 if len(events):
1190 return 0, "", self._show_events(events)
1191 else:
1192 return 0, "", "No events emitted yet, local cache is empty"
1193
1194 def show_status(self):
1195 s = "Kubernetes\n"
1196 s += "- Hostname : {}\n".format(self.k8s_config['server'])
1197 s += "- Namespace : {}\n".format(self._namespace)
1198 s += "Tracker Health\n"
1199 for t in self.trackers:
1200 s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health)
1201 s += "Tracked Events\n"
1202 s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events))
1203 s += "- ceph events : {:>3}\n".format(len(self.event_processor.events))
1204 return 0, "", s
1205
1206 def _valid_server(self, server):
1207 # must be a valid server url format
1208 server = server.strip()
1209
1210 res = urlparse(server)
1211 port = res.netloc.split(":")[-1]
1212
1213 if res.scheme != 'https':
1214 return False, "Server URL must use https"
1215
1216 elif not res.hostname:
1217 return False, "Invalid server URL format"
1218
1219 elif res.hostname:
1220 try:
1221 socket.gethostbyname(res.hostname)
1222 except socket.gaierror:
1223 return False, "Unresolvable server URL"
1224
1225 if not port.isdigit():
1226 return False, "Server URL must end in a port number"
1227
1228 return True, ""
1229
1230 def _valid_cacrt(self, cacrt_data):
1231 """use mgr_util.verify_cacrt to validate the CA file"""
1232
1233 cacrt_fname = create_temp_file("ca_file", cacrt_data)
1234
1235 try:
1236 verify_cacrt(cacrt_fname)
1237 except ServerConfigException as e:
1238 return False, "Invalid CA certificate: {}".format(str(e))
1239 else:
1240 return True, ""
1241
1242 def _valid_token(self, token_data):
1243 """basic checks on the token"""
1244 if not token_data:
1245 return False, "Token file is empty"
1246
1247 pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$")
1248 if not pattern.match(token_data):
1249 return False, "Token contains invalid characters"
1250
1251 return True, ""
1252
1253 def _valid_namespace(self, namespace):
1254 # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols
1255
1256 if len(namespace) > 253:
1257 return False, "Name too long"
1258 if namespace.isdigit():
1259 return False, "Invalid name - must be alphanumeric"
1260
1261 pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$")
1262 if not pattern.match(namespace):
1263 return False, "Invalid characters in the name"
1264
1265 return True, ""
1266
1267 def _config_set(self, key, val):
1268 """Attempt to validate the content, then save to KV store"""
1269
1270 val = val.rstrip() # remove any trailing whitespace/newline
1271
1272 try:
1273 checker = getattr(self, "_valid_" + key)
1274 except AttributeError:
1275 # no checker available, just let it pass
1276 self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key))
1277 valid = True
1278 else:
1279 valid, reason = checker(val)
1280
1281 if valid:
1282 self.set_store(key, val)
1283 self.log.info("Updated config KV Store item: " + key)
1284 return 0, "", "Config updated for parameter '{}'".format(key)
1285 else:
1286 return -22, "", "Invalid value for '{}' :{}".format(key, reason)
1287
1288 def clear_config_settings(self):
1289 for k in self.k8s_config:
1290 self.set_store(k, None)
1291 return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys()))
1292
1293 def handle_command(self, inbuf, cmd):
1294
1295 access_options = ['cacrt', 'token']
1296 config_options = ['server', 'namespace']
1297
1298 if cmd['prefix'] == 'k8sevents clear-config':
1299 return self.clear_config_settings()
1300
1301 if cmd['prefix'] == 'k8sevents set-access':
1302 if cmd['key'] not in access_options:
1303 return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options))
1304
1305 if inbuf:
1306 return self._config_set(cmd['key'], inbuf)
1307 else:
1308 return -errno.EINVAL, "", "Command must specify -i <filename>"
1309
1310 if cmd['prefix'] == 'k8sevents set-config':
1311
1312 if cmd['key'] not in config_options:
1313 return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options))
1314
1315 return self._config_set(cmd['key'], cmd['value'])
1316
1317 # At this point the command is trying to interact with k8sevents, so intercept if the configuration is
1318 # not ready
1319 if self.error_msg:
1320 _msg = "k8sevents unavailable: " + self.error_msg
1321 ready, _ = self.k8s_ready()
1322 if not self.kubernetes_control and not ready:
1323 _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect"
1324 return -errno.ENODATA, "", _msg
1325
1326 if cmd["prefix"] == "k8sevents status":
1327 return self.show_status()
1328
1329 elif cmd["prefix"] == "k8sevents ls":
1330 return self.show_events(self.ns_watcher.events)
1331
1332 elif cmd["prefix"] == "k8sevents ceph":
1333 return self.show_events(self.event_processor.events)
1334
1335 else:
1336 raise NotImplementedError(cmd["prefix"])
1337
1338 @staticmethod
1339 def can_run():
1340 """Determine whether the pre-reqs for the module are in place"""
1341
1342 if not kubernetes_imported:
1343 return False, "kubernetes python client is not available"
1344 return True, ""
1345
1346 def load_kubernetes_config(self):
1347 """Load configuration for remote kubernetes API using KV store values
1348
1349 Attempt to create an API client configuration from settings stored in
1350 KV store.
1351
1352 Returns:
1353 client.ApiClient: kubernetes API client object
1354
1355 Raises:
1356 OSError: unable to create the cacrt file
1357 """
1358
1359 # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a
1360 # temporary file containing the cert for the client to load from
1361 try:
1362 ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt'])
1363 except OSError as e:
1364 self.log.error("Unable to create file to hold cacrt: {}".format(str(e)))
1365 raise OSError(str(e))
1366 else:
1367 self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file))
1368
1369 configuration = client.Configuration()
1370 configuration.host = self.k8s_config['server']
1371 configuration.ssl_ca_cert = ca_crt_file
1372 configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] }
1373 api_client = client.ApiClient(configuration)
1374 self.log.info("API client created for remote kubernetes access using cacrt and token from KV store")
1375
1376 return api_client
1377
1378 def serve(self):
1379 # apply options set by CLI to this module
1380 self.config_notify()
1381
1382 if not kubernetes_imported:
1383 self.error_msg = "Unable to start : python kubernetes package is missing"
1384 else:
1385 if self.kubernetes_control:
1386 # running under rook-ceph
1387 config.load_incluster_config()
1388 self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'),
1389 os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN'))
1390 self._api_client_config = None
1391 self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph")
1392 else:
1393 # running outside of rook-ceph, so we need additional settings to tell us
1394 # how to connect to the kubernetes cluster
1395 ready, errors = self.k8s_ready()
1396 if not ready:
1397 self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors))
1398 else:
1399 try:
1400 self._api_client_config = self.load_kubernetes_config()
1401 except OSError as e:
1402 self.error_msg = str(e)
1403 else:
1404 self._namespace = self.k8s_config['namespace']
1405 self.log.info("k8sevents configuration loaded from KV store")
1406
1407 if self.error_msg:
1408 self.log.error(self.error_msg)
1409 return
1410
1411 # All checks have passed
1412 self.config_watcher = CephConfigWatcher(self)
1413
1414 self.event_processor = EventProcessor(self.config_watcher,
1415 self.ceph_event_retention_days,
1416 self._api_client_config,
1417 self._namespace)
1418
1419 self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config,
1420 namespace=self._namespace)
1421
1422 if self.event_processor.ok:
1423 log.info("Ceph Log processor thread starting")
1424 self.event_processor.start() # start log consumer thread
1425 log.info("Ceph config watcher thread starting")
1426 self.config_watcher.start()
1427 log.info("Rook-ceph namespace events watcher starting")
1428 self.ns_watcher.start()
1429
1430 self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher])
1431
1432 while True:
1433 # stay alive
1434 time.sleep(1)
1435
1436 trackers = self.trackers
1437 for t in trackers:
1438 if not t.is_alive() and not t.reported:
1439 log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health))
1440 t.reported = True
1441
1442 else:
1443 self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?"
1444 log.warning(self.error_msg)
1445 log.warning("k8sevents module exiting")
1446 self.run = False
1447
1448 def shutdown(self):
1449 self.run = False
1450 log.info("Shutting down k8sevents module")
1451 self.event_processor.can_run = False
1452
1453 if self._rados:
1454 self._rados.shutdown()