]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | # Integrate with the kubernetes events API. |
2 | # This module sends events to Kubernetes, and also captures/tracks all events | |
3 | # in the rook-ceph namespace so kubernetes activity like pod restarts, | |
4 | # imagepulls etc can be seen from within the ceph cluster itself. | |
5 | # | |
6 | # To interact with the events API, the mgr service to access needs to be | |
7 | # granted additional permissions | |
8 | # e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules | |
9 | # | |
10 | # These are the changes needed; | |
11 | # - apiGroups: | |
12 | # - "" | |
13 | # resources: | |
14 | # - events | |
15 | # verbs: | |
16 | # - create | |
17 | # - patch | |
18 | # - list | |
19 | # - get | |
20 | # - watch | |
21 | ||
22 | ||
23 | import os | |
24 | import re | |
25 | import sys | |
26 | import time | |
27 | import json | |
28 | import yaml | |
29 | import errno | |
30 | import socket | |
31 | import base64 | |
32 | import logging | |
33 | import tempfile | |
34 | import threading | |
35 | ||
36 | try: | |
37 | # python 3 | |
38 | from urllib.parse import urlparse | |
39 | except ImportError: | |
40 | # python 2 fallback | |
41 | from urlparse import urlparse | |
42 | ||
43 | from datetime import tzinfo, datetime, timedelta | |
44 | ||
45 | from urllib3.exceptions import MaxRetryError,ProtocolError | |
46 | from collections import OrderedDict | |
47 | ||
48 | import rados | |
49 | from mgr_module import MgrModule | |
50 | from mgr_util import verify_cacrt, ServerConfigException | |
51 | ||
52 | try: | |
53 | import queue | |
54 | except ImportError: | |
55 | # python 2.7.5 | |
56 | import Queue as queue | |
57 | finally: | |
58 | # python 2.7.15 or python3 | |
59 | event_queue = queue.Queue() | |
60 | ||
61 | try: | |
62 | from kubernetes import client, config, watch | |
63 | from kubernetes.client.rest import ApiException | |
64 | except ImportError: | |
65 | kubernetes_imported = False | |
66 | client = None | |
67 | config = None | |
68 | watch = None | |
69 | else: | |
70 | kubernetes_imported = True | |
71 | ||
72 | # The watch.Watch.stream method can provide event objects that have involved_object = None | |
73 | # which causes an exception in the generator. A workaround is discussed for a similar issue | |
74 | # in https://github.com/kubernetes-client/python/issues/376 which has been used here | |
75 | # pylint: disable=no-member | |
76 | from kubernetes.client.models.v1_event import V1Event | |
77 | def local_involved_object(self, involved_object): | |
78 | if involved_object is None: | |
79 | involved_object = client.V1ObjectReference(api_version="1") | |
80 | self._involved_object = involved_object | |
81 | V1Event.involved_object = V1Event.involved_object.setter(local_involved_object) | |
82 | ||
83 | log = logging.getLogger(__name__) | |
84 | ||
85 | # use a simple local class to represent UTC | |
86 | # datetime pkg modules vary between python2 and 3 and pytz is not available on older | |
87 | # ceph container images, so taking a pragmatic approach! | |
88 | class UTC(tzinfo): | |
89 | def utcoffset(self, dt): | |
90 | return timedelta(0) | |
91 | ||
92 | def tzname(self, dt): | |
93 | return "UTC" | |
94 | ||
95 | def dst(self, dt): | |
96 | return timedelta(0) | |
97 | ||
98 | ||
99 | def text_suffix(num): | |
100 | """Define a text suffix based on a value i.e. turn host into hosts""" | |
101 | return '' if num == 1 else 's' | |
102 | ||
103 | ||
104 | def create_temp_file(fname, content, suffix=".tmp"): | |
105 | """Create a temp file | |
106 | ||
107 | Attempt to create an temporary file containing the given content | |
108 | ||
109 | Returns: | |
110 | str .. full path to the temporary file | |
111 | ||
112 | Raises: | |
113 | OSError: problems creating the file | |
114 | ||
115 | """ | |
116 | ||
117 | if content is not None: | |
118 | file_name = os.path.join(tempfile.gettempdir(), fname + suffix) | |
119 | ||
120 | try: | |
121 | with open(file_name, "w") as f: | |
122 | f.write(content) | |
123 | except OSError as e: | |
124 | raise OSError("Unable to create temporary file : {}".format(str(e))) | |
125 | ||
126 | return file_name | |
127 | ||
128 | ||
129 | class HealthCheck(object): | |
130 | """Transform a healthcheck msg into it's component parts""" | |
131 | ||
132 | def __init__(self, msg, msg_level): | |
133 | ||
134 | # msg looks like | |
135 | # | |
136 | # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY) | |
137 | # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set) | |
138 | # Health check failed: nodown flag(s) set (OSDMAP_FLAGS) | |
139 | # | |
140 | self.msg = None | |
141 | self.name = None | |
142 | self.text = None | |
143 | self.valid = False | |
144 | ||
145 | if msg.lower().startswith('health check'): | |
146 | ||
147 | self.valid = True | |
148 | self.msg = msg | |
149 | msg_tokens = self.msg.split() | |
150 | ||
151 | if msg_level == 'INF': | |
152 | self.text = ' '.join(msg_tokens[3:]) | |
153 | self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS | |
154 | else: # WRN or ERR | |
155 | self.text = ' '.join(msg_tokens[3:-1]) | |
156 | self.name = msg_tokens[-1][1:-1] | |
157 | ||
158 | ||
159 | class LogEntry(object): | |
160 | """Generic 'log' object""" | |
161 | ||
162 | reason_map = { | |
163 | "audit": "Audit", | |
164 | "cluster": "HealthCheck", | |
165 | "config": "ClusterChange", | |
166 | "heartbeat":"Heartbeat", | |
167 | "startup": "Started" | |
168 | } | |
169 | ||
170 | def __init__(self, source, msg, msg_type, level, tstamp=None): | |
171 | ||
172 | self.source = source | |
173 | self.msg = msg | |
174 | self.msg_type = msg_type | |
175 | self.level = level | |
176 | self.tstamp = tstamp | |
177 | self.healthcheck = None | |
178 | ||
179 | if 'health check ' in self.msg.lower(): | |
180 | self.healthcheck = HealthCheck(self.msg, self.level) | |
181 | ||
182 | ||
183 | def __str__(self): | |
184 | return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source, | |
185 | self.msg_type, | |
186 | self.msg, | |
187 | self.level, | |
188 | self.tstamp) | |
189 | ||
190 | @property | |
191 | def cmd(self): | |
192 | """Look at the msg string and extract the command content""" | |
193 | ||
194 | # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished' | |
195 | if self.msg_type != 'audit': | |
196 | return None | |
197 | else: | |
198 | _m=self.msg[:-10].replace("\'","").split("cmd=") | |
199 | _s='"cmd":{}'.format(_m[1]) | |
200 | cmds_list = json.loads('{' + _s + '}')['cmd'] | |
201 | ||
202 | # TODO. Assuming only one command was issued for now | |
203 | _c = cmds_list[0] | |
204 | return "{} {}".format(_c['prefix'], _c.get('key', '')) | |
205 | ||
206 | @property | |
207 | def event_type(self): | |
208 | return 'Normal' if self.level == 'INF' else 'Warning' | |
209 | ||
210 | @property | |
211 | def event_reason(self): | |
212 | return self.reason_map[self.msg_type] | |
213 | ||
214 | @property | |
215 | def event_name(self): | |
216 | if self.msg_type == 'heartbeat': | |
217 | return 'mgr.Heartbeat' | |
218 | elif self.healthcheck: | |
219 | return 'mgr.health.{}'.format(self.healthcheck.name) | |
220 | elif self.msg_type == 'audit': | |
221 | return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_') | |
222 | elif self.msg_type == 'config': | |
223 | return 'mgr.ConfigurationChange' | |
224 | elif self.msg_type == 'startup': | |
225 | return "mgr.k8sevents-module" | |
226 | else: | |
227 | return None | |
228 | ||
229 | @property | |
230 | def event_entity(self): | |
231 | if self.msg_type == 'audit': | |
232 | return self.msg.replace("\'","").split('entity=')[1].split(' ')[0] | |
233 | else: | |
234 | return None | |
235 | ||
236 | @property | |
237 | def event_msg(self): | |
238 | if self.msg_type == 'audit': | |
239 | return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd) | |
240 | ||
241 | elif self.healthcheck: | |
242 | return self.healthcheck.text | |
243 | else: | |
244 | return self.msg | |
245 | ||
246 | ||
247 | class BaseThread(threading.Thread): | |
248 | health = 'OK' | |
249 | reported = False | |
250 | daemon = True | |
251 | ||
252 | ||
253 | class NamespaceWatcher(BaseThread): | |
254 | """Watch events in a given namespace | |
255 | ||
256 | Using the watch package we can listen to event traffic in the namespace to | |
257 | get an idea of what kubernetes related events surround the ceph cluster. The | |
258 | thing to bear in mind is that events have a TTL enforced by the kube-apiserver | |
259 | so this stream will only really show activity inside this retention window. | |
260 | """ | |
261 | ||
262 | def __init__(self, api_client_config, namespace=None): | |
263 | super(NamespaceWatcher, self).__init__() | |
264 | ||
265 | if api_client_config: | |
266 | self.api = client.CoreV1Api(api_client_config) | |
267 | else: | |
268 | self.api = client.CoreV1Api() | |
269 | ||
270 | self.namespace = namespace | |
271 | ||
272 | self.events = OrderedDict() | |
273 | self.lock = threading.Lock() | |
274 | self.active = None | |
275 | self.resource_version = None | |
276 | ||
277 | def fetch(self): | |
278 | # clear the cache on every call to fetch | |
279 | self.events.clear() | |
280 | try: | |
281 | resp = self.api.list_namespaced_event(self.namespace) | |
282 | # TODO - Perhaps test for auth problem to be more specific in the except clause? | |
283 | except: | |
284 | self.active = False | |
285 | self.health = "Unable to access events API (list_namespaced_event call failed)" | |
286 | log.warning(self.health) | |
287 | else: | |
288 | self.active = True | |
289 | self.resource_version = resp.metadata.resource_version | |
290 | ||
291 | for item in resp.items: | |
292 | self.events[item.metadata.name] = item | |
293 | log.info('Added {} events'.format(len(resp.items))) | |
294 | ||
295 | def run(self): | |
296 | self.fetch() | |
297 | func = getattr(self.api, "list_namespaced_event") | |
298 | ||
299 | if self.active: | |
300 | log.info("Namespace event watcher started") | |
301 | ||
302 | ||
303 | while True: | |
304 | ||
305 | try: | |
306 | w = watch.Watch() | |
307 | # execute generator to continually watch resource for changes | |
308 | for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True): | |
309 | obj = item['object'] | |
310 | ||
311 | with self.lock: | |
312 | ||
313 | if item['type'] in ['ADDED', 'MODIFIED']: | |
314 | self.events[obj.metadata.name] = obj | |
315 | ||
316 | elif item['type'] == 'DELETED': | |
317 | del self.events[obj.metadata.name] | |
318 | ||
319 | # TODO test the exception for auth problem (403?) | |
320 | ||
321 | # Attribute error is generated when urllib3 on the system is old and doesn't have a | |
322 | # read_chunked method | |
323 | except AttributeError as e: | |
324 | self.health = ("Error: Unable to 'watch' events API in namespace '{}' - " | |
325 | "urllib3 too old? ({})".format(self.namespace, e)) | |
326 | self.active = False | |
327 | log.warning(self.health) | |
328 | break | |
329 | ||
330 | except ApiException as e: | |
331 | # refresh the resource_version & watcher | |
332 | log.warning("API exception caught in watcher ({})".format(e)) | |
333 | log.warning("Restarting namespace watcher") | |
334 | self.fetch() | |
335 | ||
336 | except Exception: | |
337 | self.health = "{} Exception at {}".format( | |
338 | sys.exc_info()[0].__name__, | |
339 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
340 | ) | |
341 | log.exception(self.health) | |
342 | self.active = False | |
343 | break | |
344 | ||
345 | log.warning("Namespace event watcher stopped") | |
346 | ||
347 | ||
348 | class KubernetesEvent(object): | |
349 | ||
350 | def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None): | |
351 | ||
352 | if api_client_config: | |
353 | self.api = client.CoreV1Api(api_client_config) | |
354 | else: | |
355 | self.api = client.CoreV1Api() | |
356 | ||
357 | self.namespace = namespace | |
358 | ||
359 | self.event_name = log_entry.event_name | |
360 | self.message = log_entry.event_msg | |
361 | self.event_type = log_entry.event_type | |
362 | self.event_reason = log_entry.event_reason | |
363 | self.unique_name = unique_name | |
364 | ||
365 | self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN')) | |
366 | ||
367 | self.api_status = 200 | |
368 | self.count = 1 | |
369 | self.first_timestamp = None | |
370 | self.last_timestamp = None | |
371 | ||
372 | @property | |
373 | def type(self): | |
374 | """provide a type property matching a V1Event object""" | |
375 | return self.event_type | |
376 | ||
377 | @property | |
378 | def event_body(self): | |
379 | if self.unique_name: | |
380 | obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name)) | |
381 | else: | |
382 | obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name)) | |
383 | ||
384 | # field_path is needed to prevent problems in the namespacewatcher when | |
385 | # deleted event are received | |
386 | obj_ref = client.V1ObjectReference(kind="CephCluster", | |
387 | field_path='spec.containers{mgr}', | |
388 | name=self.event_name, | |
389 | namespace=self.namespace) | |
390 | ||
391 | event_source = client.V1EventSource(component="ceph-mgr", | |
392 | host=self.host) | |
393 | return client.V1Event( | |
394 | involved_object=obj_ref, | |
395 | metadata=obj_meta, | |
396 | message=self.message, | |
397 | count=self.count, | |
398 | type=self.event_type, | |
399 | reason=self.event_reason, | |
400 | source=event_source, | |
401 | first_timestamp=self.first_timestamp, | |
402 | last_timestamp=self.last_timestamp | |
403 | ) | |
404 | ||
405 | def write(self): | |
406 | ||
407 | now=datetime.now(UTC()) | |
408 | ||
409 | self.first_timestamp = now | |
410 | self.last_timestamp = now | |
411 | ||
412 | try: | |
413 | self.api.create_namespaced_event(self.namespace, self.event_body) | |
414 | except (OSError, ProtocolError): | |
415 | # unable to reach to the API server | |
416 | log.error("Unable to reach API server") | |
417 | self.api_status = 400 | |
418 | except MaxRetryError: | |
419 | # k8s config has not be defined properly | |
420 | log.error("multiple attempts to connect to the API have failed") | |
421 | self.api_status = 403 # Forbidden | |
422 | except ApiException as e: | |
423 | log.debug("event.write status:{}".format(e.status)) | |
424 | self.api_status = e.status | |
425 | if e.status == 409: | |
426 | log.debug("attempting event update for an existing event") | |
427 | # 409 means the event is there already, so read it back (v1Event object returned) | |
428 | # this could happen if the event has been created, and then the k8sevent module | |
429 | # disabled and reenabled - i.e. the internal event tracking no longer matches k8s | |
430 | response = self.api.read_namespaced_event(self.event_name, self.namespace) | |
431 | # | |
432 | # response looks like | |
433 | # | |
434 | # {'action': None, | |
435 | # 'api_version': 'v1', | |
436 | # 'count': 1, | |
437 | # 'event_time': None, | |
438 | # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
439 | # 'involved_object': {'api_version': None, | |
440 | # 'field_path': None, | |
441 | # 'kind': 'CephCluster', | |
442 | # 'name': 'ceph-mgr.k8sevent-module', | |
443 | # 'namespace': 'rook-ceph', | |
444 | # 'resource_version': None, | |
445 | # 'uid': None}, | |
446 | # 'kind': 'Event', | |
447 | # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
448 | # 'message': 'Ceph log -> event tracking started', | |
449 | # 'metadata': {'annotations': None, | |
450 | # 'cluster_name': None, | |
451 | # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), | |
452 | # 'deletion_grace_period_seconds': None, | |
453 | # 'deletion_timestamp': None, | |
454 | # 'finalizers': None, | |
455 | # 'generate_name': 'ceph-mgr.k8sevent-module', | |
456 | # 'generation': None, | |
457 | # 'initializers': None, | |
458 | # 'labels': None, | |
459 | # 'name': 'ceph-mgr.k8sevent-module5z7kq', | |
460 | # 'namespace': 'rook-ceph', | |
461 | # 'owner_references': None, | |
462 | # 'resource_version': '1195832', | |
463 | # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq', | |
464 | # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'}, | |
465 | # 'reason': 'Started', | |
466 | # 'related': None, | |
467 | # 'reporting_component': '', | |
468 | # 'reporting_instance': '', | |
469 | # 'series': None, | |
470 | # 'source': {'component': 'ceph-mgr', 'host': 'minikube'}, | |
471 | # 'type': 'Normal'} | |
472 | ||
473 | # conflict event already exists | |
474 | # read it | |
475 | # update : count and last_timestamp and msg | |
476 | ||
477 | self.count = response.count + 1 | |
478 | self.first_timestamp = response.first_timestamp | |
479 | try: | |
480 | self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) | |
481 | except ApiException as e: | |
482 | log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status)) | |
483 | self.api_status = e.status | |
484 | else: | |
485 | log.debug("event {} patched".format(self.event_name)) | |
486 | self.api_status = 200 | |
487 | ||
488 | else: | |
489 | log.debug("event {} created successfully".format(self.event_name)) | |
490 | self.api_status = 200 | |
491 | ||
492 | @property | |
493 | def api_success(self): | |
494 | return self.api_status == 200 | |
495 | ||
496 | def update(self, log_entry): | |
497 | self.message = log_entry.event_msg | |
498 | self.event_type = log_entry.event_type | |
499 | self.last_timestamp = datetime.now(UTC()) | |
500 | self.count += 1 | |
501 | log.debug("performing event update for {}".format(self.event_name)) | |
502 | ||
503 | try: | |
504 | self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) | |
505 | except ApiException as e: | |
506 | log.error("event patch call failed: {}".format(e.status)) | |
507 | if e.status == 404: | |
508 | # tried to patch, but hit a 404. The event's TTL must have been reached, and | |
509 | # pruned by the kube-apiserver | |
510 | log.debug("event not found, so attempting to create it") | |
511 | try: | |
512 | self.api.create_namespaced_event(self.namespace, self.event_body) | |
513 | except ApiException as e: | |
514 | log.error("unable to create the event: {}".format(e.status)) | |
515 | self.api_status = e.status | |
516 | else: | |
517 | log.debug("event {} created successfully".format(self.event_name)) | |
518 | self.api_status = 200 | |
519 | else: | |
520 | log.debug("event {} updated".format(self.event_name)) | |
521 | self.api_status = 200 | |
522 | ||
523 | ||
524 | class EventProcessor(BaseThread): | |
525 | """Handle a global queue used to track events we want to send/update to kubernetes""" | |
526 | ||
527 | can_run = True | |
528 | ||
529 | def __init__(self, config_watcher, event_retention_days, api_client_config, namespace): | |
530 | super(EventProcessor, self).__init__() | |
531 | ||
532 | self.events = dict() | |
533 | self.config_watcher = config_watcher | |
534 | self.event_retention_days = event_retention_days | |
535 | self.api_client_config = api_client_config | |
536 | self.namespace = namespace | |
537 | ||
538 | def startup(self): | |
539 | """Log an event to show we're active""" | |
540 | ||
541 | event = KubernetesEvent( | |
542 | LogEntry( | |
543 | source='self', | |
544 | msg='Ceph log -> event tracking started', | |
545 | msg_type='startup', | |
546 | level='INF', | |
547 | tstamp=None | |
548 | ), | |
549 | unique_name=False, | |
550 | api_client_config=self.api_client_config, | |
551 | namespace=self.namespace | |
552 | ) | |
553 | ||
554 | event.write() | |
555 | return event.api_success | |
556 | ||
557 | @property | |
558 | def ok(self): | |
559 | return self.startup() | |
560 | ||
561 | def prune_events(self): | |
562 | log.debug("prune_events - looking for old events to remove from cache") | |
563 | oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days) | |
564 | local_events = dict(self.events) | |
565 | ||
566 | for event_name in sorted(local_events, | |
567 | key = lambda name: local_events[name].last_timestamp): | |
568 | event = local_events[event_name] | |
569 | if event.last_timestamp >= oldest: | |
570 | break | |
571 | else: | |
572 | # drop this event | |
573 | log.debug("prune_events - removing old event : {}".format(event_name)) | |
574 | del self.events[event_name] | |
575 | ||
576 | def process(self, log_object): | |
577 | ||
578 | log.debug("log entry being processed : {}".format(str(log_object))) | |
579 | ||
580 | event_out = False | |
581 | unique_name = True | |
582 | ||
583 | if log_object.msg_type == 'audit': | |
584 | # audit traffic : operator commands | |
585 | if log_object.msg.endswith('finished'): | |
586 | log.debug("K8sevents received command finished msg") | |
587 | event_out = True | |
588 | else: | |
589 | # NO OP - ignoring 'dispatch' log records | |
590 | return | |
591 | ||
592 | elif log_object.msg_type == 'cluster': | |
593 | # cluster messages : health checks | |
594 | if log_object.event_name: | |
595 | event_out = True | |
596 | ||
597 | elif log_object.msg_type == 'config': | |
598 | # configuration checker messages | |
599 | event_out = True | |
600 | unique_name = False | |
601 | ||
602 | elif log_object.msg_type == 'heartbeat': | |
603 | # hourly health message summary from Ceph | |
604 | event_out = True | |
605 | unique_name = False | |
606 | log_object.msg = str(self.config_watcher) | |
607 | ||
608 | else: | |
609 | log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type)) | |
610 | ||
611 | if event_out: | |
612 | log.debug("k8sevents sending event to kubernetes") | |
613 | # we don't cache non-unique events like heartbeats or config changes | |
614 | if not unique_name or log_object.event_name not in self.events.keys(): | |
615 | event = KubernetesEvent(log_entry=log_object, | |
616 | unique_name=unique_name, | |
617 | api_client_config=self.api_client_config, | |
618 | namespace=self.namespace) | |
619 | event.write() | |
620 | log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status)) | |
621 | if event.api_success and unique_name: | |
622 | self.events[log_object.event_name] = event | |
623 | else: | |
624 | event = self.events[log_object.event_name] | |
625 | event.update(log_object) | |
626 | log.debug("event update ended : {}".format(event.api_status)) | |
627 | ||
628 | self.prune_events() | |
629 | ||
630 | else: | |
631 | log.debug("K8sevents ignored message : {}".format(log_object.msg)) | |
632 | ||
633 | def run(self): | |
634 | log.info("Ceph event processing thread started, " | |
635 | "event retention set to {} days".format(self.event_retention_days)) | |
636 | ||
637 | while True: | |
638 | ||
639 | try: | |
640 | log_object = event_queue.get(block=False) | |
641 | except queue.Empty: | |
642 | pass | |
643 | else: | |
644 | try: | |
645 | self.process(log_object) | |
646 | except Exception: | |
647 | self.health = "{} Exception at {}".format( | |
648 | sys.exc_info()[0].__name__, | |
649 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
650 | ) | |
651 | log.exception(self.health) | |
652 | break | |
653 | ||
654 | if not self.can_run: | |
655 | break | |
656 | ||
657 | time.sleep(0.5) | |
658 | ||
659 | log.warning("Ceph event processing thread stopped") | |
660 | ||
661 | ||
662 | class ListDiff(object): | |
663 | def __init__(self, before, after): | |
664 | self.before = set(before) | |
665 | self.after = set(after) | |
666 | ||
667 | @property | |
668 | def removed(self): | |
669 | return list(self.before - self.after) | |
670 | ||
671 | @property | |
672 | def added(self): | |
673 | return list(self.after - self.before) | |
674 | ||
675 | @property | |
676 | def is_equal(self): | |
677 | return self.before == self.after | |
678 | ||
679 | ||
680 | class CephConfigWatcher(BaseThread): | |
681 | """Detect configuration changes within the cluster and generate human readable events""" | |
682 | ||
683 | def __init__(self, mgr): | |
684 | super(CephConfigWatcher, self).__init__() | |
685 | self.mgr = mgr | |
686 | self.server_map = dict() | |
687 | self.osd_map = dict() | |
688 | self.pool_map = dict() | |
689 | self.service_map = dict() | |
690 | ||
691 | self.config_check_secs = mgr.config_check_secs | |
692 | ||
693 | @property | |
694 | def raw_capacity(self): | |
695 | # Note. if the osd's are not online the capacity field will be 0 | |
696 | return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map]) | |
697 | ||
698 | @property | |
699 | def num_servers(self): | |
700 | return len(self.server_map.keys()) | |
701 | ||
702 | @property | |
703 | def num_osds(self): | |
704 | return len(self.osd_map.keys()) | |
705 | ||
706 | @property | |
707 | def num_pools(self): | |
708 | return len(self.pool_map.keys()) | |
709 | ||
710 | def __str__(self): | |
711 | s = '' | |
712 | ||
713 | s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format( | |
714 | json.loads(self.mgr.get('health')['json'])['status'], | |
715 | self.num_servers, | |
716 | text_suffix(self.num_servers), | |
717 | self.num_pools, | |
718 | text_suffix(self.num_pools), | |
719 | self.num_osds, | |
720 | MgrModule.to_pretty_iec(self.raw_capacity)) | |
721 | return s | |
722 | ||
723 | def fetch_servers(self): | |
724 | """Return a server summary, and service summary""" | |
725 | servers = self.mgr.list_servers() | |
726 | server_map = dict() # host -> services | |
727 | service_map = dict() # service -> host | |
728 | for server_info in servers: | |
729 | services = dict() | |
730 | for svc in server_info['services']: | |
731 | if svc.get('type') in services.keys(): | |
732 | services[svc.get('type')].append(svc.get('id')) | |
733 | else: | |
734 | services[svc.get('type')] = list([svc.get('id')]) | |
735 | # maintain the service xref map service -> host and version | |
736 | service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '') | |
737 | server_map[server_info.get('hostname')] = services | |
738 | ||
739 | return server_map, service_map | |
740 | ||
741 | def fetch_pools(self): | |
742 | interesting = ["type", "size", "min_size"] | |
743 | # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool', | |
744 | # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn', | |
745 | # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100, | |
746 | # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0', | |
747 | # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0, | |
748 | # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1, | |
749 | # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0, | |
750 | # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000, | |
751 | # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0, | |
752 | # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0, | |
753 | # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0, | |
754 | # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0, | |
755 | # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}}, | |
756 | # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0', | |
757 | # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}] | |
758 | pools = self.mgr.get('osd_map')['pools'] | |
759 | pool_map = dict() | |
760 | for pool in pools: | |
761 | pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting} | |
762 | return pool_map | |
763 | ||
764 | ||
765 | def fetch_osd_map(self, service_map): | |
766 | """Create an osd map""" | |
767 | stats = self.mgr.get('osd_stats') | |
768 | ||
769 | osd_map = dict() | |
770 | ||
771 | devices = self.mgr.get('osd_map_crush')['devices'] | |
772 | for dev in devices: | |
773 | osd_id = str(dev['id']) | |
774 | osd_map[osd_id] = dict( | |
775 | deviceclass=dev.get('class'), | |
776 | capacity=0, | |
777 | hostname=service_map['osd', osd_id] | |
778 | ) | |
779 | ||
780 | for osd_stat in stats['osd_stats']: | |
781 | osd_id = str(osd_stat.get('osd')) | |
782 | osd_map[osd_id]['capacity'] = osd_stat['statfs']['total'] | |
783 | ||
784 | return osd_map | |
785 | ||
786 | def push_events(self, changes): | |
787 | """Add config change to the global queue to generate an event in kubernetes""" | |
788 | log.debug("{} events will be generated") | |
789 | for change in changes: | |
790 | event_queue.put(change) | |
791 | ||
792 | def _generate_config_logentry(self, msg): | |
793 | return LogEntry( | |
794 | source="config", | |
795 | msg_type="config", | |
796 | msg=msg, | |
797 | level='INF', | |
798 | tstamp=None | |
799 | ) | |
800 | ||
801 | def _check_hosts(self, server_map): | |
802 | log.debug("K8sevents checking host membership") | |
803 | changes = list() | |
804 | servers = ListDiff(self.server_map.keys(), server_map.keys()) | |
805 | if servers.is_equal: | |
806 | # no hosts have been added or removed | |
807 | pass | |
808 | else: | |
809 | # host changes detected, find out what | |
810 | host_msg = "Host '{}' has been {} the cluster" | |
811 | for new_server in servers.added: | |
812 | changes.append(self._generate_config_logentry( | |
813 | msg=host_msg.format(new_server, 'added to')) | |
814 | ) | |
815 | ||
816 | for removed_server in servers.removed: | |
817 | changes.append(self._generate_config_logentry( | |
818 | msg=host_msg.format(removed_server, 'removed from')) | |
819 | ) | |
820 | ||
821 | return changes | |
822 | ||
823 | def _check_osds(self,server_map, osd_map): | |
824 | log.debug("K8sevents checking OSD configuration") | |
825 | changes = list() | |
826 | before_osds = list() | |
827 | for svr in self.server_map: | |
828 | before_osds.extend(self.server_map[svr].get('osd',[])) | |
829 | ||
830 | after_osds = list() | |
831 | for svr in server_map: | |
832 | after_osds.extend(server_map[svr].get('osd',[])) | |
833 | ||
834 | if set(before_osds) == set(after_osds): | |
835 | # no change in osd id's | |
836 | pass | |
837 | else: | |
838 | # osd changes detected | |
839 | osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}" | |
840 | ||
841 | osds = ListDiff(before_osds, after_osds) | |
842 | for new_osd in osds.added: | |
843 | changes.append(self._generate_config_logentry( | |
844 | msg=osd_msg.format( | |
845 | new_osd, | |
846 | osd_map[new_osd]['deviceclass'], | |
847 | MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']), | |
848 | 'added to', | |
849 | osd_map[new_osd]['hostname'])) | |
850 | ) | |
851 | ||
852 | for removed_osd in osds.removed: | |
853 | changes.append(self._generate_config_logentry( | |
854 | msg=osd_msg.format( | |
855 | removed_osd, | |
856 | osd_map[removed_osd]['deviceclass'], | |
857 | MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']), | |
858 | 'removed from', | |
859 | osd_map[removed_osd]['hostname'])) | |
860 | ) | |
861 | ||
862 | return changes | |
863 | ||
864 | def _check_pools(self, pool_map): | |
865 | changes = list() | |
866 | log.debug("K8sevents checking pool configurations") | |
867 | if self.pool_map.keys() == pool_map.keys(): | |
868 | # no pools added/removed | |
869 | pass | |
870 | else: | |
871 | # Pool changes | |
872 | pools = ListDiff(self.pool_map.keys(), pool_map.keys()) | |
873 | pool_msg = "Pool '{}' has been {} the cluster" | |
874 | for new_pool in pools.added: | |
875 | changes.append(self._generate_config_logentry( | |
876 | msg=pool_msg.format(new_pool, 'added to')) | |
877 | ) | |
878 | ||
879 | for removed_pool in pools.removed: | |
880 | changes.append(self._generate_config_logentry( | |
881 | msg=pool_msg.format(removed_pool, 'removed from')) | |
882 | ) | |
883 | ||
884 | # check pool configuration changes | |
885 | for pool_name in pool_map: | |
886 | if not self.pool_map.get(pool_name, dict()): | |
887 | # pool didn't exist before so just skip the checks | |
888 | continue | |
889 | ||
890 | if pool_map[pool_name] == self.pool_map[pool_name]: | |
891 | # no changes - dicts match in key and value | |
892 | continue | |
893 | else: | |
894 | # determine the change and add it to the change list | |
895 | size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size'] | |
896 | if size_diff != 0: | |
897 | if size_diff < 0: | |
898 | msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name, | |
899 | pool_map[pool_name]['size']) | |
900 | level = 'WRN' | |
901 | else: | |
902 | msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name, | |
903 | pool_map[pool_name]['size']) | |
904 | level = 'INF' | |
905 | ||
906 | changes.append(LogEntry(source="config", | |
907 | msg_type="config", | |
908 | msg=msg, | |
909 | level=level, | |
910 | tstamp=None) | |
911 | ) | |
912 | ||
913 | if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']: | |
914 | changes.append(LogEntry(source="config", | |
915 | msg_type="config", | |
916 | msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name), | |
917 | level='WRN', | |
918 | tstamp=None) | |
919 | ) | |
920 | ||
921 | return changes | |
922 | ||
923 | def get_changes(self, server_map, osd_map, pool_map): | |
924 | """Detect changes in maps between current observation and the last""" | |
925 | ||
926 | changes = list() | |
927 | ||
928 | changes.extend(self._check_hosts(server_map)) | |
929 | changes.extend(self._check_osds(server_map, osd_map)) | |
930 | changes.extend(self._check_pools(pool_map)) | |
931 | ||
932 | # FUTURE | |
933 | # Could generate an event if a ceph daemon has moved hosts | |
934 | # (assumes the ceph metadata host information is valid though!) | |
935 | ||
936 | return changes | |
937 | ||
938 | def run(self): | |
939 | log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs)) | |
940 | ||
941 | self.server_map, self.service_map = self.fetch_servers() | |
942 | self.pool_map = self.fetch_pools() | |
943 | ||
944 | self.osd_map = self.fetch_osd_map(self.service_map) | |
945 | ||
946 | while True: | |
947 | ||
948 | try: | |
949 | start_time = time.time() | |
950 | server_map, service_map = self.fetch_servers() | |
951 | pool_map = self.fetch_pools() | |
952 | osd_map = self.fetch_osd_map(service_map) | |
953 | ||
954 | changes = self.get_changes(server_map, osd_map, pool_map) | |
955 | if changes: | |
956 | self.push_events(changes) | |
957 | ||
958 | self.osd_map = osd_map | |
959 | self.pool_map = pool_map | |
960 | self.server_map = server_map | |
961 | self.service_map = service_map | |
962 | ||
963 | checks_duration = int(time.time() - start_time) | |
964 | ||
965 | # check that the time it took to run the checks fits within the | |
966 | # interval, and if not extend the interval and emit a log message | |
967 | # to show that the runtime for the checks exceeded the desired | |
968 | # interval | |
969 | if checks_duration > self.config_check_secs: | |
970 | new_interval = self.config_check_secs * 2 | |
971 | log.warning("K8sevents check interval warning. " | |
972 | "Current checks took {}s, interval was {}s. " | |
973 | "Increasing interval to {}s".format(int(checks_duration), | |
974 | self.config_check_secs, | |
975 | new_interval)) | |
976 | self.config_check_secs = new_interval | |
977 | ||
978 | time.sleep(self.config_check_secs) | |
979 | ||
980 | except Exception: | |
981 | self.health = "{} Exception at {}".format( | |
982 | sys.exc_info()[0].__name__, | |
983 | datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") | |
984 | ) | |
985 | log.exception(self.health) | |
986 | break | |
987 | ||
988 | log.warning("Ceph configuration watcher stopped") | |
989 | ||
990 | ||
991 | class Module(MgrModule): | |
992 | COMMANDS = [ | |
993 | { | |
994 | "cmd": "k8sevents status", | |
995 | "desc": "Show the status of the data gathering threads", | |
996 | "perm": "r" | |
997 | }, | |
998 | { | |
999 | "cmd": "k8sevents ls", | |
1000 | "desc": "List all current Kuberenetes events from the Ceph namespace", | |
1001 | "perm": "r" | |
1002 | }, | |
1003 | { | |
1004 | "cmd": "k8sevents ceph", | |
1005 | "desc": "List Ceph events tracked & sent to the kubernetes cluster", | |
1006 | "perm": "r" | |
1007 | }, | |
1008 | { | |
1009 | "cmd": "k8sevents set-access name=key,type=CephString", | |
1010 | "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt", | |
1011 | "perm": "rw" | |
1012 | }, | |
1013 | { | |
1014 | "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString", | |
1015 | "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433", | |
1016 | "perm": "rw" | |
1017 | }, | |
1018 | { | |
1019 | "cmd": "k8sevents clear-config", | |
1020 | "desc": "Clear external kubernetes configuration settings", | |
1021 | "perm": "rw" | |
1022 | }, | |
1023 | ] | |
1024 | MODULE_OPTIONS = [ | |
1025 | {'name': 'config_check_secs', | |
1026 | 'type': 'int', | |
1027 | 'default': 10, | |
1028 | 'min': 10, | |
1029 | 'desc': "interval (secs) to check for cluster configuration changes"}, | |
1030 | {'name': 'ceph_event_retention_days', | |
1031 | 'type': 'int', | |
1032 | 'default': 7, | |
1033 | 'desc': "Days to hold ceph event information within local cache"} | |
1034 | ] | |
1035 | ||
1036 | def __init__(self, *args, **kwargs): | |
1037 | self.run = True | |
1038 | self.kubernetes_control = 'POD_NAME' in os.environ | |
1039 | self.event_processor = None | |
1040 | self.config_watcher = None | |
1041 | self.ns_watcher = None | |
1042 | self.trackers = list() | |
1043 | self.error_msg = None | |
1044 | self._api_client_config = None | |
1045 | self._namespace = None | |
1046 | ||
1047 | # Declare the module options we accept | |
1048 | self.config_check_secs = None | |
1049 | self.ceph_event_retention_days = None | |
1050 | ||
1051 | self.k8s_config = dict( | |
1052 | cacrt = None, | |
1053 | token = None, | |
1054 | server = None, | |
1055 | namespace = None | |
1056 | ) | |
1057 | ||
1058 | super(Module, self).__init__(*args, **kwargs) | |
1059 | ||
1060 | def k8s_ready(self): | |
1061 | """Validate the k8s_config dict | |
1062 | ||
1063 | Returns: | |
1064 | - bool .... indicating whether the config is ready to use | |
1065 | - string .. variables that need to be defined before the module will function | |
1066 | ||
1067 | """ | |
1068 | missing = list() | |
1069 | ready = True | |
1070 | for k in self.k8s_config: | |
1071 | if not self.k8s_config[k]: | |
1072 | missing.append(k) | |
1073 | ready = False | |
1074 | return ready, missing | |
1075 | ||
1076 | def config_notify(self): | |
1077 | """Apply runtime module options, and defaults from the modules KV store""" | |
1078 | self.log.debug("applying runtime module option settings") | |
1079 | for opt in self.MODULE_OPTIONS: | |
1080 | setattr(self, | |
1081 | opt['name'], | |
9f95a23c | 1082 | self.get_module_option(opt['name'])) |
eafe8130 TL |
1083 | |
1084 | if not self.kubernetes_control: | |
1085 | # Populate the config | |
1086 | self.log.debug("loading config from KV store") | |
1087 | for k in self.k8s_config: | |
1088 | self.k8s_config[k] = self.get_store(k, default=None) | |
1089 | ||
1090 | def fetch_events(self, limit=None): | |
1091 | """Interface to expose current events to another mgr module""" | |
1092 | # FUTURE: Implement this to provide k8s events to the dashboard? | |
1093 | raise NotImplementedError | |
1094 | ||
1095 | def process_clog(self, log_message): | |
1096 | """Add log message to the event queue | |
1097 | ||
1098 | :param log_message: dict from the cluster log (audit/cluster channels) | |
1099 | """ | |
1100 | required_fields = ['channel', 'message', 'priority', 'stamp'] | |
1101 | _message_attrs = log_message.keys() | |
1102 | if all(_field in _message_attrs for _field in required_fields): | |
1103 | self.log.debug("clog entry received - adding to the queue") | |
1104 | if log_message.get('message').startswith('overall HEALTH'): | |
1105 | m_type = 'heartbeat' | |
1106 | else: | |
1107 | m_type = log_message.get('channel') | |
1108 | ||
1109 | event_queue.put( | |
1110 | LogEntry( | |
1111 | source='log', | |
1112 | msg_type=m_type, | |
1113 | msg=log_message.get('message'), | |
1114 | level=log_message.get('priority')[1:-1], | |
1115 | tstamp=log_message.get('stamp') | |
1116 | ) | |
1117 | ) | |
1118 | ||
1119 | else: | |
1120 | self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message)) | |
1121 | ||
1122 | def notify(self, notify_type, notify_id): | |
1123 | """ | |
1124 | Called by the ceph-mgr service to notify the Python plugin | |
1125 | that new state is available. | |
1126 | ||
1127 | :param notify_type: string indicating what kind of notification, | |
1128 | such as osd_map, mon_map, fs_map, mon_status, | |
1129 | health, pg_summary, command, service_map | |
1130 | :param notify_id: string (may be empty) that optionally specifies | |
1131 | which entity is being notified about. With | |
1132 | "command" notifications this is set to the tag | |
1133 | ``from send_command``. | |
1134 | """ | |
1135 | ||
1136 | # only interested in cluster log (clog) messages for now | |
1137 | if notify_type == 'clog': | |
1138 | self.log.debug("received a clog entry from mgr.notify") | |
1139 | if isinstance(notify_id, dict): | |
1140 | # create a log object to process | |
1141 | self.process_clog(notify_id) | |
1142 | else: | |
1143 | self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type))) | |
1144 | ||
1145 | def _show_events(self, events): | |
1146 | ||
1147 | max_msg_length = max([len(events[k].message) for k in events]) | |
1148 | fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n" | |
1149 | s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name") | |
1150 | ||
1151 | for event_name in sorted(events, | |
1152 | key = lambda name: events[name].last_timestamp, | |
1153 | reverse=True): | |
1154 | ||
1155 | event = events[event_name] | |
1156 | ||
1157 | s += fmt.format( | |
1158 | datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"), | |
1159 | event.type, | |
1160 | event.count, | |
1161 | event.message, | |
1162 | event_name | |
1163 | ) | |
1164 | s += "Total : {:>3}\n".format(len(events)) | |
1165 | return s | |
1166 | ||
1167 | def show_events(self, events): | |
1168 | """Show events we're holding from the ceph namespace - most recent 1st""" | |
1169 | ||
1170 | if len(events): | |
1171 | return 0, "", self._show_events(events) | |
1172 | else: | |
1173 | return 0, "", "No events emitted yet, local cache is empty" | |
1174 | ||
1175 | def show_status(self): | |
1176 | s = "Kubernetes\n" | |
1177 | s += "- Hostname : {}\n".format(self.k8s_config['server']) | |
1178 | s += "- Namespace : {}\n".format(self._namespace) | |
1179 | s += "Tracker Health\n" | |
1180 | for t in self.trackers: | |
1181 | s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health) | |
1182 | s += "Tracked Events\n" | |
1183 | s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events)) | |
1184 | s += "- ceph events : {:>3}\n".format(len(self.event_processor.events)) | |
1185 | return 0, "", s | |
1186 | ||
1187 | def _valid_server(self, server): | |
1188 | # must be a valid server url format | |
1189 | server = server.strip() | |
1190 | ||
1191 | res = urlparse(server) | |
1192 | port = res.netloc.split(":")[-1] | |
1193 | ||
1194 | if res.scheme != 'https': | |
1195 | return False, "Server URL must use https" | |
1196 | ||
1197 | elif not res.hostname: | |
1198 | return False, "Invalid server URL format" | |
1199 | ||
1200 | elif res.hostname: | |
1201 | try: | |
1202 | socket.gethostbyname(res.hostname) | |
1203 | except socket.gaierror: | |
1204 | return False, "Unresolvable server URL" | |
1205 | ||
1206 | if not port.isdigit(): | |
1207 | return False, "Server URL must end in a port number" | |
1208 | ||
1209 | return True, "" | |
1210 | ||
1211 | def _valid_cacrt(self, cacrt_data): | |
1212 | """use mgr_util.verify_cacrt to validate the CA file""" | |
1213 | ||
1214 | cacrt_fname = create_temp_file("ca_file", cacrt_data) | |
1215 | ||
1216 | try: | |
1217 | verify_cacrt(cacrt_fname) | |
1218 | except ServerConfigException as e: | |
1219 | return False, "Invalid CA certificate: {}".format(str(e)) | |
1220 | else: | |
1221 | return True, "" | |
1222 | ||
1223 | def _valid_token(self, token_data): | |
1224 | """basic checks on the token""" | |
1225 | if not token_data: | |
1226 | return False, "Token file is empty" | |
1227 | ||
1228 | pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$") | |
1229 | if not pattern.match(token_data): | |
1230 | return False, "Token contains invalid characters" | |
1231 | ||
1232 | return True, "" | |
1233 | ||
1234 | def _valid_namespace(self, namespace): | |
1235 | # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols | |
1236 | ||
1237 | if len(namespace) > 253: | |
1238 | return False, "Name too long" | |
1239 | if namespace.isdigit(): | |
1240 | return False, "Invalid name - must be alphanumeric" | |
1241 | ||
1242 | pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$") | |
1243 | if not pattern.match(namespace): | |
1244 | return False, "Invalid characters in the name" | |
1245 | ||
1246 | return True, "" | |
1247 | ||
1248 | def _config_set(self, key, val): | |
1249 | """Attempt to validate the content, then save to KV store""" | |
1250 | ||
1251 | val = val.rstrip() # remove any trailing whitespace/newline | |
1252 | ||
1253 | try: | |
1254 | checker = getattr(self, "_valid_" + key) | |
1255 | except AttributeError: | |
1256 | # no checker available, just let it pass | |
1257 | self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key)) | |
1258 | valid = True | |
1259 | else: | |
1260 | valid, reason = checker(val) | |
1261 | ||
1262 | if valid: | |
1263 | self.set_store(key, val) | |
1264 | self.log.info("Updated config KV Store item: " + key) | |
1265 | return 0, "", "Config updated for parameter '{}'".format(key) | |
1266 | else: | |
1267 | return -22, "", "Invalid value for '{}' :{}".format(key, reason) | |
1268 | ||
1269 | def clear_config_settings(self): | |
1270 | for k in self.k8s_config: | |
1271 | self.set_store(k, None) | |
1272 | return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys())) | |
1273 | ||
1274 | def handle_command(self, inbuf, cmd): | |
1275 | ||
1276 | access_options = ['cacrt', 'token'] | |
1277 | config_options = ['server', 'namespace'] | |
1278 | ||
1279 | if cmd['prefix'] == 'k8sevents clear-config': | |
1280 | return self.clear_config_settings() | |
1281 | ||
1282 | if cmd['prefix'] == 'k8sevents set-access': | |
1283 | if cmd['key'] not in access_options: | |
1284 | return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options)) | |
1285 | ||
1286 | if inbuf: | |
1287 | return self._config_set(cmd['key'], inbuf) | |
1288 | else: | |
1289 | return -errno.EINVAL, "", "Command must specify -i <filename>" | |
1290 | ||
1291 | if cmd['prefix'] == 'k8sevents set-config': | |
1292 | ||
1293 | if cmd['key'] not in config_options: | |
1294 | return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options)) | |
1295 | ||
1296 | return self._config_set(cmd['key'], cmd['value']) | |
1297 | ||
1298 | # At this point the command is trying to interact with k8sevents, so intercept if the configuration is | |
1299 | # not ready | |
1300 | if self.error_msg: | |
1301 | _msg = "k8sevents unavailable: " + self.error_msg | |
1302 | ready, _ = self.k8s_ready() | |
1303 | if not self.kubernetes_control and not ready: | |
1304 | _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect" | |
1305 | return -errno.ENODATA, "", _msg | |
1306 | ||
1307 | if cmd["prefix"] == "k8sevents status": | |
1308 | return self.show_status() | |
1309 | ||
1310 | elif cmd["prefix"] == "k8sevents ls": | |
1311 | return self.show_events(self.ns_watcher.events) | |
1312 | ||
1313 | elif cmd["prefix"] == "k8sevents ceph": | |
1314 | return self.show_events(self.event_processor.events) | |
1315 | ||
1316 | else: | |
1317 | raise NotImplementedError(cmd["prefix"]) | |
1318 | ||
1319 | @staticmethod | |
1320 | def can_run(): | |
1321 | """Determine whether the pre-reqs for the module are in place""" | |
1322 | ||
1323 | if not kubernetes_imported: | |
1324 | return False, "kubernetes python client is not available" | |
1325 | return True, "" | |
1326 | ||
1327 | def load_kubernetes_config(self): | |
1328 | """Load configuration for remote kubernetes API using KV store values | |
1329 | ||
1330 | Attempt to create an API client configuration from settings stored in | |
1331 | KV store. | |
1332 | ||
1333 | Returns: | |
1334 | client.ApiClient: kubernetes API client object | |
1335 | ||
1336 | Raises: | |
1337 | OSError: unable to create the cacrt file | |
1338 | """ | |
1339 | ||
1340 | # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a | |
1341 | # temporary file containing the cert for the client to load from | |
1342 | try: | |
1343 | ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt']) | |
1344 | except OSError as e: | |
1345 | self.log.error("Unable to create file to hold cacrt: {}".format(str(e))) | |
1346 | raise OSError(str(e)) | |
1347 | else: | |
1348 | self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file)) | |
1349 | ||
1350 | configuration = client.Configuration() | |
1351 | configuration.host = self.k8s_config['server'] | |
1352 | configuration.ssl_ca_cert = ca_crt_file | |
1353 | configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] } | |
1354 | api_client = client.ApiClient(configuration) | |
1355 | self.log.info("API client created for remote kubernetes access using cacrt and token from KV store") | |
1356 | ||
1357 | return api_client | |
1358 | ||
1359 | def serve(self): | |
1360 | # apply options set by CLI to this module | |
1361 | self.config_notify() | |
1362 | ||
1363 | if not kubernetes_imported: | |
1364 | self.error_msg = "Unable to start : python kubernetes package is missing" | |
1365 | else: | |
1366 | if self.kubernetes_control: | |
1367 | # running under rook-ceph | |
1368 | config.load_incluster_config() | |
1369 | self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'), | |
1370 | os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN')) | |
1371 | self._api_client_config = None | |
1372 | self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph") | |
1373 | else: | |
1374 | # running outside of rook-ceph, so we need additional settings to tell us | |
1375 | # how to connect to the kubernetes cluster | |
1376 | ready, errors = self.k8s_ready() | |
1377 | if not ready: | |
1378 | self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors)) | |
1379 | else: | |
1380 | try: | |
1381 | self._api_client_config = self.load_kubernetes_config() | |
1382 | except OSError as e: | |
1383 | self.error_msg = str(e) | |
1384 | else: | |
1385 | self._namespace = self.k8s_config['namespace'] | |
1386 | self.log.info("k8sevents configuration loaded from KV store") | |
1387 | ||
1388 | if self.error_msg: | |
1389 | self.log.error(self.error_msg) | |
1390 | return | |
1391 | ||
1392 | # All checks have passed | |
1393 | self.config_watcher = CephConfigWatcher(self) | |
1394 | ||
1395 | self.event_processor = EventProcessor(self.config_watcher, | |
1396 | self.ceph_event_retention_days, | |
1397 | self._api_client_config, | |
1398 | self._namespace) | |
1399 | ||
1400 | self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config, | |
1401 | namespace=self._namespace) | |
1402 | ||
1403 | if self.event_processor.ok: | |
1404 | log.info("Ceph Log processor thread starting") | |
1405 | self.event_processor.start() # start log consumer thread | |
1406 | log.info("Ceph config watcher thread starting") | |
1407 | self.config_watcher.start() | |
1408 | log.info("Rook-ceph namespace events watcher starting") | |
1409 | self.ns_watcher.start() | |
1410 | ||
1411 | self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher]) | |
1412 | ||
1413 | while True: | |
1414 | # stay alive | |
1415 | time.sleep(1) | |
1416 | ||
1417 | trackers = self.trackers | |
1418 | for t in trackers: | |
1419 | if not t.is_alive() and not t.reported: | |
1420 | log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health)) | |
1421 | t.reported = True | |
1422 | ||
1423 | else: | |
1424 | self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?" | |
1425 | log.warning(self.error_msg) | |
1426 | log.warning("k8sevents module exiting") | |
1427 | self.run = False | |
1428 | ||
1429 | def shutdown(self): | |
1430 | self.run = False | |
1431 | log.info("Shutting down k8sevents module") | |
1432 | self.event_processor.can_run = False | |
1433 | ||
1434 | if self._rados: | |
1435 | self._rados.shutdown() |