1 # Integrate with the kubernetes events API.
2 # This module sends events to Kubernetes, and also captures/tracks all events
3 # in the rook-ceph namespace so kubernetes activity like pod restarts,
4 # imagepulls etc can be seen from within the ceph cluster itself.
6 # To interact with the events API, the mgr service to access needs to be
7 # granted additional permissions
8 # e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules
10 # These are the changes needed;
36 from urllib
.parse
import urlparse
37 from datetime
import tzinfo
, datetime
, timedelta
39 from urllib3
.exceptions
import MaxRetryError
,ProtocolError
40 from collections
import OrderedDict
43 from mgr_module
import MgrModule
44 from mgr_util
import verify_cacrt
, ServerConfigException
52 # python 2.7.15 or python3
53 event_queue
= queue
.Queue()
56 from kubernetes
import client
, config
, watch
57 from kubernetes
.client
.rest
import ApiException
59 kubernetes_imported
= False
64 kubernetes_imported
= True
66 # The watch.Watch.stream method can provide event objects that have involved_object = None
67 # which causes an exception in the generator. A workaround is discussed for a similar issue
68 # in https://github.com/kubernetes-client/python/issues/376 which has been used here
69 # pylint: disable=no-member
70 from kubernetes
.client
.models
.v1_event
import V1Event
71 def local_involved_object(self
, involved_object
):
72 if involved_object
is None:
73 involved_object
= client
.V1ObjectReference(api_version
="1")
74 self
._involved
_object
= involved_object
75 V1Event
.involved_object
= V1Event
.involved_object
.setter(local_involved_object
)
77 log
= logging
.getLogger(__name__
)
79 # use a simple local class to represent UTC
80 # datetime pkg modules vary between python2 and 3 and pytz is not available on older
81 # ceph container images, so taking a pragmatic approach!
83 def utcoffset(self
, dt
):
94 """Define a text suffix based on a value i.e. turn host into hosts"""
95 return '' if num
== 1 else 's'
98 def create_temp_file(fname
, content
, suffix
=".tmp"):
101 Attempt to create an temporary file containing the given content
104 str .. full path to the temporary file
107 OSError: problems creating the file
111 if content
is not None:
112 file_name
= os
.path
.join(tempfile
.gettempdir(), fname
+ suffix
)
115 with
open(file_name
, "w") as f
:
118 raise OSError("Unable to create temporary file : {}".format(str(e
)))
123 class HealthCheck(object):
124 """Transform a healthcheck msg into it's component parts"""
126 def __init__(self
, msg
, msg_level
):
130 # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY)
131 # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set)
132 # Health check failed: nodown flag(s) set (OSDMAP_FLAGS)
139 if msg
.lower().startswith('health check'):
143 msg_tokens
= self
.msg
.split()
145 if msg_level
== 'INF':
146 self
.text
= ' '.join(msg_tokens
[3:])
147 self
.name
= msg_tokens
[3] # health check name e.g. OSDMAP_FLAGS
149 self
.text
= ' '.join(msg_tokens
[3:-1])
150 self
.name
= msg_tokens
[-1][1:-1]
153 class LogEntry(object):
154 """Generic 'log' object"""
158 "cluster": "HealthCheck",
159 "config": "ClusterChange",
160 "heartbeat":"Heartbeat",
164 def __init__(self
, source
, msg
, msg_type
, level
, tstamp
=None):
168 self
.msg_type
= msg_type
171 self
.healthcheck
= None
173 if 'health check ' in self
.msg
.lower():
174 self
.healthcheck
= HealthCheck(self
.msg
, self
.level
)
178 return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self
.source
,
186 """Look at the msg string and extract the command content"""
188 # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished'
189 if self
.msg_type
!= 'audit':
192 _m
=self
.msg
[:-10].replace("\'","").split("cmd=")
193 _s
='"cmd":{}'.format(_m
[1])
194 cmds_list
= json
.loads('{' + _s
+ '}')['cmd']
196 # TODO. Assuming only one command was issued for now
198 return "{} {}".format(_c
['prefix'], _c
.get('key', ''))
201 def event_type(self
):
202 return 'Normal' if self
.level
== 'INF' else 'Warning'
205 def event_reason(self
):
206 return self
.reason_map
[self
.msg_type
]
209 def event_name(self
):
210 if self
.msg_type
== 'heartbeat':
211 return 'mgr.Heartbeat'
212 elif self
.healthcheck
:
213 return 'mgr.health.{}'.format(self
.healthcheck
.name
)
214 elif self
.msg_type
== 'audit':
215 return 'mgr.audit.{}'.format(self
.cmd
).replace(' ', '_')
216 elif self
.msg_type
== 'config':
217 return 'mgr.ConfigurationChange'
218 elif self
.msg_type
== 'startup':
219 return "mgr.k8sevents-module"
224 def event_entity(self
):
225 if self
.msg_type
== 'audit':
226 return self
.msg
.replace("\'","").split('entity=')[1].split(' ')[0]
232 if self
.msg_type
== 'audit':
233 return "Client '{}' issued: ceph {}".format(self
.event_entity
, self
.cmd
)
235 elif self
.healthcheck
:
236 return self
.healthcheck
.text
241 class BaseThread(threading
.Thread
):
247 def clean_event(event
):
248 """ clean an event record """
249 if not event
.first_timestamp
:
250 log
.error("first_timestamp is empty")
251 if event
.metadata
.creation_timestamp
:
252 log
.error("setting first_timestamp to the creation timestamp")
253 event
.first_timestamp
= event
.metadata
.creation_timestamp
255 log
.error("defaulting event first timestamp to current datetime")
256 event
.first_timestamp
= datetime
.datetime
.now()
258 if not event
.last_timestamp
:
259 log
.error("setting event last timestamp to {}".format(event
.first_timestamp
))
260 event
.last_timestamp
= event
.first_timestamp
268 class NamespaceWatcher(BaseThread
):
269 """Watch events in a given namespace
271 Using the watch package we can listen to event traffic in the namespace to
272 get an idea of what kubernetes related events surround the ceph cluster. The
273 thing to bear in mind is that events have a TTL enforced by the kube-apiserver
274 so this stream will only really show activity inside this retention window.
277 def __init__(self
, api_client_config
, namespace
=None):
278 super(NamespaceWatcher
, self
).__init
__()
280 if api_client_config
:
281 self
.api
= client
.CoreV1Api(api_client_config
)
283 self
.api
= client
.CoreV1Api()
285 self
.namespace
= namespace
287 self
.events
= OrderedDict()
288 self
.lock
= threading
.Lock()
290 self
.resource_version
= None
293 # clear the cache on every call to fetch
296 resp
= self
.api
.list_namespaced_event(self
.namespace
)
297 # TODO - Perhaps test for auth problem to be more specific in the except clause?
300 self
.health
= "Unable to access events API (list_namespaced_event call failed)"
301 log
.warning(self
.health
)
304 self
.resource_version
= resp
.metadata
.resource_version
306 for item
in resp
.items
:
307 self
.events
[item
.metadata
.name
] = clean_event(item
)
308 log
.info('Added {} events'.format(len(resp
.items
)))
312 func
= getattr(self
.api
, "list_namespaced_event")
315 log
.info("Namespace event watcher started")
322 # execute generator to continually watch resource for changes
323 for item
in w
.stream(func
, namespace
=self
.namespace
, resource_version
=self
.resource_version
, watch
=True):
328 if item
['type'] in ['ADDED', 'MODIFIED']:
329 self
.events
[obj
.metadata
.name
] = clean_event(obj
)
331 elif item
['type'] == 'DELETED':
332 del self
.events
[obj
.metadata
.name
]
334 # TODO test the exception for auth problem (403?)
336 # Attribute error is generated when urllib3 on the system is old and doesn't have a
337 # read_chunked method
338 except AttributeError as e
:
339 self
.health
= ("Error: Unable to 'watch' events API in namespace '{}' - "
340 "urllib3 too old? ({})".format(self
.namespace
, e
))
342 log
.warning(self
.health
)
345 except ApiException
as e
:
346 # refresh the resource_version & watcher
347 log
.warning("API exception caught in watcher ({})".format(e
))
348 log
.warning("Restarting namespace watcher")
351 except ProtocolError
as e
:
352 log
.warning("Namespace watcher hit protocolerror ({}) - restarting".format(e
))
356 self
.health
= "{} Exception at {}".format(
357 sys
.exc_info()[0].__name
__,
358 datetime
.strftime(datetime
.now(),"%Y/%m/%d %H:%M:%S")
360 log
.exception(self
.health
)
364 log
.warning("Namespace event watcher stopped")
367 class KubernetesEvent(object):
369 def __init__(self
, log_entry
, unique_name
=True, api_client_config
=None, namespace
=None):
371 if api_client_config
:
372 self
.api
= client
.CoreV1Api(api_client_config
)
374 self
.api
= client
.CoreV1Api()
376 self
.namespace
= namespace
378 self
.event_name
= log_entry
.event_name
379 self
.message
= log_entry
.event_msg
380 self
.event_type
= log_entry
.event_type
381 self
.event_reason
= log_entry
.event_reason
382 self
.unique_name
= unique_name
384 self
.host
= os
.environ
.get('NODE_NAME', os
.environ
.get('HOSTNAME', 'UNKNOWN'))
386 self
.api_status
= 200
388 self
.first_timestamp
= None
389 self
.last_timestamp
= None
393 """provide a type property matching a V1Event object"""
394 return self
.event_type
397 def event_body(self
):
399 obj_meta
= client
.V1ObjectMeta(name
="{}".format(self
.event_name
))
401 obj_meta
= client
.V1ObjectMeta(generate_name
="{}".format(self
.event_name
))
403 # field_path is needed to prevent problems in the namespacewatcher when
404 # deleted event are received
405 obj_ref
= client
.V1ObjectReference(kind
="CephCluster",
406 field_path
='spec.containers{mgr}',
407 name
=self
.event_name
,
408 namespace
=self
.namespace
)
410 event_source
= client
.V1EventSource(component
="ceph-mgr",
412 return client
.V1Event(
413 involved_object
=obj_ref
,
415 message
=self
.message
,
417 type=self
.event_type
,
418 reason
=self
.event_reason
,
420 first_timestamp
=self
.first_timestamp
,
421 last_timestamp
=self
.last_timestamp
426 now
=datetime
.now(UTC())
428 self
.first_timestamp
= now
429 self
.last_timestamp
= now
432 self
.api
.create_namespaced_event(self
.namespace
, self
.event_body
)
433 except (OSError, ProtocolError
):
434 # unable to reach to the API server
435 log
.error("Unable to reach API server")
436 self
.api_status
= 400
437 except MaxRetryError
:
438 # k8s config has not be defined properly
439 log
.error("multiple attempts to connect to the API have failed")
440 self
.api_status
= 403 # Forbidden
441 except ApiException
as e
:
442 log
.debug("event.write status:{}".format(e
.status
))
443 self
.api_status
= e
.status
445 log
.debug("attempting event update for an existing event")
446 # 409 means the event is there already, so read it back (v1Event object returned)
447 # this could happen if the event has been created, and then the k8sevent module
448 # disabled and reenabled - i.e. the internal event tracking no longer matches k8s
449 response
= self
.api
.read_namespaced_event(self
.event_name
, self
.namespace
)
451 # response looks like
454 # 'api_version': 'v1',
456 # 'event_time': None,
457 # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
458 # 'involved_object': {'api_version': None,
459 # 'field_path': None,
460 # 'kind': 'CephCluster',
461 # 'name': 'ceph-mgr.k8sevent-module',
462 # 'namespace': 'rook-ceph',
463 # 'resource_version': None,
466 # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
467 # 'message': 'Ceph log -> event tracking started',
468 # 'metadata': {'annotations': None,
469 # 'cluster_name': None,
470 # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
471 # 'deletion_grace_period_seconds': None,
472 # 'deletion_timestamp': None,
473 # 'finalizers': None,
474 # 'generate_name': 'ceph-mgr.k8sevent-module',
475 # 'generation': None,
476 # 'initializers': None,
478 # 'name': 'ceph-mgr.k8sevent-module5z7kq',
479 # 'namespace': 'rook-ceph',
480 # 'owner_references': None,
481 # 'resource_version': '1195832',
482 # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq',
483 # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'},
484 # 'reason': 'Started',
486 # 'reporting_component': '',
487 # 'reporting_instance': '',
489 # 'source': {'component': 'ceph-mgr', 'host': 'minikube'},
492 # conflict event already exists
494 # update : count and last_timestamp and msg
496 self
.count
= response
.count
+ 1
497 self
.first_timestamp
= response
.first_timestamp
499 self
.api
.patch_namespaced_event(self
.event_name
, self
.namespace
, self
.event_body
)
500 except ApiException
as e
:
501 log
.error("event.patch failed for {} with status code:{}".format(self
.event_name
, e
.status
))
502 self
.api_status
= e
.status
504 log
.debug("event {} patched".format(self
.event_name
))
505 self
.api_status
= 200
508 log
.debug("event {} created successfully".format(self
.event_name
))
509 self
.api_status
= 200
512 def api_success(self
):
513 return self
.api_status
== 200
515 def update(self
, log_entry
):
516 self
.message
= log_entry
.event_msg
517 self
.event_type
= log_entry
.event_type
518 self
.last_timestamp
= datetime
.now(UTC())
520 log
.debug("performing event update for {}".format(self
.event_name
))
523 self
.api
.patch_namespaced_event(self
.event_name
, self
.namespace
, self
.event_body
)
524 except ApiException
as e
:
525 log
.error("event patch call failed: {}".format(e
.status
))
527 # tried to patch, but hit a 404. The event's TTL must have been reached, and
528 # pruned by the kube-apiserver
529 log
.debug("event not found, so attempting to create it")
531 self
.api
.create_namespaced_event(self
.namespace
, self
.event_body
)
532 except ApiException
as e
:
533 log
.error("unable to create the event: {}".format(e
.status
))
534 self
.api_status
= e
.status
536 log
.debug("event {} created successfully".format(self
.event_name
))
537 self
.api_status
= 200
539 log
.debug("event {} updated".format(self
.event_name
))
540 self
.api_status
= 200
543 class EventProcessor(BaseThread
):
544 """Handle a global queue used to track events we want to send/update to kubernetes"""
548 def __init__(self
, config_watcher
, event_retention_days
, api_client_config
, namespace
):
549 super(EventProcessor
, self
).__init
__()
552 self
.config_watcher
= config_watcher
553 self
.event_retention_days
= event_retention_days
554 self
.api_client_config
= api_client_config
555 self
.namespace
= namespace
558 """Log an event to show we're active"""
560 event
= KubernetesEvent(
563 msg
='Ceph log -> event tracking started',
569 api_client_config
=self
.api_client_config
,
570 namespace
=self
.namespace
574 return event
.api_success
578 return self
.startup()
580 def prune_events(self
):
581 log
.debug("prune_events - looking for old events to remove from cache")
582 oldest
= datetime
.now(UTC()) - timedelta(days
=self
.event_retention_days
)
583 local_events
= dict(self
.events
)
585 for event_name
in sorted(local_events
,
586 key
= lambda name
: local_events
[name
].last_timestamp
):
587 event
= local_events
[event_name
]
588 if event
.last_timestamp
>= oldest
:
592 log
.debug("prune_events - removing old event : {}".format(event_name
))
593 del self
.events
[event_name
]
595 def process(self
, log_object
):
597 log
.debug("log entry being processed : {}".format(str(log_object
)))
602 if log_object
.msg_type
== 'audit':
603 # audit traffic : operator commands
604 if log_object
.msg
.endswith('finished'):
605 log
.debug("K8sevents received command finished msg")
608 # NO OP - ignoring 'dispatch' log records
611 elif log_object
.msg_type
== 'cluster':
612 # cluster messages : health checks
613 if log_object
.event_name
:
616 elif log_object
.msg_type
== 'config':
617 # configuration checker messages
621 elif log_object
.msg_type
== 'heartbeat':
622 # hourly health message summary from Ceph
625 log_object
.msg
= str(self
.config_watcher
)
628 log
.warning("K8sevents received unknown msg_type - {}".format(log_object
.msg_type
))
631 log
.debug("k8sevents sending event to kubernetes")
632 # we don't cache non-unique events like heartbeats or config changes
633 if not unique_name
or log_object
.event_name
not in self
.events
.keys():
634 event
= KubernetesEvent(log_entry
=log_object
,
635 unique_name
=unique_name
,
636 api_client_config
=self
.api_client_config
,
637 namespace
=self
.namespace
)
639 log
.debug("event(unique={}) creation ended : {}".format(unique_name
, event
.api_status
))
640 if event
.api_success
and unique_name
:
641 self
.events
[log_object
.event_name
] = event
643 event
= self
.events
[log_object
.event_name
]
644 event
.update(log_object
)
645 log
.debug("event update ended : {}".format(event
.api_status
))
650 log
.debug("K8sevents ignored message : {}".format(log_object
.msg
))
653 log
.info("Ceph event processing thread started, "
654 "event retention set to {} days".format(self
.event_retention_days
))
659 log_object
= event_queue
.get(block
=False)
664 self
.process(log_object
)
666 self
.health
= "{} Exception at {}".format(
667 sys
.exc_info()[0].__name
__,
668 datetime
.strftime(datetime
.now(),"%Y/%m/%d %H:%M:%S")
670 log
.exception(self
.health
)
678 log
.warning("Ceph event processing thread stopped")
681 class ListDiff(object):
682 def __init__(self
, before
, after
):
683 self
.before
= set(before
)
684 self
.after
= set(after
)
688 return list(self
.before
- self
.after
)
692 return list(self
.after
- self
.before
)
696 return self
.before
== self
.after
699 class CephConfigWatcher(BaseThread
):
700 """Detect configuration changes within the cluster and generate human readable events"""
702 def __init__(self
, mgr
):
703 super(CephConfigWatcher
, self
).__init
__()
705 self
.server_map
= dict()
706 self
.osd_map
= dict()
707 self
.pool_map
= dict()
708 self
.service_map
= dict()
710 self
.config_check_secs
= mgr
.config_check_secs
713 def raw_capacity(self
):
714 # Note. if the osd's are not online the capacity field will be 0
715 return sum([self
.osd_map
[osd
]['capacity'] for osd
in self
.osd_map
])
718 def num_servers(self
):
719 return len(self
.server_map
.keys())
723 return len(self
.osd_map
.keys())
727 return len(self
.pool_map
.keys())
732 s
+= "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format(
733 json
.loads(self
.mgr
.get('health')['json'])['status'],
735 text_suffix(self
.num_servers
),
737 text_suffix(self
.num_pools
),
739 MgrModule
.to_pretty_iec(self
.raw_capacity
))
742 def fetch_servers(self
):
743 """Return a server summary, and service summary"""
744 servers
= self
.mgr
.list_servers()
745 server_map
= dict() # host -> services
746 service_map
= dict() # service -> host
747 for server_info
in servers
:
749 for svc
in server_info
['services']:
750 if svc
.get('type') in services
.keys():
751 services
[svc
.get('type')].append(svc
.get('id'))
753 services
[svc
.get('type')] = list([svc
.get('id')])
754 # maintain the service xref map service -> host and version
755 service_map
[(svc
.get('type'), str(svc
.get('id')))] = server_info
.get('hostname', '')
756 server_map
[server_info
.get('hostname')] = services
758 return server_map
, service_map
760 def fetch_pools(self
):
761 interesting
= ["type", "size", "min_size"]
762 # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool',
763 # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn',
764 # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100,
765 # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0',
766 # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0,
767 # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1,
768 # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0,
769 # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000,
770 # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0,
771 # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0,
772 # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0,
773 # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0,
774 # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}},
775 # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0',
776 # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}]
777 pools
= self
.mgr
.get('osd_map')['pools']
780 pool_map
[pool
.get('pool_name')] = {k
:pool
.get(k
) for k
in interesting
}
784 def fetch_osd_map(self
, service_map
):
785 """Create an osd map"""
786 stats
= self
.mgr
.get('osd_stats')
790 devices
= self
.mgr
.get('osd_map_crush')['devices']
792 osd_id
= str(dev
['id'])
793 osd_map
[osd_id
] = dict(
794 deviceclass
=dev
.get('class'),
796 hostname
=service_map
['osd', osd_id
]
799 for osd_stat
in stats
['osd_stats']:
800 osd_id
= str(osd_stat
.get('osd'))
801 osd_map
[osd_id
]['capacity'] = osd_stat
['statfs']['total']
805 def push_events(self
, changes
):
806 """Add config change to the global queue to generate an event in kubernetes"""
807 log
.debug("{} events will be generated")
808 for change
in changes
:
809 event_queue
.put(change
)
811 def _generate_config_logentry(self
, msg
):
820 def _check_hosts(self
, server_map
):
821 log
.debug("K8sevents checking host membership")
823 servers
= ListDiff(self
.server_map
.keys(), server_map
.keys())
825 # no hosts have been added or removed
828 # host changes detected, find out what
829 host_msg
= "Host '{}' has been {} the cluster"
830 for new_server
in servers
.added
:
831 changes
.append(self
._generate
_config
_logentry
(
832 msg
=host_msg
.format(new_server
, 'added to'))
835 for removed_server
in servers
.removed
:
836 changes
.append(self
._generate
_config
_logentry
(
837 msg
=host_msg
.format(removed_server
, 'removed from'))
842 def _check_osds(self
,server_map
, osd_map
):
843 log
.debug("K8sevents checking OSD configuration")
846 for svr
in self
.server_map
:
847 before_osds
.extend(self
.server_map
[svr
].get('osd',[]))
850 for svr
in server_map
:
851 after_osds
.extend(server_map
[svr
].get('osd',[]))
853 if set(before_osds
) == set(after_osds
):
854 # no change in osd id's
857 # osd changes detected
858 osd_msg
= "Ceph OSD '{}' ({} @ {}B) has been {} host {}"
860 osds
= ListDiff(before_osds
, after_osds
)
861 for new_osd
in osds
.added
:
862 changes
.append(self
._generate
_config
_logentry
(
865 osd_map
[new_osd
]['deviceclass'],
866 MgrModule
.to_pretty_iec(osd_map
[new_osd
]['capacity']),
868 osd_map
[new_osd
]['hostname']))
871 for removed_osd
in osds
.removed
:
872 changes
.append(self
._generate
_config
_logentry
(
875 osd_map
[removed_osd
]['deviceclass'],
876 MgrModule
.to_pretty_iec(osd_map
[removed_osd
]['capacity']),
878 osd_map
[removed_osd
]['hostname']))
883 def _check_pools(self
, pool_map
):
885 log
.debug("K8sevents checking pool configurations")
886 if self
.pool_map
.keys() == pool_map
.keys():
887 # no pools added/removed
891 pools
= ListDiff(self
.pool_map
.keys(), pool_map
.keys())
892 pool_msg
= "Pool '{}' has been {} the cluster"
893 for new_pool
in pools
.added
:
894 changes
.append(self
._generate
_config
_logentry
(
895 msg
=pool_msg
.format(new_pool
, 'added to'))
898 for removed_pool
in pools
.removed
:
899 changes
.append(self
._generate
_config
_logentry
(
900 msg
=pool_msg
.format(removed_pool
, 'removed from'))
903 # check pool configuration changes
904 for pool_name
in pool_map
:
905 if not self
.pool_map
.get(pool_name
, dict()):
906 # pool didn't exist before so just skip the checks
909 if pool_map
[pool_name
] == self
.pool_map
[pool_name
]:
910 # no changes - dicts match in key and value
913 # determine the change and add it to the change list
914 size_diff
= pool_map
[pool_name
]['size'] - self
.pool_map
[pool_name
]['size']
917 msg
= "Data protection level of pool '{}' reduced to {} copies".format(pool_name
,
918 pool_map
[pool_name
]['size'])
921 msg
= "Data protection level of pool '{}' increased to {} copies".format(pool_name
,
922 pool_map
[pool_name
]['size'])
925 changes
.append(LogEntry(source
="config",
932 if pool_map
[pool_name
]['min_size'] != self
.pool_map
[pool_name
]['min_size']:
933 changes
.append(LogEntry(source
="config",
935 msg
="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name
),
942 def get_changes(self
, server_map
, osd_map
, pool_map
):
943 """Detect changes in maps between current observation and the last"""
947 changes
.extend(self
._check
_hosts
(server_map
))
948 changes
.extend(self
._check
_osds
(server_map
, osd_map
))
949 changes
.extend(self
._check
_pools
(pool_map
))
952 # Could generate an event if a ceph daemon has moved hosts
953 # (assumes the ceph metadata host information is valid though!)
958 log
.info("Ceph configuration watcher started, interval set to {}s".format(self
.config_check_secs
))
960 self
.server_map
, self
.service_map
= self
.fetch_servers()
961 self
.pool_map
= self
.fetch_pools()
963 self
.osd_map
= self
.fetch_osd_map(self
.service_map
)
968 start_time
= time
.time()
969 server_map
, service_map
= self
.fetch_servers()
970 pool_map
= self
.fetch_pools()
971 osd_map
= self
.fetch_osd_map(service_map
)
973 changes
= self
.get_changes(server_map
, osd_map
, pool_map
)
975 self
.push_events(changes
)
977 self
.osd_map
= osd_map
978 self
.pool_map
= pool_map
979 self
.server_map
= server_map
980 self
.service_map
= service_map
982 checks_duration
= int(time
.time() - start_time
)
984 # check that the time it took to run the checks fits within the
985 # interval, and if not extend the interval and emit a log message
986 # to show that the runtime for the checks exceeded the desired
988 if checks_duration
> self
.config_check_secs
:
989 new_interval
= self
.config_check_secs
* 2
990 log
.warning("K8sevents check interval warning. "
991 "Current checks took {}s, interval was {}s. "
992 "Increasing interval to {}s".format(int(checks_duration
),
993 self
.config_check_secs
,
995 self
.config_check_secs
= new_interval
997 time
.sleep(self
.config_check_secs
)
1000 self
.health
= "{} Exception at {}".format(
1001 sys
.exc_info()[0].__name
__,
1002 datetime
.strftime(datetime
.now(),"%Y/%m/%d %H:%M:%S")
1004 log
.exception(self
.health
)
1007 log
.warning("Ceph configuration watcher stopped")
1010 class Module(MgrModule
):
1013 "cmd": "k8sevents status",
1014 "desc": "Show the status of the data gathering threads",
1018 "cmd": "k8sevents ls",
1019 "desc": "List all current Kuberenetes events from the Ceph namespace",
1023 "cmd": "k8sevents ceph",
1024 "desc": "List Ceph events tracked & sent to the kubernetes cluster",
1028 "cmd": "k8sevents set-access name=key,type=CephString",
1029 "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax (e.g., ceph k8sevents set-access cacrt -i /root/ca.crt).",
1033 "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString",
1034 "desc": "Set kubernetes config paramters. <key> must be server or namespace (e.g., ceph k8sevents set-config server https://localhost:30433).",
1038 "cmd": "k8sevents clear-config",
1039 "desc": "Clear external kubernetes configuration settings",
1044 {'name': 'config_check_secs',
1048 'desc': "interval (secs) to check for cluster configuration changes"},
1049 {'name': 'ceph_event_retention_days',
1052 'desc': "Days to hold ceph event information within local cache"}
1055 def __init__(self
, *args
, **kwargs
):
1057 self
.kubernetes_control
= 'POD_NAME' in os
.environ
1058 self
.event_processor
= None
1059 self
.config_watcher
= None
1060 self
.ns_watcher
= None
1061 self
.trackers
= list()
1062 self
.error_msg
= None
1063 self
._api
_client
_config
= None
1064 self
._namespace
= None
1066 # Declare the module options we accept
1067 self
.config_check_secs
= None
1068 self
.ceph_event_retention_days
= None
1070 self
.k8s_config
= dict(
1077 super(Module
, self
).__init
__(*args
, **kwargs
)
1079 def k8s_ready(self
):
1080 """Validate the k8s_config dict
1083 - bool .... indicating whether the config is ready to use
1084 - string .. variables that need to be defined before the module will function
1089 for k
in self
.k8s_config
:
1090 if not self
.k8s_config
[k
]:
1093 return ready
, missing
1095 def config_notify(self
):
1096 """Apply runtime module options, and defaults from the modules KV store"""
1097 self
.log
.debug("applying runtime module option settings")
1098 for opt
in self
.MODULE_OPTIONS
:
1101 self
.get_module_option(opt
['name']))
1103 if not self
.kubernetes_control
:
1104 # Populate the config
1105 self
.log
.debug("loading config from KV store")
1106 for k
in self
.k8s_config
:
1107 self
.k8s_config
[k
] = self
.get_store(k
, default
=None)
1109 def fetch_events(self
, limit
=None):
1110 """Interface to expose current events to another mgr module"""
1111 # FUTURE: Implement this to provide k8s events to the dashboard?
1112 raise NotImplementedError
1114 def process_clog(self
, log_message
):
1115 """Add log message to the event queue
1117 :param log_message: dict from the cluster log (audit/cluster channels)
1119 required_fields
= ['channel', 'message', 'priority', 'stamp']
1120 _message_attrs
= log_message
.keys()
1121 if all(_field
in _message_attrs
for _field
in required_fields
):
1122 self
.log
.debug("clog entry received - adding to the queue")
1123 if log_message
.get('message').startswith('overall HEALTH'):
1124 m_type
= 'heartbeat'
1126 m_type
= log_message
.get('channel')
1132 msg
=log_message
.get('message'),
1133 level
=log_message
.get('priority')[1:-1],
1134 tstamp
=log_message
.get('stamp')
1139 self
.log
.warning("Unexpected clog message format received - skipped: {}".format(log_message
))
1141 def notify(self
, notify_type
, notify_id
):
1143 Called by the ceph-mgr service to notify the Python plugin
1144 that new state is available.
1146 :param notify_type: string indicating what kind of notification,
1147 such as osd_map, mon_map, fs_map, mon_status,
1148 health, pg_summary, command, service_map
1149 :param notify_id: string (may be empty) that optionally specifies
1150 which entity is being notified about. With
1151 "command" notifications this is set to the tag
1152 ``from send_command``.
1155 # only interested in cluster log (clog) messages for now
1156 if notify_type
== 'clog':
1157 self
.log
.debug("received a clog entry from mgr.notify")
1158 if isinstance(notify_id
, dict):
1159 # create a log object to process
1160 self
.process_clog(notify_id
)
1162 self
.log
.warning("Expected a 'dict' log record format, received {}".format(type(notify_type
)))
1164 def _show_events(self
, events
):
1166 max_msg_length
= max([len(events
[k
].message
) for k
in events
])
1167 fmt
= "{:<20} {:<8} {:>5} {:<" + str(max_msg_length
) + "} {}\n"
1168 s
= fmt
.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name")
1170 for event_name
in sorted(events
,
1171 key
= lambda name
: events
[name
].last_timestamp
,
1174 event
= events
[event_name
]
1177 datetime
.strftime(event
.last_timestamp
,"%Y/%m/%d %H:%M:%S"),
1183 s
+= "Total : {:>3}\n".format(len(events
))
1186 def show_events(self
, events
):
1187 """Show events we're holding from the ceph namespace - most recent 1st"""
1190 return 0, "", self
._show
_events
(events
)
1192 return 0, "", "No events emitted yet, local cache is empty"
1194 def show_status(self
):
1196 s
+= "- Hostname : {}\n".format(self
.k8s_config
['server'])
1197 s
+= "- Namespace : {}\n".format(self
._namespace
)
1198 s
+= "Tracker Health\n"
1199 for t
in self
.trackers
:
1200 s
+= "- {:<20} : {}\n".format(t
.__class
__.__name
__, t
.health
)
1201 s
+= "Tracked Events\n"
1202 s
+= "- namespace : {:>3}\n".format(len(self
.ns_watcher
.events
))
1203 s
+= "- ceph events : {:>3}\n".format(len(self
.event_processor
.events
))
1206 def _valid_server(self
, server
):
1207 # must be a valid server url format
1208 server
= server
.strip()
1210 res
= urlparse(server
)
1211 port
= res
.netloc
.split(":")[-1]
1213 if res
.scheme
!= 'https':
1214 return False, "Server URL must use https"
1216 elif not res
.hostname
:
1217 return False, "Invalid server URL format"
1221 socket
.gethostbyname(res
.hostname
)
1222 except socket
.gaierror
:
1223 return False, "Unresolvable server URL"
1225 if not port
.isdigit():
1226 return False, "Server URL must end in a port number"
1230 def _valid_cacrt(self
, cacrt_data
):
1231 """use mgr_util.verify_cacrt to validate the CA file"""
1233 cacrt_fname
= create_temp_file("ca_file", cacrt_data
)
1236 verify_cacrt(cacrt_fname
)
1237 except ServerConfigException
as e
:
1238 return False, "Invalid CA certificate: {}".format(str(e
))
1242 def _valid_token(self
, token_data
):
1243 """basic checks on the token"""
1245 return False, "Token file is empty"
1247 pattern
= re
.compile(r
"[a-zA-Z0-9\-\.\_]+$")
1248 if not pattern
.match(token_data
):
1249 return False, "Token contains invalid characters"
1253 def _valid_namespace(self
, namespace
):
1254 # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols
1256 if len(namespace
) > 253:
1257 return False, "Name too long"
1258 if namespace
.isdigit():
1259 return False, "Invalid name - must be alphanumeric"
1261 pattern
= re
.compile(r
"^[a-z][a-z0-9\-\.]+$")
1262 if not pattern
.match(namespace
):
1263 return False, "Invalid characters in the name"
1267 def _config_set(self
, key
, val
):
1268 """Attempt to validate the content, then save to KV store"""
1270 val
= val
.rstrip() # remove any trailing whitespace/newline
1273 checker
= getattr(self
, "_valid_" + key
)
1274 except AttributeError:
1275 # no checker available, just let it pass
1276 self
.log
.warning("Unable to validate '{}' parameter - checker not implemented".format(key
))
1279 valid
, reason
= checker(val
)
1282 self
.set_store(key
, val
)
1283 self
.log
.info("Updated config KV Store item: " + key
)
1284 return 0, "", "Config updated for parameter '{}'".format(key
)
1286 return -22, "", "Invalid value for '{}' :{}".format(key
, reason
)
1288 def clear_config_settings(self
):
1289 for k
in self
.k8s_config
:
1290 self
.set_store(k
, None)
1291 return 0,"","{} configuration keys removed".format(len(self
.k8s_config
.keys()))
1293 def handle_command(self
, inbuf
, cmd
):
1295 access_options
= ['cacrt', 'token']
1296 config_options
= ['server', 'namespace']
1298 if cmd
['prefix'] == 'k8sevents clear-config':
1299 return self
.clear_config_settings()
1301 if cmd
['prefix'] == 'k8sevents set-access':
1302 if cmd
['key'] not in access_options
:
1303 return -errno
.EINVAL
, "", "Unknown access option. Must be one of; {}".format(','.join(access_options
))
1306 return self
._config
_set
(cmd
['key'], inbuf
)
1308 return -errno
.EINVAL
, "", "Command must specify -i <filename>"
1310 if cmd
['prefix'] == 'k8sevents set-config':
1312 if cmd
['key'] not in config_options
:
1313 return -errno
.EINVAL
, "", "Unknown config option. Must be one of; {}".format(','.join(config_options
))
1315 return self
._config
_set
(cmd
['key'], cmd
['value'])
1317 # At this point the command is trying to interact with k8sevents, so intercept if the configuration is
1320 _msg
= "k8sevents unavailable: " + self
.error_msg
1321 ready
, _
= self
.k8s_ready()
1322 if not self
.kubernetes_control
and not ready
:
1323 _msg
+= "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect"
1324 return -errno
.ENODATA
, "", _msg
1326 if cmd
["prefix"] == "k8sevents status":
1327 return self
.show_status()
1329 elif cmd
["prefix"] == "k8sevents ls":
1330 return self
.show_events(self
.ns_watcher
.events
)
1332 elif cmd
["prefix"] == "k8sevents ceph":
1333 return self
.show_events(self
.event_processor
.events
)
1336 raise NotImplementedError(cmd
["prefix"])
1340 """Determine whether the pre-reqs for the module are in place"""
1342 if not kubernetes_imported
:
1343 return False, "kubernetes python client is not available"
1346 def load_kubernetes_config(self
):
1347 """Load configuration for remote kubernetes API using KV store values
1349 Attempt to create an API client configuration from settings stored in
1353 client.ApiClient: kubernetes API client object
1356 OSError: unable to create the cacrt file
1359 # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a
1360 # temporary file containing the cert for the client to load from
1362 ca_crt_file
= create_temp_file('cacrt', self
.k8s_config
['cacrt'])
1363 except OSError as e
:
1364 self
.log
.error("Unable to create file to hold cacrt: {}".format(str(e
)))
1365 raise OSError(str(e
))
1367 self
.log
.debug("CA certificate from KV store, written to {}".format(ca_crt_file
))
1369 configuration
= client
.Configuration()
1370 configuration
.host
= self
.k8s_config
['server']
1371 configuration
.ssl_ca_cert
= ca_crt_file
1372 configuration
.api_key
= { "authorization": "Bearer " + self
.k8s_config
['token'] }
1373 api_client
= client
.ApiClient(configuration
)
1374 self
.log
.info("API client created for remote kubernetes access using cacrt and token from KV store")
1379 # apply options set by CLI to this module
1380 self
.config_notify()
1382 if not kubernetes_imported
:
1383 self
.error_msg
= "Unable to start : python kubernetes package is missing"
1385 if self
.kubernetes_control
:
1386 # running under rook-ceph
1387 config
.load_incluster_config()
1388 self
.k8s_config
['server'] = "https://{}:{}".format(os
.environ
.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'),
1389 os
.environ
.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN'))
1390 self
._api
_client
_config
= None
1391 self
._namespace
= os
.environ
.get("POD_NAMESPACE", "rook-ceph")
1393 # running outside of rook-ceph, so we need additional settings to tell us
1394 # how to connect to the kubernetes cluster
1395 ready
, errors
= self
.k8s_ready()
1397 self
.error_msg
= "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors
))
1400 self
._api
_client
_config
= self
.load_kubernetes_config()
1401 except OSError as e
:
1402 self
.error_msg
= str(e
)
1404 self
._namespace
= self
.k8s_config
['namespace']
1405 self
.log
.info("k8sevents configuration loaded from KV store")
1408 self
.log
.error(self
.error_msg
)
1411 # All checks have passed
1412 self
.config_watcher
= CephConfigWatcher(self
)
1414 self
.event_processor
= EventProcessor(self
.config_watcher
,
1415 self
.ceph_event_retention_days
,
1416 self
._api
_client
_config
,
1419 self
.ns_watcher
= NamespaceWatcher(api_client_config
=self
._api
_client
_config
,
1420 namespace
=self
._namespace
)
1422 if self
.event_processor
.ok
:
1423 log
.info("Ceph Log processor thread starting")
1424 self
.event_processor
.start() # start log consumer thread
1425 log
.info("Ceph config watcher thread starting")
1426 self
.config_watcher
.start()
1427 log
.info("Rook-ceph namespace events watcher starting")
1428 self
.ns_watcher
.start()
1430 self
.trackers
.extend([self
.event_processor
, self
.config_watcher
, self
.ns_watcher
])
1436 trackers
= self
.trackers
1438 if not t
.is_alive() and not t
.reported
:
1439 log
.error("K8sevents tracker thread '{}' stopped: {}".format(t
.__class
__.__name
__, t
.health
))
1443 self
.error_msg
= "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?"
1444 log
.warning(self
.error_msg
)
1445 log
.warning("k8sevents module exiting")
1450 log
.info("Shutting down k8sevents module")
1451 self
.event_processor
.can_run
= False
1454 self
._rados
.shutdown()