]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/k8sevents/module.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / k8sevents / module.py
CommitLineData
eafe8130
TL
1# Integrate with the kubernetes events API.
2# This module sends events to Kubernetes, and also captures/tracks all events
3# in the rook-ceph namespace so kubernetes activity like pod restarts,
4# imagepulls etc can be seen from within the ceph cluster itself.
5#
6# To interact with the events API, the mgr service to access needs to be
7# granted additional permissions
8# e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules
9#
10# These are the changes needed;
11# - apiGroups:
12# - ""
13# resources:
14# - events
15# verbs:
16# - create
17# - patch
18# - list
19# - get
20# - watch
21
22
23import os
24import re
25import sys
26import time
27import json
28import yaml
29import errno
30import socket
31import base64
32import logging
33import tempfile
34import threading
35
36try:
37 # python 3
38 from urllib.parse import urlparse
39except ImportError:
40 # python 2 fallback
41 from urlparse import urlparse
42
43from datetime import tzinfo, datetime, timedelta
44
45from urllib3.exceptions import MaxRetryError,ProtocolError
46from collections import OrderedDict
47
48import rados
49from mgr_module import MgrModule
50from mgr_util import verify_cacrt, ServerConfigException
51
52try:
53 import queue
54except ImportError:
55 # python 2.7.5
56 import Queue as queue
57finally:
58 # python 2.7.15 or python3
59 event_queue = queue.Queue()
60
61try:
62 from kubernetes import client, config, watch
63 from kubernetes.client.rest import ApiException
64except ImportError:
65 kubernetes_imported = False
66 client = None
67 config = None
68 watch = None
69else:
70 kubernetes_imported = True
71
72 # The watch.Watch.stream method can provide event objects that have involved_object = None
73 # which causes an exception in the generator. A workaround is discussed for a similar issue
74 # in https://github.com/kubernetes-client/python/issues/376 which has been used here
75 # pylint: disable=no-member
76 from kubernetes.client.models.v1_event import V1Event
77 def local_involved_object(self, involved_object):
78 if involved_object is None:
79 involved_object = client.V1ObjectReference(api_version="1")
80 self._involved_object = involved_object
81 V1Event.involved_object = V1Event.involved_object.setter(local_involved_object)
82
83log = logging.getLogger(__name__)
84
85# use a simple local class to represent UTC
86# datetime pkg modules vary between python2 and 3 and pytz is not available on older
87# ceph container images, so taking a pragmatic approach!
88class UTC(tzinfo):
89 def utcoffset(self, dt):
90 return timedelta(0)
91
92 def tzname(self, dt):
93 return "UTC"
94
95 def dst(self, dt):
96 return timedelta(0)
97
98
99def text_suffix(num):
100 """Define a text suffix based on a value i.e. turn host into hosts"""
101 return '' if num == 1 else 's'
102
103
104def create_temp_file(fname, content, suffix=".tmp"):
105 """Create a temp file
106
107 Attempt to create an temporary file containing the given content
108
109 Returns:
110 str .. full path to the temporary file
111
112 Raises:
113 OSError: problems creating the file
114
115 """
116
117 if content is not None:
118 file_name = os.path.join(tempfile.gettempdir(), fname + suffix)
119
120 try:
121 with open(file_name, "w") as f:
122 f.write(content)
123 except OSError as e:
124 raise OSError("Unable to create temporary file : {}".format(str(e)))
125
126 return file_name
127
128
129class HealthCheck(object):
130 """Transform a healthcheck msg into it's component parts"""
131
132 def __init__(self, msg, msg_level):
133
134 # msg looks like
135 #
136 # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY)
137 # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set)
138 # Health check failed: nodown flag(s) set (OSDMAP_FLAGS)
139 #
140 self.msg = None
141 self.name = None
142 self.text = None
143 self.valid = False
144
145 if msg.lower().startswith('health check'):
146
147 self.valid = True
148 self.msg = msg
149 msg_tokens = self.msg.split()
150
151 if msg_level == 'INF':
152 self.text = ' '.join(msg_tokens[3:])
153 self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS
154 else: # WRN or ERR
155 self.text = ' '.join(msg_tokens[3:-1])
156 self.name = msg_tokens[-1][1:-1]
157
158
159class LogEntry(object):
160 """Generic 'log' object"""
161
162 reason_map = {
163 "audit": "Audit",
164 "cluster": "HealthCheck",
165 "config": "ClusterChange",
166 "heartbeat":"Heartbeat",
167 "startup": "Started"
168 }
169
170 def __init__(self, source, msg, msg_type, level, tstamp=None):
171
172 self.source = source
173 self.msg = msg
174 self.msg_type = msg_type
175 self.level = level
176 self.tstamp = tstamp
177 self.healthcheck = None
178
179 if 'health check ' in self.msg.lower():
180 self.healthcheck = HealthCheck(self.msg, self.level)
181
182
183 def __str__(self):
184 return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,
185 self.msg_type,
186 self.msg,
187 self.level,
188 self.tstamp)
189
190 @property
191 def cmd(self):
192 """Look at the msg string and extract the command content"""
193
194 # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished'
195 if self.msg_type != 'audit':
196 return None
197 else:
198 _m=self.msg[:-10].replace("\'","").split("cmd=")
199 _s='"cmd":{}'.format(_m[1])
200 cmds_list = json.loads('{' + _s + '}')['cmd']
201
202 # TODO. Assuming only one command was issued for now
203 _c = cmds_list[0]
204 return "{} {}".format(_c['prefix'], _c.get('key', ''))
205
206 @property
207 def event_type(self):
208 return 'Normal' if self.level == 'INF' else 'Warning'
209
210 @property
211 def event_reason(self):
212 return self.reason_map[self.msg_type]
213
214 @property
215 def event_name(self):
216 if self.msg_type == 'heartbeat':
217 return 'mgr.Heartbeat'
218 elif self.healthcheck:
219 return 'mgr.health.{}'.format(self.healthcheck.name)
220 elif self.msg_type == 'audit':
221 return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_')
222 elif self.msg_type == 'config':
223 return 'mgr.ConfigurationChange'
224 elif self.msg_type == 'startup':
225 return "mgr.k8sevents-module"
226 else:
227 return None
228
229 @property
230 def event_entity(self):
231 if self.msg_type == 'audit':
232 return self.msg.replace("\'","").split('entity=')[1].split(' ')[0]
233 else:
234 return None
235
236 @property
237 def event_msg(self):
238 if self.msg_type == 'audit':
239 return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd)
240
241 elif self.healthcheck:
242 return self.healthcheck.text
243 else:
244 return self.msg
245
246
247class BaseThread(threading.Thread):
248 health = 'OK'
249 reported = False
250 daemon = True
251
252
253class NamespaceWatcher(BaseThread):
254 """Watch events in a given namespace
255
256 Using the watch package we can listen to event traffic in the namespace to
257 get an idea of what kubernetes related events surround the ceph cluster. The
258 thing to bear in mind is that events have a TTL enforced by the kube-apiserver
259 so this stream will only really show activity inside this retention window.
260 """
261
262 def __init__(self, api_client_config, namespace=None):
263 super(NamespaceWatcher, self).__init__()
264
265 if api_client_config:
266 self.api = client.CoreV1Api(api_client_config)
267 else:
268 self.api = client.CoreV1Api()
269
270 self.namespace = namespace
271
272 self.events = OrderedDict()
273 self.lock = threading.Lock()
274 self.active = None
275 self.resource_version = None
276
277 def fetch(self):
278 # clear the cache on every call to fetch
279 self.events.clear()
280 try:
281 resp = self.api.list_namespaced_event(self.namespace)
282 # TODO - Perhaps test for auth problem to be more specific in the except clause?
283 except:
284 self.active = False
285 self.health = "Unable to access events API (list_namespaced_event call failed)"
286 log.warning(self.health)
287 else:
288 self.active = True
289 self.resource_version = resp.metadata.resource_version
290
291 for item in resp.items:
292 self.events[item.metadata.name] = item
293 log.info('Added {} events'.format(len(resp.items)))
294
295 def run(self):
296 self.fetch()
297 func = getattr(self.api, "list_namespaced_event")
298
299 if self.active:
300 log.info("Namespace event watcher started")
301
302
303 while True:
304
305 try:
306 w = watch.Watch()
307 # execute generator to continually watch resource for changes
308 for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True):
309 obj = item['object']
310
311 with self.lock:
312
313 if item['type'] in ['ADDED', 'MODIFIED']:
314 self.events[obj.metadata.name] = obj
315
316 elif item['type'] == 'DELETED':
317 del self.events[obj.metadata.name]
318
319 # TODO test the exception for auth problem (403?)
320
321 # Attribute error is generated when urllib3 on the system is old and doesn't have a
322 # read_chunked method
323 except AttributeError as e:
324 self.health = ("Error: Unable to 'watch' events API in namespace '{}' - "
325 "urllib3 too old? ({})".format(self.namespace, e))
326 self.active = False
327 log.warning(self.health)
328 break
329
330 except ApiException as e:
331 # refresh the resource_version & watcher
332 log.warning("API exception caught in watcher ({})".format(e))
333 log.warning("Restarting namespace watcher")
334 self.fetch()
335
336 except Exception:
337 self.health = "{} Exception at {}".format(
338 sys.exc_info()[0].__name__,
339 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
340 )
341 log.exception(self.health)
342 self.active = False
343 break
344
345 log.warning("Namespace event watcher stopped")
346
347
348class KubernetesEvent(object):
349
350 def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None):
351
352 if api_client_config:
353 self.api = client.CoreV1Api(api_client_config)
354 else:
355 self.api = client.CoreV1Api()
356
357 self.namespace = namespace
358
359 self.event_name = log_entry.event_name
360 self.message = log_entry.event_msg
361 self.event_type = log_entry.event_type
362 self.event_reason = log_entry.event_reason
363 self.unique_name = unique_name
364
365 self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN'))
366
367 self.api_status = 200
368 self.count = 1
369 self.first_timestamp = None
370 self.last_timestamp = None
371
372 @property
373 def type(self):
374 """provide a type property matching a V1Event object"""
375 return self.event_type
376
377 @property
378 def event_body(self):
379 if self.unique_name:
380 obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name))
381 else:
382 obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name))
383
384 # field_path is needed to prevent problems in the namespacewatcher when
385 # deleted event are received
386 obj_ref = client.V1ObjectReference(kind="CephCluster",
387 field_path='spec.containers{mgr}',
388 name=self.event_name,
389 namespace=self.namespace)
390
391 event_source = client.V1EventSource(component="ceph-mgr",
392 host=self.host)
393 return client.V1Event(
394 involved_object=obj_ref,
395 metadata=obj_meta,
396 message=self.message,
397 count=self.count,
398 type=self.event_type,
399 reason=self.event_reason,
400 source=event_source,
401 first_timestamp=self.first_timestamp,
402 last_timestamp=self.last_timestamp
403 )
404
405 def write(self):
406
407 now=datetime.now(UTC())
408
409 self.first_timestamp = now
410 self.last_timestamp = now
411
412 try:
413 self.api.create_namespaced_event(self.namespace, self.event_body)
414 except (OSError, ProtocolError):
415 # unable to reach to the API server
416 log.error("Unable to reach API server")
417 self.api_status = 400
418 except MaxRetryError:
419 # k8s config has not be defined properly
420 log.error("multiple attempts to connect to the API have failed")
421 self.api_status = 403 # Forbidden
422 except ApiException as e:
423 log.debug("event.write status:{}".format(e.status))
424 self.api_status = e.status
425 if e.status == 409:
426 log.debug("attempting event update for an existing event")
427 # 409 means the event is there already, so read it back (v1Event object returned)
428 # this could happen if the event has been created, and then the k8sevent module
429 # disabled and reenabled - i.e. the internal event tracking no longer matches k8s
430 response = self.api.read_namespaced_event(self.event_name, self.namespace)
431 #
432 # response looks like
433 #
434 # {'action': None,
435 # 'api_version': 'v1',
436 # 'count': 1,
437 # 'event_time': None,
438 # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
439 # 'involved_object': {'api_version': None,
440 # 'field_path': None,
441 # 'kind': 'CephCluster',
442 # 'name': 'ceph-mgr.k8sevent-module',
443 # 'namespace': 'rook-ceph',
444 # 'resource_version': None,
445 # 'uid': None},
446 # 'kind': 'Event',
447 # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
448 # 'message': 'Ceph log -> event tracking started',
449 # 'metadata': {'annotations': None,
450 # 'cluster_name': None,
451 # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
452 # 'deletion_grace_period_seconds': None,
453 # 'deletion_timestamp': None,
454 # 'finalizers': None,
455 # 'generate_name': 'ceph-mgr.k8sevent-module',
456 # 'generation': None,
457 # 'initializers': None,
458 # 'labels': None,
459 # 'name': 'ceph-mgr.k8sevent-module5z7kq',
460 # 'namespace': 'rook-ceph',
461 # 'owner_references': None,
462 # 'resource_version': '1195832',
463 # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq',
464 # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'},
465 # 'reason': 'Started',
466 # 'related': None,
467 # 'reporting_component': '',
468 # 'reporting_instance': '',
469 # 'series': None,
470 # 'source': {'component': 'ceph-mgr', 'host': 'minikube'},
471 # 'type': 'Normal'}
472
473 # conflict event already exists
474 # read it
475 # update : count and last_timestamp and msg
476
477 self.count = response.count + 1
478 self.first_timestamp = response.first_timestamp
479 try:
480 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
481 except ApiException as e:
482 log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status))
483 self.api_status = e.status
484 else:
485 log.debug("event {} patched".format(self.event_name))
486 self.api_status = 200
487
488 else:
489 log.debug("event {} created successfully".format(self.event_name))
490 self.api_status = 200
491
492 @property
493 def api_success(self):
494 return self.api_status == 200
495
496 def update(self, log_entry):
497 self.message = log_entry.event_msg
498 self.event_type = log_entry.event_type
499 self.last_timestamp = datetime.now(UTC())
500 self.count += 1
501 log.debug("performing event update for {}".format(self.event_name))
502
503 try:
504 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
505 except ApiException as e:
506 log.error("event patch call failed: {}".format(e.status))
507 if e.status == 404:
508 # tried to patch, but hit a 404. The event's TTL must have been reached, and
509 # pruned by the kube-apiserver
510 log.debug("event not found, so attempting to create it")
511 try:
512 self.api.create_namespaced_event(self.namespace, self.event_body)
513 except ApiException as e:
514 log.error("unable to create the event: {}".format(e.status))
515 self.api_status = e.status
516 else:
517 log.debug("event {} created successfully".format(self.event_name))
518 self.api_status = 200
519 else:
520 log.debug("event {} updated".format(self.event_name))
521 self.api_status = 200
522
523
524class EventProcessor(BaseThread):
525 """Handle a global queue used to track events we want to send/update to kubernetes"""
526
527 can_run = True
528
529 def __init__(self, config_watcher, event_retention_days, api_client_config, namespace):
530 super(EventProcessor, self).__init__()
531
532 self.events = dict()
533 self.config_watcher = config_watcher
534 self.event_retention_days = event_retention_days
535 self.api_client_config = api_client_config
536 self.namespace = namespace
537
538 def startup(self):
539 """Log an event to show we're active"""
540
541 event = KubernetesEvent(
542 LogEntry(
543 source='self',
544 msg='Ceph log -> event tracking started',
545 msg_type='startup',
546 level='INF',
547 tstamp=None
548 ),
549 unique_name=False,
550 api_client_config=self.api_client_config,
551 namespace=self.namespace
552 )
553
554 event.write()
555 return event.api_success
556
557 @property
558 def ok(self):
559 return self.startup()
560
561 def prune_events(self):
562 log.debug("prune_events - looking for old events to remove from cache")
563 oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days)
564 local_events = dict(self.events)
565
566 for event_name in sorted(local_events,
567 key = lambda name: local_events[name].last_timestamp):
568 event = local_events[event_name]
569 if event.last_timestamp >= oldest:
570 break
571 else:
572 # drop this event
573 log.debug("prune_events - removing old event : {}".format(event_name))
574 del self.events[event_name]
575
576 def process(self, log_object):
577
578 log.debug("log entry being processed : {}".format(str(log_object)))
579
580 event_out = False
581 unique_name = True
582
583 if log_object.msg_type == 'audit':
584 # audit traffic : operator commands
585 if log_object.msg.endswith('finished'):
586 log.debug("K8sevents received command finished msg")
587 event_out = True
588 else:
589 # NO OP - ignoring 'dispatch' log records
590 return
591
592 elif log_object.msg_type == 'cluster':
593 # cluster messages : health checks
594 if log_object.event_name:
595 event_out = True
596
597 elif log_object.msg_type == 'config':
598 # configuration checker messages
599 event_out = True
600 unique_name = False
601
602 elif log_object.msg_type == 'heartbeat':
603 # hourly health message summary from Ceph
604 event_out = True
605 unique_name = False
606 log_object.msg = str(self.config_watcher)
607
608 else:
609 log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type))
610
611 if event_out:
612 log.debug("k8sevents sending event to kubernetes")
613 # we don't cache non-unique events like heartbeats or config changes
614 if not unique_name or log_object.event_name not in self.events.keys():
615 event = KubernetesEvent(log_entry=log_object,
616 unique_name=unique_name,
617 api_client_config=self.api_client_config,
618 namespace=self.namespace)
619 event.write()
620 log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status))
621 if event.api_success and unique_name:
622 self.events[log_object.event_name] = event
623 else:
624 event = self.events[log_object.event_name]
625 event.update(log_object)
626 log.debug("event update ended : {}".format(event.api_status))
627
628 self.prune_events()
629
630 else:
631 log.debug("K8sevents ignored message : {}".format(log_object.msg))
632
633 def run(self):
634 log.info("Ceph event processing thread started, "
635 "event retention set to {} days".format(self.event_retention_days))
636
637 while True:
638
639 try:
640 log_object = event_queue.get(block=False)
641 except queue.Empty:
642 pass
643 else:
644 try:
645 self.process(log_object)
646 except Exception:
647 self.health = "{} Exception at {}".format(
648 sys.exc_info()[0].__name__,
649 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
650 )
651 log.exception(self.health)
652 break
653
654 if not self.can_run:
655 break
656
657 time.sleep(0.5)
658
659 log.warning("Ceph event processing thread stopped")
660
661
662class ListDiff(object):
663 def __init__(self, before, after):
664 self.before = set(before)
665 self.after = set(after)
666
667 @property
668 def removed(self):
669 return list(self.before - self.after)
670
671 @property
672 def added(self):
673 return list(self.after - self.before)
674
675 @property
676 def is_equal(self):
677 return self.before == self.after
678
679
680class CephConfigWatcher(BaseThread):
681 """Detect configuration changes within the cluster and generate human readable events"""
682
683 def __init__(self, mgr):
684 super(CephConfigWatcher, self).__init__()
685 self.mgr = mgr
686 self.server_map = dict()
687 self.osd_map = dict()
688 self.pool_map = dict()
689 self.service_map = dict()
690
691 self.config_check_secs = mgr.config_check_secs
692
693 @property
694 def raw_capacity(self):
695 # Note. if the osd's are not online the capacity field will be 0
696 return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map])
697
698 @property
699 def num_servers(self):
700 return len(self.server_map.keys())
701
702 @property
703 def num_osds(self):
704 return len(self.osd_map.keys())
705
706 @property
707 def num_pools(self):
708 return len(self.pool_map.keys())
709
710 def __str__(self):
711 s = ''
712
713 s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format(
714 json.loads(self.mgr.get('health')['json'])['status'],
715 self.num_servers,
716 text_suffix(self.num_servers),
717 self.num_pools,
718 text_suffix(self.num_pools),
719 self.num_osds,
720 MgrModule.to_pretty_iec(self.raw_capacity))
721 return s
722
723 def fetch_servers(self):
724 """Return a server summary, and service summary"""
725 servers = self.mgr.list_servers()
726 server_map = dict() # host -> services
727 service_map = dict() # service -> host
728 for server_info in servers:
729 services = dict()
730 for svc in server_info['services']:
731 if svc.get('type') in services.keys():
732 services[svc.get('type')].append(svc.get('id'))
733 else:
734 services[svc.get('type')] = list([svc.get('id')])
735 # maintain the service xref map service -> host and version
736 service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '')
737 server_map[server_info.get('hostname')] = services
738
739 return server_map, service_map
740
741 def fetch_pools(self):
742 interesting = ["type", "size", "min_size"]
743 # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool',
744 # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn',
745 # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100,
746 # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0',
747 # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0,
748 # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1,
749 # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0,
750 # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000,
751 # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0,
752 # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0,
753 # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0,
754 # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0,
755 # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}},
756 # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0',
757 # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}]
758 pools = self.mgr.get('osd_map')['pools']
759 pool_map = dict()
760 for pool in pools:
761 pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting}
762 return pool_map
763
764
765 def fetch_osd_map(self, service_map):
766 """Create an osd map"""
767 stats = self.mgr.get('osd_stats')
768
769 osd_map = dict()
770
771 devices = self.mgr.get('osd_map_crush')['devices']
772 for dev in devices:
773 osd_id = str(dev['id'])
774 osd_map[osd_id] = dict(
775 deviceclass=dev.get('class'),
776 capacity=0,
777 hostname=service_map['osd', osd_id]
778 )
779
780 for osd_stat in stats['osd_stats']:
781 osd_id = str(osd_stat.get('osd'))
782 osd_map[osd_id]['capacity'] = osd_stat['statfs']['total']
783
784 return osd_map
785
786 def push_events(self, changes):
787 """Add config change to the global queue to generate an event in kubernetes"""
788 log.debug("{} events will be generated")
789 for change in changes:
790 event_queue.put(change)
791
792 def _generate_config_logentry(self, msg):
793 return LogEntry(
794 source="config",
795 msg_type="config",
796 msg=msg,
797 level='INF',
798 tstamp=None
799 )
800
801 def _check_hosts(self, server_map):
802 log.debug("K8sevents checking host membership")
803 changes = list()
804 servers = ListDiff(self.server_map.keys(), server_map.keys())
805 if servers.is_equal:
806 # no hosts have been added or removed
807 pass
808 else:
809 # host changes detected, find out what
810 host_msg = "Host '{}' has been {} the cluster"
811 for new_server in servers.added:
812 changes.append(self._generate_config_logentry(
813 msg=host_msg.format(new_server, 'added to'))
814 )
815
816 for removed_server in servers.removed:
817 changes.append(self._generate_config_logentry(
818 msg=host_msg.format(removed_server, 'removed from'))
819 )
820
821 return changes
822
823 def _check_osds(self,server_map, osd_map):
824 log.debug("K8sevents checking OSD configuration")
825 changes = list()
826 before_osds = list()
827 for svr in self.server_map:
828 before_osds.extend(self.server_map[svr].get('osd',[]))
829
830 after_osds = list()
831 for svr in server_map:
832 after_osds.extend(server_map[svr].get('osd',[]))
833
834 if set(before_osds) == set(after_osds):
835 # no change in osd id's
836 pass
837 else:
838 # osd changes detected
839 osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}"
840
841 osds = ListDiff(before_osds, after_osds)
842 for new_osd in osds.added:
843 changes.append(self._generate_config_logentry(
844 msg=osd_msg.format(
845 new_osd,
846 osd_map[new_osd]['deviceclass'],
847 MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']),
848 'added to',
849 osd_map[new_osd]['hostname']))
850 )
851
852 for removed_osd in osds.removed:
853 changes.append(self._generate_config_logentry(
854 msg=osd_msg.format(
855 removed_osd,
856 osd_map[removed_osd]['deviceclass'],
857 MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']),
858 'removed from',
859 osd_map[removed_osd]['hostname']))
860 )
861
862 return changes
863
864 def _check_pools(self, pool_map):
865 changes = list()
866 log.debug("K8sevents checking pool configurations")
867 if self.pool_map.keys() == pool_map.keys():
868 # no pools added/removed
869 pass
870 else:
871 # Pool changes
872 pools = ListDiff(self.pool_map.keys(), pool_map.keys())
873 pool_msg = "Pool '{}' has been {} the cluster"
874 for new_pool in pools.added:
875 changes.append(self._generate_config_logentry(
876 msg=pool_msg.format(new_pool, 'added to'))
877 )
878
879 for removed_pool in pools.removed:
880 changes.append(self._generate_config_logentry(
881 msg=pool_msg.format(removed_pool, 'removed from'))
882 )
883
884 # check pool configuration changes
885 for pool_name in pool_map:
886 if not self.pool_map.get(pool_name, dict()):
887 # pool didn't exist before so just skip the checks
888 continue
889
890 if pool_map[pool_name] == self.pool_map[pool_name]:
891 # no changes - dicts match in key and value
892 continue
893 else:
894 # determine the change and add it to the change list
895 size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size']
896 if size_diff != 0:
897 if size_diff < 0:
898 msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name,
899 pool_map[pool_name]['size'])
900 level = 'WRN'
901 else:
902 msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name,
903 pool_map[pool_name]['size'])
904 level = 'INF'
905
906 changes.append(LogEntry(source="config",
907 msg_type="config",
908 msg=msg,
909 level=level,
910 tstamp=None)
911 )
912
913 if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']:
914 changes.append(LogEntry(source="config",
915 msg_type="config",
916 msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name),
917 level='WRN',
918 tstamp=None)
919 )
920
921 return changes
922
923 def get_changes(self, server_map, osd_map, pool_map):
924 """Detect changes in maps between current observation and the last"""
925
926 changes = list()
927
928 changes.extend(self._check_hosts(server_map))
929 changes.extend(self._check_osds(server_map, osd_map))
930 changes.extend(self._check_pools(pool_map))
931
932 # FUTURE
933 # Could generate an event if a ceph daemon has moved hosts
934 # (assumes the ceph metadata host information is valid though!)
935
936 return changes
937
938 def run(self):
939 log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs))
940
941 self.server_map, self.service_map = self.fetch_servers()
942 self.pool_map = self.fetch_pools()
943
944 self.osd_map = self.fetch_osd_map(self.service_map)
945
946 while True:
947
948 try:
949 start_time = time.time()
950 server_map, service_map = self.fetch_servers()
951 pool_map = self.fetch_pools()
952 osd_map = self.fetch_osd_map(service_map)
953
954 changes = self.get_changes(server_map, osd_map, pool_map)
955 if changes:
956 self.push_events(changes)
957
958 self.osd_map = osd_map
959 self.pool_map = pool_map
960 self.server_map = server_map
961 self.service_map = service_map
962
963 checks_duration = int(time.time() - start_time)
964
965 # check that the time it took to run the checks fits within the
966 # interval, and if not extend the interval and emit a log message
967 # to show that the runtime for the checks exceeded the desired
968 # interval
969 if checks_duration > self.config_check_secs:
970 new_interval = self.config_check_secs * 2
971 log.warning("K8sevents check interval warning. "
972 "Current checks took {}s, interval was {}s. "
973 "Increasing interval to {}s".format(int(checks_duration),
974 self.config_check_secs,
975 new_interval))
976 self.config_check_secs = new_interval
977
978 time.sleep(self.config_check_secs)
979
980 except Exception:
981 self.health = "{} Exception at {}".format(
982 sys.exc_info()[0].__name__,
983 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
984 )
985 log.exception(self.health)
986 break
987
988 log.warning("Ceph configuration watcher stopped")
989
990
991class Module(MgrModule):
992 COMMANDS = [
993 {
994 "cmd": "k8sevents status",
995 "desc": "Show the status of the data gathering threads",
996 "perm": "r"
997 },
998 {
999 "cmd": "k8sevents ls",
1000 "desc": "List all current Kuberenetes events from the Ceph namespace",
1001 "perm": "r"
1002 },
1003 {
1004 "cmd": "k8sevents ceph",
1005 "desc": "List Ceph events tracked & sent to the kubernetes cluster",
1006 "perm": "r"
1007 },
1008 {
1009 "cmd": "k8sevents set-access name=key,type=CephString",
1010 "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt",
1011 "perm": "rw"
1012 },
1013 {
1014 "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString",
1015 "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433",
1016 "perm": "rw"
1017 },
1018 {
1019 "cmd": "k8sevents clear-config",
1020 "desc": "Clear external kubernetes configuration settings",
1021 "perm": "rw"
1022 },
1023 ]
1024 MODULE_OPTIONS = [
1025 {'name': 'config_check_secs',
1026 'type': 'int',
1027 'default': 10,
1028 'min': 10,
1029 'desc': "interval (secs) to check for cluster configuration changes"},
1030 {'name': 'ceph_event_retention_days',
1031 'type': 'int',
1032 'default': 7,
1033 'desc': "Days to hold ceph event information within local cache"}
1034 ]
1035
1036 def __init__(self, *args, **kwargs):
1037 self.run = True
1038 self.kubernetes_control = 'POD_NAME' in os.environ
1039 self.event_processor = None
1040 self.config_watcher = None
1041 self.ns_watcher = None
1042 self.trackers = list()
1043 self.error_msg = None
1044 self._api_client_config = None
1045 self._namespace = None
1046
1047 # Declare the module options we accept
1048 self.config_check_secs = None
1049 self.ceph_event_retention_days = None
1050
1051 self.k8s_config = dict(
1052 cacrt = None,
1053 token = None,
1054 server = None,
1055 namespace = None
1056 )
1057
1058 super(Module, self).__init__(*args, **kwargs)
1059
1060 def k8s_ready(self):
1061 """Validate the k8s_config dict
1062
1063 Returns:
1064 - bool .... indicating whether the config is ready to use
1065 - string .. variables that need to be defined before the module will function
1066
1067 """
1068 missing = list()
1069 ready = True
1070 for k in self.k8s_config:
1071 if not self.k8s_config[k]:
1072 missing.append(k)
1073 ready = False
1074 return ready, missing
1075
1076 def config_notify(self):
1077 """Apply runtime module options, and defaults from the modules KV store"""
1078 self.log.debug("applying runtime module option settings")
1079 for opt in self.MODULE_OPTIONS:
1080 setattr(self,
1081 opt['name'],
9f95a23c 1082 self.get_module_option(opt['name']))
eafe8130
TL
1083
1084 if not self.kubernetes_control:
1085 # Populate the config
1086 self.log.debug("loading config from KV store")
1087 for k in self.k8s_config:
1088 self.k8s_config[k] = self.get_store(k, default=None)
1089
1090 def fetch_events(self, limit=None):
1091 """Interface to expose current events to another mgr module"""
1092 # FUTURE: Implement this to provide k8s events to the dashboard?
1093 raise NotImplementedError
1094
1095 def process_clog(self, log_message):
1096 """Add log message to the event queue
1097
1098 :param log_message: dict from the cluster log (audit/cluster channels)
1099 """
1100 required_fields = ['channel', 'message', 'priority', 'stamp']
1101 _message_attrs = log_message.keys()
1102 if all(_field in _message_attrs for _field in required_fields):
1103 self.log.debug("clog entry received - adding to the queue")
1104 if log_message.get('message').startswith('overall HEALTH'):
1105 m_type = 'heartbeat'
1106 else:
1107 m_type = log_message.get('channel')
1108
1109 event_queue.put(
1110 LogEntry(
1111 source='log',
1112 msg_type=m_type,
1113 msg=log_message.get('message'),
1114 level=log_message.get('priority')[1:-1],
1115 tstamp=log_message.get('stamp')
1116 )
1117 )
1118
1119 else:
1120 self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
1121
1122 def notify(self, notify_type, notify_id):
1123 """
1124 Called by the ceph-mgr service to notify the Python plugin
1125 that new state is available.
1126
1127 :param notify_type: string indicating what kind of notification,
1128 such as osd_map, mon_map, fs_map, mon_status,
1129 health, pg_summary, command, service_map
1130 :param notify_id: string (may be empty) that optionally specifies
1131 which entity is being notified about. With
1132 "command" notifications this is set to the tag
1133 ``from send_command``.
1134 """
1135
1136 # only interested in cluster log (clog) messages for now
1137 if notify_type == 'clog':
1138 self.log.debug("received a clog entry from mgr.notify")
1139 if isinstance(notify_id, dict):
1140 # create a log object to process
1141 self.process_clog(notify_id)
1142 else:
1143 self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type)))
1144
1145 def _show_events(self, events):
1146
1147 max_msg_length = max([len(events[k].message) for k in events])
1148 fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n"
1149 s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name")
1150
1151 for event_name in sorted(events,
1152 key = lambda name: events[name].last_timestamp,
1153 reverse=True):
1154
1155 event = events[event_name]
1156
1157 s += fmt.format(
1158 datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"),
1159 event.type,
1160 event.count,
1161 event.message,
1162 event_name
1163 )
1164 s += "Total : {:>3}\n".format(len(events))
1165 return s
1166
1167 def show_events(self, events):
1168 """Show events we're holding from the ceph namespace - most recent 1st"""
1169
1170 if len(events):
1171 return 0, "", self._show_events(events)
1172 else:
1173 return 0, "", "No events emitted yet, local cache is empty"
1174
1175 def show_status(self):
1176 s = "Kubernetes\n"
1177 s += "- Hostname : {}\n".format(self.k8s_config['server'])
1178 s += "- Namespace : {}\n".format(self._namespace)
1179 s += "Tracker Health\n"
1180 for t in self.trackers:
1181 s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health)
1182 s += "Tracked Events\n"
1183 s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events))
1184 s += "- ceph events : {:>3}\n".format(len(self.event_processor.events))
1185 return 0, "", s
1186
1187 def _valid_server(self, server):
1188 # must be a valid server url format
1189 server = server.strip()
1190
1191 res = urlparse(server)
1192 port = res.netloc.split(":")[-1]
1193
1194 if res.scheme != 'https':
1195 return False, "Server URL must use https"
1196
1197 elif not res.hostname:
1198 return False, "Invalid server URL format"
1199
1200 elif res.hostname:
1201 try:
1202 socket.gethostbyname(res.hostname)
1203 except socket.gaierror:
1204 return False, "Unresolvable server URL"
1205
1206 if not port.isdigit():
1207 return False, "Server URL must end in a port number"
1208
1209 return True, ""
1210
1211 def _valid_cacrt(self, cacrt_data):
1212 """use mgr_util.verify_cacrt to validate the CA file"""
1213
1214 cacrt_fname = create_temp_file("ca_file", cacrt_data)
1215
1216 try:
1217 verify_cacrt(cacrt_fname)
1218 except ServerConfigException as e:
1219 return False, "Invalid CA certificate: {}".format(str(e))
1220 else:
1221 return True, ""
1222
1223 def _valid_token(self, token_data):
1224 """basic checks on the token"""
1225 if not token_data:
1226 return False, "Token file is empty"
1227
1228 pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$")
1229 if not pattern.match(token_data):
1230 return False, "Token contains invalid characters"
1231
1232 return True, ""
1233
1234 def _valid_namespace(self, namespace):
1235 # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols
1236
1237 if len(namespace) > 253:
1238 return False, "Name too long"
1239 if namespace.isdigit():
1240 return False, "Invalid name - must be alphanumeric"
1241
1242 pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$")
1243 if not pattern.match(namespace):
1244 return False, "Invalid characters in the name"
1245
1246 return True, ""
1247
1248 def _config_set(self, key, val):
1249 """Attempt to validate the content, then save to KV store"""
1250
1251 val = val.rstrip() # remove any trailing whitespace/newline
1252
1253 try:
1254 checker = getattr(self, "_valid_" + key)
1255 except AttributeError:
1256 # no checker available, just let it pass
1257 self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key))
1258 valid = True
1259 else:
1260 valid, reason = checker(val)
1261
1262 if valid:
1263 self.set_store(key, val)
1264 self.log.info("Updated config KV Store item: " + key)
1265 return 0, "", "Config updated for parameter '{}'".format(key)
1266 else:
1267 return -22, "", "Invalid value for '{}' :{}".format(key, reason)
1268
1269 def clear_config_settings(self):
1270 for k in self.k8s_config:
1271 self.set_store(k, None)
1272 return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys()))
1273
1274 def handle_command(self, inbuf, cmd):
1275
1276 access_options = ['cacrt', 'token']
1277 config_options = ['server', 'namespace']
1278
1279 if cmd['prefix'] == 'k8sevents clear-config':
1280 return self.clear_config_settings()
1281
1282 if cmd['prefix'] == 'k8sevents set-access':
1283 if cmd['key'] not in access_options:
1284 return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options))
1285
1286 if inbuf:
1287 return self._config_set(cmd['key'], inbuf)
1288 else:
1289 return -errno.EINVAL, "", "Command must specify -i <filename>"
1290
1291 if cmd['prefix'] == 'k8sevents set-config':
1292
1293 if cmd['key'] not in config_options:
1294 return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options))
1295
1296 return self._config_set(cmd['key'], cmd['value'])
1297
1298 # At this point the command is trying to interact with k8sevents, so intercept if the configuration is
1299 # not ready
1300 if self.error_msg:
1301 _msg = "k8sevents unavailable: " + self.error_msg
1302 ready, _ = self.k8s_ready()
1303 if not self.kubernetes_control and not ready:
1304 _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect"
1305 return -errno.ENODATA, "", _msg
1306
1307 if cmd["prefix"] == "k8sevents status":
1308 return self.show_status()
1309
1310 elif cmd["prefix"] == "k8sevents ls":
1311 return self.show_events(self.ns_watcher.events)
1312
1313 elif cmd["prefix"] == "k8sevents ceph":
1314 return self.show_events(self.event_processor.events)
1315
1316 else:
1317 raise NotImplementedError(cmd["prefix"])
1318
1319 @staticmethod
1320 def can_run():
1321 """Determine whether the pre-reqs for the module are in place"""
1322
1323 if not kubernetes_imported:
1324 return False, "kubernetes python client is not available"
1325 return True, ""
1326
1327 def load_kubernetes_config(self):
1328 """Load configuration for remote kubernetes API using KV store values
1329
1330 Attempt to create an API client configuration from settings stored in
1331 KV store.
1332
1333 Returns:
1334 client.ApiClient: kubernetes API client object
1335
1336 Raises:
1337 OSError: unable to create the cacrt file
1338 """
1339
1340 # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a
1341 # temporary file containing the cert for the client to load from
1342 try:
1343 ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt'])
1344 except OSError as e:
1345 self.log.error("Unable to create file to hold cacrt: {}".format(str(e)))
1346 raise OSError(str(e))
1347 else:
1348 self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file))
1349
1350 configuration = client.Configuration()
1351 configuration.host = self.k8s_config['server']
1352 configuration.ssl_ca_cert = ca_crt_file
1353 configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] }
1354 api_client = client.ApiClient(configuration)
1355 self.log.info("API client created for remote kubernetes access using cacrt and token from KV store")
1356
1357 return api_client
1358
1359 def serve(self):
1360 # apply options set by CLI to this module
1361 self.config_notify()
1362
1363 if not kubernetes_imported:
1364 self.error_msg = "Unable to start : python kubernetes package is missing"
1365 else:
1366 if self.kubernetes_control:
1367 # running under rook-ceph
1368 config.load_incluster_config()
1369 self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'),
1370 os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN'))
1371 self._api_client_config = None
1372 self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph")
1373 else:
1374 # running outside of rook-ceph, so we need additional settings to tell us
1375 # how to connect to the kubernetes cluster
1376 ready, errors = self.k8s_ready()
1377 if not ready:
1378 self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors))
1379 else:
1380 try:
1381 self._api_client_config = self.load_kubernetes_config()
1382 except OSError as e:
1383 self.error_msg = str(e)
1384 else:
1385 self._namespace = self.k8s_config['namespace']
1386 self.log.info("k8sevents configuration loaded from KV store")
1387
1388 if self.error_msg:
1389 self.log.error(self.error_msg)
1390 return
1391
1392 # All checks have passed
1393 self.config_watcher = CephConfigWatcher(self)
1394
1395 self.event_processor = EventProcessor(self.config_watcher,
1396 self.ceph_event_retention_days,
1397 self._api_client_config,
1398 self._namespace)
1399
1400 self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config,
1401 namespace=self._namespace)
1402
1403 if self.event_processor.ok:
1404 log.info("Ceph Log processor thread starting")
1405 self.event_processor.start() # start log consumer thread
1406 log.info("Ceph config watcher thread starting")
1407 self.config_watcher.start()
1408 log.info("Rook-ceph namespace events watcher starting")
1409 self.ns_watcher.start()
1410
1411 self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher])
1412
1413 while True:
1414 # stay alive
1415 time.sleep(1)
1416
1417 trackers = self.trackers
1418 for t in trackers:
1419 if not t.is_alive() and not t.reported:
1420 log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health))
1421 t.reported = True
1422
1423 else:
1424 self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?"
1425 log.warning(self.error_msg)
1426 log.warning("k8sevents module exiting")
1427 self.run = False
1428
1429 def shutdown(self):
1430 self.run = False
1431 log.info("Shutting down k8sevents module")
1432 self.event_processor.can_run = False
1433
1434 if self._rados:
1435 self._rados.shutdown()