]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | # Integrate with the kubernetes events API. |
2 | # This module sends events to Kubernetes, and also captures/tracks all events | |
3 | # in the rook-ceph namespace so kubernetes activity like pod restarts, | |
4 | # imagepulls etc can be seen from within the ceph cluster itself. | |
5 | # | |
6 | # To interact with the events API, the mgr service to access needs to be | |
7 | # granted additional permissions | |
8 | # e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules | |
9 | # | |
10 | # These are the changes needed; | |
11 | # - apiGroups: | |
12 | # - "" | |
13 | # resources: | |
14 | # - events | |
15 | # verbs: | |
16 | # - create | |
17 | # - patch | |
18 | # - list | |
19 | # - get | |
20 | # - watch | |
21 | ||
22 | ||
23 | import os | |
24 | import re | |
25 | import sys | |
26 | import time | |
27 | import json | |
28 | import yaml | |
29 | import errno | |
30 | import socket | |
31 | import base64 | |
32 | import logging | |
33 | import tempfile | |
34 | import threading | |
35 | ||
36 | try: | |
37 | # python 3 | |
38 | from urllib.parse import urlparse | |
39 | except ImportError: | |
40 | # python 2 fallback | |
41 | from urlparse import urlparse | |
42 | ||
43 | from datetime import tzinfo, datetime, timedelta | |
44 | ||
45 | from urllib3.exceptions import MaxRetryError,ProtocolError | |
46 | from collections import OrderedDict | |
47 | ||
48 | import rados | |
49 | from mgr_module import MgrModule | |
50 | from mgr_util import verify_cacrt, ServerConfigException | |
51 | ||
52 | try: | |
53 | import queue | |
54 | except ImportError: | |
55 | # python 2.7.5 | |
56 | import Queue as queue | |
57 | finally: | |
58 | # python 2.7.15 or python3 | |
59 | event_queue = queue.Queue() | |
60 | ||
61 | try: | |
62 | from kubernetes import client, config, watch | |
63 | from kubernetes.client.rest import ApiException | |
64 | except ImportError: | |
65 | kubernetes_imported = False | |
66 | client = None | |
67 | config = None | |
68 | watch = None | |
69 | else: | |
70 | kubernetes_imported = True | |
71 | ||
72 | # The watch.Watch.stream method can provide event objects that have involved_object = None | |
73 | # which causes an exception in the generator. A workaround is discussed for a similar issue | |
74 | # in https://github.com/kubernetes-client/python/issues/376 which has been used here | |
75 | # pylint: disable=no-member | |
76 | from kubernetes.client.models.v1_event import V1Event | |
77 | def local_involved_object(self, involved_object): | |
78 | if involved_object is None: | |
79 | involved_object = client.V1ObjectReference(api_version="1") | |
80 | self._involved_object = involved_object | |
81 | V1Event.involved_object = V1Event.involved_object.setter(local_involved_object) | |
82 | ||
83 | log = logging.getLogger(__name__) | |
84 | ||
85 | # use a simple local class to represent UTC | |
86 | # datetime pkg modules vary between python2 and 3 and pytz is not available on older | |
87 | # ceph container images, so taking a pragmatic approach! | |
88 | class UTC(tzinfo): | |
89 | def utcoffset(self, dt): | |
90 | return timedelta(0) | |
91 | ||
92 | def tzname(self, dt): | |
93 | return "UTC" | |
94 | ||
95 | def dst(self, dt): | |
96 | return timedelta(0) | |
97 | ||
98 | ||
99 | def text_suffix(num): | |
100 | """Define a text suffix based on a value i.e. turn host into hosts""" | |
101 | return '' if num == 1 else 's' | |
102 | ||
103 | ||
104 | def create_temp_file(fname, content, suffix=".tmp"): | |
105 | """Create a temp file | |
106 | ||
107 | Attempt to create an temporary file containing the given content | |
108 | ||
109 | Returns: | |
110 | str .. full path to the temporary file | |
111 | ||
112 | Raises: | |
113 | OSError: problems creating the file | |
114 | ||
115 | """ | |
116 | ||
117 | if content is not None: | |
118 | file_name = os.path.join(tempfile.gettempdir(), fname + suffix) | |
119 | ||
120 | try: | |
121 | with open(file_name, "w") as f: | |
122 | f.write(content) | |
123 | except OSError as e: | |
124 | raise OSError("Unable to create temporary file : {}".format(str(e))) | |
125 | ||
126 | return file_name | |
127 | ||
128 | ||
129 | class HealthCheck(object): | |
130 | """Transform a healthcheck msg into it's component parts""" | |
131 | ||
132 | def __init__(self, msg, msg_level): | |
133 | ||
134 | # msg looks like | |
135 | # | |
136 | # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY) | |
137 | # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set) | |
138 | # Health check failed: nodown flag(s) set (OSDMAP_FLAGS) | |
139 | # | |
140 | self.msg = None | |
141 | self.name = None | |
142 | self.text = None | |
143 | self.valid = False | |
144 | ||
145 | if msg.lower().startswith('health check'): | |
146 | ||
147 | self.valid = True | |
148 | self.msg = msg | |
149 | msg_tokens = self.msg.split() | |
150 | ||
151 | if msg_level == 'INF': | |
152 | self.text = ' '.join(msg_tokens[3:]) | |
153 | self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS | |
154 | else: # WRN or ERR | |
155 | self.text = ' '.join(msg_tokens[3:-1]) | |
156 | self.name = msg_tokens[-1][1:-1] | |
157 | ||
158 | ||
159 | class LogEntry(object): | |
160 | """Generic 'log' object""" | |
161 | ||
162 | reason_map = { | |
163 | "audit": "Audit", | |
164 | "cluster": "HealthCheck", | |
165 | "config": "ClusterChange", | |
166 | "heartbeat":"Heartbeat", | |
167 | "startup": "Started" | |
168 | } | |
169 | ||
170 | def __init__(self, source, msg, msg_type, level, tstamp=None): | |
171 | ||
172 | self.source = source | |
173 | self.msg = msg | |
174 | self.msg_type = msg_type | |
175 | self.level = level | |
176 | self.tstamp = tstamp | |
177 | self.healthcheck = None | |
178 | ||
179 | if 'health check ' in self.msg.lower(): | |
180 | self.healthcheck = HealthCheck(self.msg, self.level) | |
181 | ||
182 | ||
183 | def __str__(self): | |
184 | return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source, | |
185 | self.msg_type, | |
186 | self.msg, | |
187 | self.level, | |
188 | self.tstamp) | |
189 | ||
190 | @property | |
191 | def cmd(self): | |
192 | """Look at the msg string and extract the command content""" | |
193 | ||
194 | # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished' | |
195 | if self.msg_type != 'audit': | |
196 | return None | |
197 | else: | |
198 | _m=self.msg[:-10].replace("\'","").split("cmd=") | |
199 | _s='"cmd":{}'.format(_m[1]) | |
200 | cmds_list = json.loads('{' + _s + '}')['cmd'] | |
201 | ||
202 | # TODO. Assuming only one command was issued for now | |
203 | _c = cmds_list[0] | |
204 | return "{} {}".format(_c['prefix'], _c.get('key', '')) | |
205 | ||
206 | @property | |
207 | def event_type(self): | |
208 | return 'Normal' if self.level == 'INF' else 'Warning' | |
209 | ||
210 | @property | |
211 | def event_reason(self): | |
212 | return self.reason_map[self.msg_type] | |
213 | ||
214 | @property | |
215 | def event_name(self): | |
216 | if self.msg_type == 'heartbeat': | |
217 | return 'mgr.Heartbeat' | |
218 | elif self.healthcheck: | |
219 | return 'mgr.health.{}'.format(self.healthcheck.name) | |
220 | elif self.msg_type == 'audit': | |
221 | return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_') | |
222 | elif self.msg_type == 'config': | |
223 | return 'mgr.ConfigurationChange' | |
224 | elif self.msg_type == 'startup': | |
225 | return "mgr.k8sevents-module" | |
226 | else: | |
227 | return None | |
228 | ||
229 | @property | |
230 | def event_entity(self): | |
231 | if self.msg_type == 'audit': | |
232 | return self.msg.replace("\'","").split('entity=')[1].split(' ')[0] | |
233 | else: | |
234 | return None | |
235 | ||
236 | @property | |
237 | def event_msg(self): | |
238 | if self.msg_type == 'audit': | |
239 | return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd) | |
240 | ||
241 | elif self.healthcheck: | |
242 | return self.healthcheck.text | |
243 | else: | |
244 | return self.msg | |
245 | ||
246 | ||
247 | class BaseThread(threading.Thread): | |
248 | health = 'OK' | |
249 | reported = False | |
250 | daemon = True | |
251 | ||
252 | ||
f6b5b4d7 TL |
253 | def clean_event(event): |
254 | """ clean an event record """ | |
255 | if not event.first_timestamp: | |
256 | log.error("first_timestamp is empty") | |
257 | if event.metadata.creation_timestamp: | |
258 | log.error("setting first_timestamp to the creation timestamp") | |
259 | event.first_timestamp = event.metadata.creation_timestamp | |
260 | else: | |
261 | log.error("defaulting event first timestamp to current datetime") | |
262 | event.first_timestamp = datetime.datetime.now() | |
263 | ||
264 | if not event.last_timestamp: | |
265 | log.error("setting event last timestamp to {}".format(event.first_timestamp)) | |
266 | event.last_timestamp = event.first_timestamp | |
267 | ||
268 | if not event.count: | |
269 | event.count = 1 | |
270 | ||
271 | return event | |
272 | ||
273 | ||
eafe8130 TL |
274 | class NamespaceWatcher(BaseThread): |
275 | """Watch events in a given namespace | |
276 | ||
277 | Using the watch package we can listen to event traffic in the namespace to | |
278 | get an idea of what kubernetes related events surround the ceph cluster. The | |
279 | thing to bear in mind is that events have a TTL enforced by the kube-apiserver | |
280 | so this stream will only really show activity inside this retention window. | |
281 | """ | |
282 | ||
283 | def __init__(self, api_client_config, namespace=None): | |
284 | super(NamespaceWatcher, self).__init__() | |
285 | ||
286 | if api_client_config: | |
287 | self.api = client.CoreV1Api(api_client_config) | |
288 | else: | |
289 | self.api = client.CoreV1Api() | |
290 | ||
291 | self.namespace = namespace | |
292 | ||
293 | self.events = OrderedDict() | |
294 | self.lock = threading.Lock() | |
295 | self.active = None | |
296 | self.resource_version = None | |
297 | ||
298 | def fetch(self): | |
299 | # clear the cache on every call to fetch | |
300 | self.events.clear() | |
301 | try: | |
302 | resp = self.api.list_namespaced_event(self.namespace) | |
303 | # TODO - Perhaps test for auth problem to be more specific in the except clause? | |
304 | except: | |
305 | self.active = False | |
306 | self.health = "Unable to access events API (list_namespaced_event call failed)" | |
307 | log.warning(self.health) | |
308 | else: | |
309 | self.active = True | |
310 | self.resource_version = resp.metadata.resource_version | |
311 | ||
312 | for item in resp.items: | |
f6b5b4d7 | 313 | self.events[item.metadata.name] = clean_event(item) |
eafe8130 TL |
314 | log.info('Added {} events'.format(len(resp.items))) |
315 | ||
316 | def run(self): | |
317 | self.fetch() | |
318 | func = getattr(self.api, "list_namespaced_event") | |
319 | ||
320 | if self.active: | |
321 | log.info("Namespace event watcher started") | |
322 | ||
323 | ||
324 | while True: | |
325 | ||
326 | try: | |
327 | w = watch.Watch() | |
328 | # execute generator to continually watch resource for changes | |
329 | for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True): | |
330 | obj = item['object'] | |
331 | ||
332 | with self.lock: | |
333 | ||
334 | if item['type'] in ['ADDED', 'MODIFIED']: | |
f6b5b4d7 | 335 | self.events[obj.metadata.name] = clean_event(obj) |
eafe8130 TL |
336 | |
337 | elif item['type'] == 'DELETED': | |
338 | del self.events[obj.metadata.name] | |
339 | ||
340 | # TODO test the exception for auth problem (403?) | |
341 | ||
342 | # Attribute error is generated when urllib3 on the system is old and doesn't have a | |
343 | # read_chunked method | |
344 | except AttributeError as e: | |
345 | self.health = ("Error: Unable to 'watch' events API in namespace '{}' - " | |
346 | "urllib3 too old? ({})".format(self.namespace, e)) | |
347 | self.active = False | |
348 | log.warning(self.health) | |
349 | break | |
350 | ||
351 | except ApiException as e: | |
352 | # refresh the resource_version & watcher | |
353 | log.warning("API exception caught in watcher ({})".format(e)) | |
354 | log.warning("Restarting namespace watcher") | |
355 | self.fetch() | |
356 | ||
f6b5b4d7 TL |
357 | except ProtocolError as e: |
358 | log.warning("Namespace watcher hit protocolerror ({}) - restarting".format(e)) | |
359 | self.fetch() | |
360 | ||
eafe8130 TL |
361 | except Exception: |
362 | self.health = "{} Exception at {}".format( | |
363 | sys.exc_info()[0].__name__, | |
364 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
365 | ) | |
366 | log.exception(self.health) | |
367 | self.active = False | |
368 | break | |
369 | ||
370 | log.warning("Namespace event watcher stopped") | |
371 | ||
372 | ||
373 | class KubernetesEvent(object): | |
374 | ||
375 | def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None): | |
376 | ||
377 | if api_client_config: | |
378 | self.api = client.CoreV1Api(api_client_config) | |
379 | else: | |
380 | self.api = client.CoreV1Api() | |
381 | ||
382 | self.namespace = namespace | |
383 | ||
384 | self.event_name = log_entry.event_name | |
385 | self.message = log_entry.event_msg | |
386 | self.event_type = log_entry.event_type | |
387 | self.event_reason = log_entry.event_reason | |
388 | self.unique_name = unique_name | |
389 | ||
390 | self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN')) | |
391 | ||
392 | self.api_status = 200 | |
393 | self.count = 1 | |
394 | self.first_timestamp = None | |
395 | self.last_timestamp = None | |
396 | ||
397 | @property | |
398 | def type(self): | |
399 | """provide a type property matching a V1Event object""" | |
400 | return self.event_type | |
401 | ||
402 | @property | |
403 | def event_body(self): | |
404 | if self.unique_name: | |
405 | obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name)) | |
406 | else: | |
407 | obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name)) | |
408 | ||
409 | # field_path is needed to prevent problems in the namespacewatcher when | |
410 | # deleted event are received | |
411 | obj_ref = client.V1ObjectReference(kind="CephCluster", | |
412 | field_path='spec.containers{mgr}', | |
413 | name=self.event_name, | |
414 | namespace=self.namespace) | |
415 | ||
416 | event_source = client.V1EventSource(component="ceph-mgr", | |
417 | host=self.host) | |
418 | return client.V1Event( | |
419 | involved_object=obj_ref, | |
420 | metadata=obj_meta, | |
421 | message=self.message, | |
422 | count=self.count, | |
423 | type=self.event_type, | |
424 | reason=self.event_reason, | |
425 | source=event_source, | |
426 | first_timestamp=self.first_timestamp, | |
427 | last_timestamp=self.last_timestamp | |
428 | ) | |
429 | ||
430 | def write(self): | |
431 | ||
432 | now=datetime.now(UTC()) | |
433 | ||
434 | self.first_timestamp = now | |
435 | self.last_timestamp = now | |
436 | ||
437 | try: | |
438 | self.api.create_namespaced_event(self.namespace, self.event_body) | |
439 | except (OSError, ProtocolError): | |
440 | # unable to reach to the API server | |
441 | log.error("Unable to reach API server") | |
442 | self.api_status = 400 | |
443 | except MaxRetryError: | |
444 | # k8s config has not be defined properly | |
445 | log.error("multiple attempts to connect to the API have failed") | |
446 | self.api_status = 403 # Forbidden | |
447 | except ApiException as e: | |
448 | log.debug("event.write status:{}".format(e.status)) | |
449 | self.api_status = e.status | |
450 | if e.status == 409: | |
451 | log.debug("attempting event update for an existing event") | |
452 | # 409 means the event is there already, so read it back (v1Event object returned) | |
453 | # this could happen if the event has been created, and then the k8sevent module | |
454 | # disabled and reenabled - i.e. the internal event tracking no longer matches k8s | |
455 | response = self.api.read_namespaced_event(self.event_name, self.namespace) | |
456 | # | |
457 | # response looks like | |
458 | # | |
459 | # {'action': None, | |
460 | # 'api_version': 'v1', | |
461 | # 'count': 1, | |
462 | # 'event_time': None, | |
463 | # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
464 | # 'involved_object': {'api_version': None, | |
465 | # 'field_path': None, | |
466 | # 'kind': 'CephCluster', | |
467 | # 'name': 'ceph-mgr.k8sevent-module', | |
468 | # 'namespace': 'rook-ceph', | |
469 | # 'resource_version': None, | |
470 | # 'uid': None}, | |
471 | # 'kind': 'Event', | |
472 | # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
473 | # 'message': 'Ceph log -> event tracking started', | |
474 | # 'metadata': {'annotations': None, | |
475 | # 'cluster_name': None, | |
476 | # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
477 | # 'deletion_grace_period_seconds': None, | |
478 | # 'deletion_timestamp': None, | |
479 | # 'finalizers': None, | |
480 | # 'generate_name': 'ceph-mgr.k8sevent-module', | |
481 | # 'generation': None, | |
482 | # 'initializers': None, | |
483 | # 'labels': None, | |
484 | # 'name': 'ceph-mgr.k8sevent-module5z7kq', | |
485 | # 'namespace': 'rook-ceph', | |
486 | # 'owner_references': None, | |
487 | # 'resource_version': '1195832', | |
488 | # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq', | |
489 | # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'}, | |
490 | # 'reason': 'Started', | |
491 | # 'related': None, | |
492 | # 'reporting_component': '', | |
493 | # 'reporting_instance': '', | |
494 | # 'series': None, | |
495 | # 'source': {'component': 'ceph-mgr', 'host': 'minikube'}, | |
496 | # 'type': 'Normal'} | |
497 | ||
498 | # conflict event already exists | |
499 | # read it | |
500 | # update : count and last_timestamp and msg | |
501 | ||
502 | self.count = response.count + 1 | |
503 | self.first_timestamp = response.first_timestamp | |
504 | try: | |
505 | self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) | |
506 | except ApiException as e: | |
507 | log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status)) | |
508 | self.api_status = e.status | |
509 | else: | |
510 | log.debug("event {} patched".format(self.event_name)) | |
511 | self.api_status = 200 | |
512 | ||
513 | else: | |
514 | log.debug("event {} created successfully".format(self.event_name)) | |
515 | self.api_status = 200 | |
516 | ||
517 | @property | |
518 | def api_success(self): | |
519 | return self.api_status == 200 | |
520 | ||
521 | def update(self, log_entry): | |
522 | self.message = log_entry.event_msg | |
523 | self.event_type = log_entry.event_type | |
524 | self.last_timestamp = datetime.now(UTC()) | |
525 | self.count += 1 | |
526 | log.debug("performing event update for {}".format(self.event_name)) | |
527 | ||
528 | try: | |
529 | self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) | |
530 | except ApiException as e: | |
531 | log.error("event patch call failed: {}".format(e.status)) | |
532 | if e.status == 404: | |
533 | # tried to patch, but hit a 404. The event's TTL must have been reached, and | |
534 | # pruned by the kube-apiserver | |
535 | log.debug("event not found, so attempting to create it") | |
536 | try: | |
537 | self.api.create_namespaced_event(self.namespace, self.event_body) | |
538 | except ApiException as e: | |
539 | log.error("unable to create the event: {}".format(e.status)) | |
540 | self.api_status = e.status | |
541 | else: | |
542 | log.debug("event {} created successfully".format(self.event_name)) | |
543 | self.api_status = 200 | |
544 | else: | |
545 | log.debug("event {} updated".format(self.event_name)) | |
546 | self.api_status = 200 | |
547 | ||
548 | ||
549 | class EventProcessor(BaseThread): | |
550 | """Handle a global queue used to track events we want to send/update to kubernetes""" | |
551 | ||
552 | can_run = True | |
553 | ||
554 | def __init__(self, config_watcher, event_retention_days, api_client_config, namespace): | |
555 | super(EventProcessor, self).__init__() | |
556 | ||
557 | self.events = dict() | |
558 | self.config_watcher = config_watcher | |
559 | self.event_retention_days = event_retention_days | |
560 | self.api_client_config = api_client_config | |
561 | self.namespace = namespace | |
562 | ||
563 | def startup(self): | |
564 | """Log an event to show we're active""" | |
565 | ||
566 | event = KubernetesEvent( | |
567 | LogEntry( | |
568 | source='self', | |
569 | msg='Ceph log -> event tracking started', | |
570 | msg_type='startup', | |
571 | level='INF', | |
572 | tstamp=None | |
573 | ), | |
574 | unique_name=False, | |
575 | api_client_config=self.api_client_config, | |
576 | namespace=self.namespace | |
577 | ) | |
578 | ||
579 | event.write() | |
580 | return event.api_success | |
581 | ||
582 | @property | |
583 | def ok(self): | |
584 | return self.startup() | |
585 | ||
586 | def prune_events(self): | |
587 | log.debug("prune_events - looking for old events to remove from cache") | |
588 | oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days) | |
589 | local_events = dict(self.events) | |
590 | ||
591 | for event_name in sorted(local_events, | |
592 | key = lambda name: local_events[name].last_timestamp): | |
593 | event = local_events[event_name] | |
594 | if event.last_timestamp >= oldest: | |
595 | break | |
596 | else: | |
597 | # drop this event | |
598 | log.debug("prune_events - removing old event : {}".format(event_name)) | |
599 | del self.events[event_name] | |
600 | ||
601 | def process(self, log_object): | |
602 | ||
603 | log.debug("log entry being processed : {}".format(str(log_object))) | |
604 | ||
605 | event_out = False | |
606 | unique_name = True | |
607 | ||
608 | if log_object.msg_type == 'audit': | |
609 | # audit traffic : operator commands | |
610 | if log_object.msg.endswith('finished'): | |
611 | log.debug("K8sevents received command finished msg") | |
612 | event_out = True | |
613 | else: | |
614 | # NO OP - ignoring 'dispatch' log records | |
615 | return | |
616 | ||
617 | elif log_object.msg_type == 'cluster': | |
618 | # cluster messages : health checks | |
619 | if log_object.event_name: | |
620 | event_out = True | |
621 | ||
622 | elif log_object.msg_type == 'config': | |
623 | # configuration checker messages | |
624 | event_out = True | |
625 | unique_name = False | |
626 | ||
627 | elif log_object.msg_type == 'heartbeat': | |
628 | # hourly health message summary from Ceph | |
629 | event_out = True | |
630 | unique_name = False | |
631 | log_object.msg = str(self.config_watcher) | |
632 | ||
633 | else: | |
634 | log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type)) | |
635 | ||
636 | if event_out: | |
637 | log.debug("k8sevents sending event to kubernetes") | |
638 | # we don't cache non-unique events like heartbeats or config changes | |
639 | if not unique_name or log_object.event_name not in self.events.keys(): | |
640 | event = KubernetesEvent(log_entry=log_object, | |
641 | unique_name=unique_name, | |
642 | api_client_config=self.api_client_config, | |
643 | namespace=self.namespace) | |
644 | event.write() | |
645 | log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status)) | |
646 | if event.api_success and unique_name: | |
647 | self.events[log_object.event_name] = event | |
648 | else: | |
649 | event = self.events[log_object.event_name] | |
650 | event.update(log_object) | |
651 | log.debug("event update ended : {}".format(event.api_status)) | |
652 | ||
653 | self.prune_events() | |
654 | ||
655 | else: | |
656 | log.debug("K8sevents ignored message : {}".format(log_object.msg)) | |
657 | ||
658 | def run(self): | |
659 | log.info("Ceph event processing thread started, " | |
660 | "event retention set to {} days".format(self.event_retention_days)) | |
661 | ||
662 | while True: | |
663 | ||
664 | try: | |
665 | log_object = event_queue.get(block=False) | |
666 | except queue.Empty: | |
667 | pass | |
668 | else: | |
669 | try: | |
670 | self.process(log_object) | |
671 | except Exception: | |
672 | self.health = "{} Exception at {}".format( | |
673 | sys.exc_info()[0].__name__, | |
674 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
675 | ) | |
676 | log.exception(self.health) | |
677 | break | |
678 | ||
679 | if not self.can_run: | |
680 | break | |
681 | ||
682 | time.sleep(0.5) | |
683 | ||
684 | log.warning("Ceph event processing thread stopped") | |
685 | ||
686 | ||
687 | class ListDiff(object): | |
688 | def __init__(self, before, after): | |
689 | self.before = set(before) | |
690 | self.after = set(after) | |
691 | ||
692 | @property | |
693 | def removed(self): | |
694 | return list(self.before - self.after) | |
695 | ||
696 | @property | |
697 | def added(self): | |
698 | return list(self.after - self.before) | |
699 | ||
700 | @property | |
701 | def is_equal(self): | |
702 | return self.before == self.after | |
703 | ||
704 | ||
705 | class CephConfigWatcher(BaseThread): | |
706 | """Detect configuration changes within the cluster and generate human readable events""" | |
707 | ||
708 | def __init__(self, mgr): | |
709 | super(CephConfigWatcher, self).__init__() | |
710 | self.mgr = mgr | |
711 | self.server_map = dict() | |
712 | self.osd_map = dict() | |
713 | self.pool_map = dict() | |
714 | self.service_map = dict() | |
715 | ||
716 | self.config_check_secs = mgr.config_check_secs | |
717 | ||
718 | @property | |
719 | def raw_capacity(self): | |
720 | # Note. if the osd's are not online the capacity field will be 0 | |
721 | return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map]) | |
722 | ||
723 | @property | |
724 | def num_servers(self): | |
725 | return len(self.server_map.keys()) | |
726 | ||
727 | @property | |
728 | def num_osds(self): | |
729 | return len(self.osd_map.keys()) | |
730 | ||
731 | @property | |
732 | def num_pools(self): | |
733 | return len(self.pool_map.keys()) | |
734 | ||
735 | def __str__(self): | |
736 | s = '' | |
737 | ||
738 | s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format( | |
739 | json.loads(self.mgr.get('health')['json'])['status'], | |
740 | self.num_servers, | |
741 | text_suffix(self.num_servers), | |
742 | self.num_pools, | |
743 | text_suffix(self.num_pools), | |
744 | self.num_osds, | |
745 | MgrModule.to_pretty_iec(self.raw_capacity)) | |
746 | return s | |
747 | ||
748 | def fetch_servers(self): | |
749 | """Return a server summary, and service summary""" | |
750 | servers = self.mgr.list_servers() | |
751 | server_map = dict() # host -> services | |
752 | service_map = dict() # service -> host | |
753 | for server_info in servers: | |
754 | services = dict() | |
755 | for svc in server_info['services']: | |
756 | if svc.get('type') in services.keys(): | |
757 | services[svc.get('type')].append(svc.get('id')) | |
758 | else: | |
759 | services[svc.get('type')] = list([svc.get('id')]) | |
760 | # maintain the service xref map service -> host and version | |
761 | service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '') | |
762 | server_map[server_info.get('hostname')] = services | |
763 | ||
764 | return server_map, service_map | |
765 | ||
766 | def fetch_pools(self): | |
767 | interesting = ["type", "size", "min_size"] | |
768 | # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool', | |
769 | # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn', | |
770 | # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100, | |
771 | # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0', | |
772 | # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0, | |
773 | # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1, | |
774 | # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0, | |
775 | # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000, | |
776 | # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0, | |
777 | # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0, | |
778 | # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0, | |
779 | # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0, | |
780 | # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}}, | |
781 | # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0', | |
782 | # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}] | |
783 | pools = self.mgr.get('osd_map')['pools'] | |
784 | pool_map = dict() | |
785 | for pool in pools: | |
786 | pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting} | |
787 | return pool_map | |
788 | ||
789 | ||
790 | def fetch_osd_map(self, service_map): | |
791 | """Create an osd map""" | |
792 | stats = self.mgr.get('osd_stats') | |
793 | ||
794 | osd_map = dict() | |
795 | ||
796 | devices = self.mgr.get('osd_map_crush')['devices'] | |
797 | for dev in devices: | |
798 | osd_id = str(dev['id']) | |
799 | osd_map[osd_id] = dict( | |
800 | deviceclass=dev.get('class'), | |
801 | capacity=0, | |
802 | hostname=service_map['osd', osd_id] | |
803 | ) | |
804 | ||
805 | for osd_stat in stats['osd_stats']: | |
806 | osd_id = str(osd_stat.get('osd')) | |
807 | osd_map[osd_id]['capacity'] = osd_stat['statfs']['total'] | |
808 | ||
809 | return osd_map | |
810 | ||
811 | def push_events(self, changes): | |
812 | """Add config change to the global queue to generate an event in kubernetes""" | |
813 | log.debug("{} events will be generated") | |
814 | for change in changes: | |
815 | event_queue.put(change) | |
816 | ||
817 | def _generate_config_logentry(self, msg): | |
818 | return LogEntry( | |
819 | source="config", | |
820 | msg_type="config", | |
821 | msg=msg, | |
822 | level='INF', | |
823 | tstamp=None | |
824 | ) | |
825 | ||
826 | def _check_hosts(self, server_map): | |
827 | log.debug("K8sevents checking host membership") | |
828 | changes = list() | |
829 | servers = ListDiff(self.server_map.keys(), server_map.keys()) | |
830 | if servers.is_equal: | |
831 | # no hosts have been added or removed | |
832 | pass | |
833 | else: | |
834 | # host changes detected, find out what | |
835 | host_msg = "Host '{}' has been {} the cluster" | |
836 | for new_server in servers.added: | |
837 | changes.append(self._generate_config_logentry( | |
838 | msg=host_msg.format(new_server, 'added to')) | |
839 | ) | |
840 | ||
841 | for removed_server in servers.removed: | |
842 | changes.append(self._generate_config_logentry( | |
843 | msg=host_msg.format(removed_server, 'removed from')) | |
844 | ) | |
845 | ||
846 | return changes | |
847 | ||
848 | def _check_osds(self,server_map, osd_map): | |
849 | log.debug("K8sevents checking OSD configuration") | |
850 | changes = list() | |
851 | before_osds = list() | |
852 | for svr in self.server_map: | |
853 | before_osds.extend(self.server_map[svr].get('osd',[])) | |
854 | ||
855 | after_osds = list() | |
856 | for svr in server_map: | |
857 | after_osds.extend(server_map[svr].get('osd',[])) | |
858 | ||
859 | if set(before_osds) == set(after_osds): | |
860 | # no change in osd id's | |
861 | pass | |
862 | else: | |
863 | # osd changes detected | |
864 | osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}" | |
865 | ||
866 | osds = ListDiff(before_osds, after_osds) | |
867 | for new_osd in osds.added: | |
868 | changes.append(self._generate_config_logentry( | |
869 | msg=osd_msg.format( | |
870 | new_osd, | |
871 | osd_map[new_osd]['deviceclass'], | |
872 | MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']), | |
873 | 'added to', | |
874 | osd_map[new_osd]['hostname'])) | |
875 | ) | |
876 | ||
877 | for removed_osd in osds.removed: | |
878 | changes.append(self._generate_config_logentry( | |
879 | msg=osd_msg.format( | |
880 | removed_osd, | |
881 | osd_map[removed_osd]['deviceclass'], | |
882 | MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']), | |
883 | 'removed from', | |
884 | osd_map[removed_osd]['hostname'])) | |
885 | ) | |
886 | ||
887 | return changes | |
888 | ||
889 | def _check_pools(self, pool_map): | |
890 | changes = list() | |
891 | log.debug("K8sevents checking pool configurations") | |
892 | if self.pool_map.keys() == pool_map.keys(): | |
893 | # no pools added/removed | |
894 | pass | |
895 | else: | |
896 | # Pool changes | |
897 | pools = ListDiff(self.pool_map.keys(), pool_map.keys()) | |
898 | pool_msg = "Pool '{}' has been {} the cluster" | |
899 | for new_pool in pools.added: | |
900 | changes.append(self._generate_config_logentry( | |
901 | msg=pool_msg.format(new_pool, 'added to')) | |
902 | ) | |
903 | ||
904 | for removed_pool in pools.removed: | |
905 | changes.append(self._generate_config_logentry( | |
906 | msg=pool_msg.format(removed_pool, 'removed from')) | |
907 | ) | |
908 | ||
909 | # check pool configuration changes | |
910 | for pool_name in pool_map: | |
911 | if not self.pool_map.get(pool_name, dict()): | |
912 | # pool didn't exist before so just skip the checks | |
913 | continue | |
914 | ||
915 | if pool_map[pool_name] == self.pool_map[pool_name]: | |
916 | # no changes - dicts match in key and value | |
917 | continue | |
918 | else: | |
919 | # determine the change and add it to the change list | |
920 | size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size'] | |
921 | if size_diff != 0: | |
922 | if size_diff < 0: | |
923 | msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name, | |
924 | pool_map[pool_name]['size']) | |
925 | level = 'WRN' | |
926 | else: | |
927 | msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name, | |
928 | pool_map[pool_name]['size']) | |
929 | level = 'INF' | |
930 | ||
931 | changes.append(LogEntry(source="config", | |
932 | msg_type="config", | |
933 | msg=msg, | |
934 | level=level, | |
935 | tstamp=None) | |
936 | ) | |
937 | ||
938 | if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']: | |
939 | changes.append(LogEntry(source="config", | |
940 | msg_type="config", | |
941 | msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name), | |
942 | level='WRN', | |
943 | tstamp=None) | |
944 | ) | |
945 | ||
946 | return changes | |
947 | ||
948 | def get_changes(self, server_map, osd_map, pool_map): | |
949 | """Detect changes in maps between current observation and the last""" | |
950 | ||
951 | changes = list() | |
952 | ||
953 | changes.extend(self._check_hosts(server_map)) | |
954 | changes.extend(self._check_osds(server_map, osd_map)) | |
955 | changes.extend(self._check_pools(pool_map)) | |
956 | ||
957 | # FUTURE | |
958 | # Could generate an event if a ceph daemon has moved hosts | |
959 | # (assumes the ceph metadata host information is valid though!) | |
960 | ||
961 | return changes | |
962 | ||
963 | def run(self): | |
964 | log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs)) | |
965 | ||
966 | self.server_map, self.service_map = self.fetch_servers() | |
967 | self.pool_map = self.fetch_pools() | |
968 | ||
969 | self.osd_map = self.fetch_osd_map(self.service_map) | |
970 | ||
971 | while True: | |
972 | ||
973 | try: | |
974 | start_time = time.time() | |
975 | server_map, service_map = self.fetch_servers() | |
976 | pool_map = self.fetch_pools() | |
977 | osd_map = self.fetch_osd_map(service_map) | |
978 | ||
979 | changes = self.get_changes(server_map, osd_map, pool_map) | |
980 | if changes: | |
981 | self.push_events(changes) | |
982 | ||
983 | self.osd_map = osd_map | |
984 | self.pool_map = pool_map | |
985 | self.server_map = server_map | |
986 | self.service_map = service_map | |
987 | ||
988 | checks_duration = int(time.time() - start_time) | |
989 | ||
990 | # check that the time it took to run the checks fits within the | |
991 | # interval, and if not extend the interval and emit a log message | |
992 | # to show that the runtime for the checks exceeded the desired | |
993 | # interval | |
994 | if checks_duration > self.config_check_secs: | |
995 | new_interval = self.config_check_secs * 2 | |
996 | log.warning("K8sevents check interval warning. " | |
997 | "Current checks took {}s, interval was {}s. " | |
998 | "Increasing interval to {}s".format(int(checks_duration), | |
999 | self.config_check_secs, | |
1000 | new_interval)) | |
1001 | self.config_check_secs = new_interval | |
1002 | ||
1003 | time.sleep(self.config_check_secs) | |
1004 | ||
1005 | except Exception: | |
1006 | self.health = "{} Exception at {}".format( | |
1007 | sys.exc_info()[0].__name__, | |
1008 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
1009 | ) | |
1010 | log.exception(self.health) | |
1011 | break | |
1012 | ||
1013 | log.warning("Ceph configuration watcher stopped") | |
1014 | ||
1015 | ||
1016 | class Module(MgrModule): | |
1017 | COMMANDS = [ | |
1018 | { | |
1019 | "cmd": "k8sevents status", | |
1020 | "desc": "Show the status of the data gathering threads", | |
1021 | "perm": "r" | |
1022 | }, | |
1023 | { | |
1024 | "cmd": "k8sevents ls", | |
1025 | "desc": "List all current Kuberenetes events from the Ceph namespace", | |
1026 | "perm": "r" | |
1027 | }, | |
1028 | { | |
1029 | "cmd": "k8sevents ceph", | |
1030 | "desc": "List Ceph events tracked & sent to the kubernetes cluster", | |
1031 | "perm": "r" | |
1032 | }, | |
1033 | { | |
1034 | "cmd": "k8sevents set-access name=key,type=CephString", | |
1035 | "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt", | |
1036 | "perm": "rw" | |
1037 | }, | |
1038 | { | |
1039 | "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString", | |
1040 | "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433", | |
1041 | "perm": "rw" | |
1042 | }, | |
1043 | { | |
1044 | "cmd": "k8sevents clear-config", | |
1045 | "desc": "Clear external kubernetes configuration settings", | |
1046 | "perm": "rw" | |
1047 | }, | |
1048 | ] | |
1049 | MODULE_OPTIONS = [ | |
1050 | {'name': 'config_check_secs', | |
1051 | 'type': 'int', | |
1052 | 'default': 10, | |
1053 | 'min': 10, | |
1054 | 'desc': "interval (secs) to check for cluster configuration changes"}, | |
1055 | {'name': 'ceph_event_retention_days', | |
1056 | 'type': 'int', | |
1057 | 'default': 7, | |
1058 | 'desc': "Days to hold ceph event information within local cache"} | |
1059 | ] | |
1060 | ||
1061 | def __init__(self, *args, **kwargs): | |
1062 | self.run = True | |
1063 | self.kubernetes_control = 'POD_NAME' in os.environ | |
1064 | self.event_processor = None | |
1065 | self.config_watcher = None | |
1066 | self.ns_watcher = None | |
1067 | self.trackers = list() | |
1068 | self.error_msg = None | |
1069 | self._api_client_config = None | |
1070 | self._namespace = None | |
1071 | ||
1072 | # Declare the module options we accept | |
1073 | self.config_check_secs = None | |
1074 | self.ceph_event_retention_days = None | |
1075 | ||
1076 | self.k8s_config = dict( | |
1077 | cacrt = None, | |
1078 | token = None, | |
1079 | server = None, | |
1080 | namespace = None | |
1081 | ) | |
1082 | ||
1083 | super(Module, self).__init__(*args, **kwargs) | |
1084 | ||
1085 | def k8s_ready(self): | |
1086 | """Validate the k8s_config dict | |
1087 | ||
1088 | Returns: | |
1089 | - bool .... indicating whether the config is ready to use | |
1090 | - string .. variables that need to be defined before the module will function | |
1091 | ||
1092 | """ | |
1093 | missing = list() | |
1094 | ready = True | |
1095 | for k in self.k8s_config: | |
1096 | if not self.k8s_config[k]: | |
1097 | missing.append(k) | |
1098 | ready = False | |
1099 | return ready, missing | |
1100 | ||
1101 | def config_notify(self): | |
1102 | """Apply runtime module options, and defaults from the modules KV store""" | |
1103 | self.log.debug("applying runtime module option settings") | |
1104 | for opt in self.MODULE_OPTIONS: | |
1105 | setattr(self, | |
1106 | opt['name'], | |
9f95a23c | 1107 | self.get_module_option(opt['name'])) |
eafe8130 TL |
1108 | |
1109 | if not self.kubernetes_control: | |
1110 | # Populate the config | |
1111 | self.log.debug("loading config from KV store") | |
1112 | for k in self.k8s_config: | |
1113 | self.k8s_config[k] = self.get_store(k, default=None) | |
1114 | ||
1115 | def fetch_events(self, limit=None): | |
1116 | """Interface to expose current events to another mgr module""" | |
1117 | # FUTURE: Implement this to provide k8s events to the dashboard? | |
1118 | raise NotImplementedError | |
1119 | ||
1120 | def process_clog(self, log_message): | |
1121 | """Add log message to the event queue | |
1122 | ||
1123 | :param log_message: dict from the cluster log (audit/cluster channels) | |
1124 | """ | |
1125 | required_fields = ['channel', 'message', 'priority', 'stamp'] | |
1126 | _message_attrs = log_message.keys() | |
1127 | if all(_field in _message_attrs for _field in required_fields): | |
1128 | self.log.debug("clog entry received - adding to the queue") | |
1129 | if log_message.get('message').startswith('overall HEALTH'): | |
1130 | m_type = 'heartbeat' | |
1131 | else: | |
1132 | m_type = log_message.get('channel') | |
1133 | ||
1134 | event_queue.put( | |
1135 | LogEntry( | |
1136 | source='log', | |
1137 | msg_type=m_type, | |
1138 | msg=log_message.get('message'), | |
1139 | level=log_message.get('priority')[1:-1], | |
1140 | tstamp=log_message.get('stamp') | |
1141 | ) | |
1142 | ) | |
1143 | ||
1144 | else: | |
1145 | self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message)) | |
1146 | ||
1147 | def notify(self, notify_type, notify_id): | |
1148 | """ | |
1149 | Called by the ceph-mgr service to notify the Python plugin | |
1150 | that new state is available. | |
1151 | ||
1152 | :param notify_type: string indicating what kind of notification, | |
1153 | such as osd_map, mon_map, fs_map, mon_status, | |
1154 | health, pg_summary, command, service_map | |
1155 | :param notify_id: string (may be empty) that optionally specifies | |
1156 | which entity is being notified about. With | |
1157 | "command" notifications this is set to the tag | |
1158 | ``from send_command``. | |
1159 | """ | |
1160 | ||
1161 | # only interested in cluster log (clog) messages for now | |
1162 | if notify_type == 'clog': | |
1163 | self.log.debug("received a clog entry from mgr.notify") | |
1164 | if isinstance(notify_id, dict): | |
1165 | # create a log object to process | |
1166 | self.process_clog(notify_id) | |
1167 | else: | |
1168 | self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type))) | |
1169 | ||
1170 | def _show_events(self, events): | |
1171 | ||
1172 | max_msg_length = max([len(events[k].message) for k in events]) | |
1173 | fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n" | |
1174 | s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name") | |
1175 | ||
1176 | for event_name in sorted(events, | |
1177 | key = lambda name: events[name].last_timestamp, | |
1178 | reverse=True): | |
1179 | ||
1180 | event = events[event_name] | |
1181 | ||
1182 | s += fmt.format( | |
1183 | datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"), | |
f6b5b4d7 TL |
1184 | str(event.type), |
1185 | str(event.count), | |
1186 | str(event.message), | |
1187 | str(event_name) | |
eafe8130 TL |
1188 | ) |
1189 | s += "Total : {:>3}\n".format(len(events)) | |
1190 | return s | |
1191 | ||
1192 | def show_events(self, events): | |
1193 | """Show events we're holding from the ceph namespace - most recent 1st""" | |
1194 | ||
1195 | if len(events): | |
1196 | return 0, "", self._show_events(events) | |
1197 | else: | |
1198 | return 0, "", "No events emitted yet, local cache is empty" | |
1199 | ||
1200 | def show_status(self): | |
1201 | s = "Kubernetes\n" | |
1202 | s += "- Hostname : {}\n".format(self.k8s_config['server']) | |
1203 | s += "- Namespace : {}\n".format(self._namespace) | |
1204 | s += "Tracker Health\n" | |
1205 | for t in self.trackers: | |
1206 | s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health) | |
1207 | s += "Tracked Events\n" | |
1208 | s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events)) | |
1209 | s += "- ceph events : {:>3}\n".format(len(self.event_processor.events)) | |
1210 | return 0, "", s | |
1211 | ||
1212 | def _valid_server(self, server): | |
1213 | # must be a valid server url format | |
1214 | server = server.strip() | |
1215 | ||
1216 | res = urlparse(server) | |
1217 | port = res.netloc.split(":")[-1] | |
1218 | ||
1219 | if res.scheme != 'https': | |
1220 | return False, "Server URL must use https" | |
1221 | ||
1222 | elif not res.hostname: | |
1223 | return False, "Invalid server URL format" | |
1224 | ||
1225 | elif res.hostname: | |
1226 | try: | |
1227 | socket.gethostbyname(res.hostname) | |
1228 | except socket.gaierror: | |
1229 | return False, "Unresolvable server URL" | |
1230 | ||
1231 | if not port.isdigit(): | |
1232 | return False, "Server URL must end in a port number" | |
1233 | ||
1234 | return True, "" | |
1235 | ||
1236 | def _valid_cacrt(self, cacrt_data): | |
1237 | """use mgr_util.verify_cacrt to validate the CA file""" | |
1238 | ||
1239 | cacrt_fname = create_temp_file("ca_file", cacrt_data) | |
1240 | ||
1241 | try: | |
1242 | verify_cacrt(cacrt_fname) | |
1243 | except ServerConfigException as e: | |
1244 | return False, "Invalid CA certificate: {}".format(str(e)) | |
1245 | else: | |
1246 | return True, "" | |
1247 | ||
1248 | def _valid_token(self, token_data): | |
1249 | """basic checks on the token""" | |
1250 | if not token_data: | |
1251 | return False, "Token file is empty" | |
1252 | ||
1253 | pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$") | |
1254 | if not pattern.match(token_data): | |
1255 | return False, "Token contains invalid characters" | |
1256 | ||
1257 | return True, "" | |
1258 | ||
1259 | def _valid_namespace(self, namespace): | |
1260 | # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols | |
1261 | ||
1262 | if len(namespace) > 253: | |
1263 | return False, "Name too long" | |
1264 | if namespace.isdigit(): | |
1265 | return False, "Invalid name - must be alphanumeric" | |
1266 | ||
1267 | pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$") | |
1268 | if not pattern.match(namespace): | |
1269 | return False, "Invalid characters in the name" | |
1270 | ||
1271 | return True, "" | |
1272 | ||
1273 | def _config_set(self, key, val): | |
1274 | """Attempt to validate the content, then save to KV store""" | |
1275 | ||
1276 | val = val.rstrip() # remove any trailing whitespace/newline | |
1277 | ||
1278 | try: | |
1279 | checker = getattr(self, "_valid_" + key) | |
1280 | except AttributeError: | |
1281 | # no checker available, just let it pass | |
1282 | self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key)) | |
1283 | valid = True | |
1284 | else: | |
1285 | valid, reason = checker(val) | |
1286 | ||
1287 | if valid: | |
1288 | self.set_store(key, val) | |
1289 | self.log.info("Updated config KV Store item: " + key) | |
1290 | return 0, "", "Config updated for parameter '{}'".format(key) | |
1291 | else: | |
1292 | return -22, "", "Invalid value for '{}' :{}".format(key, reason) | |
1293 | ||
1294 | def clear_config_settings(self): | |
1295 | for k in self.k8s_config: | |
1296 | self.set_store(k, None) | |
1297 | return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys())) | |
1298 | ||
1299 | def handle_command(self, inbuf, cmd): | |
1300 | ||
1301 | access_options = ['cacrt', 'token'] | |
1302 | config_options = ['server', 'namespace'] | |
1303 | ||
1304 | if cmd['prefix'] == 'k8sevents clear-config': | |
1305 | return self.clear_config_settings() | |
1306 | ||
1307 | if cmd['prefix'] == 'k8sevents set-access': | |
1308 | if cmd['key'] not in access_options: | |
1309 | return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options)) | |
1310 | ||
1311 | if inbuf: | |
1312 | return self._config_set(cmd['key'], inbuf) | |
1313 | else: | |
1314 | return -errno.EINVAL, "", "Command must specify -i <filename>" | |
1315 | ||
1316 | if cmd['prefix'] == 'k8sevents set-config': | |
1317 | ||
1318 | if cmd['key'] not in config_options: | |
1319 | return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options)) | |
1320 | ||
1321 | return self._config_set(cmd['key'], cmd['value']) | |
1322 | ||
1323 | # At this point the command is trying to interact with k8sevents, so intercept if the configuration is | |
1324 | # not ready | |
1325 | if self.error_msg: | |
1326 | _msg = "k8sevents unavailable: " + self.error_msg | |
1327 | ready, _ = self.k8s_ready() | |
1328 | if not self.kubernetes_control and not ready: | |
1329 | _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect" | |
1330 | return -errno.ENODATA, "", _msg | |
1331 | ||
1332 | if cmd["prefix"] == "k8sevents status": | |
1333 | return self.show_status() | |
1334 | ||
1335 | elif cmd["prefix"] == "k8sevents ls": | |
1336 | return self.show_events(self.ns_watcher.events) | |
1337 | ||
1338 | elif cmd["prefix"] == "k8sevents ceph": | |
1339 | return self.show_events(self.event_processor.events) | |
1340 | ||
1341 | else: | |
1342 | raise NotImplementedError(cmd["prefix"]) | |
1343 | ||
1344 | @staticmethod | |
1345 | def can_run(): | |
1346 | """Determine whether the pre-reqs for the module are in place""" | |
1347 | ||
1348 | if not kubernetes_imported: | |
1349 | return False, "kubernetes python client is not available" | |
1350 | return True, "" | |
1351 | ||
1352 | def load_kubernetes_config(self): | |
1353 | """Load configuration for remote kubernetes API using KV store values | |
1354 | ||
1355 | Attempt to create an API client configuration from settings stored in | |
1356 | KV store. | |
1357 | ||
1358 | Returns: | |
1359 | client.ApiClient: kubernetes API client object | |
1360 | ||
1361 | Raises: | |
1362 | OSError: unable to create the cacrt file | |
1363 | """ | |
1364 | ||
1365 | # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a | |
1366 | # temporary file containing the cert for the client to load from | |
1367 | try: | |
1368 | ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt']) | |
1369 | except OSError as e: | |
1370 | self.log.error("Unable to create file to hold cacrt: {}".format(str(e))) | |
1371 | raise OSError(str(e)) | |
1372 | else: | |
1373 | self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file)) | |
1374 | ||
1375 | configuration = client.Configuration() | |
1376 | configuration.host = self.k8s_config['server'] | |
1377 | configuration.ssl_ca_cert = ca_crt_file | |
1378 | configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] } | |
1379 | api_client = client.ApiClient(configuration) | |
1380 | self.log.info("API client created for remote kubernetes access using cacrt and token from KV store") | |
1381 | ||
1382 | return api_client | |
1383 | ||
1384 | def serve(self): | |
1385 | # apply options set by CLI to this module | |
1386 | self.config_notify() | |
1387 | ||
1388 | if not kubernetes_imported: | |
1389 | self.error_msg = "Unable to start : python kubernetes package is missing" | |
1390 | else: | |
1391 | if self.kubernetes_control: | |
1392 | # running under rook-ceph | |
1393 | config.load_incluster_config() | |
1394 | self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'), | |
1395 | os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN')) | |
1396 | self._api_client_config = None | |
1397 | self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph") | |
1398 | else: | |
1399 | # running outside of rook-ceph, so we need additional settings to tell us | |
1400 | # how to connect to the kubernetes cluster | |
1401 | ready, errors = self.k8s_ready() | |
1402 | if not ready: | |
1403 | self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors)) | |
1404 | else: | |
1405 | try: | |
1406 | self._api_client_config = self.load_kubernetes_config() | |
1407 | except OSError as e: | |
1408 | self.error_msg = str(e) | |
1409 | else: | |
1410 | self._namespace = self.k8s_config['namespace'] | |
1411 | self.log.info("k8sevents configuration loaded from KV store") | |
1412 | ||
1413 | if self.error_msg: | |
1414 | self.log.error(self.error_msg) | |
1415 | return | |
1416 | ||
1417 | # All checks have passed | |
1418 | self.config_watcher = CephConfigWatcher(self) | |
1419 | ||
1420 | self.event_processor = EventProcessor(self.config_watcher, | |
1421 | self.ceph_event_retention_days, | |
1422 | self._api_client_config, | |
1423 | self._namespace) | |
1424 | ||
1425 | self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config, | |
1426 | namespace=self._namespace) | |
1427 | ||
1428 | if self.event_processor.ok: | |
1429 | log.info("Ceph Log processor thread starting") | |
1430 | self.event_processor.start() # start log consumer thread | |
1431 | log.info("Ceph config watcher thread starting") | |
1432 | self.config_watcher.start() | |
1433 | log.info("Rook-ceph namespace events watcher starting") | |
1434 | self.ns_watcher.start() | |
1435 | ||
1436 | self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher]) | |
1437 | ||
1438 | while True: | |
1439 | # stay alive | |
1440 | time.sleep(1) | |
1441 | ||
1442 | trackers = self.trackers | |
1443 | for t in trackers: | |
1444 | if not t.is_alive() and not t.reported: | |
1445 | log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health)) | |
1446 | t.reported = True | |
1447 | ||
1448 | else: | |
1449 | self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?" | |
1450 | log.warning(self.error_msg) | |
1451 | log.warning("k8sevents module exiting") | |
1452 | self.run = False | |
1453 | ||
1454 | def shutdown(self): | |
1455 | self.run = False | |
1456 | log.info("Shutting down k8sevents module") | |
1457 | self.event_processor.can_run = False | |
1458 | ||
1459 | if self._rados: | |
1460 | self._rados.shutdown() |