]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/k8sevents/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / k8sevents / module.py
CommitLineData
eafe8130
TL
1# Integrate with the kubernetes events API.
2# This module sends events to Kubernetes, and also captures/tracks all events
3# in the rook-ceph namespace so kubernetes activity like pod restarts,
4# imagepulls etc can be seen from within the ceph cluster itself.
5#
6# To interact with the events API, the mgr service to access needs to be
7# granted additional permissions
8# e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules
9#
10# These are the changes needed;
11# - apiGroups:
12# - ""
13# resources:
14# - events
15# verbs:
16# - create
17# - patch
18# - list
19# - get
20# - watch
21
22
23import os
24import re
25import sys
26import time
27import json
28import yaml
29import errno
30import socket
31import base64
32import logging
33import tempfile
34import threading
35
f67539c2 36from urllib.parse import urlparse
eafe8130
TL
37from datetime import tzinfo, datetime, timedelta
38
39from urllib3.exceptions import MaxRetryError,ProtocolError
40from collections import OrderedDict
41
42import rados
20effc67 43from mgr_module import MgrModule, NotifyType
eafe8130
TL
44from mgr_util import verify_cacrt, ServerConfigException
45
46try:
47 import queue
48except ImportError:
49 # python 2.7.5
50 import Queue as queue
51finally:
52 # python 2.7.15 or python3
53 event_queue = queue.Queue()
54
55try:
56 from kubernetes import client, config, watch
57 from kubernetes.client.rest import ApiException
58except ImportError:
59 kubernetes_imported = False
60 client = None
61 config = None
62 watch = None
63else:
64 kubernetes_imported = True
65
66 # The watch.Watch.stream method can provide event objects that have involved_object = None
67 # which causes an exception in the generator. A workaround is discussed for a similar issue
68 # in https://github.com/kubernetes-client/python/issues/376 which has been used here
69 # pylint: disable=no-member
70 from kubernetes.client.models.v1_event import V1Event
71 def local_involved_object(self, involved_object):
72 if involved_object is None:
73 involved_object = client.V1ObjectReference(api_version="1")
74 self._involved_object = involved_object
75 V1Event.involved_object = V1Event.involved_object.setter(local_involved_object)
76
77log = logging.getLogger(__name__)
78
79# use a simple local class to represent UTC
80# datetime pkg modules vary between python2 and 3 and pytz is not available on older
81# ceph container images, so taking a pragmatic approach!
82class UTC(tzinfo):
83 def utcoffset(self, dt):
84 return timedelta(0)
85
86 def tzname(self, dt):
87 return "UTC"
88
89 def dst(self, dt):
90 return timedelta(0)
91
92
93def text_suffix(num):
94 """Define a text suffix based on a value i.e. turn host into hosts"""
95 return '' if num == 1 else 's'
96
97
98def create_temp_file(fname, content, suffix=".tmp"):
99 """Create a temp file
100
101 Attempt to create an temporary file containing the given content
102
103 Returns:
104 str .. full path to the temporary file
105
106 Raises:
107 OSError: problems creating the file
108
109 """
110
111 if content is not None:
112 file_name = os.path.join(tempfile.gettempdir(), fname + suffix)
113
114 try:
115 with open(file_name, "w") as f:
116 f.write(content)
117 except OSError as e:
118 raise OSError("Unable to create temporary file : {}".format(str(e)))
119
120 return file_name
121
122
123class HealthCheck(object):
124 """Transform a healthcheck msg into it's component parts"""
125
126 def __init__(self, msg, msg_level):
127
128 # msg looks like
129 #
130 # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY)
131 # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set)
132 # Health check failed: nodown flag(s) set (OSDMAP_FLAGS)
133 #
134 self.msg = None
135 self.name = None
136 self.text = None
137 self.valid = False
138
139 if msg.lower().startswith('health check'):
140
141 self.valid = True
142 self.msg = msg
143 msg_tokens = self.msg.split()
144
145 if msg_level == 'INF':
146 self.text = ' '.join(msg_tokens[3:])
147 self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS
148 else: # WRN or ERR
149 self.text = ' '.join(msg_tokens[3:-1])
150 self.name = msg_tokens[-1][1:-1]
151
152
153class LogEntry(object):
154 """Generic 'log' object"""
155
156 reason_map = {
157 "audit": "Audit",
158 "cluster": "HealthCheck",
159 "config": "ClusterChange",
160 "heartbeat":"Heartbeat",
161 "startup": "Started"
162 }
163
164 def __init__(self, source, msg, msg_type, level, tstamp=None):
165
166 self.source = source
167 self.msg = msg
168 self.msg_type = msg_type
169 self.level = level
170 self.tstamp = tstamp
171 self.healthcheck = None
172
173 if 'health check ' in self.msg.lower():
174 self.healthcheck = HealthCheck(self.msg, self.level)
175
176
177 def __str__(self):
178 return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,
179 self.msg_type,
180 self.msg,
181 self.level,
182 self.tstamp)
183
184 @property
185 def cmd(self):
186 """Look at the msg string and extract the command content"""
187
188 # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished'
189 if self.msg_type != 'audit':
190 return None
191 else:
192 _m=self.msg[:-10].replace("\'","").split("cmd=")
193 _s='"cmd":{}'.format(_m[1])
194 cmds_list = json.loads('{' + _s + '}')['cmd']
195
196 # TODO. Assuming only one command was issued for now
197 _c = cmds_list[0]
198 return "{} {}".format(_c['prefix'], _c.get('key', ''))
199
200 @property
201 def event_type(self):
202 return 'Normal' if self.level == 'INF' else 'Warning'
203
204 @property
205 def event_reason(self):
206 return self.reason_map[self.msg_type]
207
208 @property
209 def event_name(self):
210 if self.msg_type == 'heartbeat':
211 return 'mgr.Heartbeat'
212 elif self.healthcheck:
213 return 'mgr.health.{}'.format(self.healthcheck.name)
214 elif self.msg_type == 'audit':
215 return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_')
216 elif self.msg_type == 'config':
217 return 'mgr.ConfigurationChange'
218 elif self.msg_type == 'startup':
219 return "mgr.k8sevents-module"
220 else:
221 return None
222
223 @property
224 def event_entity(self):
225 if self.msg_type == 'audit':
226 return self.msg.replace("\'","").split('entity=')[1].split(' ')[0]
227 else:
228 return None
229
230 @property
231 def event_msg(self):
232 if self.msg_type == 'audit':
233 return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd)
234
235 elif self.healthcheck:
236 return self.healthcheck.text
237 else:
238 return self.msg
239
240
241class BaseThread(threading.Thread):
242 health = 'OK'
243 reported = False
244 daemon = True
245
246
f6b5b4d7
TL
247def clean_event(event):
248 """ clean an event record """
249 if not event.first_timestamp:
250 log.error("first_timestamp is empty")
251 if event.metadata.creation_timestamp:
252 log.error("setting first_timestamp to the creation timestamp")
253 event.first_timestamp = event.metadata.creation_timestamp
254 else:
255 log.error("defaulting event first timestamp to current datetime")
256 event.first_timestamp = datetime.datetime.now()
257
258 if not event.last_timestamp:
259 log.error("setting event last timestamp to {}".format(event.first_timestamp))
260 event.last_timestamp = event.first_timestamp
261
262 if not event.count:
263 event.count = 1
264
265 return event
266
267
eafe8130
TL
268class NamespaceWatcher(BaseThread):
269 """Watch events in a given namespace
270
271 Using the watch package we can listen to event traffic in the namespace to
272 get an idea of what kubernetes related events surround the ceph cluster. The
273 thing to bear in mind is that events have a TTL enforced by the kube-apiserver
274 so this stream will only really show activity inside this retention window.
275 """
276
277 def __init__(self, api_client_config, namespace=None):
278 super(NamespaceWatcher, self).__init__()
279
280 if api_client_config:
281 self.api = client.CoreV1Api(api_client_config)
282 else:
283 self.api = client.CoreV1Api()
284
285 self.namespace = namespace
286
287 self.events = OrderedDict()
288 self.lock = threading.Lock()
289 self.active = None
290 self.resource_version = None
291
292 def fetch(self):
293 # clear the cache on every call to fetch
294 self.events.clear()
295 try:
296 resp = self.api.list_namespaced_event(self.namespace)
297 # TODO - Perhaps test for auth problem to be more specific in the except clause?
298 except:
299 self.active = False
300 self.health = "Unable to access events API (list_namespaced_event call failed)"
301 log.warning(self.health)
302 else:
303 self.active = True
304 self.resource_version = resp.metadata.resource_version
305
306 for item in resp.items:
f6b5b4d7 307 self.events[item.metadata.name] = clean_event(item)
eafe8130
TL
308 log.info('Added {} events'.format(len(resp.items)))
309
310 def run(self):
311 self.fetch()
312 func = getattr(self.api, "list_namespaced_event")
313
314 if self.active:
315 log.info("Namespace event watcher started")
316
317
318 while True:
319
320 try:
321 w = watch.Watch()
322 # execute generator to continually watch resource for changes
323 for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True):
324 obj = item['object']
325
326 with self.lock:
327
328 if item['type'] in ['ADDED', 'MODIFIED']:
f6b5b4d7 329 self.events[obj.metadata.name] = clean_event(obj)
eafe8130
TL
330
331 elif item['type'] == 'DELETED':
332 del self.events[obj.metadata.name]
333
334 # TODO test the exception for auth problem (403?)
335
336 # Attribute error is generated when urllib3 on the system is old and doesn't have a
337 # read_chunked method
338 except AttributeError as e:
339 self.health = ("Error: Unable to 'watch' events API in namespace '{}' - "
340 "urllib3 too old? ({})".format(self.namespace, e))
341 self.active = False
342 log.warning(self.health)
343 break
344
345 except ApiException as e:
346 # refresh the resource_version & watcher
347 log.warning("API exception caught in watcher ({})".format(e))
348 log.warning("Restarting namespace watcher")
349 self.fetch()
350
f6b5b4d7
TL
351 except ProtocolError as e:
352 log.warning("Namespace watcher hit protocolerror ({}) - restarting".format(e))
353 self.fetch()
354
eafe8130
TL
355 except Exception:
356 self.health = "{} Exception at {}".format(
357 sys.exc_info()[0].__name__,
358 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
359 )
360 log.exception(self.health)
361 self.active = False
362 break
363
364 log.warning("Namespace event watcher stopped")
365
366
367class KubernetesEvent(object):
368
369 def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None):
370
371 if api_client_config:
372 self.api = client.CoreV1Api(api_client_config)
373 else:
374 self.api = client.CoreV1Api()
375
376 self.namespace = namespace
377
378 self.event_name = log_entry.event_name
379 self.message = log_entry.event_msg
380 self.event_type = log_entry.event_type
381 self.event_reason = log_entry.event_reason
382 self.unique_name = unique_name
383
384 self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN'))
385
386 self.api_status = 200
387 self.count = 1
388 self.first_timestamp = None
389 self.last_timestamp = None
390
391 @property
392 def type(self):
393 """provide a type property matching a V1Event object"""
394 return self.event_type
395
396 @property
397 def event_body(self):
398 if self.unique_name:
399 obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name))
400 else:
401 obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name))
402
403 # field_path is needed to prevent problems in the namespacewatcher when
404 # deleted event are received
405 obj_ref = client.V1ObjectReference(kind="CephCluster",
406 field_path='spec.containers{mgr}',
407 name=self.event_name,
408 namespace=self.namespace)
409
410 event_source = client.V1EventSource(component="ceph-mgr",
411 host=self.host)
412 return client.V1Event(
413 involved_object=obj_ref,
414 metadata=obj_meta,
415 message=self.message,
416 count=self.count,
417 type=self.event_type,
418 reason=self.event_reason,
419 source=event_source,
420 first_timestamp=self.first_timestamp,
421 last_timestamp=self.last_timestamp
422 )
423
424 def write(self):
425
426 now=datetime.now(UTC())
427
428 self.first_timestamp = now
429 self.last_timestamp = now
430
431 try:
432 self.api.create_namespaced_event(self.namespace, self.event_body)
433 except (OSError, ProtocolError):
434 # unable to reach to the API server
435 log.error("Unable to reach API server")
436 self.api_status = 400
437 except MaxRetryError:
438 # k8s config has not be defined properly
439 log.error("multiple attempts to connect to the API have failed")
440 self.api_status = 403 # Forbidden
441 except ApiException as e:
442 log.debug("event.write status:{}".format(e.status))
443 self.api_status = e.status
444 if e.status == 409:
445 log.debug("attempting event update for an existing event")
446 # 409 means the event is there already, so read it back (v1Event object returned)
447 # this could happen if the event has been created, and then the k8sevent module
448 # disabled and reenabled - i.e. the internal event tracking no longer matches k8s
449 response = self.api.read_namespaced_event(self.event_name, self.namespace)
450 #
451 # response looks like
452 #
453 # {'action': None,
454 # 'api_version': 'v1',
455 # 'count': 1,
456 # 'event_time': None,
457 # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
458 # 'involved_object': {'api_version': None,
459 # 'field_path': None,
460 # 'kind': 'CephCluster',
461 # 'name': 'ceph-mgr.k8sevent-module',
462 # 'namespace': 'rook-ceph',
463 # 'resource_version': None,
464 # 'uid': None},
465 # 'kind': 'Event',
466 # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
467 # 'message': 'Ceph log -> event tracking started',
468 # 'metadata': {'annotations': None,
469 # 'cluster_name': None,
470 # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
471 # 'deletion_grace_period_seconds': None,
472 # 'deletion_timestamp': None,
473 # 'finalizers': None,
474 # 'generate_name': 'ceph-mgr.k8sevent-module',
475 # 'generation': None,
476 # 'initializers': None,
477 # 'labels': None,
478 # 'name': 'ceph-mgr.k8sevent-module5z7kq',
479 # 'namespace': 'rook-ceph',
480 # 'owner_references': None,
481 # 'resource_version': '1195832',
482 # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq',
483 # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'},
484 # 'reason': 'Started',
485 # 'related': None,
486 # 'reporting_component': '',
487 # 'reporting_instance': '',
488 # 'series': None,
489 # 'source': {'component': 'ceph-mgr', 'host': 'minikube'},
490 # 'type': 'Normal'}
491
492 # conflict event already exists
493 # read it
494 # update : count and last_timestamp and msg
495
496 self.count = response.count + 1
497 self.first_timestamp = response.first_timestamp
498 try:
499 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
500 except ApiException as e:
501 log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status))
502 self.api_status = e.status
503 else:
504 log.debug("event {} patched".format(self.event_name))
505 self.api_status = 200
506
507 else:
508 log.debug("event {} created successfully".format(self.event_name))
509 self.api_status = 200
510
511 @property
512 def api_success(self):
513 return self.api_status == 200
514
515 def update(self, log_entry):
516 self.message = log_entry.event_msg
517 self.event_type = log_entry.event_type
518 self.last_timestamp = datetime.now(UTC())
519 self.count += 1
520 log.debug("performing event update for {}".format(self.event_name))
521
522 try:
523 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
524 except ApiException as e:
525 log.error("event patch call failed: {}".format(e.status))
526 if e.status == 404:
527 # tried to patch, but hit a 404. The event's TTL must have been reached, and
528 # pruned by the kube-apiserver
529 log.debug("event not found, so attempting to create it")
530 try:
531 self.api.create_namespaced_event(self.namespace, self.event_body)
532 except ApiException as e:
533 log.error("unable to create the event: {}".format(e.status))
534 self.api_status = e.status
535 else:
536 log.debug("event {} created successfully".format(self.event_name))
537 self.api_status = 200
538 else:
539 log.debug("event {} updated".format(self.event_name))
540 self.api_status = 200
541
542
543class EventProcessor(BaseThread):
544 """Handle a global queue used to track events we want to send/update to kubernetes"""
545
546 can_run = True
547
548 def __init__(self, config_watcher, event_retention_days, api_client_config, namespace):
549 super(EventProcessor, self).__init__()
550
551 self.events = dict()
552 self.config_watcher = config_watcher
553 self.event_retention_days = event_retention_days
554 self.api_client_config = api_client_config
555 self.namespace = namespace
556
557 def startup(self):
558 """Log an event to show we're active"""
559
560 event = KubernetesEvent(
561 LogEntry(
562 source='self',
563 msg='Ceph log -> event tracking started',
564 msg_type='startup',
565 level='INF',
566 tstamp=None
567 ),
568 unique_name=False,
569 api_client_config=self.api_client_config,
570 namespace=self.namespace
571 )
572
573 event.write()
574 return event.api_success
575
576 @property
577 def ok(self):
578 return self.startup()
579
580 def prune_events(self):
581 log.debug("prune_events - looking for old events to remove from cache")
582 oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days)
583 local_events = dict(self.events)
584
585 for event_name in sorted(local_events,
586 key = lambda name: local_events[name].last_timestamp):
587 event = local_events[event_name]
588 if event.last_timestamp >= oldest:
589 break
590 else:
591 # drop this event
592 log.debug("prune_events - removing old event : {}".format(event_name))
593 del self.events[event_name]
594
595 def process(self, log_object):
596
597 log.debug("log entry being processed : {}".format(str(log_object)))
598
599 event_out = False
600 unique_name = True
601
602 if log_object.msg_type == 'audit':
603 # audit traffic : operator commands
604 if log_object.msg.endswith('finished'):
605 log.debug("K8sevents received command finished msg")
606 event_out = True
607 else:
608 # NO OP - ignoring 'dispatch' log records
609 return
610
611 elif log_object.msg_type == 'cluster':
612 # cluster messages : health checks
613 if log_object.event_name:
614 event_out = True
615
616 elif log_object.msg_type == 'config':
617 # configuration checker messages
618 event_out = True
619 unique_name = False
620
621 elif log_object.msg_type == 'heartbeat':
622 # hourly health message summary from Ceph
623 event_out = True
624 unique_name = False
625 log_object.msg = str(self.config_watcher)
626
627 else:
628 log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type))
629
630 if event_out:
631 log.debug("k8sevents sending event to kubernetes")
632 # we don't cache non-unique events like heartbeats or config changes
633 if not unique_name or log_object.event_name not in self.events.keys():
634 event = KubernetesEvent(log_entry=log_object,
635 unique_name=unique_name,
636 api_client_config=self.api_client_config,
637 namespace=self.namespace)
638 event.write()
639 log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status))
640 if event.api_success and unique_name:
641 self.events[log_object.event_name] = event
642 else:
643 event = self.events[log_object.event_name]
644 event.update(log_object)
645 log.debug("event update ended : {}".format(event.api_status))
646
647 self.prune_events()
648
649 else:
650 log.debug("K8sevents ignored message : {}".format(log_object.msg))
651
652 def run(self):
653 log.info("Ceph event processing thread started, "
654 "event retention set to {} days".format(self.event_retention_days))
655
656 while True:
657
658 try:
659 log_object = event_queue.get(block=False)
660 except queue.Empty:
661 pass
662 else:
663 try:
664 self.process(log_object)
665 except Exception:
666 self.health = "{} Exception at {}".format(
667 sys.exc_info()[0].__name__,
668 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
669 )
670 log.exception(self.health)
671 break
672
673 if not self.can_run:
674 break
675
676 time.sleep(0.5)
677
678 log.warning("Ceph event processing thread stopped")
679
680
681class ListDiff(object):
682 def __init__(self, before, after):
683 self.before = set(before)
684 self.after = set(after)
685
686 @property
687 def removed(self):
688 return list(self.before - self.after)
689
690 @property
691 def added(self):
692 return list(self.after - self.before)
693
694 @property
695 def is_equal(self):
696 return self.before == self.after
697
698
699class CephConfigWatcher(BaseThread):
700 """Detect configuration changes within the cluster and generate human readable events"""
701
702 def __init__(self, mgr):
703 super(CephConfigWatcher, self).__init__()
704 self.mgr = mgr
705 self.server_map = dict()
706 self.osd_map = dict()
707 self.pool_map = dict()
708 self.service_map = dict()
709
710 self.config_check_secs = mgr.config_check_secs
711
712 @property
713 def raw_capacity(self):
714 # Note. if the osd's are not online the capacity field will be 0
715 return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map])
716
717 @property
718 def num_servers(self):
719 return len(self.server_map.keys())
720
721 @property
722 def num_osds(self):
723 return len(self.osd_map.keys())
724
725 @property
726 def num_pools(self):
727 return len(self.pool_map.keys())
728
729 def __str__(self):
730 s = ''
731
732 s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format(
733 json.loads(self.mgr.get('health')['json'])['status'],
734 self.num_servers,
735 text_suffix(self.num_servers),
736 self.num_pools,
737 text_suffix(self.num_pools),
738 self.num_osds,
739 MgrModule.to_pretty_iec(self.raw_capacity))
740 return s
741
742 def fetch_servers(self):
743 """Return a server summary, and service summary"""
744 servers = self.mgr.list_servers()
745 server_map = dict() # host -> services
746 service_map = dict() # service -> host
747 for server_info in servers:
748 services = dict()
749 for svc in server_info['services']:
750 if svc.get('type') in services.keys():
751 services[svc.get('type')].append(svc.get('id'))
752 else:
753 services[svc.get('type')] = list([svc.get('id')])
754 # maintain the service xref map service -> host and version
755 service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '')
756 server_map[server_info.get('hostname')] = services
757
758 return server_map, service_map
759
760 def fetch_pools(self):
761 interesting = ["type", "size", "min_size"]
762 # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool',
763 # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn',
764 # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100,
765 # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0',
766 # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0,
767 # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1,
768 # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0,
769 # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000,
770 # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0,
771 # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0,
772 # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0,
773 # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0,
774 # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}},
775 # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0',
776 # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}]
777 pools = self.mgr.get('osd_map')['pools']
778 pool_map = dict()
779 for pool in pools:
780 pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting}
781 return pool_map
782
783
784 def fetch_osd_map(self, service_map):
785 """Create an osd map"""
786 stats = self.mgr.get('osd_stats')
787
788 osd_map = dict()
789
790 devices = self.mgr.get('osd_map_crush')['devices']
791 for dev in devices:
792 osd_id = str(dev['id'])
793 osd_map[osd_id] = dict(
794 deviceclass=dev.get('class'),
795 capacity=0,
796 hostname=service_map['osd', osd_id]
797 )
798
799 for osd_stat in stats['osd_stats']:
800 osd_id = str(osd_stat.get('osd'))
801 osd_map[osd_id]['capacity'] = osd_stat['statfs']['total']
802
803 return osd_map
804
805 def push_events(self, changes):
806 """Add config change to the global queue to generate an event in kubernetes"""
807 log.debug("{} events will be generated")
808 for change in changes:
809 event_queue.put(change)
810
811 def _generate_config_logentry(self, msg):
812 return LogEntry(
813 source="config",
814 msg_type="config",
815 msg=msg,
816 level='INF',
817 tstamp=None
818 )
819
820 def _check_hosts(self, server_map):
821 log.debug("K8sevents checking host membership")
822 changes = list()
823 servers = ListDiff(self.server_map.keys(), server_map.keys())
824 if servers.is_equal:
825 # no hosts have been added or removed
826 pass
827 else:
828 # host changes detected, find out what
829 host_msg = "Host '{}' has been {} the cluster"
830 for new_server in servers.added:
831 changes.append(self._generate_config_logentry(
832 msg=host_msg.format(new_server, 'added to'))
833 )
834
835 for removed_server in servers.removed:
836 changes.append(self._generate_config_logentry(
837 msg=host_msg.format(removed_server, 'removed from'))
838 )
839
840 return changes
841
842 def _check_osds(self,server_map, osd_map):
843 log.debug("K8sevents checking OSD configuration")
844 changes = list()
845 before_osds = list()
846 for svr in self.server_map:
847 before_osds.extend(self.server_map[svr].get('osd',[]))
848
849 after_osds = list()
850 for svr in server_map:
851 after_osds.extend(server_map[svr].get('osd',[]))
852
853 if set(before_osds) == set(after_osds):
854 # no change in osd id's
855 pass
856 else:
857 # osd changes detected
858 osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}"
859
860 osds = ListDiff(before_osds, after_osds)
861 for new_osd in osds.added:
862 changes.append(self._generate_config_logentry(
863 msg=osd_msg.format(
864 new_osd,
865 osd_map[new_osd]['deviceclass'],
866 MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']),
867 'added to',
868 osd_map[new_osd]['hostname']))
869 )
870
871 for removed_osd in osds.removed:
872 changes.append(self._generate_config_logentry(
873 msg=osd_msg.format(
874 removed_osd,
875 osd_map[removed_osd]['deviceclass'],
876 MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']),
877 'removed from',
878 osd_map[removed_osd]['hostname']))
879 )
880
881 return changes
882
883 def _check_pools(self, pool_map):
884 changes = list()
885 log.debug("K8sevents checking pool configurations")
886 if self.pool_map.keys() == pool_map.keys():
887 # no pools added/removed
888 pass
889 else:
890 # Pool changes
891 pools = ListDiff(self.pool_map.keys(), pool_map.keys())
892 pool_msg = "Pool '{}' has been {} the cluster"
893 for new_pool in pools.added:
894 changes.append(self._generate_config_logentry(
895 msg=pool_msg.format(new_pool, 'added to'))
896 )
897
898 for removed_pool in pools.removed:
899 changes.append(self._generate_config_logentry(
900 msg=pool_msg.format(removed_pool, 'removed from'))
901 )
902
903 # check pool configuration changes
904 for pool_name in pool_map:
905 if not self.pool_map.get(pool_name, dict()):
906 # pool didn't exist before so just skip the checks
907 continue
908
909 if pool_map[pool_name] == self.pool_map[pool_name]:
910 # no changes - dicts match in key and value
911 continue
912 else:
913 # determine the change and add it to the change list
914 size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size']
915 if size_diff != 0:
916 if size_diff < 0:
917 msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name,
918 pool_map[pool_name]['size'])
919 level = 'WRN'
920 else:
921 msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name,
922 pool_map[pool_name]['size'])
923 level = 'INF'
924
925 changes.append(LogEntry(source="config",
926 msg_type="config",
927 msg=msg,
928 level=level,
929 tstamp=None)
930 )
931
932 if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']:
933 changes.append(LogEntry(source="config",
934 msg_type="config",
935 msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name),
936 level='WRN',
937 tstamp=None)
938 )
939
940 return changes
941
942 def get_changes(self, server_map, osd_map, pool_map):
943 """Detect changes in maps between current observation and the last"""
944
945 changes = list()
946
947 changes.extend(self._check_hosts(server_map))
948 changes.extend(self._check_osds(server_map, osd_map))
949 changes.extend(self._check_pools(pool_map))
950
951 # FUTURE
952 # Could generate an event if a ceph daemon has moved hosts
953 # (assumes the ceph metadata host information is valid though!)
954
955 return changes
956
957 def run(self):
958 log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs))
959
960 self.server_map, self.service_map = self.fetch_servers()
961 self.pool_map = self.fetch_pools()
962
963 self.osd_map = self.fetch_osd_map(self.service_map)
964
965 while True:
966
967 try:
968 start_time = time.time()
969 server_map, service_map = self.fetch_servers()
970 pool_map = self.fetch_pools()
971 osd_map = self.fetch_osd_map(service_map)
972
973 changes = self.get_changes(server_map, osd_map, pool_map)
974 if changes:
975 self.push_events(changes)
976
977 self.osd_map = osd_map
978 self.pool_map = pool_map
979 self.server_map = server_map
980 self.service_map = service_map
981
982 checks_duration = int(time.time() - start_time)
983
984 # check that the time it took to run the checks fits within the
985 # interval, and if not extend the interval and emit a log message
986 # to show that the runtime for the checks exceeded the desired
987 # interval
988 if checks_duration > self.config_check_secs:
989 new_interval = self.config_check_secs * 2
990 log.warning("K8sevents check interval warning. "
991 "Current checks took {}s, interval was {}s. "
992 "Increasing interval to {}s".format(int(checks_duration),
993 self.config_check_secs,
994 new_interval))
995 self.config_check_secs = new_interval
996
997 time.sleep(self.config_check_secs)
998
999 except Exception:
1000 self.health = "{} Exception at {}".format(
1001 sys.exc_info()[0].__name__,
1002 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
1003 )
1004 log.exception(self.health)
1005 break
1006
1007 log.warning("Ceph configuration watcher stopped")
1008
1009
1010class Module(MgrModule):
1011 COMMANDS = [
1012 {
1013 "cmd": "k8sevents status",
1014 "desc": "Show the status of the data gathering threads",
1015 "perm": "r"
1016 },
1017 {
1018 "cmd": "k8sevents ls",
1019 "desc": "List all current Kuberenetes events from the Ceph namespace",
1020 "perm": "r"
1021 },
1022 {
1023 "cmd": "k8sevents ceph",
1024 "desc": "List Ceph events tracked & sent to the kubernetes cluster",
1025 "perm": "r"
1026 },
1027 {
1028 "cmd": "k8sevents set-access name=key,type=CephString",
a4b75251 1029 "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax (e.g., ceph k8sevents set-access cacrt -i /root/ca.crt).",
eafe8130
TL
1030 "perm": "rw"
1031 },
1032 {
1033 "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString",
a4b75251 1034 "desc": "Set kubernetes config paramters. <key> must be server or namespace (e.g., ceph k8sevents set-config server https://localhost:30433).",
eafe8130
TL
1035 "perm": "rw"
1036 },
1037 {
1038 "cmd": "k8sevents clear-config",
1039 "desc": "Clear external kubernetes configuration settings",
1040 "perm": "rw"
1041 },
1042 ]
1043 MODULE_OPTIONS = [
1044 {'name': 'config_check_secs',
1045 'type': 'int',
1046 'default': 10,
1047 'min': 10,
1048 'desc': "interval (secs) to check for cluster configuration changes"},
1049 {'name': 'ceph_event_retention_days',
1050 'type': 'int',
1051 'default': 7,
1052 'desc': "Days to hold ceph event information within local cache"}
1053 ]
20effc67 1054 NOTIFY_TYPES = [NotifyType.clog]
eafe8130
TL
1055
1056 def __init__(self, *args, **kwargs):
1057 self.run = True
1058 self.kubernetes_control = 'POD_NAME' in os.environ
1059 self.event_processor = None
1060 self.config_watcher = None
1061 self.ns_watcher = None
1062 self.trackers = list()
1063 self.error_msg = None
1064 self._api_client_config = None
1065 self._namespace = None
1066
1067 # Declare the module options we accept
1068 self.config_check_secs = None
1069 self.ceph_event_retention_days = None
1070
1071 self.k8s_config = dict(
1072 cacrt = None,
1073 token = None,
1074 server = None,
1075 namespace = None
1076 )
1077
1078 super(Module, self).__init__(*args, **kwargs)
1079
1080 def k8s_ready(self):
1081 """Validate the k8s_config dict
1082
1083 Returns:
1084 - bool .... indicating whether the config is ready to use
1085 - string .. variables that need to be defined before the module will function
1086
1087 """
1088 missing = list()
1089 ready = True
1090 for k in self.k8s_config:
1091 if not self.k8s_config[k]:
1092 missing.append(k)
1093 ready = False
1094 return ready, missing
1095
1096 def config_notify(self):
1097 """Apply runtime module options, and defaults from the modules KV store"""
1098 self.log.debug("applying runtime module option settings")
1099 for opt in self.MODULE_OPTIONS:
1100 setattr(self,
1101 opt['name'],
9f95a23c 1102 self.get_module_option(opt['name']))
eafe8130
TL
1103
1104 if not self.kubernetes_control:
1105 # Populate the config
1106 self.log.debug("loading config from KV store")
1107 for k in self.k8s_config:
1108 self.k8s_config[k] = self.get_store(k, default=None)
1109
1110 def fetch_events(self, limit=None):
1111 """Interface to expose current events to another mgr module"""
1112 # FUTURE: Implement this to provide k8s events to the dashboard?
1113 raise NotImplementedError
1114
1115 def process_clog(self, log_message):
1116 """Add log message to the event queue
1117
1118 :param log_message: dict from the cluster log (audit/cluster channels)
1119 """
1120 required_fields = ['channel', 'message', 'priority', 'stamp']
1121 _message_attrs = log_message.keys()
1122 if all(_field in _message_attrs for _field in required_fields):
1123 self.log.debug("clog entry received - adding to the queue")
1124 if log_message.get('message').startswith('overall HEALTH'):
1125 m_type = 'heartbeat'
1126 else:
1127 m_type = log_message.get('channel')
1128
1129 event_queue.put(
1130 LogEntry(
1131 source='log',
1132 msg_type=m_type,
1133 msg=log_message.get('message'),
1134 level=log_message.get('priority')[1:-1],
1135 tstamp=log_message.get('stamp')
1136 )
1137 )
1138
1139 else:
1140 self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
1141
20effc67 1142 def notify(self, notify_type: NotifyType, notify_id):
eafe8130
TL
1143 """
1144 Called by the ceph-mgr service to notify the Python plugin
1145 that new state is available.
1146
1147 :param notify_type: string indicating what kind of notification,
1148 such as osd_map, mon_map, fs_map, mon_status,
1149 health, pg_summary, command, service_map
1150 :param notify_id: string (may be empty) that optionally specifies
1151 which entity is being notified about. With
1152 "command" notifications this is set to the tag
1153 ``from send_command``.
1154 """
1155
1156 # only interested in cluster log (clog) messages for now
20effc67 1157 if notify_type == NotifyType.clog:
eafe8130
TL
1158 self.log.debug("received a clog entry from mgr.notify")
1159 if isinstance(notify_id, dict):
1160 # create a log object to process
1161 self.process_clog(notify_id)
1162 else:
1163 self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type)))
1164
1165 def _show_events(self, events):
1166
1167 max_msg_length = max([len(events[k].message) for k in events])
1168 fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n"
1169 s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name")
1170
1171 for event_name in sorted(events,
1172 key = lambda name: events[name].last_timestamp,
1173 reverse=True):
1174
1175 event = events[event_name]
1176
1177 s += fmt.format(
1178 datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"),
f6b5b4d7
TL
1179 str(event.type),
1180 str(event.count),
1181 str(event.message),
1182 str(event_name)
eafe8130
TL
1183 )
1184 s += "Total : {:>3}\n".format(len(events))
1185 return s
1186
1187 def show_events(self, events):
1188 """Show events we're holding from the ceph namespace - most recent 1st"""
1189
1190 if len(events):
1191 return 0, "", self._show_events(events)
1192 else:
1193 return 0, "", "No events emitted yet, local cache is empty"
1194
1195 def show_status(self):
1196 s = "Kubernetes\n"
1197 s += "- Hostname : {}\n".format(self.k8s_config['server'])
1198 s += "- Namespace : {}\n".format(self._namespace)
1199 s += "Tracker Health\n"
1200 for t in self.trackers:
1201 s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health)
1202 s += "Tracked Events\n"
1203 s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events))
1204 s += "- ceph events : {:>3}\n".format(len(self.event_processor.events))
1205 return 0, "", s
1206
1207 def _valid_server(self, server):
1208 # must be a valid server url format
1209 server = server.strip()
1210
1211 res = urlparse(server)
1212 port = res.netloc.split(":")[-1]
1213
1214 if res.scheme != 'https':
1215 return False, "Server URL must use https"
1216
1217 elif not res.hostname:
1218 return False, "Invalid server URL format"
1219
1220 elif res.hostname:
1221 try:
1222 socket.gethostbyname(res.hostname)
1223 except socket.gaierror:
1224 return False, "Unresolvable server URL"
1225
1226 if not port.isdigit():
1227 return False, "Server URL must end in a port number"
1228
1229 return True, ""
1230
1231 def _valid_cacrt(self, cacrt_data):
1232 """use mgr_util.verify_cacrt to validate the CA file"""
1233
1234 cacrt_fname = create_temp_file("ca_file", cacrt_data)
1235
1236 try:
1237 verify_cacrt(cacrt_fname)
1238 except ServerConfigException as e:
1239 return False, "Invalid CA certificate: {}".format(str(e))
1240 else:
1241 return True, ""
1242
1243 def _valid_token(self, token_data):
1244 """basic checks on the token"""
1245 if not token_data:
1246 return False, "Token file is empty"
1247
1248 pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$")
1249 if not pattern.match(token_data):
1250 return False, "Token contains invalid characters"
1251
1252 return True, ""
1253
1254 def _valid_namespace(self, namespace):
1255 # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols
1256
1257 if len(namespace) > 253:
1258 return False, "Name too long"
1259 if namespace.isdigit():
1260 return False, "Invalid name - must be alphanumeric"
1261
1262 pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$")
1263 if not pattern.match(namespace):
1264 return False, "Invalid characters in the name"
1265
1266 return True, ""
1267
1268 def _config_set(self, key, val):
1269 """Attempt to validate the content, then save to KV store"""
1270
1271 val = val.rstrip() # remove any trailing whitespace/newline
1272
1273 try:
1274 checker = getattr(self, "_valid_" + key)
1275 except AttributeError:
1276 # no checker available, just let it pass
1277 self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key))
1278 valid = True
1279 else:
1280 valid, reason = checker(val)
1281
1282 if valid:
1283 self.set_store(key, val)
1284 self.log.info("Updated config KV Store item: " + key)
1285 return 0, "", "Config updated for parameter '{}'".format(key)
1286 else:
1287 return -22, "", "Invalid value for '{}' :{}".format(key, reason)
1288
1289 def clear_config_settings(self):
1290 for k in self.k8s_config:
1291 self.set_store(k, None)
1292 return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys()))
1293
1294 def handle_command(self, inbuf, cmd):
1295
1296 access_options = ['cacrt', 'token']
1297 config_options = ['server', 'namespace']
1298
1299 if cmd['prefix'] == 'k8sevents clear-config':
1300 return self.clear_config_settings()
1301
1302 if cmd['prefix'] == 'k8sevents set-access':
1303 if cmd['key'] not in access_options:
1304 return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options))
1305
1306 if inbuf:
1307 return self._config_set(cmd['key'], inbuf)
1308 else:
1309 return -errno.EINVAL, "", "Command must specify -i <filename>"
1310
1311 if cmd['prefix'] == 'k8sevents set-config':
1312
1313 if cmd['key'] not in config_options:
1314 return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options))
1315
1316 return self._config_set(cmd['key'], cmd['value'])
1317
1318 # At this point the command is trying to interact with k8sevents, so intercept if the configuration is
1319 # not ready
1320 if self.error_msg:
1321 _msg = "k8sevents unavailable: " + self.error_msg
1322 ready, _ = self.k8s_ready()
1323 if not self.kubernetes_control and not ready:
1324 _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect"
1325 return -errno.ENODATA, "", _msg
1326
1327 if cmd["prefix"] == "k8sevents status":
1328 return self.show_status()
1329
1330 elif cmd["prefix"] == "k8sevents ls":
1331 return self.show_events(self.ns_watcher.events)
1332
1333 elif cmd["prefix"] == "k8sevents ceph":
1334 return self.show_events(self.event_processor.events)
1335
1336 else:
1337 raise NotImplementedError(cmd["prefix"])
1338
1339 @staticmethod
1340 def can_run():
1341 """Determine whether the pre-reqs for the module are in place"""
1342
1343 if not kubernetes_imported:
1344 return False, "kubernetes python client is not available"
1345 return True, ""
1346
1347 def load_kubernetes_config(self):
1348 """Load configuration for remote kubernetes API using KV store values
1349
1350 Attempt to create an API client configuration from settings stored in
1351 KV store.
1352
1353 Returns:
1354 client.ApiClient: kubernetes API client object
1355
1356 Raises:
1357 OSError: unable to create the cacrt file
1358 """
1359
1360 # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a
1361 # temporary file containing the cert for the client to load from
1362 try:
1363 ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt'])
1364 except OSError as e:
1365 self.log.error("Unable to create file to hold cacrt: {}".format(str(e)))
1366 raise OSError(str(e))
1367 else:
1368 self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file))
1369
1370 configuration = client.Configuration()
1371 configuration.host = self.k8s_config['server']
1372 configuration.ssl_ca_cert = ca_crt_file
1373 configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] }
1374 api_client = client.ApiClient(configuration)
1375 self.log.info("API client created for remote kubernetes access using cacrt and token from KV store")
1376
1377 return api_client
1378
1379 def serve(self):
1380 # apply options set by CLI to this module
1381 self.config_notify()
1382
1383 if not kubernetes_imported:
1384 self.error_msg = "Unable to start : python kubernetes package is missing"
1385 else:
1386 if self.kubernetes_control:
1387 # running under rook-ceph
1388 config.load_incluster_config()
1389 self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'),
1390 os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN'))
1391 self._api_client_config = None
1392 self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph")
1393 else:
1394 # running outside of rook-ceph, so we need additional settings to tell us
1395 # how to connect to the kubernetes cluster
1396 ready, errors = self.k8s_ready()
1397 if not ready:
1398 self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors))
1399 else:
1400 try:
1401 self._api_client_config = self.load_kubernetes_config()
1402 except OSError as e:
1403 self.error_msg = str(e)
1404 else:
1405 self._namespace = self.k8s_config['namespace']
1406 self.log.info("k8sevents configuration loaded from KV store")
1407
1408 if self.error_msg:
1409 self.log.error(self.error_msg)
1410 return
1411
1412 # All checks have passed
1413 self.config_watcher = CephConfigWatcher(self)
1414
1415 self.event_processor = EventProcessor(self.config_watcher,
1416 self.ceph_event_retention_days,
1417 self._api_client_config,
1418 self._namespace)
1419
1420 self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config,
1421 namespace=self._namespace)
1422
1423 if self.event_processor.ok:
1424 log.info("Ceph Log processor thread starting")
1425 self.event_processor.start() # start log consumer thread
1426 log.info("Ceph config watcher thread starting")
1427 self.config_watcher.start()
1428 log.info("Rook-ceph namespace events watcher starting")
1429 self.ns_watcher.start()
1430
1431 self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher])
1432
1433 while True:
1434 # stay alive
1435 time.sleep(1)
1436
1437 trackers = self.trackers
1438 for t in trackers:
1439 if not t.is_alive() and not t.reported:
1440 log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health))
1441 t.reported = True
1442
1443 else:
1444 self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?"
1445 log.warning(self.error_msg)
1446 log.warning("k8sevents module exiting")
1447 self.run = False
1448
1449 def shutdown(self):
1450 self.run = False
1451 log.info("Shutting down k8sevents module")
1452 self.event_processor.can_run = False
1453
1454 if self._rados:
1455 self._rados.shutdown()