]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/k8sevents/module.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / k8sevents / module.py
CommitLineData
eafe8130
TL
1# Integrate with the kubernetes events API.
2# This module sends events to Kubernetes, and also captures/tracks all events
3# in the rook-ceph namespace so kubernetes activity like pod restarts,
4# imagepulls etc can be seen from within the ceph cluster itself.
5#
6# To interact with the events API, the mgr service to access needs to be
7# granted additional permissions
8# e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules
9#
10# These are the changes needed;
11# - apiGroups:
12# - ""
13# resources:
14# - events
15# verbs:
16# - create
17# - patch
18# - list
19# - get
20# - watch
21
22
23import os
24import re
25import sys
26import time
27import json
28import yaml
29import errno
30import socket
31import base64
32import logging
33import tempfile
34import threading
35
36try:
37 # python 3
38 from urllib.parse import urlparse
39except ImportError:
40 # python 2 fallback
41 from urlparse import urlparse
42
43from datetime import tzinfo, datetime, timedelta
44
45from urllib3.exceptions import MaxRetryError,ProtocolError
46from collections import OrderedDict
47
48import rados
49from mgr_module import MgrModule
50from mgr_util import verify_cacrt, ServerConfigException
51
52try:
53 import queue
54except ImportError:
55 # python 2.7.5
56 import Queue as queue
57finally:
58 # python 2.7.15 or python3
59 event_queue = queue.Queue()
60
61try:
62 from kubernetes import client, config, watch
63 from kubernetes.client.rest import ApiException
64except ImportError:
65 kubernetes_imported = False
66 client = None
67 config = None
68 watch = None
69else:
70 kubernetes_imported = True
71
72 # The watch.Watch.stream method can provide event objects that have involved_object = None
73 # which causes an exception in the generator. A workaround is discussed for a similar issue
74 # in https://github.com/kubernetes-client/python/issues/376 which has been used here
75 # pylint: disable=no-member
76 from kubernetes.client.models.v1_event import V1Event
77 def local_involved_object(self, involved_object):
78 if involved_object is None:
79 involved_object = client.V1ObjectReference(api_version="1")
80 self._involved_object = involved_object
81 V1Event.involved_object = V1Event.involved_object.setter(local_involved_object)
82
83log = logging.getLogger(__name__)
84
85# use a simple local class to represent UTC
86# datetime pkg modules vary between python2 and 3 and pytz is not available on older
87# ceph container images, so taking a pragmatic approach!
88class UTC(tzinfo):
89 def utcoffset(self, dt):
90 return timedelta(0)
91
92 def tzname(self, dt):
93 return "UTC"
94
95 def dst(self, dt):
96 return timedelta(0)
97
98
99def text_suffix(num):
100 """Define a text suffix based on a value i.e. turn host into hosts"""
101 return '' if num == 1 else 's'
102
103
104def create_temp_file(fname, content, suffix=".tmp"):
105 """Create a temp file
106
107 Attempt to create an temporary file containing the given content
108
109 Returns:
110 str .. full path to the temporary file
111
112 Raises:
113 OSError: problems creating the file
114
115 """
116
117 if content is not None:
118 file_name = os.path.join(tempfile.gettempdir(), fname + suffix)
119
120 try:
121 with open(file_name, "w") as f:
122 f.write(content)
123 except OSError as e:
124 raise OSError("Unable to create temporary file : {}".format(str(e)))
125
126 return file_name
127
128
129class HealthCheck(object):
130 """Transform a healthcheck msg into it's component parts"""
131
132 def __init__(self, msg, msg_level):
133
134 # msg looks like
135 #
136 # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY)
137 # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set)
138 # Health check failed: nodown flag(s) set (OSDMAP_FLAGS)
139 #
140 self.msg = None
141 self.name = None
142 self.text = None
143 self.valid = False
144
145 if msg.lower().startswith('health check'):
146
147 self.valid = True
148 self.msg = msg
149 msg_tokens = self.msg.split()
150
151 if msg_level == 'INF':
152 self.text = ' '.join(msg_tokens[3:])
153 self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS
154 else: # WRN or ERR
155 self.text = ' '.join(msg_tokens[3:-1])
156 self.name = msg_tokens[-1][1:-1]
157
158
159class LogEntry(object):
160 """Generic 'log' object"""
161
162 reason_map = {
163 "audit": "Audit",
164 "cluster": "HealthCheck",
165 "config": "ClusterChange",
166 "heartbeat":"Heartbeat",
167 "startup": "Started"
168 }
169
170 def __init__(self, source, msg, msg_type, level, tstamp=None):
171
172 self.source = source
173 self.msg = msg
174 self.msg_type = msg_type
175 self.level = level
176 self.tstamp = tstamp
177 self.healthcheck = None
178
179 if 'health check ' in self.msg.lower():
180 self.healthcheck = HealthCheck(self.msg, self.level)
181
182
183 def __str__(self):
184 return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,
185 self.msg_type,
186 self.msg,
187 self.level,
188 self.tstamp)
189
190 @property
191 def cmd(self):
192 """Look at the msg string and extract the command content"""
193
194 # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished'
195 if self.msg_type != 'audit':
196 return None
197 else:
198 _m=self.msg[:-10].replace("\'","").split("cmd=")
199 _s='"cmd":{}'.format(_m[1])
200 cmds_list = json.loads('{' + _s + '}')['cmd']
201
202 # TODO. Assuming only one command was issued for now
203 _c = cmds_list[0]
204 return "{} {}".format(_c['prefix'], _c.get('key', ''))
205
206 @property
207 def event_type(self):
208 return 'Normal' if self.level == 'INF' else 'Warning'
209
210 @property
211 def event_reason(self):
212 return self.reason_map[self.msg_type]
213
214 @property
215 def event_name(self):
216 if self.msg_type == 'heartbeat':
217 return 'mgr.Heartbeat'
218 elif self.healthcheck:
219 return 'mgr.health.{}'.format(self.healthcheck.name)
220 elif self.msg_type == 'audit':
221 return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_')
222 elif self.msg_type == 'config':
223 return 'mgr.ConfigurationChange'
224 elif self.msg_type == 'startup':
225 return "mgr.k8sevents-module"
226 else:
227 return None
228
229 @property
230 def event_entity(self):
231 if self.msg_type == 'audit':
232 return self.msg.replace("\'","").split('entity=')[1].split(' ')[0]
233 else:
234 return None
235
236 @property
237 def event_msg(self):
238 if self.msg_type == 'audit':
239 return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd)
240
241 elif self.healthcheck:
242 return self.healthcheck.text
243 else:
244 return self.msg
245
246
247class BaseThread(threading.Thread):
248 health = 'OK'
249 reported = False
250 daemon = True
251
252
f6b5b4d7
TL
253def clean_event(event):
254 """ clean an event record """
255 if not event.first_timestamp:
256 log.error("first_timestamp is empty")
257 if event.metadata.creation_timestamp:
258 log.error("setting first_timestamp to the creation timestamp")
259 event.first_timestamp = event.metadata.creation_timestamp
260 else:
261 log.error("defaulting event first timestamp to current datetime")
262 event.first_timestamp = datetime.datetime.now()
263
264 if not event.last_timestamp:
265 log.error("setting event last timestamp to {}".format(event.first_timestamp))
266 event.last_timestamp = event.first_timestamp
267
268 if not event.count:
269 event.count = 1
270
271 return event
272
273
eafe8130
TL
274class NamespaceWatcher(BaseThread):
275 """Watch events in a given namespace
276
277 Using the watch package we can listen to event traffic in the namespace to
278 get an idea of what kubernetes related events surround the ceph cluster. The
279 thing to bear in mind is that events have a TTL enforced by the kube-apiserver
280 so this stream will only really show activity inside this retention window.
281 """
282
283 def __init__(self, api_client_config, namespace=None):
284 super(NamespaceWatcher, self).__init__()
285
286 if api_client_config:
287 self.api = client.CoreV1Api(api_client_config)
288 else:
289 self.api = client.CoreV1Api()
290
291 self.namespace = namespace
292
293 self.events = OrderedDict()
294 self.lock = threading.Lock()
295 self.active = None
296 self.resource_version = None
297
298 def fetch(self):
299 # clear the cache on every call to fetch
300 self.events.clear()
301 try:
302 resp = self.api.list_namespaced_event(self.namespace)
303 # TODO - Perhaps test for auth problem to be more specific in the except clause?
304 except:
305 self.active = False
306 self.health = "Unable to access events API (list_namespaced_event call failed)"
307 log.warning(self.health)
308 else:
309 self.active = True
310 self.resource_version = resp.metadata.resource_version
311
312 for item in resp.items:
f6b5b4d7 313 self.events[item.metadata.name] = clean_event(item)
eafe8130
TL
314 log.info('Added {} events'.format(len(resp.items)))
315
316 def run(self):
317 self.fetch()
318 func = getattr(self.api, "list_namespaced_event")
319
320 if self.active:
321 log.info("Namespace event watcher started")
322
323
324 while True:
325
326 try:
327 w = watch.Watch()
328 # execute generator to continually watch resource for changes
329 for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True):
330 obj = item['object']
331
332 with self.lock:
333
334 if item['type'] in ['ADDED', 'MODIFIED']:
f6b5b4d7 335 self.events[obj.metadata.name] = clean_event(obj)
eafe8130
TL
336
337 elif item['type'] == 'DELETED':
338 del self.events[obj.metadata.name]
339
340 # TODO test the exception for auth problem (403?)
341
342 # Attribute error is generated when urllib3 on the system is old and doesn't have a
343 # read_chunked method
344 except AttributeError as e:
345 self.health = ("Error: Unable to 'watch' events API in namespace '{}' - "
346 "urllib3 too old? ({})".format(self.namespace, e))
347 self.active = False
348 log.warning(self.health)
349 break
350
351 except ApiException as e:
352 # refresh the resource_version & watcher
353 log.warning("API exception caught in watcher ({})".format(e))
354 log.warning("Restarting namespace watcher")
355 self.fetch()
356
f6b5b4d7
TL
357 except ProtocolError as e:
358 log.warning("Namespace watcher hit protocolerror ({}) - restarting".format(e))
359 self.fetch()
360
eafe8130
TL
361 except Exception:
362 self.health = "{} Exception at {}".format(
363 sys.exc_info()[0].__name__,
364 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
365 )
366 log.exception(self.health)
367 self.active = False
368 break
369
370 log.warning("Namespace event watcher stopped")
371
372
373class KubernetesEvent(object):
374
375 def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None):
376
377 if api_client_config:
378 self.api = client.CoreV1Api(api_client_config)
379 else:
380 self.api = client.CoreV1Api()
381
382 self.namespace = namespace
383
384 self.event_name = log_entry.event_name
385 self.message = log_entry.event_msg
386 self.event_type = log_entry.event_type
387 self.event_reason = log_entry.event_reason
388 self.unique_name = unique_name
389
390 self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN'))
391
392 self.api_status = 200
393 self.count = 1
394 self.first_timestamp = None
395 self.last_timestamp = None
396
397 @property
398 def type(self):
399 """provide a type property matching a V1Event object"""
400 return self.event_type
401
402 @property
403 def event_body(self):
404 if self.unique_name:
405 obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name))
406 else:
407 obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name))
408
409 # field_path is needed to prevent problems in the namespacewatcher when
410 # deleted event are received
411 obj_ref = client.V1ObjectReference(kind="CephCluster",
412 field_path='spec.containers{mgr}',
413 name=self.event_name,
414 namespace=self.namespace)
415
416 event_source = client.V1EventSource(component="ceph-mgr",
417 host=self.host)
418 return client.V1Event(
419 involved_object=obj_ref,
420 metadata=obj_meta,
421 message=self.message,
422 count=self.count,
423 type=self.event_type,
424 reason=self.event_reason,
425 source=event_source,
426 first_timestamp=self.first_timestamp,
427 last_timestamp=self.last_timestamp
428 )
429
430 def write(self):
431
432 now=datetime.now(UTC())
433
434 self.first_timestamp = now
435 self.last_timestamp = now
436
437 try:
438 self.api.create_namespaced_event(self.namespace, self.event_body)
439 except (OSError, ProtocolError):
440 # unable to reach to the API server
441 log.error("Unable to reach API server")
442 self.api_status = 400
443 except MaxRetryError:
444 # k8s config has not be defined properly
445 log.error("multiple attempts to connect to the API have failed")
446 self.api_status = 403 # Forbidden
447 except ApiException as e:
448 log.debug("event.write status:{}".format(e.status))
449 self.api_status = e.status
450 if e.status == 409:
451 log.debug("attempting event update for an existing event")
452 # 409 means the event is there already, so read it back (v1Event object returned)
453 # this could happen if the event has been created, and then the k8sevent module
454 # disabled and reenabled - i.e. the internal event tracking no longer matches k8s
455 response = self.api.read_namespaced_event(self.event_name, self.namespace)
456 #
457 # response looks like
458 #
459 # {'action': None,
460 # 'api_version': 'v1',
461 # 'count': 1,
462 # 'event_time': None,
463 # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
464 # 'involved_object': {'api_version': None,
465 # 'field_path': None,
466 # 'kind': 'CephCluster',
467 # 'name': 'ceph-mgr.k8sevent-module',
468 # 'namespace': 'rook-ceph',
469 # 'resource_version': None,
470 # 'uid': None},
471 # 'kind': 'Event',
472 # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
473 # 'message': 'Ceph log -> event tracking started',
474 # 'metadata': {'annotations': None,
475 # 'cluster_name': None,
476 # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()),
477 # 'deletion_grace_period_seconds': None,
478 # 'deletion_timestamp': None,
479 # 'finalizers': None,
480 # 'generate_name': 'ceph-mgr.k8sevent-module',
481 # 'generation': None,
482 # 'initializers': None,
483 # 'labels': None,
484 # 'name': 'ceph-mgr.k8sevent-module5z7kq',
485 # 'namespace': 'rook-ceph',
486 # 'owner_references': None,
487 # 'resource_version': '1195832',
488 # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq',
489 # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'},
490 # 'reason': 'Started',
491 # 'related': None,
492 # 'reporting_component': '',
493 # 'reporting_instance': '',
494 # 'series': None,
495 # 'source': {'component': 'ceph-mgr', 'host': 'minikube'},
496 # 'type': 'Normal'}
497
498 # conflict event already exists
499 # read it
500 # update : count and last_timestamp and msg
501
502 self.count = response.count + 1
503 self.first_timestamp = response.first_timestamp
504 try:
505 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
506 except ApiException as e:
507 log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status))
508 self.api_status = e.status
509 else:
510 log.debug("event {} patched".format(self.event_name))
511 self.api_status = 200
512
513 else:
514 log.debug("event {} created successfully".format(self.event_name))
515 self.api_status = 200
516
517 @property
518 def api_success(self):
519 return self.api_status == 200
520
521 def update(self, log_entry):
522 self.message = log_entry.event_msg
523 self.event_type = log_entry.event_type
524 self.last_timestamp = datetime.now(UTC())
525 self.count += 1
526 log.debug("performing event update for {}".format(self.event_name))
527
528 try:
529 self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
530 except ApiException as e:
531 log.error("event patch call failed: {}".format(e.status))
532 if e.status == 404:
533 # tried to patch, but hit a 404. The event's TTL must have been reached, and
534 # pruned by the kube-apiserver
535 log.debug("event not found, so attempting to create it")
536 try:
537 self.api.create_namespaced_event(self.namespace, self.event_body)
538 except ApiException as e:
539 log.error("unable to create the event: {}".format(e.status))
540 self.api_status = e.status
541 else:
542 log.debug("event {} created successfully".format(self.event_name))
543 self.api_status = 200
544 else:
545 log.debug("event {} updated".format(self.event_name))
546 self.api_status = 200
547
548
549class EventProcessor(BaseThread):
550 """Handle a global queue used to track events we want to send/update to kubernetes"""
551
552 can_run = True
553
554 def __init__(self, config_watcher, event_retention_days, api_client_config, namespace):
555 super(EventProcessor, self).__init__()
556
557 self.events = dict()
558 self.config_watcher = config_watcher
559 self.event_retention_days = event_retention_days
560 self.api_client_config = api_client_config
561 self.namespace = namespace
562
563 def startup(self):
564 """Log an event to show we're active"""
565
566 event = KubernetesEvent(
567 LogEntry(
568 source='self',
569 msg='Ceph log -> event tracking started',
570 msg_type='startup',
571 level='INF',
572 tstamp=None
573 ),
574 unique_name=False,
575 api_client_config=self.api_client_config,
576 namespace=self.namespace
577 )
578
579 event.write()
580 return event.api_success
581
582 @property
583 def ok(self):
584 return self.startup()
585
586 def prune_events(self):
587 log.debug("prune_events - looking for old events to remove from cache")
588 oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days)
589 local_events = dict(self.events)
590
591 for event_name in sorted(local_events,
592 key = lambda name: local_events[name].last_timestamp):
593 event = local_events[event_name]
594 if event.last_timestamp >= oldest:
595 break
596 else:
597 # drop this event
598 log.debug("prune_events - removing old event : {}".format(event_name))
599 del self.events[event_name]
600
601 def process(self, log_object):
602
603 log.debug("log entry being processed : {}".format(str(log_object)))
604
605 event_out = False
606 unique_name = True
607
608 if log_object.msg_type == 'audit':
609 # audit traffic : operator commands
610 if log_object.msg.endswith('finished'):
611 log.debug("K8sevents received command finished msg")
612 event_out = True
613 else:
614 # NO OP - ignoring 'dispatch' log records
615 return
616
617 elif log_object.msg_type == 'cluster':
618 # cluster messages : health checks
619 if log_object.event_name:
620 event_out = True
621
622 elif log_object.msg_type == 'config':
623 # configuration checker messages
624 event_out = True
625 unique_name = False
626
627 elif log_object.msg_type == 'heartbeat':
628 # hourly health message summary from Ceph
629 event_out = True
630 unique_name = False
631 log_object.msg = str(self.config_watcher)
632
633 else:
634 log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type))
635
636 if event_out:
637 log.debug("k8sevents sending event to kubernetes")
638 # we don't cache non-unique events like heartbeats or config changes
639 if not unique_name or log_object.event_name not in self.events.keys():
640 event = KubernetesEvent(log_entry=log_object,
641 unique_name=unique_name,
642 api_client_config=self.api_client_config,
643 namespace=self.namespace)
644 event.write()
645 log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status))
646 if event.api_success and unique_name:
647 self.events[log_object.event_name] = event
648 else:
649 event = self.events[log_object.event_name]
650 event.update(log_object)
651 log.debug("event update ended : {}".format(event.api_status))
652
653 self.prune_events()
654
655 else:
656 log.debug("K8sevents ignored message : {}".format(log_object.msg))
657
658 def run(self):
659 log.info("Ceph event processing thread started, "
660 "event retention set to {} days".format(self.event_retention_days))
661
662 while True:
663
664 try:
665 log_object = event_queue.get(block=False)
666 except queue.Empty:
667 pass
668 else:
669 try:
670 self.process(log_object)
671 except Exception:
672 self.health = "{} Exception at {}".format(
673 sys.exc_info()[0].__name__,
674 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
675 )
676 log.exception(self.health)
677 break
678
679 if not self.can_run:
680 break
681
682 time.sleep(0.5)
683
684 log.warning("Ceph event processing thread stopped")
685
686
687class ListDiff(object):
688 def __init__(self, before, after):
689 self.before = set(before)
690 self.after = set(after)
691
692 @property
693 def removed(self):
694 return list(self.before - self.after)
695
696 @property
697 def added(self):
698 return list(self.after - self.before)
699
700 @property
701 def is_equal(self):
702 return self.before == self.after
703
704
705class CephConfigWatcher(BaseThread):
706 """Detect configuration changes within the cluster and generate human readable events"""
707
708 def __init__(self, mgr):
709 super(CephConfigWatcher, self).__init__()
710 self.mgr = mgr
711 self.server_map = dict()
712 self.osd_map = dict()
713 self.pool_map = dict()
714 self.service_map = dict()
715
716 self.config_check_secs = mgr.config_check_secs
717
718 @property
719 def raw_capacity(self):
720 # Note. if the osd's are not online the capacity field will be 0
721 return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map])
722
723 @property
724 def num_servers(self):
725 return len(self.server_map.keys())
726
727 @property
728 def num_osds(self):
729 return len(self.osd_map.keys())
730
731 @property
732 def num_pools(self):
733 return len(self.pool_map.keys())
734
735 def __str__(self):
736 s = ''
737
738 s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format(
739 json.loads(self.mgr.get('health')['json'])['status'],
740 self.num_servers,
741 text_suffix(self.num_servers),
742 self.num_pools,
743 text_suffix(self.num_pools),
744 self.num_osds,
745 MgrModule.to_pretty_iec(self.raw_capacity))
746 return s
747
748 def fetch_servers(self):
749 """Return a server summary, and service summary"""
750 servers = self.mgr.list_servers()
751 server_map = dict() # host -> services
752 service_map = dict() # service -> host
753 for server_info in servers:
754 services = dict()
755 for svc in server_info['services']:
756 if svc.get('type') in services.keys():
757 services[svc.get('type')].append(svc.get('id'))
758 else:
759 services[svc.get('type')] = list([svc.get('id')])
760 # maintain the service xref map service -> host and version
761 service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '')
762 server_map[server_info.get('hostname')] = services
763
764 return server_map, service_map
765
766 def fetch_pools(self):
767 interesting = ["type", "size", "min_size"]
768 # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool',
769 # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn',
770 # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100,
771 # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0',
772 # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0,
773 # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1,
774 # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0,
775 # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000,
776 # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0,
777 # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0,
778 # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0,
779 # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0,
780 # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}},
781 # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0',
782 # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}]
783 pools = self.mgr.get('osd_map')['pools']
784 pool_map = dict()
785 for pool in pools:
786 pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting}
787 return pool_map
788
789
790 def fetch_osd_map(self, service_map):
791 """Create an osd map"""
792 stats = self.mgr.get('osd_stats')
793
794 osd_map = dict()
795
796 devices = self.mgr.get('osd_map_crush')['devices']
797 for dev in devices:
798 osd_id = str(dev['id'])
799 osd_map[osd_id] = dict(
800 deviceclass=dev.get('class'),
801 capacity=0,
802 hostname=service_map['osd', osd_id]
803 )
804
805 for osd_stat in stats['osd_stats']:
806 osd_id = str(osd_stat.get('osd'))
807 osd_map[osd_id]['capacity'] = osd_stat['statfs']['total']
808
809 return osd_map
810
811 def push_events(self, changes):
812 """Add config change to the global queue to generate an event in kubernetes"""
813 log.debug("{} events will be generated")
814 for change in changes:
815 event_queue.put(change)
816
817 def _generate_config_logentry(self, msg):
818 return LogEntry(
819 source="config",
820 msg_type="config",
821 msg=msg,
822 level='INF',
823 tstamp=None
824 )
825
826 def _check_hosts(self, server_map):
827 log.debug("K8sevents checking host membership")
828 changes = list()
829 servers = ListDiff(self.server_map.keys(), server_map.keys())
830 if servers.is_equal:
831 # no hosts have been added or removed
832 pass
833 else:
834 # host changes detected, find out what
835 host_msg = "Host '{}' has been {} the cluster"
836 for new_server in servers.added:
837 changes.append(self._generate_config_logentry(
838 msg=host_msg.format(new_server, 'added to'))
839 )
840
841 for removed_server in servers.removed:
842 changes.append(self._generate_config_logentry(
843 msg=host_msg.format(removed_server, 'removed from'))
844 )
845
846 return changes
847
848 def _check_osds(self,server_map, osd_map):
849 log.debug("K8sevents checking OSD configuration")
850 changes = list()
851 before_osds = list()
852 for svr in self.server_map:
853 before_osds.extend(self.server_map[svr].get('osd',[]))
854
855 after_osds = list()
856 for svr in server_map:
857 after_osds.extend(server_map[svr].get('osd',[]))
858
859 if set(before_osds) == set(after_osds):
860 # no change in osd id's
861 pass
862 else:
863 # osd changes detected
864 osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}"
865
866 osds = ListDiff(before_osds, after_osds)
867 for new_osd in osds.added:
868 changes.append(self._generate_config_logentry(
869 msg=osd_msg.format(
870 new_osd,
871 osd_map[new_osd]['deviceclass'],
872 MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']),
873 'added to',
874 osd_map[new_osd]['hostname']))
875 )
876
877 for removed_osd in osds.removed:
878 changes.append(self._generate_config_logentry(
879 msg=osd_msg.format(
880 removed_osd,
881 osd_map[removed_osd]['deviceclass'],
882 MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']),
883 'removed from',
884 osd_map[removed_osd]['hostname']))
885 )
886
887 return changes
888
889 def _check_pools(self, pool_map):
890 changes = list()
891 log.debug("K8sevents checking pool configurations")
892 if self.pool_map.keys() == pool_map.keys():
893 # no pools added/removed
894 pass
895 else:
896 # Pool changes
897 pools = ListDiff(self.pool_map.keys(), pool_map.keys())
898 pool_msg = "Pool '{}' has been {} the cluster"
899 for new_pool in pools.added:
900 changes.append(self._generate_config_logentry(
901 msg=pool_msg.format(new_pool, 'added to'))
902 )
903
904 for removed_pool in pools.removed:
905 changes.append(self._generate_config_logentry(
906 msg=pool_msg.format(removed_pool, 'removed from'))
907 )
908
909 # check pool configuration changes
910 for pool_name in pool_map:
911 if not self.pool_map.get(pool_name, dict()):
912 # pool didn't exist before so just skip the checks
913 continue
914
915 if pool_map[pool_name] == self.pool_map[pool_name]:
916 # no changes - dicts match in key and value
917 continue
918 else:
919 # determine the change and add it to the change list
920 size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size']
921 if size_diff != 0:
922 if size_diff < 0:
923 msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name,
924 pool_map[pool_name]['size'])
925 level = 'WRN'
926 else:
927 msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name,
928 pool_map[pool_name]['size'])
929 level = 'INF'
930
931 changes.append(LogEntry(source="config",
932 msg_type="config",
933 msg=msg,
934 level=level,
935 tstamp=None)
936 )
937
938 if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']:
939 changes.append(LogEntry(source="config",
940 msg_type="config",
941 msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name),
942 level='WRN',
943 tstamp=None)
944 )
945
946 return changes
947
948 def get_changes(self, server_map, osd_map, pool_map):
949 """Detect changes in maps between current observation and the last"""
950
951 changes = list()
952
953 changes.extend(self._check_hosts(server_map))
954 changes.extend(self._check_osds(server_map, osd_map))
955 changes.extend(self._check_pools(pool_map))
956
957 # FUTURE
958 # Could generate an event if a ceph daemon has moved hosts
959 # (assumes the ceph metadata host information is valid though!)
960
961 return changes
962
963 def run(self):
964 log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs))
965
966 self.server_map, self.service_map = self.fetch_servers()
967 self.pool_map = self.fetch_pools()
968
969 self.osd_map = self.fetch_osd_map(self.service_map)
970
971 while True:
972
973 try:
974 start_time = time.time()
975 server_map, service_map = self.fetch_servers()
976 pool_map = self.fetch_pools()
977 osd_map = self.fetch_osd_map(service_map)
978
979 changes = self.get_changes(server_map, osd_map, pool_map)
980 if changes:
981 self.push_events(changes)
982
983 self.osd_map = osd_map
984 self.pool_map = pool_map
985 self.server_map = server_map
986 self.service_map = service_map
987
988 checks_duration = int(time.time() - start_time)
989
990 # check that the time it took to run the checks fits within the
991 # interval, and if not extend the interval and emit a log message
992 # to show that the runtime for the checks exceeded the desired
993 # interval
994 if checks_duration > self.config_check_secs:
995 new_interval = self.config_check_secs * 2
996 log.warning("K8sevents check interval warning. "
997 "Current checks took {}s, interval was {}s. "
998 "Increasing interval to {}s".format(int(checks_duration),
999 self.config_check_secs,
1000 new_interval))
1001 self.config_check_secs = new_interval
1002
1003 time.sleep(self.config_check_secs)
1004
1005 except Exception:
1006 self.health = "{} Exception at {}".format(
1007 sys.exc_info()[0].__name__,
1008 datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S")
1009 )
1010 log.exception(self.health)
1011 break
1012
1013 log.warning("Ceph configuration watcher stopped")
1014
1015
1016class Module(MgrModule):
1017 COMMANDS = [
1018 {
1019 "cmd": "k8sevents status",
1020 "desc": "Show the status of the data gathering threads",
1021 "perm": "r"
1022 },
1023 {
1024 "cmd": "k8sevents ls",
1025 "desc": "List all current Kuberenetes events from the Ceph namespace",
1026 "perm": "r"
1027 },
1028 {
1029 "cmd": "k8sevents ceph",
1030 "desc": "List Ceph events tracked & sent to the kubernetes cluster",
1031 "perm": "r"
1032 },
1033 {
1034 "cmd": "k8sevents set-access name=key,type=CephString",
1035 "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt",
1036 "perm": "rw"
1037 },
1038 {
1039 "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString",
1040 "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433",
1041 "perm": "rw"
1042 },
1043 {
1044 "cmd": "k8sevents clear-config",
1045 "desc": "Clear external kubernetes configuration settings",
1046 "perm": "rw"
1047 },
1048 ]
1049 MODULE_OPTIONS = [
1050 {'name': 'config_check_secs',
1051 'type': 'int',
1052 'default': 10,
1053 'min': 10,
1054 'desc': "interval (secs) to check for cluster configuration changes"},
1055 {'name': 'ceph_event_retention_days',
1056 'type': 'int',
1057 'default': 7,
1058 'desc': "Days to hold ceph event information within local cache"}
1059 ]
1060
1061 def __init__(self, *args, **kwargs):
1062 self.run = True
1063 self.kubernetes_control = 'POD_NAME' in os.environ
1064 self.event_processor = None
1065 self.config_watcher = None
1066 self.ns_watcher = None
1067 self.trackers = list()
1068 self.error_msg = None
1069 self._api_client_config = None
1070 self._namespace = None
1071
1072 # Declare the module options we accept
1073 self.config_check_secs = None
1074 self.ceph_event_retention_days = None
1075
1076 self.k8s_config = dict(
1077 cacrt = None,
1078 token = None,
1079 server = None,
1080 namespace = None
1081 )
1082
1083 super(Module, self).__init__(*args, **kwargs)
1084
1085 def k8s_ready(self):
1086 """Validate the k8s_config dict
1087
1088 Returns:
1089 - bool .... indicating whether the config is ready to use
1090 - string .. variables that need to be defined before the module will function
1091
1092 """
1093 missing = list()
1094 ready = True
1095 for k in self.k8s_config:
1096 if not self.k8s_config[k]:
1097 missing.append(k)
1098 ready = False
1099 return ready, missing
1100
1101 def config_notify(self):
1102 """Apply runtime module options, and defaults from the modules KV store"""
1103 self.log.debug("applying runtime module option settings")
1104 for opt in self.MODULE_OPTIONS:
1105 setattr(self,
1106 opt['name'],
9f95a23c 1107 self.get_module_option(opt['name']))
eafe8130
TL
1108
1109 if not self.kubernetes_control:
1110 # Populate the config
1111 self.log.debug("loading config from KV store")
1112 for k in self.k8s_config:
1113 self.k8s_config[k] = self.get_store(k, default=None)
1114
1115 def fetch_events(self, limit=None):
1116 """Interface to expose current events to another mgr module"""
1117 # FUTURE: Implement this to provide k8s events to the dashboard?
1118 raise NotImplementedError
1119
1120 def process_clog(self, log_message):
1121 """Add log message to the event queue
1122
1123 :param log_message: dict from the cluster log (audit/cluster channels)
1124 """
1125 required_fields = ['channel', 'message', 'priority', 'stamp']
1126 _message_attrs = log_message.keys()
1127 if all(_field in _message_attrs for _field in required_fields):
1128 self.log.debug("clog entry received - adding to the queue")
1129 if log_message.get('message').startswith('overall HEALTH'):
1130 m_type = 'heartbeat'
1131 else:
1132 m_type = log_message.get('channel')
1133
1134 event_queue.put(
1135 LogEntry(
1136 source='log',
1137 msg_type=m_type,
1138 msg=log_message.get('message'),
1139 level=log_message.get('priority')[1:-1],
1140 tstamp=log_message.get('stamp')
1141 )
1142 )
1143
1144 else:
1145 self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
1146
1147 def notify(self, notify_type, notify_id):
1148 """
1149 Called by the ceph-mgr service to notify the Python plugin
1150 that new state is available.
1151
1152 :param notify_type: string indicating what kind of notification,
1153 such as osd_map, mon_map, fs_map, mon_status,
1154 health, pg_summary, command, service_map
1155 :param notify_id: string (may be empty) that optionally specifies
1156 which entity is being notified about. With
1157 "command" notifications this is set to the tag
1158 ``from send_command``.
1159 """
1160
1161 # only interested in cluster log (clog) messages for now
1162 if notify_type == 'clog':
1163 self.log.debug("received a clog entry from mgr.notify")
1164 if isinstance(notify_id, dict):
1165 # create a log object to process
1166 self.process_clog(notify_id)
1167 else:
1168 self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type)))
1169
1170 def _show_events(self, events):
1171
1172 max_msg_length = max([len(events[k].message) for k in events])
1173 fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n"
1174 s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name")
1175
1176 for event_name in sorted(events,
1177 key = lambda name: events[name].last_timestamp,
1178 reverse=True):
1179
1180 event = events[event_name]
1181
1182 s += fmt.format(
1183 datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"),
f6b5b4d7
TL
1184 str(event.type),
1185 str(event.count),
1186 str(event.message),
1187 str(event_name)
eafe8130
TL
1188 )
1189 s += "Total : {:>3}\n".format(len(events))
1190 return s
1191
1192 def show_events(self, events):
1193 """Show events we're holding from the ceph namespace - most recent 1st"""
1194
1195 if len(events):
1196 return 0, "", self._show_events(events)
1197 else:
1198 return 0, "", "No events emitted yet, local cache is empty"
1199
1200 def show_status(self):
1201 s = "Kubernetes\n"
1202 s += "- Hostname : {}\n".format(self.k8s_config['server'])
1203 s += "- Namespace : {}\n".format(self._namespace)
1204 s += "Tracker Health\n"
1205 for t in self.trackers:
1206 s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health)
1207 s += "Tracked Events\n"
1208 s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events))
1209 s += "- ceph events : {:>3}\n".format(len(self.event_processor.events))
1210 return 0, "", s
1211
1212 def _valid_server(self, server):
1213 # must be a valid server url format
1214 server = server.strip()
1215
1216 res = urlparse(server)
1217 port = res.netloc.split(":")[-1]
1218
1219 if res.scheme != 'https':
1220 return False, "Server URL must use https"
1221
1222 elif not res.hostname:
1223 return False, "Invalid server URL format"
1224
1225 elif res.hostname:
1226 try:
1227 socket.gethostbyname(res.hostname)
1228 except socket.gaierror:
1229 return False, "Unresolvable server URL"
1230
1231 if not port.isdigit():
1232 return False, "Server URL must end in a port number"
1233
1234 return True, ""
1235
1236 def _valid_cacrt(self, cacrt_data):
1237 """use mgr_util.verify_cacrt to validate the CA file"""
1238
1239 cacrt_fname = create_temp_file("ca_file", cacrt_data)
1240
1241 try:
1242 verify_cacrt(cacrt_fname)
1243 except ServerConfigException as e:
1244 return False, "Invalid CA certificate: {}".format(str(e))
1245 else:
1246 return True, ""
1247
1248 def _valid_token(self, token_data):
1249 """basic checks on the token"""
1250 if not token_data:
1251 return False, "Token file is empty"
1252
1253 pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$")
1254 if not pattern.match(token_data):
1255 return False, "Token contains invalid characters"
1256
1257 return True, ""
1258
1259 def _valid_namespace(self, namespace):
1260 # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols
1261
1262 if len(namespace) > 253:
1263 return False, "Name too long"
1264 if namespace.isdigit():
1265 return False, "Invalid name - must be alphanumeric"
1266
1267 pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$")
1268 if not pattern.match(namespace):
1269 return False, "Invalid characters in the name"
1270
1271 return True, ""
1272
1273 def _config_set(self, key, val):
1274 """Attempt to validate the content, then save to KV store"""
1275
1276 val = val.rstrip() # remove any trailing whitespace/newline
1277
1278 try:
1279 checker = getattr(self, "_valid_" + key)
1280 except AttributeError:
1281 # no checker available, just let it pass
1282 self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key))
1283 valid = True
1284 else:
1285 valid, reason = checker(val)
1286
1287 if valid:
1288 self.set_store(key, val)
1289 self.log.info("Updated config KV Store item: " + key)
1290 return 0, "", "Config updated for parameter '{}'".format(key)
1291 else:
1292 return -22, "", "Invalid value for '{}' :{}".format(key, reason)
1293
1294 def clear_config_settings(self):
1295 for k in self.k8s_config:
1296 self.set_store(k, None)
1297 return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys()))
1298
1299 def handle_command(self, inbuf, cmd):
1300
1301 access_options = ['cacrt', 'token']
1302 config_options = ['server', 'namespace']
1303
1304 if cmd['prefix'] == 'k8sevents clear-config':
1305 return self.clear_config_settings()
1306
1307 if cmd['prefix'] == 'k8sevents set-access':
1308 if cmd['key'] not in access_options:
1309 return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options))
1310
1311 if inbuf:
1312 return self._config_set(cmd['key'], inbuf)
1313 else:
1314 return -errno.EINVAL, "", "Command must specify -i <filename>"
1315
1316 if cmd['prefix'] == 'k8sevents set-config':
1317
1318 if cmd['key'] not in config_options:
1319 return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options))
1320
1321 return self._config_set(cmd['key'], cmd['value'])
1322
1323 # At this point the command is trying to interact with k8sevents, so intercept if the configuration is
1324 # not ready
1325 if self.error_msg:
1326 _msg = "k8sevents unavailable: " + self.error_msg
1327 ready, _ = self.k8s_ready()
1328 if not self.kubernetes_control and not ready:
1329 _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect"
1330 return -errno.ENODATA, "", _msg
1331
1332 if cmd["prefix"] == "k8sevents status":
1333 return self.show_status()
1334
1335 elif cmd["prefix"] == "k8sevents ls":
1336 return self.show_events(self.ns_watcher.events)
1337
1338 elif cmd["prefix"] == "k8sevents ceph":
1339 return self.show_events(self.event_processor.events)
1340
1341 else:
1342 raise NotImplementedError(cmd["prefix"])
1343
1344 @staticmethod
1345 def can_run():
1346 """Determine whether the pre-reqs for the module are in place"""
1347
1348 if not kubernetes_imported:
1349 return False, "kubernetes python client is not available"
1350 return True, ""
1351
1352 def load_kubernetes_config(self):
1353 """Load configuration for remote kubernetes API using KV store values
1354
1355 Attempt to create an API client configuration from settings stored in
1356 KV store.
1357
1358 Returns:
1359 client.ApiClient: kubernetes API client object
1360
1361 Raises:
1362 OSError: unable to create the cacrt file
1363 """
1364
1365 # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a
1366 # temporary file containing the cert for the client to load from
1367 try:
1368 ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt'])
1369 except OSError as e:
1370 self.log.error("Unable to create file to hold cacrt: {}".format(str(e)))
1371 raise OSError(str(e))
1372 else:
1373 self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file))
1374
1375 configuration = client.Configuration()
1376 configuration.host = self.k8s_config['server']
1377 configuration.ssl_ca_cert = ca_crt_file
1378 configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] }
1379 api_client = client.ApiClient(configuration)
1380 self.log.info("API client created for remote kubernetes access using cacrt and token from KV store")
1381
1382 return api_client
1383
1384 def serve(self):
1385 # apply options set by CLI to this module
1386 self.config_notify()
1387
1388 if not kubernetes_imported:
1389 self.error_msg = "Unable to start : python kubernetes package is missing"
1390 else:
1391 if self.kubernetes_control:
1392 # running under rook-ceph
1393 config.load_incluster_config()
1394 self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'),
1395 os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN'))
1396 self._api_client_config = None
1397 self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph")
1398 else:
1399 # running outside of rook-ceph, so we need additional settings to tell us
1400 # how to connect to the kubernetes cluster
1401 ready, errors = self.k8s_ready()
1402 if not ready:
1403 self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors))
1404 else:
1405 try:
1406 self._api_client_config = self.load_kubernetes_config()
1407 except OSError as e:
1408 self.error_msg = str(e)
1409 else:
1410 self._namespace = self.k8s_config['namespace']
1411 self.log.info("k8sevents configuration loaded from KV store")
1412
1413 if self.error_msg:
1414 self.log.error(self.error_msg)
1415 return
1416
1417 # All checks have passed
1418 self.config_watcher = CephConfigWatcher(self)
1419
1420 self.event_processor = EventProcessor(self.config_watcher,
1421 self.ceph_event_retention_days,
1422 self._api_client_config,
1423 self._namespace)
1424
1425 self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config,
1426 namespace=self._namespace)
1427
1428 if self.event_processor.ok:
1429 log.info("Ceph Log processor thread starting")
1430 self.event_processor.start() # start log consumer thread
1431 log.info("Ceph config watcher thread starting")
1432 self.config_watcher.start()
1433 log.info("Rook-ceph namespace events watcher starting")
1434 self.ns_watcher.start()
1435
1436 self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher])
1437
1438 while True:
1439 # stay alive
1440 time.sleep(1)
1441
1442 trackers = self.trackers
1443 for t in trackers:
1444 if not t.is_alive() and not t.reported:
1445 log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health))
1446 t.reported = True
1447
1448 else:
1449 self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?"
1450 log.warning(self.error_msg)
1451 log.warning("k8sevents module exiting")
1452 self.run = False
1453
1454 def shutdown(self):
1455 self.run = False
1456 log.info("Shutting down k8sevents module")
1457 self.event_processor.can_run = False
1458
1459 if self._rados:
1460 self._rados.shutdown()