]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
import ceph reef 18.2.2
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 import yaml
3 from collections import defaultdict
4 from pkg_resources import packaging # type: ignore
5 import json
6 import math
7 import os
8 import re
9 import threading
10 import time
11 import enum
12 from collections import namedtuple
13
14 from mgr_module import CLIReadCommand, MgrModule, MgrStandbyModule, PG_STATES, Option, ServiceInfoT, HandleCommandResult, CLIWriteCommand
15 from mgr_util import get_default_addr, profile_method, build_url
16 from orchestrator import OrchestratorClientMixin, raise_if_exception, OrchestratorError
17 from rbd import RBD
18
19 from typing import DefaultDict, Optional, Dict, Any, Set, cast, Tuple, Union, List, Callable
20
21 LabelValues = Tuple[str, ...]
22 Number = Union[int, float]
23 MetricValue = Dict[LabelValues, Number]
24
25 # Defaults for the Prometheus HTTP server. Can also set in config-key
26 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
27 # for Prometheus exporter port registry
28
29 DEFAULT_PORT = 9283
30
31 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
32 # that the ports its listening on are in fact bound. When using the any address
33 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
34 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
35 # exception.
36 if cherrypy is not None:
37 Version = packaging.version.Version
38 v = Version(cherrypy.__version__)
39 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
40 # centos:7) and back to at least 3.0.0.
41 if Version("3.1.2") <= v < Version("3.2.3"):
42 # https://github.com/cherrypy/cherrypy/issues/1100
43 from cherrypy.process import servers
44 servers.wait_for_occupied_port = lambda host, port: None
45
46
47 # cherrypy likes to sys.exit on error. don't let it take us down too!
48 def os_exit_noop(status: int) -> None:
49 pass
50
51
52 os._exit = os_exit_noop # type: ignore
53
54 # to access things in class Module from subclass Root. Because
55 # it's a dict, the writer doesn't need to declare 'global' for access
56
57 _global_instance = None # type: Optional[Module]
58 cherrypy.config.update({
59 'response.headers.server': 'Ceph-Prometheus'
60 })
61
62
63 def health_status_to_number(status: str) -> int:
64 if status == 'HEALTH_OK':
65 return 0
66 elif status == 'HEALTH_WARN':
67 return 1
68 elif status == 'HEALTH_ERR':
69 return 2
70 raise ValueError(f'unknown status "{status}"')
71
72
73 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
74
75 OSD_BLOCKLIST = ['osd_blocklist_count']
76
77 DF_POOL = ['max_avail', 'avail_raw', 'stored', 'stored_raw', 'objects', 'dirty',
78 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
79 'compress_bytes_used', 'compress_under_bytes', 'bytes_used', 'percent_used']
80
81 OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
82 'recovering_keys_per_sec', 'num_objects_recovered',
83 'num_bytes_recovered', 'num_bytes_recovered')
84
85 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
86 'norecover', 'noscrub', 'nodeep-scrub')
87
88 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
89
90 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
91 'ceph_version')
92
93 MON_METADATA = ('ceph_daemon', 'hostname',
94 'public_addr', 'rank', 'ceph_version')
95
96 MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
97
98 MGR_STATUS = ('ceph_daemon',)
99
100 MGR_MODULE_STATUS = ('name',)
101
102 MGR_MODULE_CAN_RUN = ('name',)
103
104 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
105 'front_iface', 'hostname', 'objectstore', 'public_addr',
106 'ceph_version')
107
108 OSD_STATUS = ['weight', 'up', 'in']
109
110 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
111
112 POOL_METADATA = ('pool_id', 'name', 'type', 'description', 'compression_mode')
113
114 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version', 'instance_id')
115
116 RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
117 'ceph_version')
118
119 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
120 'wal_device', 'instance', 'devices', 'device_ids')
121
122 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
123
124 alert_metric = namedtuple('alert_metric', 'name description')
125 HEALTH_CHECKS = [
126 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process'),
127 ]
128
129 HEALTHCHECK_DETAIL = ('name', 'severity')
130
131
132 class Severity(enum.Enum):
133 ok = "HEALTH_OK"
134 warn = "HEALTH_WARN"
135 error = "HEALTH_ERR"
136
137
138 class Format(enum.Enum):
139 plain = 'plain'
140 json = 'json'
141 json_pretty = 'json-pretty'
142 yaml = 'yaml'
143
144
145 class HealthCheckEvent:
146
147 def __init__(self, name: str, severity: Severity, first_seen: float, last_seen: float, count: int, active: bool = True):
148 self.name = name
149 self.severity = severity
150 self.first_seen = first_seen
151 self.last_seen = last_seen
152 self.count = count
153 self.active = active
154
155 def as_dict(self) -> Dict[str, Any]:
156 """Return the instance as a dictionary."""
157 return self.__dict__
158
159
160 class HealthHistory:
161 kv_name = 'health_history'
162 titles = "{healthcheck_name:<24} {first_seen:<20} {last_seen:<20} {count:>5} {active:^6}"
163 date_format = "%Y/%m/%d %H:%M:%S"
164
165 def __init__(self, mgr: MgrModule):
166 self.mgr = mgr
167 self.lock = threading.Lock()
168 self.healthcheck: Dict[str, HealthCheckEvent] = {}
169 self._load()
170
171 def _load(self) -> None:
172 """Load the current state from the mons KV store."""
173 data = self.mgr.get_store(self.kv_name)
174 if data:
175 try:
176 healthcheck_data = json.loads(data)
177 except json.JSONDecodeError:
178 self.mgr.log.warn(
179 f"INVALID data read from mgr/prometheus/{self.kv_name}. Resetting")
180 self.reset()
181 return
182 else:
183 for k, v in healthcheck_data.items():
184 self.healthcheck[k] = HealthCheckEvent(
185 name=k,
186 severity=v.get('severity'),
187 first_seen=v.get('first_seen', 0),
188 last_seen=v.get('last_seen', 0),
189 count=v.get('count', 1),
190 active=v.get('active', True))
191 else:
192 self.reset()
193
194 def reset(self) -> None:
195 """Reset the healthcheck history."""
196 with self.lock:
197 self.mgr.set_store(self.kv_name, "{}")
198 self.healthcheck = {}
199
200 def save(self) -> None:
201 """Save the current in-memory healthcheck history to the KV store."""
202 with self.lock:
203 self.mgr.set_store(self.kv_name, self.as_json())
204
205 def check(self, health_checks: Dict[str, Any]) -> None:
206 """Look at the current health checks and compare existing the history.
207
208 Args:
209 health_checks (Dict[str, Any]): current health check data
210 """
211
212 current_checks = health_checks.get('checks', {})
213 changes_made = False
214
215 # first turn off any active states we're tracking
216 for seen_check in self.healthcheck:
217 check = self.healthcheck[seen_check]
218 if check.active and seen_check not in current_checks:
219 check.active = False
220 changes_made = True
221
222 # now look for any additions to track
223 now = time.time()
224 for name, info in current_checks.items():
225 if name not in self.healthcheck:
226 # this healthcheck is new, so start tracking it
227 changes_made = True
228 self.healthcheck[name] = HealthCheckEvent(
229 name=name,
230 severity=info.get('severity'),
231 first_seen=now,
232 last_seen=now,
233 count=1,
234 active=True
235 )
236 else:
237 # seen it before, so update its metadata
238 check = self.healthcheck[name]
239 if check.active:
240 # check has been registered as active already, so skip
241 continue
242 else:
243 check.last_seen = now
244 check.count += 1
245 check.active = True
246 changes_made = True
247
248 if changes_made:
249 self.save()
250
251 def __str__(self) -> str:
252 """Print the healthcheck history.
253
254 Returns:
255 str: Human readable representation of the healthcheck history
256 """
257 out = []
258
259 if len(self.healthcheck.keys()) == 0:
260 out.append("No healthchecks have been recorded")
261 else:
262 out.append(self.titles.format(
263 healthcheck_name="Healthcheck Name",
264 first_seen="First Seen (UTC)",
265 last_seen="Last seen (UTC)",
266 count="Count",
267 active="Active")
268 )
269 for k in sorted(self.healthcheck.keys()):
270 check = self.healthcheck[k]
271 out.append(self.titles.format(
272 healthcheck_name=check.name,
273 first_seen=time.strftime(self.date_format, time.localtime(check.first_seen)),
274 last_seen=time.strftime(self.date_format, time.localtime(check.last_seen)),
275 count=check.count,
276 active="Yes" if check.active else "No")
277 )
278 out.extend([f"{len(self.healthcheck)} health check(s) listed", ""])
279
280 return "\n".join(out)
281
282 def as_dict(self) -> Dict[str, Any]:
283 """Return the history in a dictionary.
284
285 Returns:
286 Dict[str, Any]: dictionary indexed by the healthcheck name
287 """
288 return {name: self.healthcheck[name].as_dict() for name in self.healthcheck}
289
290 def as_json(self, pretty: bool = False) -> str:
291 """Return the healthcheck history object as a dict (JSON).
292
293 Args:
294 pretty (bool, optional): whether to json pretty print the history. Defaults to False.
295
296 Returns:
297 str: str representation of the healthcheck in JSON format
298 """
299 if pretty:
300 return json.dumps(self.as_dict(), indent=2)
301 else:
302 return json.dumps(self.as_dict())
303
304 def as_yaml(self) -> str:
305 """Return the healthcheck history in yaml format.
306
307 Returns:
308 str: YAML representation of the healthcheck history
309 """
310 return yaml.safe_dump(self.as_dict(), explicit_start=True, default_flow_style=False)
311
312
313 class Metric(object):
314 def __init__(self, mtype: str, name: str, desc: str, labels: Optional[LabelValues] = None) -> None:
315 self.mtype = mtype
316 self.name = name
317 self.desc = desc
318 self.labelnames = labels # tuple if present
319 self.value: Dict[LabelValues, Number] = {}
320
321 def clear(self) -> None:
322 self.value = {}
323
324 def set(self, value: Number, labelvalues: Optional[LabelValues] = None) -> None:
325 # labelvalues must be a tuple
326 labelvalues = labelvalues or ('',)
327 self.value[labelvalues] = value
328
329 def str_expfmt(self) -> str:
330
331 # Must be kept in sync with promethize() in src/exporter/util.cc
332 def promethize(path: str) -> str:
333 ''' replace illegal metric name characters '''
334 result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
335
336 # Hyphens usually turn into underscores, unless they are
337 # trailing
338 if result.endswith("-"):
339 result = result[0:-1] + "_minus"
340 else:
341 result = result.replace("-", "_")
342
343 return "ceph_{0}".format(result)
344
345 def floatstr(value: float) -> str:
346 ''' represent as Go-compatible float '''
347 if value == float('inf'):
348 return '+Inf'
349 if value == float('-inf'):
350 return '-Inf'
351 if math.isnan(value):
352 return 'NaN'
353 return repr(float(value))
354
355 name = promethize(self.name)
356 expfmt = '''
357 # HELP {name} {desc}
358 # TYPE {name} {mtype}'''.format(
359 name=name,
360 desc=self.desc,
361 mtype=self.mtype,
362 )
363
364 for labelvalues, value in self.value.items():
365 if self.labelnames:
366 labels_list = zip(self.labelnames, labelvalues)
367 labels = ','.join('%s="%s"' % (k, v) for k, v in labels_list)
368 else:
369 labels = ''
370 if labels:
371 fmtstr = '\n{name}{{{labels}}} {value}'
372 else:
373 fmtstr = '\n{name} {value}'
374 expfmt += fmtstr.format(
375 name=name,
376 labels=labels,
377 value=floatstr(value),
378 )
379 return expfmt
380
381 def group_by(
382 self,
383 keys: List[str],
384 joins: Dict[str, Callable[[List[str]], str]],
385 name: Optional[str] = None,
386 ) -> "Metric":
387 """
388 Groups data by label names.
389
390 Label names not passed are being removed from the resulting metric but
391 by providing a join function, labels of metrics can be grouped.
392
393 The purpose of this method is to provide a version of a metric that can
394 be used in matching where otherwise multiple results would be returned.
395
396 As grouping is possible in Prometheus, the only additional value of this
397 method is the possibility to join labels when grouping. For that reason,
398 passing joins is required. Please use PromQL expressions in all other
399 cases.
400
401 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
402 >>> m.value = {
403 ... ('foo', 'x'): 1,
404 ... ('foo', 'y'): 1,
405 ... }
406 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
407 {('foo', 'x,y'): 1}
408
409 The functionality of group by could roughly be compared with Prometheus'
410
411 group (ceph_disk_occupation) by (device, instance)
412
413 with the exception that not all labels which aren't used as a condition
414 to group a metric are discarded, but their values can are joined and the
415 label is thereby preserved.
416
417 This function takes the value of the first entry of a found group to be
418 used for the resulting value of the grouping operation.
419
420 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
421 >>> m.value = {
422 ... ('foo', 'x'): 555,
423 ... ('foo', 'y'): 10,
424 ... }
425 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
426 {('foo', 'x,y'): 555}
427 """
428 assert self.labelnames, "cannot match keys without label names"
429 for key in keys:
430 assert key in self.labelnames, "unknown key: {}".format(key)
431 assert joins, "joins must not be empty"
432 assert all(callable(c) for c in joins.values()), "joins must be callable"
433
434 # group
435 grouped: Dict[LabelValues, List[Tuple[Dict[str, str], Number]]] = defaultdict(list)
436 for label_values, metric_value in self.value.items():
437 labels = dict(zip(self.labelnames, label_values))
438 if not all(k in labels for k in keys):
439 continue
440 group_key = tuple(labels[k] for k in keys)
441 grouped[group_key].append((labels, metric_value))
442
443 # as there is nothing specified on how to join labels that are not equal
444 # and Prometheus `group` aggregation functions similarly, we simply drop
445 # those labels.
446 labelnames = tuple(
447 label for label in self.labelnames if label in keys or label in joins
448 )
449 superfluous_labelnames = [
450 label for label in self.labelnames if label not in labelnames
451 ]
452
453 # iterate and convert groups with more than one member into a single
454 # entry
455 values: MetricValue = {}
456 for group in grouped.values():
457 labels, metric_value = group[0]
458
459 for label in superfluous_labelnames:
460 del labels[label]
461
462 if len(group) > 1:
463 for key, fn in joins.items():
464 labels[key] = fn(list(labels[key] for labels, _ in group))
465
466 values[tuple(labels.values())] = metric_value
467
468 new_metric = Metric(self.mtype, name if name else self.name, self.desc, labelnames)
469 new_metric.value = values
470
471 return new_metric
472
473
474 class MetricCounter(Metric):
475 def __init__(self,
476 name: str,
477 desc: str,
478 labels: Optional[LabelValues] = None) -> None:
479 super(MetricCounter, self).__init__('counter', name, desc, labels)
480 self.value = defaultdict(lambda: 0)
481
482 def clear(self) -> None:
483 pass # Skip calls to clear as we want to keep the counters here.
484
485 def set(self,
486 value: Number,
487 labelvalues: Optional[LabelValues] = None) -> None:
488 msg = 'This method must not be used for instances of MetricCounter class'
489 raise NotImplementedError(msg)
490
491 def add(self,
492 value: Number,
493 labelvalues: Optional[LabelValues] = None) -> None:
494 # labelvalues must be a tuple
495 labelvalues = labelvalues or ('',)
496 self.value[labelvalues] += value
497
498
499 class MetricCollectionThread(threading.Thread):
500 def __init__(self, module: 'Module') -> None:
501 self.mod = module
502 self.active = True
503 self.event = threading.Event()
504 super(MetricCollectionThread, self).__init__(target=self.collect)
505
506 def collect(self) -> None:
507 self.mod.log.info('starting metric collection thread')
508 while self.active:
509 self.mod.log.debug('collecting cache in thread')
510 if self.mod.have_mon_connection():
511 start_time = time.time()
512
513 try:
514 data = self.mod.collect()
515 except Exception:
516 # Log any issues encountered during the data collection and continue
517 self.mod.log.exception("failed to collect metrics:")
518 self.event.wait(self.mod.scrape_interval)
519 continue
520
521 duration = time.time() - start_time
522 self.mod.log.debug('collecting cache in thread done')
523
524 sleep_time = self.mod.scrape_interval - duration
525 if sleep_time < 0:
526 self.mod.log.warning(
527 'Collecting data took more time than configured scrape interval. '
528 'This possibly results in stale data. Please check the '
529 '`stale_cache_strategy` configuration option. '
530 'Collecting data took {:.2f} seconds but scrape interval is configured '
531 'to be {:.0f} seconds.'.format(
532 duration,
533 self.mod.scrape_interval,
534 )
535 )
536 sleep_time = 0
537
538 with self.mod.collect_lock:
539 self.mod.collect_cache = data
540 self.mod.collect_time = duration
541
542 self.event.wait(sleep_time)
543 else:
544 self.mod.log.error('No MON connection')
545 self.event.wait(self.mod.scrape_interval)
546
547 def stop(self) -> None:
548 self.active = False
549 self.event.set()
550
551
552 class Module(MgrModule, OrchestratorClientMixin):
553 MODULE_OPTIONS = [
554 Option(
555 'server_addr',
556 default=get_default_addr(),
557 desc='the IPv4 or IPv6 address on which the module listens for HTTP requests',
558 ),
559 Option(
560 'server_port',
561 type='int',
562 default=DEFAULT_PORT,
563 desc='the port on which the module listens for HTTP requests',
564 runtime=True
565 ),
566 Option(
567 'scrape_interval',
568 type='float',
569 default=15.0
570 ),
571 Option(
572 'stale_cache_strategy',
573 default='log'
574 ),
575 Option(
576 'cache',
577 type='bool',
578 default=True,
579 ),
580 Option(
581 'rbd_stats_pools',
582 default=''
583 ),
584 Option(
585 name='rbd_stats_pools_refresh_interval',
586 type='int',
587 default=300
588 ),
589 Option(
590 name='standby_behaviour',
591 type='str',
592 default='default',
593 enum_allowed=['default', 'error'],
594 runtime=True
595 ),
596 Option(
597 name='standby_error_status_code',
598 type='int',
599 default=500,
600 min=400,
601 max=599,
602 runtime=True
603 ),
604 Option(
605 name='exclude_perf_counters',
606 type='bool',
607 default=True,
608 desc='Do not include perf-counters in the metrics output',
609 long_desc='Gathering perf-counters from a single Prometheus exporter can degrade ceph-mgr performance, especially in large clusters. Instead, Ceph-exporter daemons are now used by default for perf-counter gathering. This should only be disabled when no ceph-exporters are deployed.',
610 runtime=True
611 )
612 ]
613
614 STALE_CACHE_FAIL = 'fail'
615 STALE_CACHE_RETURN = 'return'
616
617 def __init__(self, *args: Any, **kwargs: Any) -> None:
618 super(Module, self).__init__(*args, **kwargs)
619 self.metrics = self._setup_static_metrics()
620 self.shutdown_event = threading.Event()
621 self.collect_lock = threading.Lock()
622 self.collect_time = 0.0
623 self.scrape_interval: float = 15.0
624 self.cache = True
625 self.stale_cache_strategy: str = self.STALE_CACHE_FAIL
626 self.collect_cache: Optional[str] = None
627 self.rbd_stats = {
628 'pools': {},
629 'pools_refresh_time': 0,
630 'counters_info': {
631 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
632 'desc': 'RBD image writes count'},
633 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
634 'desc': 'RBD image reads count'},
635 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
636 'desc': 'RBD image bytes written'},
637 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
638 'desc': 'RBD image bytes read'},
639 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
640 'desc': 'RBD image writes latency (msec)'},
641 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
642 'desc': 'RBD image reads latency (msec)'},
643 },
644 } # type: Dict[str, Any]
645 global _global_instance
646 _global_instance = self
647 self.metrics_thread = MetricCollectionThread(_global_instance)
648 self.health_history = HealthHistory(self)
649
650 def _setup_static_metrics(self) -> Dict[str, Metric]:
651 metrics = {}
652 metrics['health_status'] = Metric(
653 'untyped',
654 'health_status',
655 'Cluster health status'
656 )
657 metrics['mon_quorum_status'] = Metric(
658 'gauge',
659 'mon_quorum_status',
660 'Monitors in quorum',
661 ('ceph_daemon',)
662 )
663 metrics['fs_metadata'] = Metric(
664 'untyped',
665 'fs_metadata',
666 'FS Metadata',
667 FS_METADATA
668 )
669 metrics['mds_metadata'] = Metric(
670 'untyped',
671 'mds_metadata',
672 'MDS Metadata',
673 MDS_METADATA
674 )
675 metrics['mon_metadata'] = Metric(
676 'untyped',
677 'mon_metadata',
678 'MON Metadata',
679 MON_METADATA
680 )
681 metrics['mgr_metadata'] = Metric(
682 'gauge',
683 'mgr_metadata',
684 'MGR metadata',
685 MGR_METADATA
686 )
687 metrics['mgr_status'] = Metric(
688 'gauge',
689 'mgr_status',
690 'MGR status (0=standby, 1=active)',
691 MGR_STATUS
692 )
693 metrics['mgr_module_status'] = Metric(
694 'gauge',
695 'mgr_module_status',
696 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
697 MGR_MODULE_STATUS
698 )
699 metrics['mgr_module_can_run'] = Metric(
700 'gauge',
701 'mgr_module_can_run',
702 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
703 MGR_MODULE_CAN_RUN
704 )
705 metrics['osd_metadata'] = Metric(
706 'untyped',
707 'osd_metadata',
708 'OSD Metadata',
709 OSD_METADATA
710 )
711
712 # The reason for having this separate to OSD_METADATA is
713 # so that we can stably use the same tag names that
714 # the Prometheus node_exporter does
715 metrics['disk_occupation'] = Metric(
716 'untyped',
717 'disk_occupation',
718 'Associate Ceph daemon with disk used',
719 DISK_OCCUPATION
720 )
721
722 metrics['disk_occupation_human'] = Metric(
723 'untyped',
724 'disk_occupation_human',
725 'Associate Ceph daemon with disk used for displaying to humans,'
726 ' not for joining tables (vector matching)',
727 DISK_OCCUPATION, # label names are automatically decimated on grouping
728 )
729
730 metrics['pool_metadata'] = Metric(
731 'untyped',
732 'pool_metadata',
733 'POOL Metadata',
734 POOL_METADATA
735 )
736
737 metrics['rgw_metadata'] = Metric(
738 'untyped',
739 'rgw_metadata',
740 'RGW Metadata',
741 RGW_METADATA
742 )
743
744 metrics['rbd_mirror_metadata'] = Metric(
745 'untyped',
746 'rbd_mirror_metadata',
747 'RBD Mirror Metadata',
748 RBD_MIRROR_METADATA
749 )
750
751 metrics['pg_total'] = Metric(
752 'gauge',
753 'pg_total',
754 'PG Total Count per Pool',
755 ('pool_id',)
756 )
757
758 metrics['health_detail'] = Metric(
759 'gauge',
760 'health_detail',
761 'healthcheck status by type (0=inactive, 1=active)',
762 HEALTHCHECK_DETAIL
763 )
764
765 metrics['pool_objects_repaired'] = Metric(
766 'counter',
767 'pool_objects_repaired',
768 'Number of objects repaired in a pool',
769 ('pool_id',)
770 )
771
772 metrics['daemon_health_metrics'] = Metric(
773 'gauge',
774 'daemon_health_metrics',
775 'Health metrics for Ceph daemons',
776 ('type', 'ceph_daemon',)
777 )
778
779 for flag in OSD_FLAGS:
780 path = 'osd_flag_{}'.format(flag)
781 metrics[path] = Metric(
782 'untyped',
783 path,
784 'OSD Flag {}'.format(flag)
785 )
786 for state in OSD_STATUS:
787 path = 'osd_{}'.format(state)
788 metrics[path] = Metric(
789 'untyped',
790 path,
791 'OSD status {}'.format(state),
792 ('ceph_daemon',)
793 )
794 for stat in OSD_STATS:
795 path = 'osd_{}'.format(stat)
796 metrics[path] = Metric(
797 'gauge',
798 path,
799 'OSD stat {}'.format(stat),
800 ('ceph_daemon',)
801 )
802 for stat in OSD_POOL_STATS:
803 path = 'pool_{}'.format(stat)
804 metrics[path] = Metric(
805 'gauge',
806 path,
807 "OSD pool stats: {}".format(stat),
808 ('pool_id',)
809 )
810 for state in PG_STATES:
811 path = 'pg_{}'.format(state)
812 metrics[path] = Metric(
813 'gauge',
814 path,
815 'PG {} per pool'.format(state),
816 ('pool_id',)
817 )
818 for state in DF_CLUSTER:
819 path = 'cluster_{}'.format(state)
820 metrics[path] = Metric(
821 'gauge',
822 path,
823 'DF {}'.format(state),
824 )
825 path = 'cluster_by_class_{}'.format(state)
826 metrics[path] = Metric(
827 'gauge',
828 path,
829 'DF {}'.format(state),
830 ('device_class',)
831 )
832 for state in DF_POOL:
833 path = 'pool_{}'.format(state)
834 metrics[path] = Metric(
835 'counter' if state in ('rd', 'rd_bytes', 'wr', 'wr_bytes') else 'gauge',
836 path,
837 'DF pool {}'.format(state),
838 ('pool_id',)
839 )
840 for state in OSD_BLOCKLIST:
841 path = 'cluster_{}'.format(state)
842 metrics[path] = Metric(
843 'gauge',
844 path,
845 'OSD Blocklist Count {}'.format(state),
846 )
847 for state in NUM_OBJECTS:
848 path = 'num_objects_{}'.format(state)
849 metrics[path] = Metric(
850 'gauge',
851 path,
852 'Number of {} objects'.format(state),
853 )
854
855 for check in HEALTH_CHECKS:
856 path = 'healthcheck_{}'.format(check.name.lower())
857 metrics[path] = Metric(
858 'gauge',
859 path,
860 check.description,
861 )
862
863 return metrics
864
865 def orch_is_available(self) -> bool:
866 try:
867 return self.available()[0]
868 except (RuntimeError, OrchestratorError, ImportError):
869 # import error could happend during startup in case
870 # orchestrator has not been loaded yet by the mgr
871 return False
872
873 def get_server_addr(self) -> str:
874 """
875 Return the current mgr server IP.
876 """
877 server_addr = cast(str, self.get_localized_module_option('server_addr', get_default_addr()))
878 if server_addr in ['::', '0.0.0.0']:
879 return self.get_mgr_ip()
880 return server_addr
881
882 def config_notify(self) -> None:
883 """
884 This method is called whenever one of our config options is changed.
885 """
886 # https://stackoverflow.com/questions/7254845/change-cherrypy-port-and-restart-web-server
887 # if we omit the line: cherrypy.server.httpserver = None
888 # then the cherrypy server is not restarted correctly
889 self.log.info('Restarting engine...')
890 cherrypy.engine.stop()
891 cherrypy.server.httpserver = None
892 server_addr = cast(str, self.get_localized_module_option('server_addr', get_default_addr()))
893 server_port = cast(int, self.get_localized_module_option('server_port', DEFAULT_PORT))
894 self.configure(server_addr, server_port)
895 cherrypy.engine.start()
896 self.log.info('Engine started.')
897
898 @profile_method()
899 def get_health(self) -> None:
900
901 def _get_value(message: str, delim: str = ' ', word_pos: int = 0) -> Tuple[int, int]:
902 """Extract value from message (default is 1st field)"""
903 v_str = message.split(delim)[word_pos]
904 if v_str.isdigit():
905 return int(v_str), 0
906 return 0, 1
907
908 health = json.loads(self.get('health')['json'])
909 # set overall health
910 self.metrics['health_status'].set(
911 health_status_to_number(health['status'])
912 )
913
914 # Examine the health to see if any health checks triggered need to
915 # become a specific metric with a value from the health detail
916 active_healthchecks = health.get('checks', {})
917 active_names = active_healthchecks.keys()
918
919 for check in HEALTH_CHECKS:
920 path = 'healthcheck_{}'.format(check.name.lower())
921
922 if path in self.metrics:
923
924 if check.name in active_names:
925 check_data = active_healthchecks[check.name]
926 message = check_data['summary'].get('message', '')
927 v, err = 0, 0
928
929 if check.name == "SLOW_OPS":
930 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have
931 # slow ops.
932 v, err = _get_value(message)
933
934 if err:
935 self.log.error(
936 "healthcheck %s message format is incompatible and has been dropped",
937 check.name)
938 # drop the metric, so it's no longer emitted
939 del self.metrics[path]
940 continue
941 else:
942 self.metrics[path].set(v)
943 else:
944 # health check is not active, so give it a default of 0
945 self.metrics[path].set(0)
946
947 self.health_history.check(health)
948 for name, info in self.health_history.healthcheck.items():
949 v = 1 if info.active else 0
950 self.metrics['health_detail'].set(
951 v, (
952 name,
953 str(info.severity))
954 )
955
956 @profile_method()
957 def get_pool_stats(self) -> None:
958 # retrieve pool stats to provide per pool recovery metrics
959 # (osd_pool_stats moved to mgr in Mimic)
960 pstats = self.get('osd_pool_stats')
961 for pool in pstats['pool_stats']:
962 for stat in OSD_POOL_STATS:
963 self.metrics['pool_{}'.format(stat)].set(
964 pool['recovery_rate'].get(stat, 0),
965 (pool['pool_id'],)
966 )
967
968 @profile_method()
969 def get_df(self) -> None:
970 # maybe get the to-be-exported metrics from a config?
971 df = self.get('df')
972 for stat in DF_CLUSTER:
973 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
974 for device_class in df['stats_by_class']:
975 self.metrics['cluster_by_class_{}'.format(stat)].set(
976 df['stats_by_class'][device_class][stat], (device_class,))
977
978 for pool in df['pools']:
979 for stat in DF_POOL:
980 self.metrics['pool_{}'.format(stat)].set(
981 pool['stats'][stat],
982 (pool['id'],)
983 )
984
985 @profile_method()
986 def get_osd_blocklisted_entries(self) -> None:
987 r = self.mon_command({
988 'prefix': 'osd blocklist ls',
989 'format': 'json'
990 })
991 blocklist_entries = r[2].split(' ')
992 blocklist_count = blocklist_entries[1]
993 for stat in OSD_BLOCKLIST:
994 self.metrics['cluster_{}'.format(stat)].set(int(blocklist_count))
995
996 @profile_method()
997 def get_fs(self) -> None:
998 fs_map = self.get('fs_map')
999 servers = self.get_service_list()
1000 self.log.debug('standbys: {}'.format(fs_map['standbys']))
1001 # export standby mds metadata, default standby fs_id is '-1'
1002 for standby in fs_map['standbys']:
1003 id_ = standby['name']
1004 host, version, _ = servers.get((id_, 'mds'), ('', '', ''))
1005 addr, rank = standby['addr'], standby['rank']
1006 self.metrics['mds_metadata'].set(1, (
1007 'mds.{}'.format(id_), '-1',
1008 cast(str, host),
1009 cast(str, addr),
1010 cast(str, rank),
1011 cast(str, version)
1012 ))
1013 for fs in fs_map['filesystems']:
1014 # collect fs metadata
1015 data_pools = ",".join([str(pool)
1016 for pool in fs['mdsmap']['data_pools']])
1017 self.metrics['fs_metadata'].set(1, (
1018 data_pools,
1019 fs['id'],
1020 fs['mdsmap']['metadata_pool'],
1021 fs['mdsmap']['fs_name']
1022 ))
1023 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
1024 for gid, daemon in fs['mdsmap']['info'].items():
1025 id_ = daemon['name']
1026 host, version, _ = servers.get((id_, 'mds'), ('', '', ''))
1027 self.metrics['mds_metadata'].set(1, (
1028 'mds.{}'.format(id_), fs['id'],
1029 host, daemon['addr'],
1030 daemon['rank'], version
1031 ))
1032
1033 @profile_method()
1034 def get_quorum_status(self) -> None:
1035 mon_status = json.loads(self.get('mon_status')['json'])
1036 servers = self.get_service_list()
1037 for mon in mon_status['monmap']['mons']:
1038 rank = mon['rank']
1039 id_ = mon['name']
1040 mon_version = servers.get((id_, 'mon'), ('', '', ''))
1041 self.metrics['mon_metadata'].set(1, (
1042 'mon.{}'.format(id_), mon_version[0],
1043 mon['public_addr'].rsplit(':', 1)[0], rank,
1044 mon_version[1]
1045 ))
1046 in_quorum = int(rank in mon_status['quorum'])
1047 self.metrics['mon_quorum_status'].set(in_quorum, (
1048 'mon.{}'.format(id_),
1049 ))
1050
1051 @profile_method()
1052 def get_mgr_status(self) -> None:
1053 mgr_map = self.get('mgr_map')
1054 servers = self.get_service_list()
1055
1056 active = mgr_map['active_name']
1057 standbys = [s.get('name') for s in mgr_map['standbys']]
1058
1059 all_mgrs = list(standbys)
1060 all_mgrs.append(active)
1061
1062 all_modules = {module.get('name'): module.get('can_run')
1063 for module in mgr_map['available_modules']}
1064
1065 for mgr in all_mgrs:
1066 host, version, _ = servers.get((mgr, 'mgr'), ('', '', ''))
1067 if mgr == active:
1068 _state = 1
1069 else:
1070 _state = 0
1071
1072 self.metrics['mgr_metadata'].set(1, (
1073 f'mgr.{mgr}', host, version
1074 ))
1075 self.metrics['mgr_status'].set(_state, (
1076 f'mgr.{mgr}',))
1077 always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
1078 active_modules = list(always_on_modules)
1079 active_modules.extend(mgr_map['modules'])
1080
1081 for mod_name in all_modules.keys():
1082
1083 if mod_name in always_on_modules:
1084 _state = 2
1085 elif mod_name in active_modules:
1086 _state = 1
1087 else:
1088 _state = 0
1089
1090 _can_run = 1 if all_modules[mod_name] else 0
1091 self.metrics['mgr_module_status'].set(_state, (mod_name,))
1092 self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
1093
1094 @profile_method()
1095 def get_pg_status(self) -> None:
1096
1097 pg_summary = self.get('pg_summary')
1098
1099 for pool in pg_summary['by_pool']:
1100 num_by_state: DefaultDict[str, int] = defaultdict(int)
1101 for state in PG_STATES:
1102 num_by_state[state] = 0
1103
1104 for state_name, count in pg_summary['by_pool'][pool].items():
1105 for state in state_name.split('+'):
1106 num_by_state[state] += count
1107 num_by_state['total'] += count
1108
1109 for state, num in num_by_state.items():
1110 try:
1111 self.metrics["pg_{}".format(state)].set(num, (pool,))
1112 except KeyError:
1113 self.log.warning("skipping pg in unknown state {}".format(state))
1114
1115 @profile_method()
1116 def get_osd_stats(self) -> None:
1117 osd_stats = self.get('osd_stats')
1118 for osd in osd_stats['osd_stats']:
1119 id_ = osd['osd']
1120 for stat in OSD_STATS:
1121 val = osd['perf_stat'][stat]
1122 self.metrics['osd_{}'.format(stat)].set(val, (
1123 'osd.{}'.format(id_),
1124 ))
1125
1126 def get_service_list(self) -> Dict[Tuple[str, str], Tuple[str, str, str]]:
1127 ret = {}
1128 for server in self.list_servers():
1129 host = cast(str, server.get('hostname', ''))
1130 for service in cast(List[ServiceInfoT], server.get('services', [])):
1131 ret.update({(service['id'], service['type']): (host,
1132 service.get('ceph_version', 'unknown'),
1133 service.get('name', ''))})
1134 return ret
1135
1136 @profile_method()
1137 def get_metadata_and_osd_status(self) -> None:
1138 osd_map = self.get('osd_map')
1139 osd_flags = osd_map['flags'].split(',')
1140 for flag in OSD_FLAGS:
1141 self.metrics['osd_flag_{}'.format(flag)].set(
1142 int(flag in osd_flags)
1143 )
1144
1145 osd_devices = self.get('osd_map_crush')['devices']
1146 servers = self.get_service_list()
1147 for osd in osd_map['osds']:
1148 # id can be used to link osd metrics and metadata
1149 id_ = osd['osd']
1150 # collect osd metadata
1151 p_addr = osd['public_addr'].rsplit(':', 1)[0]
1152 c_addr = osd['cluster_addr'].rsplit(':', 1)[0]
1153 if p_addr == "-" or c_addr == "-":
1154 self.log.info(
1155 "Missing address metadata for osd {0}, skipping occupation"
1156 " and metadata records for this osd".format(id_)
1157 )
1158 continue
1159
1160 dev_class = None
1161 for osd_device in osd_devices:
1162 if osd_device['id'] == id_:
1163 dev_class = osd_device.get('class', '')
1164 break
1165
1166 if dev_class is None:
1167 self.log.info("OSD {0} is missing from CRUSH map, "
1168 "skipping output".format(id_))
1169 continue
1170
1171 osd_version = servers.get((str(id_), 'osd'), ('', '', ''))
1172
1173 # collect disk occupation metadata
1174 osd_metadata = self.get_metadata("osd", str(id_))
1175 if osd_metadata is None:
1176 continue
1177
1178 obj_store = osd_metadata.get('osd_objectstore', '')
1179 f_iface = osd_metadata.get('front_iface', '')
1180 b_iface = osd_metadata.get('back_iface', '')
1181
1182 self.metrics['osd_metadata'].set(1, (
1183 b_iface,
1184 'osd.{}'.format(id_),
1185 c_addr,
1186 dev_class,
1187 f_iface,
1188 osd_version[0],
1189 obj_store,
1190 p_addr,
1191 osd_version[1]
1192 ))
1193
1194 # collect osd status
1195 for state in OSD_STATUS:
1196 status = osd[state]
1197 self.metrics['osd_{}'.format(state)].set(status, (
1198 'osd.{}'.format(id_),
1199 ))
1200
1201 osd_dev_node = None
1202 osd_wal_dev_node = ''
1203 osd_db_dev_node = ''
1204 if obj_store == "filestore":
1205 # collect filestore backend device
1206 osd_dev_node = osd_metadata.get(
1207 'backend_filestore_dev_node', None)
1208 # collect filestore journal device
1209 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
1210 osd_db_dev_node = ''
1211 elif obj_store == "bluestore":
1212 # collect bluestore backend device
1213 osd_dev_node = osd_metadata.get(
1214 'bluestore_bdev_dev_node', None)
1215 # collect bluestore wal backend
1216 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
1217 # collect bluestore db backend
1218 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
1219 if osd_dev_node and osd_dev_node == "unknown":
1220 osd_dev_node = None
1221
1222 # fetch the devices and ids (vendor, model, serial) from the
1223 # osd_metadata
1224 osd_devs = osd_metadata.get('devices', '') or 'N/A'
1225 osd_dev_ids = osd_metadata.get('device_ids', '') or 'N/A'
1226
1227 osd_hostname = osd_metadata.get('hostname', None)
1228 if osd_dev_node and osd_hostname:
1229 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
1230 id_, osd_hostname, osd_dev_node))
1231 self.metrics['disk_occupation'].set(1, (
1232 "osd.{0}".format(id_),
1233 osd_dev_node,
1234 osd_db_dev_node,
1235 osd_wal_dev_node,
1236 osd_hostname,
1237 osd_devs,
1238 osd_dev_ids,
1239 ))
1240 else:
1241 self.log.info("Missing dev node metadata for osd {0}, skipping "
1242 "occupation record for this osd".format(id_))
1243
1244 if 'disk_occupation' in self.metrics:
1245 try:
1246 self.metrics['disk_occupation_human'] = \
1247 self.metrics['disk_occupation'].group_by(
1248 ['device', 'instance'],
1249 {'ceph_daemon': lambda daemons: ', '.join(daemons)},
1250 name='disk_occupation_human',
1251 )
1252 except Exception as e:
1253 self.log.error(e)
1254
1255 ec_profiles = osd_map.get('erasure_code_profiles', {})
1256
1257 def _get_pool_info(pool: Dict[str, Any]) -> Tuple[str, str]:
1258 pool_type = 'unknown'
1259 description = 'unknown'
1260
1261 if pool['type'] == 1:
1262 pool_type = "replicated"
1263 description = f"replica:{pool['size']}"
1264 elif pool['type'] == 3:
1265 pool_type = "erasure"
1266 name = pool.get('erasure_code_profile', '')
1267 profile = ec_profiles.get(name, {})
1268 if profile:
1269 description = f"ec:{profile['k']}+{profile['m']}"
1270 else:
1271 description = "ec:unknown"
1272
1273 return pool_type, description
1274
1275 for pool in osd_map['pools']:
1276
1277 compression_mode = 'none'
1278 pool_type, pool_description = _get_pool_info(pool)
1279
1280 if 'options' in pool:
1281 compression_mode = pool['options'].get('compression_mode', 'none')
1282
1283 self.metrics['pool_metadata'].set(
1284 1, (
1285 pool['pool'],
1286 pool['pool_name'],
1287 pool_type,
1288 pool_description,
1289 compression_mode)
1290 )
1291
1292 # Populate other servers metadata
1293 # If orchestrator is available and ceph-exporter is running modify rgw instance id
1294 # to match the one from exporter
1295 modify_instance_id = self.orch_is_available() and self.get_module_option('exclude_perf_counters')
1296 if modify_instance_id:
1297 daemons = raise_if_exception(self.list_daemons(daemon_type='rgw'))
1298 for daemon in daemons:
1299 if daemon.daemon_id and '.' in daemon.daemon_id:
1300 instance_id = daemon.daemon_id.split(".")[2]
1301 else:
1302 instance_id = daemon.daemon_id if daemon.daemon_id else ""
1303 self.metrics['rgw_metadata'].set(1,
1304 (f"{daemon.daemon_type}.{daemon.daemon_id}",
1305 str(daemon.hostname),
1306 str(daemon.version),
1307 instance_id))
1308 for key, value in servers.items():
1309 service_id, service_type = key
1310 if service_type == 'rgw' and not modify_instance_id:
1311 hostname, version, name = value
1312 self.metrics['rgw_metadata'].set(
1313 1,
1314 ('{}.{}'.format(service_type, name),
1315 hostname, version, service_id)
1316 )
1317 elif service_type == 'rbd-mirror':
1318 mirror_metadata = self.get_metadata('rbd-mirror', service_id)
1319 if mirror_metadata is None:
1320 continue
1321 mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
1322 service_id)
1323 rbd_mirror_metadata = cast(LabelValues,
1324 (mirror_metadata.get(k, '')
1325 for k in RBD_MIRROR_METADATA))
1326 self.metrics['rbd_mirror_metadata'].set(
1327 1, rbd_mirror_metadata
1328 )
1329
1330 @profile_method()
1331 def get_num_objects(self) -> None:
1332 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
1333 for obj in NUM_OBJECTS:
1334 stat = 'num_objects_{}'.format(obj)
1335 self.metrics[stat].set(pg_sum[stat])
1336
1337 @profile_method()
1338 def get_rbd_stats(self) -> None:
1339 # Per RBD image stats is collected by registering a dynamic osd perf
1340 # stats query that tells OSDs to group stats for requests associated
1341 # with RBD objects by pool, namespace, and image id, which are
1342 # extracted from the request object names or other attributes.
1343 # The RBD object names have the following prefixes:
1344 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
1345 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
1346 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
1347 # The pool_id in the object name is the id of the pool with the image
1348 # metdata, and should be used in the image spec. If there is no pool_id
1349 # in the object name, the image pool is the pool where the object is
1350 # located.
1351
1352 # Parse rbd_stats_pools option, which is a comma or space separated
1353 # list of pool[/namespace] entries. If no namespace is specifed the
1354 # stats are collected for every namespace in the pool. The wildcard
1355 # '*' can be used to indicate all pools or namespaces
1356 pools_string = cast(str, self.get_localized_module_option('rbd_stats_pools'))
1357 pool_keys = set()
1358 osd_map = self.get('osd_map')
1359 rbd_pools = [pool['pool_name'] for pool in osd_map['pools']
1360 if 'rbd' in pool.get('application_metadata', {})]
1361 for x in re.split(r'[\s,]+', pools_string):
1362 if not x:
1363 continue
1364
1365 s = x.split('/', 2)
1366 pool_name = s[0]
1367 namespace_name = None
1368 if len(s) == 2:
1369 namespace_name = s[1]
1370
1371 if pool_name == "*":
1372 # collect for all pools
1373 for pool in rbd_pools:
1374 pool_keys.add((pool, namespace_name))
1375 else:
1376 if pool_name in rbd_pools:
1377 pool_keys.add((pool_name, namespace_name)) # avoids adding deleted pool
1378
1379 pools = {} # type: Dict[str, Set[str]]
1380 for pool_key in pool_keys:
1381 pool_name = pool_key[0]
1382 namespace_name = pool_key[1]
1383 if not namespace_name or namespace_name == "*":
1384 # empty set means collect for all namespaces
1385 pools[pool_name] = set()
1386 continue
1387
1388 if pool_name not in pools:
1389 pools[pool_name] = set()
1390 elif not pools[pool_name]:
1391 continue
1392 pools[pool_name].add(namespace_name)
1393
1394 rbd_stats_pools = {}
1395 for pool_id in self.rbd_stats['pools'].keys():
1396 name = self.rbd_stats['pools'][pool_id]['name']
1397 if name not in pools:
1398 del self.rbd_stats['pools'][pool_id]
1399 else:
1400 rbd_stats_pools[name] = \
1401 self.rbd_stats['pools'][pool_id]['ns_names']
1402
1403 pools_refreshed = False
1404 if pools:
1405 next_refresh = self.rbd_stats['pools_refresh_time'] + \
1406 self.get_localized_module_option(
1407 'rbd_stats_pools_refresh_interval', 300)
1408 if rbd_stats_pools != pools or time.time() >= next_refresh:
1409 self.refresh_rbd_stats_pools(pools)
1410 pools_refreshed = True
1411
1412 pool_ids = list(self.rbd_stats['pools'])
1413 pool_ids.sort()
1414 pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
1415
1416 nspace_names = []
1417 for pool_id, pool in self.rbd_stats['pools'].items():
1418 if pool['ns_names']:
1419 nspace_names.extend(pool['ns_names'])
1420 else:
1421 nspace_names = []
1422 break
1423 if nspace_names:
1424 namespace_regex = '^(' + \
1425 "|".join([re.escape(x)
1426 for x in set(nspace_names)]) + ')$'
1427 else:
1428 namespace_regex = '^(.*)$'
1429
1430 if ('query' in self.rbd_stats
1431 and (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex']
1432 or namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex'])):
1433 self.remove_osd_perf_query(self.rbd_stats['query_id'])
1434 del self.rbd_stats['query_id']
1435 del self.rbd_stats['query']
1436
1437 if not self.rbd_stats['pools']:
1438 return
1439
1440 counters_info = self.rbd_stats['counters_info']
1441
1442 if 'query_id' not in self.rbd_stats:
1443 query = {
1444 'key_descriptor': [
1445 {'type': 'pool_id', 'regex': pool_id_regex},
1446 {'type': 'namespace', 'regex': namespace_regex},
1447 {'type': 'object_name',
1448 'regex': r'^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
1449 ],
1450 'performance_counter_descriptors': list(counters_info),
1451 }
1452 query_id = self.add_osd_perf_query(query)
1453 if query_id is None:
1454 self.log.error('failed to add query %s' % query)
1455 return
1456 self.rbd_stats['query'] = query
1457 self.rbd_stats['query_id'] = query_id
1458
1459 res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
1460 assert res
1461 for c in res['counters']:
1462 # if the pool id is not found in the object name use id of the
1463 # pool where the object is located
1464 if c['k'][2][0]:
1465 pool_id = int(c['k'][2][0])
1466 else:
1467 pool_id = int(c['k'][0][0])
1468 if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
1469 self.refresh_rbd_stats_pools(pools)
1470 pools_refreshed = True
1471 if pool_id not in self.rbd_stats['pools']:
1472 continue
1473 pool = self.rbd_stats['pools'][pool_id]
1474 nspace_name = c['k'][1][0]
1475 if nspace_name not in pool['images']:
1476 continue
1477 image_id = c['k'][2][1]
1478 if image_id not in pool['images'][nspace_name] and \
1479 not pools_refreshed:
1480 self.refresh_rbd_stats_pools(pools)
1481 pool = self.rbd_stats['pools'][pool_id]
1482 pools_refreshed = True
1483 if image_id not in pool['images'][nspace_name]:
1484 continue
1485 counters = pool['images'][nspace_name][image_id]['c']
1486 for i in range(len(c['c'])):
1487 counters[i][0] += c['c'][i][0]
1488 counters[i][1] += c['c'][i][1]
1489
1490 label_names = ("pool", "namespace", "image")
1491 for pool_id, pool in self.rbd_stats['pools'].items():
1492 pool_name = pool['name']
1493 for nspace_name, images in pool['images'].items():
1494 for image_id in images:
1495 image_name = images[image_id]['n']
1496 counters = images[image_id]['c']
1497 i = 0
1498 for key in counters_info:
1499 counter_info = counters_info[key]
1500 stattype = self._stattype_to_str(counter_info['type'])
1501 labels = (pool_name, nspace_name, image_name)
1502 if counter_info['type'] == self.PERFCOUNTER_COUNTER:
1503 path = 'rbd_' + key
1504 if path not in self.metrics:
1505 self.metrics[path] = Metric(
1506 stattype,
1507 path,
1508 counter_info['desc'],
1509 label_names,
1510 )
1511 self.metrics[path].set(counters[i][0], labels)
1512 elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
1513 path = 'rbd_' + key + '_sum'
1514 if path not in self.metrics:
1515 self.metrics[path] = Metric(
1516 stattype,
1517 path,
1518 counter_info['desc'] + ' Total',
1519 label_names,
1520 )
1521 self.metrics[path].set(counters[i][0], labels)
1522 path = 'rbd_' + key + '_count'
1523 if path not in self.metrics:
1524 self.metrics[path] = Metric(
1525 'counter',
1526 path,
1527 counter_info['desc'] + ' Count',
1528 label_names,
1529 )
1530 self.metrics[path].set(counters[i][1], labels)
1531 i += 1
1532
1533 def refresh_rbd_stats_pools(self, pools: Dict[str, Set[str]]) -> None:
1534 self.log.debug('refreshing rbd pools %s' % (pools))
1535
1536 rbd = RBD()
1537 counters_info = self.rbd_stats['counters_info']
1538 for pool_name, cfg_ns_names in pools.items():
1539 try:
1540 pool_id = self.rados.pool_lookup(pool_name)
1541 with self.rados.open_ioctx(pool_name) as ioctx:
1542 if pool_id not in self.rbd_stats['pools']:
1543 self.rbd_stats['pools'][pool_id] = {'images': {}}
1544 pool = self.rbd_stats['pools'][pool_id]
1545 pool['name'] = pool_name
1546 pool['ns_names'] = cfg_ns_names
1547 if cfg_ns_names:
1548 nspace_names = list(cfg_ns_names)
1549 else:
1550 nspace_names = [''] + rbd.namespace_list(ioctx)
1551 for nspace_name in pool['images']:
1552 if nspace_name not in nspace_names:
1553 del pool['images'][nspace_name]
1554 for nspace_name in nspace_names:
1555 if nspace_name and\
1556 not rbd.namespace_exists(ioctx, nspace_name):
1557 self.log.debug('unknown namespace %s for pool %s' %
1558 (nspace_name, pool_name))
1559 continue
1560 ioctx.set_namespace(nspace_name)
1561 if nspace_name not in pool['images']:
1562 pool['images'][nspace_name] = {}
1563 namespace = pool['images'][nspace_name]
1564 images = {}
1565 for image_meta in RBD().list2(ioctx):
1566 image = {'n': image_meta['name']}
1567 image_id = image_meta['id']
1568 if image_id in namespace:
1569 image['c'] = namespace[image_id]['c']
1570 else:
1571 image['c'] = [[0, 0] for x in counters_info]
1572 images[image_id] = image
1573 pool['images'][nspace_name] = images
1574 except Exception as e:
1575 self.log.error('failed listing pool %s: %s' % (pool_name, e))
1576 self.rbd_stats['pools_refresh_time'] = time.time()
1577
1578 def shutdown_rbd_stats(self) -> None:
1579 if 'query_id' in self.rbd_stats:
1580 self.remove_osd_perf_query(self.rbd_stats['query_id'])
1581 del self.rbd_stats['query_id']
1582 del self.rbd_stats['query']
1583 self.rbd_stats['pools'].clear()
1584
1585 def add_fixed_name_metrics(self) -> None:
1586 """
1587 Add fixed name metrics from existing ones that have details in their names
1588 that should be in labels (not in name).
1589 For backward compatibility, a new fixed name metric is created (instead of replacing)
1590 and details are put in new labels.
1591 Intended for RGW sync perf. counters but extendable as required.
1592 See: https://tracker.ceph.com/issues/45311
1593 """
1594 new_metrics = {}
1595 for metric_path, metrics in self.metrics.items():
1596 # Address RGW sync perf. counters.
1597 match = re.search(r'^data-sync-from-(.*)\.', metric_path)
1598 if match:
1599 new_path = re.sub('from-([^.]*)', 'from-zone', metric_path)
1600 if new_path not in new_metrics:
1601 new_metrics[new_path] = Metric(
1602 metrics.mtype,
1603 new_path,
1604 metrics.desc,
1605 cast(LabelValues, metrics.labelnames) + ('source_zone',)
1606 )
1607 for label_values, value in metrics.value.items():
1608 new_metrics[new_path].set(value, label_values + (match.group(1),))
1609
1610 self.metrics.update(new_metrics)
1611
1612 def get_collect_time_metrics(self) -> None:
1613 sum_metric = self.metrics.get('prometheus_collect_duration_seconds_sum')
1614 count_metric = self.metrics.get('prometheus_collect_duration_seconds_count')
1615 if sum_metric is None:
1616 sum_metric = MetricCounter(
1617 'prometheus_collect_duration_seconds_sum',
1618 'The sum of seconds took to collect all metrics of this exporter',
1619 ('method',))
1620 self.metrics['prometheus_collect_duration_seconds_sum'] = sum_metric
1621 if count_metric is None:
1622 count_metric = MetricCounter(
1623 'prometheus_collect_duration_seconds_count',
1624 'The amount of metrics gathered for this exporter',
1625 ('method',))
1626 self.metrics['prometheus_collect_duration_seconds_count'] = count_metric
1627
1628 # Collect all timing data and make it available as metric, excluding the
1629 # `collect` method because it has not finished at this point and hence
1630 # there's no `_execution_duration` attribute to be found. The
1631 # `_execution_duration` attribute is added by the `profile_method`
1632 # decorator.
1633 for method_name, method in Module.__dict__.items():
1634 duration = getattr(method, '_execution_duration', None)
1635 if duration is not None:
1636 cast(MetricCounter, sum_metric).add(duration, (method_name,))
1637 cast(MetricCounter, count_metric).add(1, (method_name,))
1638
1639 def get_pool_repaired_objects(self) -> None:
1640 dump = self.get('pg_dump')
1641 for stats in dump['pool_stats']:
1642 path = 'pool_objects_repaired'
1643 self.metrics[path].set(stats['stat_sum']['num_objects_repaired'],
1644 labelvalues=(stats['poolid'],))
1645
1646 def get_all_daemon_health_metrics(self) -> None:
1647 daemon_metrics = self.get_daemon_health_metrics()
1648 self.log.debug('metrics jeje %s' % (daemon_metrics))
1649 for daemon_name, health_metrics in daemon_metrics.items():
1650 for health_metric in health_metrics:
1651 path = 'daemon_health_metrics'
1652 self.metrics[path].set(health_metric['value'], labelvalues=(
1653 health_metric['type'], daemon_name,))
1654
1655 def get_perf_counters(self) -> None:
1656 """
1657 Get the perf counters for all daemons
1658 """
1659 for daemon, counters in self.get_unlabeled_perf_counters().items():
1660 for path, counter_info in counters.items():
1661 # Skip histograms, they are represented by long running avgs
1662 stattype = self._stattype_to_str(counter_info['type'])
1663 if not stattype or stattype == 'histogram':
1664 self.log.debug('ignoring %s, type %s' % (path, stattype))
1665 continue
1666
1667 path, label_names, labels = self._perfpath_to_path_labels(
1668 daemon, path)
1669
1670 # Get the value of the counter
1671 value = self._perfvalue_to_value(
1672 counter_info['type'], counter_info['value'])
1673
1674 # Represent the long running avgs as sum/count pairs
1675 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
1676 _path = path + '_sum'
1677 if _path not in self.metrics:
1678 self.metrics[_path] = Metric(
1679 stattype,
1680 _path,
1681 counter_info['description'] + ' Total',
1682 label_names,
1683 )
1684 self.metrics[_path].set(value, labels)
1685 _path = path + '_count'
1686 if _path not in self.metrics:
1687 self.metrics[_path] = Metric(
1688 'counter',
1689 _path,
1690 counter_info['description'] + ' Count',
1691 label_names,
1692 )
1693 self.metrics[_path].set(counter_info['count'], labels,)
1694 else:
1695 if path not in self.metrics:
1696 self.metrics[path] = Metric(
1697 stattype,
1698 path,
1699 counter_info['description'],
1700 label_names,
1701 )
1702 self.metrics[path].set(value, labels)
1703 self.add_fixed_name_metrics()
1704
1705 @profile_method(True)
1706 def collect(self) -> str:
1707 # Clear the metrics before scraping
1708 for k in self.metrics.keys():
1709 self.metrics[k].clear()
1710
1711 self.get_health()
1712 self.get_df()
1713 self.get_osd_blocklisted_entries()
1714 self.get_pool_stats()
1715 self.get_fs()
1716 self.get_osd_stats()
1717 self.get_quorum_status()
1718 self.get_mgr_status()
1719 self.get_metadata_and_osd_status()
1720 self.get_pg_status()
1721 self.get_pool_repaired_objects()
1722 self.get_num_objects()
1723 self.get_all_daemon_health_metrics()
1724
1725 if not self.get_module_option('exclude_perf_counters'):
1726 self.get_perf_counters()
1727 self.get_rbd_stats()
1728
1729 self.get_collect_time_metrics()
1730
1731 # Return formatted metrics and clear no longer used data
1732 _metrics = [m.str_expfmt() for m in self.metrics.values()]
1733 for k in self.metrics.keys():
1734 self.metrics[k].clear()
1735
1736 return ''.join(_metrics) + '\n'
1737
1738 @CLIReadCommand('prometheus file_sd_config')
1739 def get_file_sd_config(self) -> Tuple[int, str, str]:
1740 '''
1741 Return file_sd compatible prometheus config for mgr cluster
1742 '''
1743 servers = self.list_servers()
1744 targets = []
1745 for server in servers:
1746 hostname = server.get('hostname', '')
1747 for service in cast(List[ServiceInfoT], server.get('services', [])):
1748 if service['type'] != 'mgr':
1749 continue
1750 id_ = service['id']
1751 port = self._get_module_option('server_port', DEFAULT_PORT, id_)
1752 targets.append(f'{hostname}:{port}')
1753 ret = [
1754 {
1755 "targets": targets,
1756 "labels": {}
1757 }
1758 ]
1759 return 0, json.dumps(ret), ""
1760
1761 def self_test(self) -> None:
1762 self.collect()
1763 self.get_file_sd_config()
1764
1765 def configure(self, server_addr: str, server_port: int) -> None:
1766 # cephadm deployments have a TLS monitoring stack setup option.
1767 # If the cephadm module is on and the setting is true (defaults to false)
1768 # we should have prometheus be set up to interact with that
1769 cephadm_secure_monitoring_stack = self.get_module_option_ex(
1770 'cephadm', 'secure_monitoring_stack', False)
1771 if cephadm_secure_monitoring_stack:
1772 try:
1773 self.setup_cephadm_tls_config(server_addr, server_port)
1774 return
1775 except Exception as e:
1776 self.log.exception(f'Failed to setup cephadm based secure monitoring stack: {e}\n',
1777 'Falling back to default configuration')
1778 self.setup_default_config(server_addr, server_port)
1779
1780 def setup_default_config(self, server_addr: str, server_port: int) -> None:
1781 cherrypy.config.update({
1782 'server.socket_host': server_addr,
1783 'server.socket_port': server_port,
1784 'engine.autoreload.on': False,
1785 'server.ssl_module': None,
1786 'server.ssl_certificate': None,
1787 'server.ssl_private_key': None,
1788 })
1789 # Publish the URI that others may use to access the service we're about to start serving
1790 self.set_uri(build_url(scheme='http', host=self.get_server_addr(),
1791 port=server_port, path='/'))
1792
1793 def setup_cephadm_tls_config(self, server_addr: str, server_port: int) -> None:
1794 from cephadm.ssl_cert_utils import SSLCerts
1795 # the ssl certs utils uses a NamedTemporaryFile for the cert files
1796 # generated with generate_cert_files function. We need the SSLCerts
1797 # object to not be cleaned up in order to have those temp files not
1798 # be cleaned up, so making it an attribute of the module instead
1799 # of just a standalone object
1800 self.cephadm_monitoring_tls_ssl_certs = SSLCerts()
1801 host = self.get_mgr_ip()
1802 try:
1803 old_cert = self.get_store('root/cert')
1804 old_key = self.get_store('root/key')
1805 if not old_cert or not old_key:
1806 raise Exception('No old credentials for mgr-prometheus endpoint')
1807 self.cephadm_monitoring_tls_ssl_certs.load_root_credentials(old_cert, old_key)
1808 except Exception:
1809 self.cephadm_monitoring_tls_ssl_certs.generate_root_cert(host)
1810 self.set_store('root/cert', self.cephadm_monitoring_tls_ssl_certs.get_root_cert())
1811 self.set_store('root/key', self.cephadm_monitoring_tls_ssl_certs.get_root_key())
1812
1813 cert_file_path, key_file_path = self.cephadm_monitoring_tls_ssl_certs.generate_cert_files(
1814 self.get_hostname(), host)
1815
1816 cherrypy.config.update({
1817 'server.socket_host': server_addr,
1818 'server.socket_port': server_port,
1819 'engine.autoreload.on': False,
1820 'server.ssl_module': 'builtin',
1821 'server.ssl_certificate': cert_file_path,
1822 'server.ssl_private_key': key_file_path,
1823 })
1824 # Publish the URI that others may use to access the service we're about to start serving
1825 self.set_uri(build_url(scheme='https', host=self.get_server_addr(),
1826 port=server_port, path='/'))
1827
1828 def serve(self) -> None:
1829
1830 class Root(object):
1831
1832 # collapse everything to '/'
1833 def _cp_dispatch(self, vpath: str) -> 'Root':
1834 cherrypy.request.path = ''
1835 return self
1836
1837 @cherrypy.expose
1838 def index(self) -> str:
1839 return '''<!DOCTYPE html>
1840 <html>
1841 <head><title>Ceph Exporter</title></head>
1842 <body>
1843 <h1>Ceph Exporter</h1>
1844 <p><a href='/metrics'>Metrics</a></p>
1845 </body>
1846 </html>'''
1847
1848 @cherrypy.expose
1849 def metrics(self) -> Optional[str]:
1850 # Lock the function execution
1851 assert isinstance(_global_instance, Module)
1852 with _global_instance.collect_lock:
1853 return self._metrics(_global_instance)
1854
1855 @staticmethod
1856 def _metrics(instance: 'Module') -> Optional[str]:
1857 if not self.cache:
1858 self.log.debug('Cache disabled, collecting and returning without cache')
1859 cherrypy.response.headers['Content-Type'] = 'text/plain'
1860 return self.collect()
1861
1862 # Return cached data if available
1863 if not instance.collect_cache:
1864 raise cherrypy.HTTPError(503, 'No cached data available yet')
1865
1866 def respond() -> Optional[str]:
1867 assert isinstance(instance, Module)
1868 cherrypy.response.headers['Content-Type'] = 'text/plain'
1869 return instance.collect_cache
1870
1871 if instance.collect_time < instance.scrape_interval:
1872 # Respond if cache isn't stale
1873 return respond()
1874
1875 if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN:
1876 # Respond even if cache is stale
1877 instance.log.info(
1878 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1879 'returning metrics from stale cache.'.format(
1880 instance.collect_time,
1881 instance.collect_time - instance.scrape_interval
1882 )
1883 )
1884 return respond()
1885
1886 if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL:
1887 # Fail if cache is stale
1888 msg = (
1889 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1890 'returning "service unavailable".'.format(
1891 instance.collect_time,
1892 instance.collect_time - instance.scrape_interval,
1893 )
1894 )
1895 instance.log.error(msg)
1896 raise cherrypy.HTTPError(503, msg)
1897 return None
1898
1899 # Make the cache timeout for collecting configurable
1900 self.scrape_interval = cast(float, self.get_localized_module_option('scrape_interval'))
1901
1902 self.stale_cache_strategy = cast(
1903 str, self.get_localized_module_option('stale_cache_strategy'))
1904 if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL,
1905 self.STALE_CACHE_RETURN]:
1906 self.stale_cache_strategy = self.STALE_CACHE_FAIL
1907
1908 server_addr = cast(str, self.get_localized_module_option('server_addr', get_default_addr()))
1909 server_port = cast(int, self.get_localized_module_option('server_port', DEFAULT_PORT))
1910 self.log.info(
1911 "server_addr: %s server_port: %s" %
1912 (server_addr, server_port)
1913 )
1914
1915 self.cache = cast(bool, self.get_localized_module_option('cache', True))
1916 if self.cache:
1917 self.log.info('Cache enabled')
1918 self.metrics_thread.start()
1919 else:
1920 self.log.info('Cache disabled')
1921
1922 self.configure(server_addr, server_port)
1923
1924 cherrypy.tree.mount(Root(), "/")
1925 self.log.info('Starting engine...')
1926 cherrypy.engine.start()
1927 self.log.info('Engine started.')
1928
1929 # wait for the shutdown event
1930 self.shutdown_event.wait()
1931 self.shutdown_event.clear()
1932 # tell metrics collection thread to stop collecting new metrics
1933 self.metrics_thread.stop()
1934 cherrypy.engine.stop()
1935 cherrypy.server.httpserver = None
1936 self.log.info('Engine stopped.')
1937 self.shutdown_rbd_stats()
1938 # wait for the metrics collection thread to stop
1939 self.metrics_thread.join()
1940
1941 def shutdown(self) -> None:
1942 self.log.info('Stopping engine...')
1943 self.shutdown_event.set()
1944
1945 @CLIReadCommand('healthcheck history ls')
1946 def _list_healthchecks(self, format: Format = Format.plain) -> HandleCommandResult:
1947 """List all the healthchecks being tracked
1948
1949 The format options are parsed in ceph_argparse, before they get evaluated here so
1950 we can safely assume that what we have to process is valid. ceph_argparse will throw
1951 a ValueError if the cast to our Format class fails.
1952
1953 Args:
1954 format (Format, optional): output format. Defaults to Format.plain.
1955
1956 Returns:
1957 HandleCommandResult: return code, stdout and stderr returned to the caller
1958 """
1959
1960 out = ""
1961 if format == Format.plain:
1962 out = str(self.health_history)
1963 elif format == Format.yaml:
1964 out = self.health_history.as_yaml()
1965 else:
1966 out = self.health_history.as_json(format == Format.json_pretty)
1967
1968 return HandleCommandResult(retval=0, stdout=out)
1969
1970 @CLIWriteCommand('healthcheck history clear')
1971 def _clear_healthchecks(self) -> HandleCommandResult:
1972 """Clear the healthcheck history"""
1973 self.health_history.reset()
1974 return HandleCommandResult(retval=0, stdout="healthcheck history cleared")
1975
1976
1977 class StandbyModule(MgrStandbyModule):
1978
1979 MODULE_OPTIONS = Module.MODULE_OPTIONS
1980
1981 def __init__(self, *args: Any, **kwargs: Any) -> None:
1982 super(StandbyModule, self).__init__(*args, **kwargs)
1983 self.shutdown_event = threading.Event()
1984
1985 def serve(self) -> None:
1986 server_addr = self.get_localized_module_option(
1987 'server_addr', get_default_addr())
1988 server_port = self.get_localized_module_option(
1989 'server_port', DEFAULT_PORT)
1990 self.log.info("server_addr: %s server_port: %s" %
1991 (server_addr, server_port))
1992 cherrypy.config.update({
1993 'server.socket_host': server_addr,
1994 'server.socket_port': server_port,
1995 'engine.autoreload.on': False,
1996 'request.show_tracebacks': False
1997 })
1998
1999 module = self
2000
2001 class Root(object):
2002 @cherrypy.expose
2003 def index(self) -> str:
2004 standby_behaviour = module.get_module_option('standby_behaviour')
2005 if standby_behaviour == 'default':
2006 active_uri = module.get_active_uri()
2007 return '''<!DOCTYPE html>
2008 <html>
2009 <head><title>Ceph Exporter</title></head>
2010 <body>
2011 <h1>Ceph Exporter</h1>
2012 <p><a href='{}metrics'>Metrics</a></p>
2013 </body>
2014 </html>'''.format(active_uri)
2015 else:
2016 status = module.get_module_option('standby_error_status_code')
2017 raise cherrypy.HTTPError(status, message="Keep on looking")
2018
2019 @cherrypy.expose
2020 def metrics(self) -> str:
2021 cherrypy.response.headers['Content-Type'] = 'text/plain'
2022 return ''
2023
2024 cherrypy.tree.mount(Root(), '/', {})
2025 self.log.info('Starting engine...')
2026 cherrypy.engine.start()
2027 self.log.info('Engine started.')
2028 # Wait for shutdown event
2029 self.shutdown_event.wait()
2030 self.shutdown_event.clear()
2031 cherrypy.engine.stop()
2032 cherrypy.server.httpserver = None
2033 self.log.info('Engine stopped.')
2034
2035 def shutdown(self) -> None:
2036 self.log.info("Stopping engine...")
2037 self.shutdown_event.set()
2038 self.log.info("Stopped engine")