]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | # Integrate with the kubernetes events API. |
2 | # This module sends events to Kubernetes, and also captures/tracks all events | |
3 | # in the rook-ceph namespace so kubernetes activity like pod restarts, | |
4 | # imagepulls etc can be seen from within the ceph cluster itself. | |
5 | # | |
6 | # To interact with the events API, the mgr service to access needs to be | |
7 | # granted additional permissions | |
8 | # e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules | |
9 | # | |
10 | # These are the changes needed; | |
11 | # - apiGroups: | |
12 | # - "" | |
13 | # resources: | |
14 | # - events | |
15 | # verbs: | |
16 | # - create | |
17 | # - patch | |
18 | # - list | |
19 | # - get | |
20 | # - watch | |
21 | ||
22 | ||
23 | import os | |
24 | import re | |
25 | import sys | |
26 | import time | |
27 | import json | |
28 | import yaml | |
29 | import errno | |
30 | import socket | |
31 | import base64 | |
32 | import logging | |
33 | import tempfile | |
34 | import threading | |
35 | ||
f67539c2 | 36 | from urllib.parse import urlparse |
eafe8130 TL |
37 | from datetime import tzinfo, datetime, timedelta |
38 | ||
39 | from urllib3.exceptions import MaxRetryError,ProtocolError | |
40 | from collections import OrderedDict | |
41 | ||
42 | import rados | |
43 | from mgr_module import MgrModule | |
44 | from mgr_util import verify_cacrt, ServerConfigException | |
45 | ||
46 | try: | |
47 | import queue | |
48 | except ImportError: | |
49 | # python 2.7.5 | |
50 | import Queue as queue | |
51 | finally: | |
52 | # python 2.7.15 or python3 | |
53 | event_queue = queue.Queue() | |
54 | ||
55 | try: | |
56 | from kubernetes import client, config, watch | |
57 | from kubernetes.client.rest import ApiException | |
58 | except ImportError: | |
59 | kubernetes_imported = False | |
60 | client = None | |
61 | config = None | |
62 | watch = None | |
63 | else: | |
64 | kubernetes_imported = True | |
65 | ||
66 | # The watch.Watch.stream method can provide event objects that have involved_object = None | |
67 | # which causes an exception in the generator. A workaround is discussed for a similar issue | |
68 | # in https://github.com/kubernetes-client/python/issues/376 which has been used here | |
69 | # pylint: disable=no-member | |
70 | from kubernetes.client.models.v1_event import V1Event | |
71 | def local_involved_object(self, involved_object): | |
72 | if involved_object is None: | |
73 | involved_object = client.V1ObjectReference(api_version="1") | |
74 | self._involved_object = involved_object | |
75 | V1Event.involved_object = V1Event.involved_object.setter(local_involved_object) | |
76 | ||
77 | log = logging.getLogger(__name__) | |
78 | ||
79 | # use a simple local class to represent UTC | |
80 | # datetime pkg modules vary between python2 and 3 and pytz is not available on older | |
81 | # ceph container images, so taking a pragmatic approach! | |
82 | class UTC(tzinfo): | |
83 | def utcoffset(self, dt): | |
84 | return timedelta(0) | |
85 | ||
86 | def tzname(self, dt): | |
87 | return "UTC" | |
88 | ||
89 | def dst(self, dt): | |
90 | return timedelta(0) | |
91 | ||
92 | ||
93 | def text_suffix(num): | |
94 | """Define a text suffix based on a value i.e. turn host into hosts""" | |
95 | return '' if num == 1 else 's' | |
96 | ||
97 | ||
98 | def create_temp_file(fname, content, suffix=".tmp"): | |
99 | """Create a temp file | |
100 | ||
101 | Attempt to create an temporary file containing the given content | |
102 | ||
103 | Returns: | |
104 | str .. full path to the temporary file | |
105 | ||
106 | Raises: | |
107 | OSError: problems creating the file | |
108 | ||
109 | """ | |
110 | ||
111 | if content is not None: | |
112 | file_name = os.path.join(tempfile.gettempdir(), fname + suffix) | |
113 | ||
114 | try: | |
115 | with open(file_name, "w") as f: | |
116 | f.write(content) | |
117 | except OSError as e: | |
118 | raise OSError("Unable to create temporary file : {}".format(str(e))) | |
119 | ||
120 | return file_name | |
121 | ||
122 | ||
123 | class HealthCheck(object): | |
124 | """Transform a healthcheck msg into it's component parts""" | |
125 | ||
126 | def __init__(self, msg, msg_level): | |
127 | ||
128 | # msg looks like | |
129 | # | |
130 | # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY) | |
131 | # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set) | |
132 | # Health check failed: nodown flag(s) set (OSDMAP_FLAGS) | |
133 | # | |
134 | self.msg = None | |
135 | self.name = None | |
136 | self.text = None | |
137 | self.valid = False | |
138 | ||
139 | if msg.lower().startswith('health check'): | |
140 | ||
141 | self.valid = True | |
142 | self.msg = msg | |
143 | msg_tokens = self.msg.split() | |
144 | ||
145 | if msg_level == 'INF': | |
146 | self.text = ' '.join(msg_tokens[3:]) | |
147 | self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS | |
148 | else: # WRN or ERR | |
149 | self.text = ' '.join(msg_tokens[3:-1]) | |
150 | self.name = msg_tokens[-1][1:-1] | |
151 | ||
152 | ||
153 | class LogEntry(object): | |
154 | """Generic 'log' object""" | |
155 | ||
156 | reason_map = { | |
157 | "audit": "Audit", | |
158 | "cluster": "HealthCheck", | |
159 | "config": "ClusterChange", | |
160 | "heartbeat":"Heartbeat", | |
161 | "startup": "Started" | |
162 | } | |
163 | ||
164 | def __init__(self, source, msg, msg_type, level, tstamp=None): | |
165 | ||
166 | self.source = source | |
167 | self.msg = msg | |
168 | self.msg_type = msg_type | |
169 | self.level = level | |
170 | self.tstamp = tstamp | |
171 | self.healthcheck = None | |
172 | ||
173 | if 'health check ' in self.msg.lower(): | |
174 | self.healthcheck = HealthCheck(self.msg, self.level) | |
175 | ||
176 | ||
177 | def __str__(self): | |
178 | return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source, | |
179 | self.msg_type, | |
180 | self.msg, | |
181 | self.level, | |
182 | self.tstamp) | |
183 | ||
184 | @property | |
185 | def cmd(self): | |
186 | """Look at the msg string and extract the command content""" | |
187 | ||
188 | # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished' | |
189 | if self.msg_type != 'audit': | |
190 | return None | |
191 | else: | |
192 | _m=self.msg[:-10].replace("\'","").split("cmd=") | |
193 | _s='"cmd":{}'.format(_m[1]) | |
194 | cmds_list = json.loads('{' + _s + '}')['cmd'] | |
195 | ||
196 | # TODO. Assuming only one command was issued for now | |
197 | _c = cmds_list[0] | |
198 | return "{} {}".format(_c['prefix'], _c.get('key', '')) | |
199 | ||
200 | @property | |
201 | def event_type(self): | |
202 | return 'Normal' if self.level == 'INF' else 'Warning' | |
203 | ||
204 | @property | |
205 | def event_reason(self): | |
206 | return self.reason_map[self.msg_type] | |
207 | ||
208 | @property | |
209 | def event_name(self): | |
210 | if self.msg_type == 'heartbeat': | |
211 | return 'mgr.Heartbeat' | |
212 | elif self.healthcheck: | |
213 | return 'mgr.health.{}'.format(self.healthcheck.name) | |
214 | elif self.msg_type == 'audit': | |
215 | return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_') | |
216 | elif self.msg_type == 'config': | |
217 | return 'mgr.ConfigurationChange' | |
218 | elif self.msg_type == 'startup': | |
219 | return "mgr.k8sevents-module" | |
220 | else: | |
221 | return None | |
222 | ||
223 | @property | |
224 | def event_entity(self): | |
225 | if self.msg_type == 'audit': | |
226 | return self.msg.replace("\'","").split('entity=')[1].split(' ')[0] | |
227 | else: | |
228 | return None | |
229 | ||
230 | @property | |
231 | def event_msg(self): | |
232 | if self.msg_type == 'audit': | |
233 | return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd) | |
234 | ||
235 | elif self.healthcheck: | |
236 | return self.healthcheck.text | |
237 | else: | |
238 | return self.msg | |
239 | ||
240 | ||
241 | class BaseThread(threading.Thread): | |
242 | health = 'OK' | |
243 | reported = False | |
244 | daemon = True | |
245 | ||
246 | ||
f6b5b4d7 TL |
247 | def clean_event(event): |
248 | """ clean an event record """ | |
249 | if not event.first_timestamp: | |
250 | log.error("first_timestamp is empty") | |
251 | if event.metadata.creation_timestamp: | |
252 | log.error("setting first_timestamp to the creation timestamp") | |
253 | event.first_timestamp = event.metadata.creation_timestamp | |
254 | else: | |
255 | log.error("defaulting event first timestamp to current datetime") | |
256 | event.first_timestamp = datetime.datetime.now() | |
257 | ||
258 | if not event.last_timestamp: | |
259 | log.error("setting event last timestamp to {}".format(event.first_timestamp)) | |
260 | event.last_timestamp = event.first_timestamp | |
261 | ||
262 | if not event.count: | |
263 | event.count = 1 | |
264 | ||
265 | return event | |
266 | ||
267 | ||
eafe8130 TL |
268 | class NamespaceWatcher(BaseThread): |
269 | """Watch events in a given namespace | |
270 | ||
271 | Using the watch package we can listen to event traffic in the namespace to | |
272 | get an idea of what kubernetes related events surround the ceph cluster. The | |
273 | thing to bear in mind is that events have a TTL enforced by the kube-apiserver | |
274 | so this stream will only really show activity inside this retention window. | |
275 | """ | |
276 | ||
277 | def __init__(self, api_client_config, namespace=None): | |
278 | super(NamespaceWatcher, self).__init__() | |
279 | ||
280 | if api_client_config: | |
281 | self.api = client.CoreV1Api(api_client_config) | |
282 | else: | |
283 | self.api = client.CoreV1Api() | |
284 | ||
285 | self.namespace = namespace | |
286 | ||
287 | self.events = OrderedDict() | |
288 | self.lock = threading.Lock() | |
289 | self.active = None | |
290 | self.resource_version = None | |
291 | ||
292 | def fetch(self): | |
293 | # clear the cache on every call to fetch | |
294 | self.events.clear() | |
295 | try: | |
296 | resp = self.api.list_namespaced_event(self.namespace) | |
297 | # TODO - Perhaps test for auth problem to be more specific in the except clause? | |
298 | except: | |
299 | self.active = False | |
300 | self.health = "Unable to access events API (list_namespaced_event call failed)" | |
301 | log.warning(self.health) | |
302 | else: | |
303 | self.active = True | |
304 | self.resource_version = resp.metadata.resource_version | |
305 | ||
306 | for item in resp.items: | |
f6b5b4d7 | 307 | self.events[item.metadata.name] = clean_event(item) |
eafe8130 TL |
308 | log.info('Added {} events'.format(len(resp.items))) |
309 | ||
310 | def run(self): | |
311 | self.fetch() | |
312 | func = getattr(self.api, "list_namespaced_event") | |
313 | ||
314 | if self.active: | |
315 | log.info("Namespace event watcher started") | |
316 | ||
317 | ||
318 | while True: | |
319 | ||
320 | try: | |
321 | w = watch.Watch() | |
322 | # execute generator to continually watch resource for changes | |
323 | for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True): | |
324 | obj = item['object'] | |
325 | ||
326 | with self.lock: | |
327 | ||
328 | if item['type'] in ['ADDED', 'MODIFIED']: | |
f6b5b4d7 | 329 | self.events[obj.metadata.name] = clean_event(obj) |
eafe8130 TL |
330 | |
331 | elif item['type'] == 'DELETED': | |
332 | del self.events[obj.metadata.name] | |
333 | ||
334 | # TODO test the exception for auth problem (403?) | |
335 | ||
336 | # Attribute error is generated when urllib3 on the system is old and doesn't have a | |
337 | # read_chunked method | |
338 | except AttributeError as e: | |
339 | self.health = ("Error: Unable to 'watch' events API in namespace '{}' - " | |
340 | "urllib3 too old? ({})".format(self.namespace, e)) | |
341 | self.active = False | |
342 | log.warning(self.health) | |
343 | break | |
344 | ||
345 | except ApiException as e: | |
346 | # refresh the resource_version & watcher | |
347 | log.warning("API exception caught in watcher ({})".format(e)) | |
348 | log.warning("Restarting namespace watcher") | |
349 | self.fetch() | |
350 | ||
f6b5b4d7 TL |
351 | except ProtocolError as e: |
352 | log.warning("Namespace watcher hit protocolerror ({}) - restarting".format(e)) | |
353 | self.fetch() | |
354 | ||
eafe8130 TL |
355 | except Exception: |
356 | self.health = "{} Exception at {}".format( | |
357 | sys.exc_info()[0].__name__, | |
358 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
359 | ) | |
360 | log.exception(self.health) | |
361 | self.active = False | |
362 | break | |
363 | ||
364 | log.warning("Namespace event watcher stopped") | |
365 | ||
366 | ||
367 | class KubernetesEvent(object): | |
368 | ||
369 | def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None): | |
370 | ||
371 | if api_client_config: | |
372 | self.api = client.CoreV1Api(api_client_config) | |
373 | else: | |
374 | self.api = client.CoreV1Api() | |
375 | ||
376 | self.namespace = namespace | |
377 | ||
378 | self.event_name = log_entry.event_name | |
379 | self.message = log_entry.event_msg | |
380 | self.event_type = log_entry.event_type | |
381 | self.event_reason = log_entry.event_reason | |
382 | self.unique_name = unique_name | |
383 | ||
384 | self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN')) | |
385 | ||
386 | self.api_status = 200 | |
387 | self.count = 1 | |
388 | self.first_timestamp = None | |
389 | self.last_timestamp = None | |
390 | ||
391 | @property | |
392 | def type(self): | |
393 | """provide a type property matching a V1Event object""" | |
394 | return self.event_type | |
395 | ||
396 | @property | |
397 | def event_body(self): | |
398 | if self.unique_name: | |
399 | obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name)) | |
400 | else: | |
401 | obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name)) | |
402 | ||
403 | # field_path is needed to prevent problems in the namespacewatcher when | |
404 | # deleted event are received | |
405 | obj_ref = client.V1ObjectReference(kind="CephCluster", | |
406 | field_path='spec.containers{mgr}', | |
407 | name=self.event_name, | |
408 | namespace=self.namespace) | |
409 | ||
410 | event_source = client.V1EventSource(component="ceph-mgr", | |
411 | host=self.host) | |
412 | return client.V1Event( | |
413 | involved_object=obj_ref, | |
414 | metadata=obj_meta, | |
415 | message=self.message, | |
416 | count=self.count, | |
417 | type=self.event_type, | |
418 | reason=self.event_reason, | |
419 | source=event_source, | |
420 | first_timestamp=self.first_timestamp, | |
421 | last_timestamp=self.last_timestamp | |
422 | ) | |
423 | ||
424 | def write(self): | |
425 | ||
426 | now=datetime.now(UTC()) | |
427 | ||
428 | self.first_timestamp = now | |
429 | self.last_timestamp = now | |
430 | ||
431 | try: | |
432 | self.api.create_namespaced_event(self.namespace, self.event_body) | |
433 | except (OSError, ProtocolError): | |
434 | # unable to reach to the API server | |
435 | log.error("Unable to reach API server") | |
436 | self.api_status = 400 | |
437 | except MaxRetryError: | |
438 | # k8s config has not be defined properly | |
439 | log.error("multiple attempts to connect to the API have failed") | |
440 | self.api_status = 403 # Forbidden | |
441 | except ApiException as e: | |
442 | log.debug("event.write status:{}".format(e.status)) | |
443 | self.api_status = e.status | |
444 | if e.status == 409: | |
445 | log.debug("attempting event update for an existing event") | |
446 | # 409 means the event is there already, so read it back (v1Event object returned) | |
447 | # this could happen if the event has been created, and then the k8sevent module | |
448 | # disabled and reenabled - i.e. the internal event tracking no longer matches k8s | |
449 | response = self.api.read_namespaced_event(self.event_name, self.namespace) | |
450 | # | |
451 | # response looks like | |
452 | # | |
453 | # {'action': None, | |
454 | # 'api_version': 'v1', | |
455 | # 'count': 1, | |
456 | # 'event_time': None, | |
457 | # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
458 | # 'involved_object': {'api_version': None, | |
459 | # 'field_path': None, | |
460 | # 'kind': 'CephCluster', | |
461 | # 'name': 'ceph-mgr.k8sevent-module', | |
462 | # 'namespace': 'rook-ceph', | |
463 | # 'resource_version': None, | |
464 | # 'uid': None}, | |
465 | # 'kind': 'Event', | |
466 | # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
467 | # 'message': 'Ceph log -> event tracking started', | |
468 | # 'metadata': {'annotations': None, | |
469 | # 'cluster_name': None, | |
470 | # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
471 | # 'deletion_grace_period_seconds': None, | |
472 | # 'deletion_timestamp': None, | |
473 | # 'finalizers': None, | |
474 | # 'generate_name': 'ceph-mgr.k8sevent-module', | |
475 | # 'generation': None, | |
476 | # 'initializers': None, | |
477 | # 'labels': None, | |
478 | # 'name': 'ceph-mgr.k8sevent-module5z7kq', | |
479 | # 'namespace': 'rook-ceph', | |
480 | # 'owner_references': None, | |
481 | # 'resource_version': '1195832', | |
482 | # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq', | |
483 | # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'}, | |
484 | # 'reason': 'Started', | |
485 | # 'related': None, | |
486 | # 'reporting_component': '', | |
487 | # 'reporting_instance': '', | |
488 | # 'series': None, | |
489 | # 'source': {'component': 'ceph-mgr', 'host': 'minikube'}, | |
490 | # 'type': 'Normal'} | |
491 | ||
492 | # conflict event already exists | |
493 | # read it | |
494 | # update : count and last_timestamp and msg | |
495 | ||
496 | self.count = response.count + 1 | |
497 | self.first_timestamp = response.first_timestamp | |
498 | try: | |
499 | self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) | |
500 | except ApiException as e: | |
501 | log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status)) | |
502 | self.api_status = e.status | |
503 | else: | |
504 | log.debug("event {} patched".format(self.event_name)) | |
505 | self.api_status = 200 | |
506 | ||
507 | else: | |
508 | log.debug("event {} created successfully".format(self.event_name)) | |
509 | self.api_status = 200 | |
510 | ||
511 | @property | |
512 | def api_success(self): | |
513 | return self.api_status == 200 | |
514 | ||
515 | def update(self, log_entry): | |
516 | self.message = log_entry.event_msg | |
517 | self.event_type = log_entry.event_type | |
518 | self.last_timestamp = datetime.now(UTC()) | |
519 | self.count += 1 | |
520 | log.debug("performing event update for {}".format(self.event_name)) | |
521 | ||
522 | try: | |
523 | self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) | |
524 | except ApiException as e: | |
525 | log.error("event patch call failed: {}".format(e.status)) | |
526 | if e.status == 404: | |
527 | # tried to patch, but hit a 404. The event's TTL must have been reached, and | |
528 | # pruned by the kube-apiserver | |
529 | log.debug("event not found, so attempting to create it") | |
530 | try: | |
531 | self.api.create_namespaced_event(self.namespace, self.event_body) | |
532 | except ApiException as e: | |
533 | log.error("unable to create the event: {}".format(e.status)) | |
534 | self.api_status = e.status | |
535 | else: | |
536 | log.debug("event {} created successfully".format(self.event_name)) | |
537 | self.api_status = 200 | |
538 | else: | |
539 | log.debug("event {} updated".format(self.event_name)) | |
540 | self.api_status = 200 | |
541 | ||
542 | ||
543 | class EventProcessor(BaseThread): | |
544 | """Handle a global queue used to track events we want to send/update to kubernetes""" | |
545 | ||
546 | can_run = True | |
547 | ||
548 | def __init__(self, config_watcher, event_retention_days, api_client_config, namespace): | |
549 | super(EventProcessor, self).__init__() | |
550 | ||
551 | self.events = dict() | |
552 | self.config_watcher = config_watcher | |
553 | self.event_retention_days = event_retention_days | |
554 | self.api_client_config = api_client_config | |
555 | self.namespace = namespace | |
556 | ||
557 | def startup(self): | |
558 | """Log an event to show we're active""" | |
559 | ||
560 | event = KubernetesEvent( | |
561 | LogEntry( | |
562 | source='self', | |
563 | msg='Ceph log -> event tracking started', | |
564 | msg_type='startup', | |
565 | level='INF', | |
566 | tstamp=None | |
567 | ), | |
568 | unique_name=False, | |
569 | api_client_config=self.api_client_config, | |
570 | namespace=self.namespace | |
571 | ) | |
572 | ||
573 | event.write() | |
574 | return event.api_success | |
575 | ||
576 | @property | |
577 | def ok(self): | |
578 | return self.startup() | |
579 | ||
580 | def prune_events(self): | |
581 | log.debug("prune_events - looking for old events to remove from cache") | |
582 | oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days) | |
583 | local_events = dict(self.events) | |
584 | ||
585 | for event_name in sorted(local_events, | |
586 | key = lambda name: local_events[name].last_timestamp): | |
587 | event = local_events[event_name] | |
588 | if event.last_timestamp >= oldest: | |
589 | break | |
590 | else: | |
591 | # drop this event | |
592 | log.debug("prune_events - removing old event : {}".format(event_name)) | |
593 | del self.events[event_name] | |
594 | ||
595 | def process(self, log_object): | |
596 | ||
597 | log.debug("log entry being processed : {}".format(str(log_object))) | |
598 | ||
599 | event_out = False | |
600 | unique_name = True | |
601 | ||
602 | if log_object.msg_type == 'audit': | |
603 | # audit traffic : operator commands | |
604 | if log_object.msg.endswith('finished'): | |
605 | log.debug("K8sevents received command finished msg") | |
606 | event_out = True | |
607 | else: | |
608 | # NO OP - ignoring 'dispatch' log records | |
609 | return | |
610 | ||
611 | elif log_object.msg_type == 'cluster': | |
612 | # cluster messages : health checks | |
613 | if log_object.event_name: | |
614 | event_out = True | |
615 | ||
616 | elif log_object.msg_type == 'config': | |
617 | # configuration checker messages | |
618 | event_out = True | |
619 | unique_name = False | |
620 | ||
621 | elif log_object.msg_type == 'heartbeat': | |
622 | # hourly health message summary from Ceph | |
623 | event_out = True | |
624 | unique_name = False | |
625 | log_object.msg = str(self.config_watcher) | |
626 | ||
627 | else: | |
628 | log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type)) | |
629 | ||
630 | if event_out: | |
631 | log.debug("k8sevents sending event to kubernetes") | |
632 | # we don't cache non-unique events like heartbeats or config changes | |
633 | if not unique_name or log_object.event_name not in self.events.keys(): | |
634 | event = KubernetesEvent(log_entry=log_object, | |
635 | unique_name=unique_name, | |
636 | api_client_config=self.api_client_config, | |
637 | namespace=self.namespace) | |
638 | event.write() | |
639 | log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status)) | |
640 | if event.api_success and unique_name: | |
641 | self.events[log_object.event_name] = event | |
642 | else: | |
643 | event = self.events[log_object.event_name] | |
644 | event.update(log_object) | |
645 | log.debug("event update ended : {}".format(event.api_status)) | |
646 | ||
647 | self.prune_events() | |
648 | ||
649 | else: | |
650 | log.debug("K8sevents ignored message : {}".format(log_object.msg)) | |
651 | ||
652 | def run(self): | |
653 | log.info("Ceph event processing thread started, " | |
654 | "event retention set to {} days".format(self.event_retention_days)) | |
655 | ||
656 | while True: | |
657 | ||
658 | try: | |
659 | log_object = event_queue.get(block=False) | |
660 | except queue.Empty: | |
661 | pass | |
662 | else: | |
663 | try: | |
664 | self.process(log_object) | |
665 | except Exception: | |
666 | self.health = "{} Exception at {}".format( | |
667 | sys.exc_info()[0].__name__, | |
668 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
669 | ) | |
670 | log.exception(self.health) | |
671 | break | |
672 | ||
673 | if not self.can_run: | |
674 | break | |
675 | ||
676 | time.sleep(0.5) | |
677 | ||
678 | log.warning("Ceph event processing thread stopped") | |
679 | ||
680 | ||
681 | class ListDiff(object): | |
682 | def __init__(self, before, after): | |
683 | self.before = set(before) | |
684 | self.after = set(after) | |
685 | ||
686 | @property | |
687 | def removed(self): | |
688 | return list(self.before - self.after) | |
689 | ||
690 | @property | |
691 | def added(self): | |
692 | return list(self.after - self.before) | |
693 | ||
694 | @property | |
695 | def is_equal(self): | |
696 | return self.before == self.after | |
697 | ||
698 | ||
699 | class CephConfigWatcher(BaseThread): | |
700 | """Detect configuration changes within the cluster and generate human readable events""" | |
701 | ||
702 | def __init__(self, mgr): | |
703 | super(CephConfigWatcher, self).__init__() | |
704 | self.mgr = mgr | |
705 | self.server_map = dict() | |
706 | self.osd_map = dict() | |
707 | self.pool_map = dict() | |
708 | self.service_map = dict() | |
709 | ||
710 | self.config_check_secs = mgr.config_check_secs | |
711 | ||
712 | @property | |
713 | def raw_capacity(self): | |
714 | # Note. if the osd's are not online the capacity field will be 0 | |
715 | return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map]) | |
716 | ||
717 | @property | |
718 | def num_servers(self): | |
719 | return len(self.server_map.keys()) | |
720 | ||
721 | @property | |
722 | def num_osds(self): | |
723 | return len(self.osd_map.keys()) | |
724 | ||
725 | @property | |
726 | def num_pools(self): | |
727 | return len(self.pool_map.keys()) | |
728 | ||
729 | def __str__(self): | |
730 | s = '' | |
731 | ||
732 | s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format( | |
733 | json.loads(self.mgr.get('health')['json'])['status'], | |
734 | self.num_servers, | |
735 | text_suffix(self.num_servers), | |
736 | self.num_pools, | |
737 | text_suffix(self.num_pools), | |
738 | self.num_osds, | |
739 | MgrModule.to_pretty_iec(self.raw_capacity)) | |
740 | return s | |
741 | ||
742 | def fetch_servers(self): | |
743 | """Return a server summary, and service summary""" | |
744 | servers = self.mgr.list_servers() | |
745 | server_map = dict() # host -> services | |
746 | service_map = dict() # service -> host | |
747 | for server_info in servers: | |
748 | services = dict() | |
749 | for svc in server_info['services']: | |
750 | if svc.get('type') in services.keys(): | |
751 | services[svc.get('type')].append(svc.get('id')) | |
752 | else: | |
753 | services[svc.get('type')] = list([svc.get('id')]) | |
754 | # maintain the service xref map service -> host and version | |
755 | service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '') | |
756 | server_map[server_info.get('hostname')] = services | |
757 | ||
758 | return server_map, service_map | |
759 | ||
760 | def fetch_pools(self): | |
761 | interesting = ["type", "size", "min_size"] | |
762 | # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool', | |
763 | # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn', | |
764 | # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100, | |
765 | # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0', | |
766 | # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0, | |
767 | # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1, | |
768 | # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0, | |
769 | # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000, | |
770 | # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0, | |
771 | # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0, | |
772 | # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0, | |
773 | # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0, | |
774 | # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}}, | |
775 | # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0', | |
776 | # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}] | |
777 | pools = self.mgr.get('osd_map')['pools'] | |
778 | pool_map = dict() | |
779 | for pool in pools: | |
780 | pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting} | |
781 | return pool_map | |
782 | ||
783 | ||
784 | def fetch_osd_map(self, service_map): | |
785 | """Create an osd map""" | |
786 | stats = self.mgr.get('osd_stats') | |
787 | ||
788 | osd_map = dict() | |
789 | ||
790 | devices = self.mgr.get('osd_map_crush')['devices'] | |
791 | for dev in devices: | |
792 | osd_id = str(dev['id']) | |
793 | osd_map[osd_id] = dict( | |
794 | deviceclass=dev.get('class'), | |
795 | capacity=0, | |
796 | hostname=service_map['osd', osd_id] | |
797 | ) | |
798 | ||
799 | for osd_stat in stats['osd_stats']: | |
800 | osd_id = str(osd_stat.get('osd')) | |
801 | osd_map[osd_id]['capacity'] = osd_stat['statfs']['total'] | |
802 | ||
803 | return osd_map | |
804 | ||
805 | def push_events(self, changes): | |
806 | """Add config change to the global queue to generate an event in kubernetes""" | |
807 | log.debug("{} events will be generated") | |
808 | for change in changes: | |
809 | event_queue.put(change) | |
810 | ||
811 | def _generate_config_logentry(self, msg): | |
812 | return LogEntry( | |
813 | source="config", | |
814 | msg_type="config", | |
815 | msg=msg, | |
816 | level='INF', | |
817 | tstamp=None | |
818 | ) | |
819 | ||
820 | def _check_hosts(self, server_map): | |
821 | log.debug("K8sevents checking host membership") | |
822 | changes = list() | |
823 | servers = ListDiff(self.server_map.keys(), server_map.keys()) | |
824 | if servers.is_equal: | |
825 | # no hosts have been added or removed | |
826 | pass | |
827 | else: | |
828 | # host changes detected, find out what | |
829 | host_msg = "Host '{}' has been {} the cluster" | |
830 | for new_server in servers.added: | |
831 | changes.append(self._generate_config_logentry( | |
832 | msg=host_msg.format(new_server, 'added to')) | |
833 | ) | |
834 | ||
835 | for removed_server in servers.removed: | |
836 | changes.append(self._generate_config_logentry( | |
837 | msg=host_msg.format(removed_server, 'removed from')) | |
838 | ) | |
839 | ||
840 | return changes | |
841 | ||
842 | def _check_osds(self,server_map, osd_map): | |
843 | log.debug("K8sevents checking OSD configuration") | |
844 | changes = list() | |
845 | before_osds = list() | |
846 | for svr in self.server_map: | |
847 | before_osds.extend(self.server_map[svr].get('osd',[])) | |
848 | ||
849 | after_osds = list() | |
850 | for svr in server_map: | |
851 | after_osds.extend(server_map[svr].get('osd',[])) | |
852 | ||
853 | if set(before_osds) == set(after_osds): | |
854 | # no change in osd id's | |
855 | pass | |
856 | else: | |
857 | # osd changes detected | |
858 | osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}" | |
859 | ||
860 | osds = ListDiff(before_osds, after_osds) | |
861 | for new_osd in osds.added: | |
862 | changes.append(self._generate_config_logentry( | |
863 | msg=osd_msg.format( | |
864 | new_osd, | |
865 | osd_map[new_osd]['deviceclass'], | |
866 | MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']), | |
867 | 'added to', | |
868 | osd_map[new_osd]['hostname'])) | |
869 | ) | |
870 | ||
871 | for removed_osd in osds.removed: | |
872 | changes.append(self._generate_config_logentry( | |
873 | msg=osd_msg.format( | |
874 | removed_osd, | |
875 | osd_map[removed_osd]['deviceclass'], | |
876 | MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']), | |
877 | 'removed from', | |
878 | osd_map[removed_osd]['hostname'])) | |
879 | ) | |
880 | ||
881 | return changes | |
882 | ||
883 | def _check_pools(self, pool_map): | |
884 | changes = list() | |
885 | log.debug("K8sevents checking pool configurations") | |
886 | if self.pool_map.keys() == pool_map.keys(): | |
887 | # no pools added/removed | |
888 | pass | |
889 | else: | |
890 | # Pool changes | |
891 | pools = ListDiff(self.pool_map.keys(), pool_map.keys()) | |
892 | pool_msg = "Pool '{}' has been {} the cluster" | |
893 | for new_pool in pools.added: | |
894 | changes.append(self._generate_config_logentry( | |
895 | msg=pool_msg.format(new_pool, 'added to')) | |
896 | ) | |
897 | ||
898 | for removed_pool in pools.removed: | |
899 | changes.append(self._generate_config_logentry( | |
900 | msg=pool_msg.format(removed_pool, 'removed from')) | |
901 | ) | |
902 | ||
903 | # check pool configuration changes | |
904 | for pool_name in pool_map: | |
905 | if not self.pool_map.get(pool_name, dict()): | |
906 | # pool didn't exist before so just skip the checks | |
907 | continue | |
908 | ||
909 | if pool_map[pool_name] == self.pool_map[pool_name]: | |
910 | # no changes - dicts match in key and value | |
911 | continue | |
912 | else: | |
913 | # determine the change and add it to the change list | |
914 | size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size'] | |
915 | if size_diff != 0: | |
916 | if size_diff < 0: | |
917 | msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name, | |
918 | pool_map[pool_name]['size']) | |
919 | level = 'WRN' | |
920 | else: | |
921 | msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name, | |
922 | pool_map[pool_name]['size']) | |
923 | level = 'INF' | |
924 | ||
925 | changes.append(LogEntry(source="config", | |
926 | msg_type="config", | |
927 | msg=msg, | |
928 | level=level, | |
929 | tstamp=None) | |
930 | ) | |
931 | ||
932 | if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']: | |
933 | changes.append(LogEntry(source="config", | |
934 | msg_type="config", | |
935 | msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name), | |
936 | level='WRN', | |
937 | tstamp=None) | |
938 | ) | |
939 | ||
940 | return changes | |
941 | ||
942 | def get_changes(self, server_map, osd_map, pool_map): | |
943 | """Detect changes in maps between current observation and the last""" | |
944 | ||
945 | changes = list() | |
946 | ||
947 | changes.extend(self._check_hosts(server_map)) | |
948 | changes.extend(self._check_osds(server_map, osd_map)) | |
949 | changes.extend(self._check_pools(pool_map)) | |
950 | ||
951 | # FUTURE | |
952 | # Could generate an event if a ceph daemon has moved hosts | |
953 | # (assumes the ceph metadata host information is valid though!) | |
954 | ||
955 | return changes | |
956 | ||
957 | def run(self): | |
958 | log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs)) | |
959 | ||
960 | self.server_map, self.service_map = self.fetch_servers() | |
961 | self.pool_map = self.fetch_pools() | |
962 | ||
963 | self.osd_map = self.fetch_osd_map(self.service_map) | |
964 | ||
965 | while True: | |
966 | ||
967 | try: | |
968 | start_time = time.time() | |
969 | server_map, service_map = self.fetch_servers() | |
970 | pool_map = self.fetch_pools() | |
971 | osd_map = self.fetch_osd_map(service_map) | |
972 | ||
973 | changes = self.get_changes(server_map, osd_map, pool_map) | |
974 | if changes: | |
975 | self.push_events(changes) | |
976 | ||
977 | self.osd_map = osd_map | |
978 | self.pool_map = pool_map | |
979 | self.server_map = server_map | |
980 | self.service_map = service_map | |
981 | ||
982 | checks_duration = int(time.time() - start_time) | |
983 | ||
984 | # check that the time it took to run the checks fits within the | |
985 | # interval, and if not extend the interval and emit a log message | |
986 | # to show that the runtime for the checks exceeded the desired | |
987 | # interval | |
988 | if checks_duration > self.config_check_secs: | |
989 | new_interval = self.config_check_secs * 2 | |
990 | log.warning("K8sevents check interval warning. " | |
991 | "Current checks took {}s, interval was {}s. " | |
992 | "Increasing interval to {}s".format(int(checks_duration), | |
993 | self.config_check_secs, | |
994 | new_interval)) | |
995 | self.config_check_secs = new_interval | |
996 | ||
997 | time.sleep(self.config_check_secs) | |
998 | ||
999 | except Exception: | |
1000 | self.health = "{} Exception at {}".format( | |
1001 | sys.exc_info()[0].__name__, | |
1002 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
1003 | ) | |
1004 | log.exception(self.health) | |
1005 | break | |
1006 | ||
1007 | log.warning("Ceph configuration watcher stopped") | |
1008 | ||
1009 | ||
1010 | class Module(MgrModule): | |
1011 | COMMANDS = [ | |
1012 | { | |
1013 | "cmd": "k8sevents status", | |
1014 | "desc": "Show the status of the data gathering threads", | |
1015 | "perm": "r" | |
1016 | }, | |
1017 | { | |
1018 | "cmd": "k8sevents ls", | |
1019 | "desc": "List all current Kuberenetes events from the Ceph namespace", | |
1020 | "perm": "r" | |
1021 | }, | |
1022 | { | |
1023 | "cmd": "k8sevents ceph", | |
1024 | "desc": "List Ceph events tracked & sent to the kubernetes cluster", | |
1025 | "perm": "r" | |
1026 | }, | |
1027 | { | |
1028 | "cmd": "k8sevents set-access name=key,type=CephString", | |
1029 | "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt", | |
1030 | "perm": "rw" | |
1031 | }, | |
1032 | { | |
1033 | "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString", | |
1034 | "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433", | |
1035 | "perm": "rw" | |
1036 | }, | |
1037 | { | |
1038 | "cmd": "k8sevents clear-config", | |
1039 | "desc": "Clear external kubernetes configuration settings", | |
1040 | "perm": "rw" | |
1041 | }, | |
1042 | ] | |
1043 | MODULE_OPTIONS = [ | |
1044 | {'name': 'config_check_secs', | |
1045 | 'type': 'int', | |
1046 | 'default': 10, | |
1047 | 'min': 10, | |
1048 | 'desc': "interval (secs) to check for cluster configuration changes"}, | |
1049 | {'name': 'ceph_event_retention_days', | |
1050 | 'type': 'int', | |
1051 | 'default': 7, | |
1052 | 'desc': "Days to hold ceph event information within local cache"} | |
1053 | ] | |
1054 | ||
1055 | def __init__(self, *args, **kwargs): | |
1056 | self.run = True | |
1057 | self.kubernetes_control = 'POD_NAME' in os.environ | |
1058 | self.event_processor = None | |
1059 | self.config_watcher = None | |
1060 | self.ns_watcher = None | |
1061 | self.trackers = list() | |
1062 | self.error_msg = None | |
1063 | self._api_client_config = None | |
1064 | self._namespace = None | |
1065 | ||
1066 | # Declare the module options we accept | |
1067 | self.config_check_secs = None | |
1068 | self.ceph_event_retention_days = None | |
1069 | ||
1070 | self.k8s_config = dict( | |
1071 | cacrt = None, | |
1072 | token = None, | |
1073 | server = None, | |
1074 | namespace = None | |
1075 | ) | |
1076 | ||
1077 | super(Module, self).__init__(*args, **kwargs) | |
1078 | ||
1079 | def k8s_ready(self): | |
1080 | """Validate the k8s_config dict | |
1081 | ||
1082 | Returns: | |
1083 | - bool .... indicating whether the config is ready to use | |
1084 | - string .. variables that need to be defined before the module will function | |
1085 | ||
1086 | """ | |
1087 | missing = list() | |
1088 | ready = True | |
1089 | for k in self.k8s_config: | |
1090 | if not self.k8s_config[k]: | |
1091 | missing.append(k) | |
1092 | ready = False | |
1093 | return ready, missing | |
1094 | ||
1095 | def config_notify(self): | |
1096 | """Apply runtime module options, and defaults from the modules KV store""" | |
1097 | self.log.debug("applying runtime module option settings") | |
1098 | for opt in self.MODULE_OPTIONS: | |
1099 | setattr(self, | |
1100 | opt['name'], | |
9f95a23c | 1101 | self.get_module_option(opt['name'])) |
eafe8130 TL |
1102 | |
1103 | if not self.kubernetes_control: | |
1104 | # Populate the config | |
1105 | self.log.debug("loading config from KV store") | |
1106 | for k in self.k8s_config: | |
1107 | self.k8s_config[k] = self.get_store(k, default=None) | |
1108 | ||
1109 | def fetch_events(self, limit=None): | |
1110 | """Interface to expose current events to another mgr module""" | |
1111 | # FUTURE: Implement this to provide k8s events to the dashboard? | |
1112 | raise NotImplementedError | |
1113 | ||
1114 | def process_clog(self, log_message): | |
1115 | """Add log message to the event queue | |
1116 | ||
1117 | :param log_message: dict from the cluster log (audit/cluster channels) | |
1118 | """ | |
1119 | required_fields = ['channel', 'message', 'priority', 'stamp'] | |
1120 | _message_attrs = log_message.keys() | |
1121 | if all(_field in _message_attrs for _field in required_fields): | |
1122 | self.log.debug("clog entry received - adding to the queue") | |
1123 | if log_message.get('message').startswith('overall HEALTH'): | |
1124 | m_type = 'heartbeat' | |
1125 | else: | |
1126 | m_type = log_message.get('channel') | |
1127 | ||
1128 | event_queue.put( | |
1129 | LogEntry( | |
1130 | source='log', | |
1131 | msg_type=m_type, | |
1132 | msg=log_message.get('message'), | |
1133 | level=log_message.get('priority')[1:-1], | |
1134 | tstamp=log_message.get('stamp') | |
1135 | ) | |
1136 | ) | |
1137 | ||
1138 | else: | |
1139 | self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message)) | |
1140 | ||
1141 | def notify(self, notify_type, notify_id): | |
1142 | """ | |
1143 | Called by the ceph-mgr service to notify the Python plugin | |
1144 | that new state is available. | |
1145 | ||
1146 | :param notify_type: string indicating what kind of notification, | |
1147 | such as osd_map, mon_map, fs_map, mon_status, | |
1148 | health, pg_summary, command, service_map | |
1149 | :param notify_id: string (may be empty) that optionally specifies | |
1150 | which entity is being notified about. With | |
1151 | "command" notifications this is set to the tag | |
1152 | ``from send_command``. | |
1153 | """ | |
1154 | ||
1155 | # only interested in cluster log (clog) messages for now | |
1156 | if notify_type == 'clog': | |
1157 | self.log.debug("received a clog entry from mgr.notify") | |
1158 | if isinstance(notify_id, dict): | |
1159 | # create a log object to process | |
1160 | self.process_clog(notify_id) | |
1161 | else: | |
1162 | self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type))) | |
1163 | ||
1164 | def _show_events(self, events): | |
1165 | ||
1166 | max_msg_length = max([len(events[k].message) for k in events]) | |
1167 | fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n" | |
1168 | s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name") | |
1169 | ||
1170 | for event_name in sorted(events, | |
1171 | key = lambda name: events[name].last_timestamp, | |
1172 | reverse=True): | |
1173 | ||
1174 | event = events[event_name] | |
1175 | ||
1176 | s += fmt.format( | |
1177 | datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"), | |
f6b5b4d7 TL |
1178 | str(event.type), |
1179 | str(event.count), | |
1180 | str(event.message), | |
1181 | str(event_name) | |
eafe8130 TL |
1182 | ) |
1183 | s += "Total : {:>3}\n".format(len(events)) | |
1184 | return s | |
1185 | ||
1186 | def show_events(self, events): | |
1187 | """Show events we're holding from the ceph namespace - most recent 1st""" | |
1188 | ||
1189 | if len(events): | |
1190 | return 0, "", self._show_events(events) | |
1191 | else: | |
1192 | return 0, "", "No events emitted yet, local cache is empty" | |
1193 | ||
1194 | def show_status(self): | |
1195 | s = "Kubernetes\n" | |
1196 | s += "- Hostname : {}\n".format(self.k8s_config['server']) | |
1197 | s += "- Namespace : {}\n".format(self._namespace) | |
1198 | s += "Tracker Health\n" | |
1199 | for t in self.trackers: | |
1200 | s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health) | |
1201 | s += "Tracked Events\n" | |
1202 | s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events)) | |
1203 | s += "- ceph events : {:>3}\n".format(len(self.event_processor.events)) | |
1204 | return 0, "", s | |
1205 | ||
1206 | def _valid_server(self, server): | |
1207 | # must be a valid server url format | |
1208 | server = server.strip() | |
1209 | ||
1210 | res = urlparse(server) | |
1211 | port = res.netloc.split(":")[-1] | |
1212 | ||
1213 | if res.scheme != 'https': | |
1214 | return False, "Server URL must use https" | |
1215 | ||
1216 | elif not res.hostname: | |
1217 | return False, "Invalid server URL format" | |
1218 | ||
1219 | elif res.hostname: | |
1220 | try: | |
1221 | socket.gethostbyname(res.hostname) | |
1222 | except socket.gaierror: | |
1223 | return False, "Unresolvable server URL" | |
1224 | ||
1225 | if not port.isdigit(): | |
1226 | return False, "Server URL must end in a port number" | |
1227 | ||
1228 | return True, "" | |
1229 | ||
1230 | def _valid_cacrt(self, cacrt_data): | |
1231 | """use mgr_util.verify_cacrt to validate the CA file""" | |
1232 | ||
1233 | cacrt_fname = create_temp_file("ca_file", cacrt_data) | |
1234 | ||
1235 | try: | |
1236 | verify_cacrt(cacrt_fname) | |
1237 | except ServerConfigException as e: | |
1238 | return False, "Invalid CA certificate: {}".format(str(e)) | |
1239 | else: | |
1240 | return True, "" | |
1241 | ||
1242 | def _valid_token(self, token_data): | |
1243 | """basic checks on the token""" | |
1244 | if not token_data: | |
1245 | return False, "Token file is empty" | |
1246 | ||
1247 | pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$") | |
1248 | if not pattern.match(token_data): | |
1249 | return False, "Token contains invalid characters" | |
1250 | ||
1251 | return True, "" | |
1252 | ||
1253 | def _valid_namespace(self, namespace): | |
1254 | # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols | |
1255 | ||
1256 | if len(namespace) > 253: | |
1257 | return False, "Name too long" | |
1258 | if namespace.isdigit(): | |
1259 | return False, "Invalid name - must be alphanumeric" | |
1260 | ||
1261 | pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$") | |
1262 | if not pattern.match(namespace): | |
1263 | return False, "Invalid characters in the name" | |
1264 | ||
1265 | return True, "" | |
1266 | ||
1267 | def _config_set(self, key, val): | |
1268 | """Attempt to validate the content, then save to KV store""" | |
1269 | ||
1270 | val = val.rstrip() # remove any trailing whitespace/newline | |
1271 | ||
1272 | try: | |
1273 | checker = getattr(self, "_valid_" + key) | |
1274 | except AttributeError: | |
1275 | # no checker available, just let it pass | |
1276 | self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key)) | |
1277 | valid = True | |
1278 | else: | |
1279 | valid, reason = checker(val) | |
1280 | ||
1281 | if valid: | |
1282 | self.set_store(key, val) | |
1283 | self.log.info("Updated config KV Store item: " + key) | |
1284 | return 0, "", "Config updated for parameter '{}'".format(key) | |
1285 | else: | |
1286 | return -22, "", "Invalid value for '{}' :{}".format(key, reason) | |
1287 | ||
1288 | def clear_config_settings(self): | |
1289 | for k in self.k8s_config: | |
1290 | self.set_store(k, None) | |
1291 | return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys())) | |
1292 | ||
1293 | def handle_command(self, inbuf, cmd): | |
1294 | ||
1295 | access_options = ['cacrt', 'token'] | |
1296 | config_options = ['server', 'namespace'] | |
1297 | ||
1298 | if cmd['prefix'] == 'k8sevents clear-config': | |
1299 | return self.clear_config_settings() | |
1300 | ||
1301 | if cmd['prefix'] == 'k8sevents set-access': | |
1302 | if cmd['key'] not in access_options: | |
1303 | return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options)) | |
1304 | ||
1305 | if inbuf: | |
1306 | return self._config_set(cmd['key'], inbuf) | |
1307 | else: | |
1308 | return -errno.EINVAL, "", "Command must specify -i <filename>" | |
1309 | ||
1310 | if cmd['prefix'] == 'k8sevents set-config': | |
1311 | ||
1312 | if cmd['key'] not in config_options: | |
1313 | return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options)) | |
1314 | ||
1315 | return self._config_set(cmd['key'], cmd['value']) | |
1316 | ||
1317 | # At this point the command is trying to interact with k8sevents, so intercept if the configuration is | |
1318 | # not ready | |
1319 | if self.error_msg: | |
1320 | _msg = "k8sevents unavailable: " + self.error_msg | |
1321 | ready, _ = self.k8s_ready() | |
1322 | if not self.kubernetes_control and not ready: | |
1323 | _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect" | |
1324 | return -errno.ENODATA, "", _msg | |
1325 | ||
1326 | if cmd["prefix"] == "k8sevents status": | |
1327 | return self.show_status() | |
1328 | ||
1329 | elif cmd["prefix"] == "k8sevents ls": | |
1330 | return self.show_events(self.ns_watcher.events) | |
1331 | ||
1332 | elif cmd["prefix"] == "k8sevents ceph": | |
1333 | return self.show_events(self.event_processor.events) | |
1334 | ||
1335 | else: | |
1336 | raise NotImplementedError(cmd["prefix"]) | |
1337 | ||
1338 | @staticmethod | |
1339 | def can_run(): | |
1340 | """Determine whether the pre-reqs for the module are in place""" | |
1341 | ||
1342 | if not kubernetes_imported: | |
1343 | return False, "kubernetes python client is not available" | |
1344 | return True, "" | |
1345 | ||
1346 | def load_kubernetes_config(self): | |
1347 | """Load configuration for remote kubernetes API using KV store values | |
1348 | ||
1349 | Attempt to create an API client configuration from settings stored in | |
1350 | KV store. | |
1351 | ||
1352 | Returns: | |
1353 | client.ApiClient: kubernetes API client object | |
1354 | ||
1355 | Raises: | |
1356 | OSError: unable to create the cacrt file | |
1357 | """ | |
1358 | ||
1359 | # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a | |
1360 | # temporary file containing the cert for the client to load from | |
1361 | try: | |
1362 | ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt']) | |
1363 | except OSError as e: | |
1364 | self.log.error("Unable to create file to hold cacrt: {}".format(str(e))) | |
1365 | raise OSError(str(e)) | |
1366 | else: | |
1367 | self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file)) | |
1368 | ||
1369 | configuration = client.Configuration() | |
1370 | configuration.host = self.k8s_config['server'] | |
1371 | configuration.ssl_ca_cert = ca_crt_file | |
1372 | configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] } | |
1373 | api_client = client.ApiClient(configuration) | |
1374 | self.log.info("API client created for remote kubernetes access using cacrt and token from KV store") | |
1375 | ||
1376 | return api_client | |
1377 | ||
1378 | def serve(self): | |
1379 | # apply options set by CLI to this module | |
1380 | self.config_notify() | |
1381 | ||
1382 | if not kubernetes_imported: | |
1383 | self.error_msg = "Unable to start : python kubernetes package is missing" | |
1384 | else: | |
1385 | if self.kubernetes_control: | |
1386 | # running under rook-ceph | |
1387 | config.load_incluster_config() | |
1388 | self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'), | |
1389 | os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN')) | |
1390 | self._api_client_config = None | |
1391 | self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph") | |
1392 | else: | |
1393 | # running outside of rook-ceph, so we need additional settings to tell us | |
1394 | # how to connect to the kubernetes cluster | |
1395 | ready, errors = self.k8s_ready() | |
1396 | if not ready: | |
1397 | self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors)) | |
1398 | else: | |
1399 | try: | |
1400 | self._api_client_config = self.load_kubernetes_config() | |
1401 | except OSError as e: | |
1402 | self.error_msg = str(e) | |
1403 | else: | |
1404 | self._namespace = self.k8s_config['namespace'] | |
1405 | self.log.info("k8sevents configuration loaded from KV store") | |
1406 | ||
1407 | if self.error_msg: | |
1408 | self.log.error(self.error_msg) | |
1409 | return | |
1410 | ||
1411 | # All checks have passed | |
1412 | self.config_watcher = CephConfigWatcher(self) | |
1413 | ||
1414 | self.event_processor = EventProcessor(self.config_watcher, | |
1415 | self.ceph_event_retention_days, | |
1416 | self._api_client_config, | |
1417 | self._namespace) | |
1418 | ||
1419 | self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config, | |
1420 | namespace=self._namespace) | |
1421 | ||
1422 | if self.event_processor.ok: | |
1423 | log.info("Ceph Log processor thread starting") | |
1424 | self.event_processor.start() # start log consumer thread | |
1425 | log.info("Ceph config watcher thread starting") | |
1426 | self.config_watcher.start() | |
1427 | log.info("Rook-ceph namespace events watcher starting") | |
1428 | self.ns_watcher.start() | |
1429 | ||
1430 | self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher]) | |
1431 | ||
1432 | while True: | |
1433 | # stay alive | |
1434 | time.sleep(1) | |
1435 | ||
1436 | trackers = self.trackers | |
1437 | for t in trackers: | |
1438 | if not t.is_alive() and not t.reported: | |
1439 | log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health)) | |
1440 | t.reported = True | |
1441 | ||
1442 | else: | |
1443 | self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?" | |
1444 | log.warning(self.error_msg) | |
1445 | log.warning("k8sevents module exiting") | |
1446 | self.run = False | |
1447 | ||
1448 | def shutdown(self): | |
1449 | self.run = False | |
1450 | log.info("Shutting down k8sevents module") | |
1451 | self.event_processor.can_run = False | |
1452 | ||
1453 | if self._rados: | |
1454 | self._rados.shutdown() |