]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
56d239843d13731b2e016ba3d12931d5716a18a3
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 from collections import defaultdict
3 from distutils.version import StrictVersion
4 import json
5 import math
6 import os
7 import re
8 import threading
9 import time
10 import enum
11 from mgr_module import CLIReadCommand, MgrModule, MgrStandbyModule, PG_STATES, Option, ServiceInfoT, HandleCommandResult, CLIWriteCommand
12 from mgr_util import get_default_addr, profile_method, build_url
13 from rbd import RBD
14 from collections import namedtuple
15 import yaml
16
17 from typing import DefaultDict, Optional, Dict, Any, Set, cast, Tuple, Union, List, Callable
18
19 LabelValues = Tuple[str, ...]
20 Number = Union[int, float]
21 MetricValue = Dict[LabelValues, Number]
22
23 # Defaults for the Prometheus HTTP server. Can also set in config-key
24 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
25 # for Prometheus exporter port registry
26
27 DEFAULT_PORT = 9283
28
29 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
30 # that the ports its listening on are in fact bound. When using the any address
31 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
32 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
33 # exception.
34 if cherrypy is not None:
35 v = StrictVersion(cherrypy.__version__)
36 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
37 # centos:7) and back to at least 3.0.0.
38 if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"):
39 # https://github.com/cherrypy/cherrypy/issues/1100
40 from cherrypy.process import servers
41 servers.wait_for_occupied_port = lambda host, port: None
42
43
44 # cherrypy likes to sys.exit on error. don't let it take us down too!
45 def os_exit_noop(status: int) -> None:
46 pass
47
48
49 os._exit = os_exit_noop # type: ignore
50
51 # to access things in class Module from subclass Root. Because
52 # it's a dict, the writer doesn't need to declare 'global' for access
53
54 _global_instance = None # type: Optional[Module]
55 cherrypy.config.update({
56 'response.headers.server': 'Ceph-Prometheus'
57 })
58
59
60 def health_status_to_number(status: str) -> int:
61 if status == 'HEALTH_OK':
62 return 0
63 elif status == 'HEALTH_WARN':
64 return 1
65 elif status == 'HEALTH_ERR':
66 return 2
67 raise ValueError(f'unknown status "{status}"')
68
69
70 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
71
72 DF_POOL = ['max_avail', 'avail_raw', 'stored', 'stored_raw', 'objects', 'dirty',
73 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
74 'compress_bytes_used', 'compress_under_bytes', 'bytes_used', 'percent_used']
75
76 OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
77 'recovering_keys_per_sec', 'num_objects_recovered',
78 'num_bytes_recovered', 'num_bytes_recovered')
79
80 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
81 'norecover', 'noscrub', 'nodeep-scrub')
82
83 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
84
85 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
86 'ceph_version')
87
88 MON_METADATA = ('ceph_daemon', 'hostname',
89 'public_addr', 'rank', 'ceph_version')
90
91 MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
92
93 MGR_STATUS = ('ceph_daemon',)
94
95 MGR_MODULE_STATUS = ('name',)
96
97 MGR_MODULE_CAN_RUN = ('name',)
98
99 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
100 'front_iface', 'hostname', 'objectstore', 'public_addr',
101 'ceph_version')
102
103 OSD_STATUS = ['weight', 'up', 'in']
104
105 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
106
107 POOL_METADATA = ('pool_id', 'name', 'type', 'description', 'compression_mode')
108
109 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version', 'instance_id')
110
111 RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
112 'ceph_version')
113
114 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
115 'wal_device', 'instance', 'devices', 'device_ids')
116
117 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
118
119 alert_metric = namedtuple('alert_metric', 'name description')
120 HEALTH_CHECKS = [
121 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process'),
122 ]
123
124 HEALTHCHECK_DETAIL = ('name', 'severity')
125
126
127 class Severity(enum.Enum):
128 ok = "HEALTH_OK"
129 warn = "HEALTH_WARN"
130 error = "HEALTH_ERR"
131
132
133 class Format(enum.Enum):
134 plain = 'plain'
135 json = 'json'
136 json_pretty = 'json-pretty'
137 yaml = 'yaml'
138
139
140 class HealthCheckEvent:
141
142 def __init__(self, name: str, severity: Severity, first_seen: float, last_seen: float, count: int, active: bool = True):
143 self.name = name
144 self.severity = severity
145 self.first_seen = first_seen
146 self.last_seen = last_seen
147 self.count = count
148 self.active = active
149
150 def as_dict(self) -> Dict[str, Any]:
151 """Return the instance as a dictionary."""
152 return self.__dict__
153
154
155 class HealthHistory:
156 kv_name = 'health_history'
157 titles = "{healthcheck_name:<24} {first_seen:<20} {last_seen:<20} {count:>5} {active:^6}"
158 date_format = "%Y/%m/%d %H:%M:%S"
159
160 def __init__(self, mgr: MgrModule):
161 self.mgr = mgr
162 self.lock = threading.Lock()
163 self.healthcheck: Dict[str, HealthCheckEvent] = {}
164 self._load()
165
166 def _load(self) -> None:
167 """Load the current state from the mons KV store."""
168 data = self.mgr.get_store(self.kv_name)
169 if data:
170 try:
171 healthcheck_data = json.loads(data)
172 except json.JSONDecodeError:
173 self.mgr.log.warn(
174 f"INVALID data read from mgr/prometheus/{self.kv_name}. Resetting")
175 self.reset()
176 return
177 else:
178 for k, v in healthcheck_data.items():
179 self.healthcheck[k] = HealthCheckEvent(
180 name=k,
181 severity=v.get('severity'),
182 first_seen=v.get('first_seen', 0),
183 last_seen=v.get('last_seen', 0),
184 count=v.get('count', 1),
185 active=v.get('active', True))
186 else:
187 self.reset()
188
189 def reset(self) -> None:
190 """Reset the healthcheck history."""
191 with self.lock:
192 self.mgr.set_store(self.kv_name, "{}")
193 self.healthcheck = {}
194
195 def save(self) -> None:
196 """Save the current in-memory healthcheck history to the KV store."""
197 with self.lock:
198 self.mgr.set_store(self.kv_name, self.as_json())
199
200 def check(self, health_checks: Dict[str, Any]) -> None:
201 """Look at the current health checks and compare existing the history.
202
203 Args:
204 health_checks (Dict[str, Any]): current health check data
205 """
206
207 current_checks = health_checks.get('checks', {})
208 changes_made = False
209
210 # first turn off any active states we're tracking
211 for seen_check in self.healthcheck:
212 check = self.healthcheck[seen_check]
213 if check.active and seen_check not in current_checks:
214 check.active = False
215 changes_made = True
216
217 # now look for any additions to track
218 now = time.time()
219 for name, info in current_checks.items():
220 if name not in self.healthcheck:
221 # this healthcheck is new, so start tracking it
222 changes_made = True
223 self.healthcheck[name] = HealthCheckEvent(
224 name=name,
225 severity=info.get('severity'),
226 first_seen=now,
227 last_seen=now,
228 count=1,
229 active=True
230 )
231 else:
232 # seen it before, so update its metadata
233 check = self.healthcheck[name]
234 if check.active:
235 # check has been registered as active already, so skip
236 continue
237 else:
238 check.last_seen = now
239 check.count += 1
240 check.active = True
241 changes_made = True
242
243 if changes_made:
244 self.save()
245
246 def __str__(self) -> str:
247 """Print the healthcheck history.
248
249 Returns:
250 str: Human readable representation of the healthcheck history
251 """
252 out = []
253
254 if len(self.healthcheck.keys()) == 0:
255 out.append("No healthchecks have been recorded")
256 else:
257 out.append(self.titles.format(
258 healthcheck_name="Healthcheck Name",
259 first_seen="First Seen (UTC)",
260 last_seen="Last seen (UTC)",
261 count="Count",
262 active="Active")
263 )
264 for k in sorted(self.healthcheck.keys()):
265 check = self.healthcheck[k]
266 out.append(self.titles.format(
267 healthcheck_name=check.name,
268 first_seen=time.strftime(self.date_format, time.localtime(check.first_seen)),
269 last_seen=time.strftime(self.date_format, time.localtime(check.last_seen)),
270 count=check.count,
271 active="Yes" if check.active else "No")
272 )
273 out.extend([f"{len(self.healthcheck)} health check(s) listed", ""])
274
275 return "\n".join(out)
276
277 def as_dict(self) -> Dict[str, Any]:
278 """Return the history in a dictionary.
279
280 Returns:
281 Dict[str, Any]: dictionary indexed by the healthcheck name
282 """
283 return {name: self.healthcheck[name].as_dict() for name in self.healthcheck}
284
285 def as_json(self, pretty: bool = False) -> str:
286 """Return the healthcheck history object as a dict (JSON).
287
288 Args:
289 pretty (bool, optional): whether to json pretty print the history. Defaults to False.
290
291 Returns:
292 str: str representation of the healthcheck in JSON format
293 """
294 if pretty:
295 return json.dumps(self.as_dict(), indent=2)
296 else:
297 return json.dumps(self.as_dict())
298
299 def as_yaml(self) -> str:
300 """Return the healthcheck history in yaml format.
301
302 Returns:
303 str: YAML representation of the healthcheck history
304 """
305 return yaml.safe_dump(self.as_dict(), explicit_start=True, default_flow_style=False)
306
307
308 class Metric(object):
309 def __init__(self, mtype: str, name: str, desc: str, labels: Optional[LabelValues] = None) -> None:
310 self.mtype = mtype
311 self.name = name
312 self.desc = desc
313 self.labelnames = labels # tuple if present
314 self.value: Dict[LabelValues, Number] = {}
315
316 def clear(self) -> None:
317 self.value = {}
318
319 def set(self, value: Number, labelvalues: Optional[LabelValues] = None) -> None:
320 # labelvalues must be a tuple
321 labelvalues = labelvalues or ('',)
322 self.value[labelvalues] = value
323
324 def str_expfmt(self) -> str:
325
326 def promethize(path: str) -> str:
327 ''' replace illegal metric name characters '''
328 result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
329
330 # Hyphens usually turn into underscores, unless they are
331 # trailing
332 if result.endswith("-"):
333 result = result[0:-1] + "_minus"
334 else:
335 result = result.replace("-", "_")
336
337 return "ceph_{0}".format(result)
338
339 def floatstr(value: float) -> str:
340 ''' represent as Go-compatible float '''
341 if value == float('inf'):
342 return '+Inf'
343 if value == float('-inf'):
344 return '-Inf'
345 if math.isnan(value):
346 return 'NaN'
347 return repr(float(value))
348
349 name = promethize(self.name)
350 expfmt = '''
351 # HELP {name} {desc}
352 # TYPE {name} {mtype}'''.format(
353 name=name,
354 desc=self.desc,
355 mtype=self.mtype,
356 )
357
358 for labelvalues, value in self.value.items():
359 if self.labelnames:
360 labels_list = zip(self.labelnames, labelvalues)
361 labels = ','.join('%s="%s"' % (k, v) for k, v in labels_list)
362 else:
363 labels = ''
364 if labels:
365 fmtstr = '\n{name}{{{labels}}} {value}'
366 else:
367 fmtstr = '\n{name} {value}'
368 expfmt += fmtstr.format(
369 name=name,
370 labels=labels,
371 value=floatstr(value),
372 )
373 return expfmt
374
375 def group_by(
376 self,
377 keys: List[str],
378 joins: Dict[str, Callable[[List[str]], str]],
379 name: Optional[str] = None,
380 ) -> "Metric":
381 """
382 Groups data by label names.
383
384 Label names not passed are being removed from the resulting metric but
385 by providing a join function, labels of metrics can be grouped.
386
387 The purpose of this method is to provide a version of a metric that can
388 be used in matching where otherwise multiple results would be returned.
389
390 As grouping is possible in Prometheus, the only additional value of this
391 method is the possibility to join labels when grouping. For that reason,
392 passing joins is required. Please use PromQL expressions in all other
393 cases.
394
395 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
396 >>> m.value = {
397 ... ('foo', 'x'): 1,
398 ... ('foo', 'y'): 1,
399 ... }
400 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
401 {('foo', 'x,y'): 1}
402
403 The functionality of group by could roughly be compared with Prometheus'
404
405 group (ceph_disk_occupation) by (device, instance)
406
407 with the exception that not all labels which aren't used as a condition
408 to group a metric are discarded, but their values can are joined and the
409 label is thereby preserved.
410
411 This function takes the value of the first entry of a found group to be
412 used for the resulting value of the grouping operation.
413
414 >>> m = Metric('type', 'name', '', labels=('label1', 'id'))
415 >>> m.value = {
416 ... ('foo', 'x'): 555,
417 ... ('foo', 'y'): 10,
418 ... }
419 >>> m.group_by(['label1'], {'id': lambda ids: ','.join(ids)}).value
420 {('foo', 'x,y'): 555}
421 """
422 assert self.labelnames, "cannot match keys without label names"
423 for key in keys:
424 assert key in self.labelnames, "unknown key: {}".format(key)
425 assert joins, "joins must not be empty"
426 assert all(callable(c) for c in joins.values()), "joins must be callable"
427
428 # group
429 grouped: Dict[LabelValues, List[Tuple[Dict[str, str], Number]]] = defaultdict(list)
430 for label_values, metric_value in self.value.items():
431 labels = dict(zip(self.labelnames, label_values))
432 if not all(k in labels for k in keys):
433 continue
434 group_key = tuple(labels[k] for k in keys)
435 grouped[group_key].append((labels, metric_value))
436
437 # as there is nothing specified on how to join labels that are not equal
438 # and Prometheus `group` aggregation functions similarly, we simply drop
439 # those labels.
440 labelnames = tuple(
441 label for label in self.labelnames if label in keys or label in joins
442 )
443 superfluous_labelnames = [
444 label for label in self.labelnames if label not in labelnames
445 ]
446
447 # iterate and convert groups with more than one member into a single
448 # entry
449 values: MetricValue = {}
450 for group in grouped.values():
451 labels, metric_value = group[0]
452
453 for label in superfluous_labelnames:
454 del labels[label]
455
456 if len(group) > 1:
457 for key, fn in joins.items():
458 labels[key] = fn(list(labels[key] for labels, _ in group))
459
460 values[tuple(labels.values())] = metric_value
461
462 new_metric = Metric(self.mtype, name if name else self.name, self.desc, labelnames)
463 new_metric.value = values
464
465 return new_metric
466
467
468 class MetricCounter(Metric):
469 def __init__(self,
470 name: str,
471 desc: str,
472 labels: Optional[LabelValues] = None) -> None:
473 super(MetricCounter, self).__init__('counter', name, desc, labels)
474 self.value = defaultdict(lambda: 0)
475
476 def clear(self) -> None:
477 pass # Skip calls to clear as we want to keep the counters here.
478
479 def set(self,
480 value: Number,
481 labelvalues: Optional[LabelValues] = None) -> None:
482 msg = 'This method must not be used for instances of MetricCounter class'
483 raise NotImplementedError(msg)
484
485 def add(self,
486 value: Number,
487 labelvalues: Optional[LabelValues] = None) -> None:
488 # labelvalues must be a tuple
489 labelvalues = labelvalues or ('',)
490 self.value[labelvalues] += value
491
492
493 class MetricCollectionThread(threading.Thread):
494 def __init__(self, module: 'Module') -> None:
495 self.mod = module
496 self.active = True
497 self.event = threading.Event()
498 super(MetricCollectionThread, self).__init__(target=self.collect)
499
500 def collect(self) -> None:
501 self.mod.log.info('starting metric collection thread')
502 while self.active:
503 self.mod.log.debug('collecting cache in thread')
504 if self.mod.have_mon_connection():
505 start_time = time.time()
506
507 try:
508 data = self.mod.collect()
509 except Exception:
510 # Log any issues encountered during the data collection and continue
511 self.mod.log.exception("failed to collect metrics:")
512 self.event.wait(self.mod.scrape_interval)
513 continue
514
515 duration = time.time() - start_time
516 self.mod.log.debug('collecting cache in thread done')
517
518 sleep_time = self.mod.scrape_interval - duration
519 if sleep_time < 0:
520 self.mod.log.warning(
521 'Collecting data took more time than configured scrape interval. '
522 'This possibly results in stale data. Please check the '
523 '`stale_cache_strategy` configuration option. '
524 'Collecting data took {:.2f} seconds but scrape interval is configured '
525 'to be {:.0f} seconds.'.format(
526 duration,
527 self.mod.scrape_interval,
528 )
529 )
530 sleep_time = 0
531
532 with self.mod.collect_lock:
533 self.mod.collect_cache = data
534 self.mod.collect_time = duration
535
536 self.event.wait(sleep_time)
537 else:
538 self.mod.log.error('No MON connection')
539 self.event.wait(self.mod.scrape_interval)
540
541 def stop(self) -> None:
542 self.active = False
543 self.event.set()
544
545
546 class Module(MgrModule):
547 MODULE_OPTIONS = [
548 Option(
549 'server_addr',
550 default=get_default_addr(),
551 desc='the IPv4 or IPv6 address on which the module listens for HTTP requests',
552 ),
553 Option(
554 'server_port',
555 type='int',
556 default=DEFAULT_PORT,
557 desc='the port on which the module listens for HTTP requests',
558 runtime=True
559 ),
560 Option(
561 'scrape_interval',
562 type='float',
563 default=15.0
564 ),
565 Option(
566 'stale_cache_strategy',
567 default='log'
568 ),
569 Option(
570 'cache',
571 type='bool',
572 default=True,
573 ),
574 Option(
575 'rbd_stats_pools',
576 default=''
577 ),
578 Option(
579 name='rbd_stats_pools_refresh_interval',
580 type='int',
581 default=300
582 ),
583 Option(
584 name='standby_behaviour',
585 type='str',
586 default='default',
587 enum_allowed=['default', 'error'],
588 runtime=True
589 ),
590 Option(
591 name='standby_error_status_code',
592 type='int',
593 default=500,
594 min=400,
595 max=599,
596 runtime=True
597 )
598 ]
599
600 STALE_CACHE_FAIL = 'fail'
601 STALE_CACHE_RETURN = 'return'
602
603 def __init__(self, *args: Any, **kwargs: Any) -> None:
604 super(Module, self).__init__(*args, **kwargs)
605 self.metrics = self._setup_static_metrics()
606 self.shutdown_event = threading.Event()
607 self.collect_lock = threading.Lock()
608 self.collect_time = 0.0
609 self.scrape_interval: float = 15.0
610 self.cache = True
611 self.stale_cache_strategy: str = self.STALE_CACHE_FAIL
612 self.collect_cache: Optional[str] = None
613 self.rbd_stats = {
614 'pools': {},
615 'pools_refresh_time': 0,
616 'counters_info': {
617 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
618 'desc': 'RBD image writes count'},
619 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
620 'desc': 'RBD image reads count'},
621 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
622 'desc': 'RBD image bytes written'},
623 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
624 'desc': 'RBD image bytes read'},
625 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
626 'desc': 'RBD image writes latency (msec)'},
627 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
628 'desc': 'RBD image reads latency (msec)'},
629 },
630 } # type: Dict[str, Any]
631 global _global_instance
632 _global_instance = self
633 self.metrics_thread = MetricCollectionThread(_global_instance)
634 self.health_history = HealthHistory(self)
635
636 def _setup_static_metrics(self) -> Dict[str, Metric]:
637 metrics = {}
638 metrics['health_status'] = Metric(
639 'untyped',
640 'health_status',
641 'Cluster health status'
642 )
643 metrics['mon_quorum_status'] = Metric(
644 'gauge',
645 'mon_quorum_status',
646 'Monitors in quorum',
647 ('ceph_daemon',)
648 )
649 metrics['fs_metadata'] = Metric(
650 'untyped',
651 'fs_metadata',
652 'FS Metadata',
653 FS_METADATA
654 )
655 metrics['mds_metadata'] = Metric(
656 'untyped',
657 'mds_metadata',
658 'MDS Metadata',
659 MDS_METADATA
660 )
661 metrics['mon_metadata'] = Metric(
662 'untyped',
663 'mon_metadata',
664 'MON Metadata',
665 MON_METADATA
666 )
667 metrics['mgr_metadata'] = Metric(
668 'gauge',
669 'mgr_metadata',
670 'MGR metadata',
671 MGR_METADATA
672 )
673 metrics['mgr_status'] = Metric(
674 'gauge',
675 'mgr_status',
676 'MGR status (0=standby, 1=active)',
677 MGR_STATUS
678 )
679 metrics['mgr_module_status'] = Metric(
680 'gauge',
681 'mgr_module_status',
682 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
683 MGR_MODULE_STATUS
684 )
685 metrics['mgr_module_can_run'] = Metric(
686 'gauge',
687 'mgr_module_can_run',
688 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
689 MGR_MODULE_CAN_RUN
690 )
691 metrics['osd_metadata'] = Metric(
692 'untyped',
693 'osd_metadata',
694 'OSD Metadata',
695 OSD_METADATA
696 )
697
698 # The reason for having this separate to OSD_METADATA is
699 # so that we can stably use the same tag names that
700 # the Prometheus node_exporter does
701 metrics['disk_occupation'] = Metric(
702 'untyped',
703 'disk_occupation',
704 'Associate Ceph daemon with disk used',
705 DISK_OCCUPATION
706 )
707
708 metrics['disk_occupation_human'] = Metric(
709 'untyped',
710 'disk_occupation_human',
711 'Associate Ceph daemon with disk used for displaying to humans,'
712 ' not for joining tables (vector matching)',
713 DISK_OCCUPATION, # label names are automatically decimated on grouping
714 )
715
716 metrics['pool_metadata'] = Metric(
717 'untyped',
718 'pool_metadata',
719 'POOL Metadata',
720 POOL_METADATA
721 )
722
723 metrics['rgw_metadata'] = Metric(
724 'untyped',
725 'rgw_metadata',
726 'RGW Metadata',
727 RGW_METADATA
728 )
729
730 metrics['rbd_mirror_metadata'] = Metric(
731 'untyped',
732 'rbd_mirror_metadata',
733 'RBD Mirror Metadata',
734 RBD_MIRROR_METADATA
735 )
736
737 metrics['pg_total'] = Metric(
738 'gauge',
739 'pg_total',
740 'PG Total Count per Pool',
741 ('pool_id',)
742 )
743
744 metrics['health_detail'] = Metric(
745 'gauge',
746 'health_detail',
747 'healthcheck status by type (0=inactive, 1=active)',
748 HEALTHCHECK_DETAIL
749 )
750
751 for flag in OSD_FLAGS:
752 path = 'osd_flag_{}'.format(flag)
753 metrics[path] = Metric(
754 'untyped',
755 path,
756 'OSD Flag {}'.format(flag)
757 )
758 for state in OSD_STATUS:
759 path = 'osd_{}'.format(state)
760 metrics[path] = Metric(
761 'untyped',
762 path,
763 'OSD status {}'.format(state),
764 ('ceph_daemon',)
765 )
766 for stat in OSD_STATS:
767 path = 'osd_{}'.format(stat)
768 metrics[path] = Metric(
769 'gauge',
770 path,
771 'OSD stat {}'.format(stat),
772 ('ceph_daemon',)
773 )
774 for stat in OSD_POOL_STATS:
775 path = 'pool_{}'.format(stat)
776 metrics[path] = Metric(
777 'gauge',
778 path,
779 "OSD pool stats: {}".format(stat),
780 ('pool_id',)
781 )
782 for state in PG_STATES:
783 path = 'pg_{}'.format(state)
784 metrics[path] = Metric(
785 'gauge',
786 path,
787 'PG {} per pool'.format(state),
788 ('pool_id',)
789 )
790 for state in DF_CLUSTER:
791 path = 'cluster_{}'.format(state)
792 metrics[path] = Metric(
793 'gauge',
794 path,
795 'DF {}'.format(state),
796 )
797 path = 'cluster_by_class_{}'.format(state)
798 metrics[path] = Metric(
799 'gauge',
800 path,
801 'DF {}'.format(state),
802 ('device_class',)
803 )
804 for state in DF_POOL:
805 path = 'pool_{}'.format(state)
806 metrics[path] = Metric(
807 'counter' if state in ('rd', 'rd_bytes', 'wr', 'wr_bytes') else 'gauge',
808 path,
809 'DF pool {}'.format(state),
810 ('pool_id',)
811 )
812 for state in NUM_OBJECTS:
813 path = 'num_objects_{}'.format(state)
814 metrics[path] = Metric(
815 'gauge',
816 path,
817 'Number of {} objects'.format(state),
818 )
819
820 for check in HEALTH_CHECKS:
821 path = 'healthcheck_{}'.format(check.name.lower())
822 metrics[path] = Metric(
823 'gauge',
824 path,
825 check.description,
826 )
827
828 return metrics
829
830 def get_server_addr(self) -> str:
831 """
832 Return the current mgr server IP.
833 """
834 server_addr = cast(str, self.get_localized_module_option('server_addr', get_default_addr()))
835 if server_addr in ['::', '0.0.0.0']:
836 return self.get_mgr_ip()
837 return server_addr
838
839 def config_notify(self) -> None:
840 """
841 This method is called whenever one of our config options is changed.
842 """
843 # https://stackoverflow.com/questions/7254845/change-cherrypy-port-and-restart-web-server
844 # if we omit the line: cherrypy.server.httpserver = None
845 # then the cherrypy server is not restarted correctly
846 self.log.info('Restarting engine...')
847 cherrypy.engine.stop()
848 cherrypy.server.httpserver = None
849 server_port = cast(int, self.get_localized_module_option('server_port', DEFAULT_PORT))
850 self.set_uri(build_url(scheme='http', host=self.get_server_addr(), port=server_port, path='/'))
851 cherrypy.config.update({'server.socket_port': server_port})
852 cherrypy.engine.start()
853 self.log.info('Engine started.')
854
855 @profile_method()
856 def get_health(self) -> None:
857
858 def _get_value(message: str, delim: str = ' ', word_pos: int = 0) -> Tuple[int, int]:
859 """Extract value from message (default is 1st field)"""
860 v_str = message.split(delim)[word_pos]
861 if v_str.isdigit():
862 return int(v_str), 0
863 return 0, 1
864
865 health = json.loads(self.get('health')['json'])
866 # set overall health
867 self.metrics['health_status'].set(
868 health_status_to_number(health['status'])
869 )
870
871 # Examine the health to see if any health checks triggered need to
872 # become a specific metric with a value from the health detail
873 active_healthchecks = health.get('checks', {})
874 active_names = active_healthchecks.keys()
875
876 for check in HEALTH_CHECKS:
877 path = 'healthcheck_{}'.format(check.name.lower())
878
879 if path in self.metrics:
880
881 if check.name in active_names:
882 check_data = active_healthchecks[check.name]
883 message = check_data['summary'].get('message', '')
884 v, err = 0, 0
885
886 if check.name == "SLOW_OPS":
887 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have
888 # slow ops.
889 v, err = _get_value(message)
890
891 if err:
892 self.log.error(
893 "healthcheck %s message format is incompatible and has been dropped",
894 check.name)
895 # drop the metric, so it's no longer emitted
896 del self.metrics[path]
897 continue
898 else:
899 self.metrics[path].set(v)
900 else:
901 # health check is not active, so give it a default of 0
902 self.metrics[path].set(0)
903
904 self.health_history.check(health)
905 for name, info in self.health_history.healthcheck.items():
906 v = 1 if info.active else 0
907 self.metrics['health_detail'].set(
908 v, (
909 name,
910 str(info.severity))
911 )
912
913 @profile_method()
914 def get_pool_stats(self) -> None:
915 # retrieve pool stats to provide per pool recovery metrics
916 # (osd_pool_stats moved to mgr in Mimic)
917 pstats = self.get('osd_pool_stats')
918 for pool in pstats['pool_stats']:
919 for stat in OSD_POOL_STATS:
920 self.metrics['pool_{}'.format(stat)].set(
921 pool['recovery_rate'].get(stat, 0),
922 (pool['pool_id'],)
923 )
924
925 @profile_method()
926 def get_df(self) -> None:
927 # maybe get the to-be-exported metrics from a config?
928 df = self.get('df')
929 for stat in DF_CLUSTER:
930 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
931 for device_class in df['stats_by_class']:
932 self.metrics['cluster_by_class_{}'.format(stat)].set(df['stats_by_class'][device_class][stat], (device_class,))
933
934 for pool in df['pools']:
935 for stat in DF_POOL:
936 self.metrics['pool_{}'.format(stat)].set(
937 pool['stats'][stat],
938 (pool['id'],)
939 )
940
941 @profile_method()
942 def get_fs(self) -> None:
943 fs_map = self.get('fs_map')
944 servers = self.get_service_list()
945 self.log.debug('standbys: {}'.format(fs_map['standbys']))
946 # export standby mds metadata, default standby fs_id is '-1'
947 for standby in fs_map['standbys']:
948 id_ = standby['name']
949 host, version, _ = servers.get((id_, 'mds'), ('', '', ''))
950 addr, rank = standby['addr'], standby['rank']
951 self.metrics['mds_metadata'].set(1, (
952 'mds.{}'.format(id_), '-1',
953 cast(str, host),
954 cast(str, addr),
955 cast(str, rank),
956 cast(str, version)
957 ))
958 for fs in fs_map['filesystems']:
959 # collect fs metadata
960 data_pools = ",".join([str(pool)
961 for pool in fs['mdsmap']['data_pools']])
962 self.metrics['fs_metadata'].set(1, (
963 data_pools,
964 fs['id'],
965 fs['mdsmap']['metadata_pool'],
966 fs['mdsmap']['fs_name']
967 ))
968 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
969 for gid, daemon in fs['mdsmap']['info'].items():
970 id_ = daemon['name']
971 host, version, _ = servers.get((id_, 'mds'), ('', '', ''))
972 self.metrics['mds_metadata'].set(1, (
973 'mds.{}'.format(id_), fs['id'],
974 host, daemon['addr'],
975 daemon['rank'], version
976 ))
977
978 @profile_method()
979 def get_quorum_status(self) -> None:
980 mon_status = json.loads(self.get('mon_status')['json'])
981 servers = self.get_service_list()
982 for mon in mon_status['monmap']['mons']:
983 rank = mon['rank']
984 id_ = mon['name']
985 mon_version = servers.get((id_, 'mon'), ('', '', ''))
986 self.metrics['mon_metadata'].set(1, (
987 'mon.{}'.format(id_), mon_version[0],
988 mon['public_addr'].rsplit(':', 1)[0], rank,
989 mon_version[1]
990 ))
991 in_quorum = int(rank in mon_status['quorum'])
992 self.metrics['mon_quorum_status'].set(in_quorum, (
993 'mon.{}'.format(id_),
994 ))
995
996 @profile_method()
997 def get_mgr_status(self) -> None:
998 mgr_map = self.get('mgr_map')
999 servers = self.get_service_list()
1000
1001 active = mgr_map['active_name']
1002 standbys = [s.get('name') for s in mgr_map['standbys']]
1003
1004 all_mgrs = list(standbys)
1005 all_mgrs.append(active)
1006
1007 all_modules = {module.get('name'): module.get('can_run')
1008 for module in mgr_map['available_modules']}
1009
1010 for mgr in all_mgrs:
1011 host, version, _ = servers.get((mgr, 'mgr'), ('', '', ''))
1012 if mgr == active:
1013 _state = 1
1014 else:
1015 _state = 0
1016
1017 self.metrics['mgr_metadata'].set(1, (
1018 f'mgr.{mgr}', host, version
1019 ))
1020 self.metrics['mgr_status'].set(_state, (
1021 f'mgr.{mgr}',))
1022 always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
1023 active_modules = list(always_on_modules)
1024 active_modules.extend(mgr_map['modules'])
1025
1026 for mod_name in all_modules.keys():
1027
1028 if mod_name in always_on_modules:
1029 _state = 2
1030 elif mod_name in active_modules:
1031 _state = 1
1032 else:
1033 _state = 0
1034
1035 _can_run = 1 if all_modules[mod_name] else 0
1036 self.metrics['mgr_module_status'].set(_state, (mod_name,))
1037 self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
1038
1039 @profile_method()
1040 def get_pg_status(self) -> None:
1041
1042 pg_summary = self.get('pg_summary')
1043
1044 for pool in pg_summary['by_pool']:
1045 num_by_state = defaultdict(int) # type: DefaultDict[str, int]
1046
1047 for state_name, count in pg_summary['by_pool'][pool].items():
1048 for state in state_name.split('+'):
1049 num_by_state[state] += count
1050 num_by_state['total'] += count
1051
1052 for state, num in num_by_state.items():
1053 try:
1054 self.metrics["pg_{}".format(state)].set(num, (pool,))
1055 except KeyError:
1056 self.log.warning("skipping pg in unknown state {}".format(state))
1057
1058 @profile_method()
1059 def get_osd_stats(self) -> None:
1060 osd_stats = self.get('osd_stats')
1061 for osd in osd_stats['osd_stats']:
1062 id_ = osd['osd']
1063 for stat in OSD_STATS:
1064 val = osd['perf_stat'][stat]
1065 self.metrics['osd_{}'.format(stat)].set(val, (
1066 'osd.{}'.format(id_),
1067 ))
1068
1069 def get_service_list(self) -> Dict[Tuple[str, str], Tuple[str, str, str]]:
1070 ret = {}
1071 for server in self.list_servers():
1072 host = cast(str, server.get('hostname', ''))
1073 for service in cast(List[ServiceInfoT], server.get('services', [])):
1074 ret.update({(service['id'], service['type']): (
1075 host, service['ceph_version'], service.get('name', ''))})
1076 return ret
1077
1078 @profile_method()
1079 def get_metadata_and_osd_status(self) -> None:
1080 osd_map = self.get('osd_map')
1081 osd_flags = osd_map['flags'].split(',')
1082 for flag in OSD_FLAGS:
1083 self.metrics['osd_flag_{}'.format(flag)].set(
1084 int(flag in osd_flags)
1085 )
1086
1087 osd_devices = self.get('osd_map_crush')['devices']
1088 servers = self.get_service_list()
1089 for osd in osd_map['osds']:
1090 # id can be used to link osd metrics and metadata
1091 id_ = osd['osd']
1092 # collect osd metadata
1093 p_addr = osd['public_addr'].rsplit(':', 1)[0]
1094 c_addr = osd['cluster_addr'].rsplit(':', 1)[0]
1095 if p_addr == "-" or c_addr == "-":
1096 self.log.info(
1097 "Missing address metadata for osd {0}, skipping occupation"
1098 " and metadata records for this osd".format(id_)
1099 )
1100 continue
1101
1102 dev_class = None
1103 for osd_device in osd_devices:
1104 if osd_device['id'] == id_:
1105 dev_class = osd_device.get('class', '')
1106 break
1107
1108 if dev_class is None:
1109 self.log.info("OSD {0} is missing from CRUSH map, "
1110 "skipping output".format(id_))
1111 continue
1112
1113 osd_version = servers.get((str(id_), 'osd'), ('', '', ''))
1114
1115 # collect disk occupation metadata
1116 osd_metadata = self.get_metadata("osd", str(id_))
1117 if osd_metadata is None:
1118 continue
1119
1120 obj_store = osd_metadata.get('osd_objectstore', '')
1121 f_iface = osd_metadata.get('front_iface', '')
1122 b_iface = osd_metadata.get('back_iface', '')
1123
1124 self.metrics['osd_metadata'].set(1, (
1125 b_iface,
1126 'osd.{}'.format(id_),
1127 c_addr,
1128 dev_class,
1129 f_iface,
1130 osd_version[0],
1131 obj_store,
1132 p_addr,
1133 osd_version[1]
1134 ))
1135
1136 # collect osd status
1137 for state in OSD_STATUS:
1138 status = osd[state]
1139 self.metrics['osd_{}'.format(state)].set(status, (
1140 'osd.{}'.format(id_),
1141 ))
1142
1143 osd_dev_node = None
1144 osd_wal_dev_node = ''
1145 osd_db_dev_node = ''
1146 if obj_store == "filestore":
1147 # collect filestore backend device
1148 osd_dev_node = osd_metadata.get(
1149 'backend_filestore_dev_node', None)
1150 # collect filestore journal device
1151 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
1152 osd_db_dev_node = ''
1153 elif obj_store == "bluestore":
1154 # collect bluestore backend device
1155 osd_dev_node = osd_metadata.get(
1156 'bluestore_bdev_dev_node', None)
1157 # collect bluestore wal backend
1158 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
1159 # collect bluestore db backend
1160 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
1161 if osd_dev_node and osd_dev_node == "unknown":
1162 osd_dev_node = None
1163
1164 # fetch the devices and ids (vendor, model, serial) from the
1165 # osd_metadata
1166 osd_devs = osd_metadata.get('devices', '') or 'N/A'
1167 osd_dev_ids = osd_metadata.get('device_ids', '') or 'N/A'
1168
1169 osd_hostname = osd_metadata.get('hostname', None)
1170 if osd_dev_node and osd_hostname:
1171 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
1172 id_, osd_hostname, osd_dev_node))
1173 self.metrics['disk_occupation'].set(1, (
1174 "osd.{0}".format(id_),
1175 osd_dev_node,
1176 osd_db_dev_node,
1177 osd_wal_dev_node,
1178 osd_hostname,
1179 osd_devs,
1180 osd_dev_ids,
1181 ))
1182 else:
1183 self.log.info("Missing dev node metadata for osd {0}, skipping "
1184 "occupation record for this osd".format(id_))
1185
1186 if 'disk_occupation' in self.metrics:
1187 try:
1188 self.metrics['disk_occupation_human'] = \
1189 self.metrics['disk_occupation'].group_by(
1190 ['device', 'instance'],
1191 {'ceph_daemon': lambda daemons: ', '.join(daemons)},
1192 name='disk_occupation_human',
1193 )
1194 except Exception as e:
1195 self.log.error(e)
1196
1197 ec_profiles = osd_map.get('erasure_code_profiles', {})
1198
1199 def _get_pool_info(pool: Dict[str, Any]) -> Tuple[str, str]:
1200 pool_type = 'unknown'
1201 description = 'unknown'
1202
1203 if pool['type'] == 1:
1204 pool_type = "replicated"
1205 description = f"replica:{pool['size']}"
1206 elif pool['type'] == 3:
1207 pool_type = "erasure"
1208 name = pool.get('erasure_code_profile', '')
1209 profile = ec_profiles.get(name, {})
1210 if profile:
1211 description = f"ec:{profile['k']}+{profile['m']}"
1212 else:
1213 description = "ec:unknown"
1214
1215 return pool_type, description
1216
1217 for pool in osd_map['pools']:
1218
1219 compression_mode = 'none'
1220 pool_type, pool_description = _get_pool_info(pool)
1221
1222 if 'options' in pool:
1223 compression_mode = pool['options'].get('compression_mode', 'none')
1224
1225 self.metrics['pool_metadata'].set(
1226 1, (
1227 pool['pool'],
1228 pool['pool_name'],
1229 pool_type,
1230 pool_description,
1231 compression_mode)
1232 )
1233
1234 # Populate other servers metadata
1235 for key, value in servers.items():
1236 service_id, service_type = key
1237 if service_type == 'rgw':
1238 hostname, version, name = value
1239 self.metrics['rgw_metadata'].set(
1240 1,
1241 ('{}.{}'.format(service_type, name),
1242 hostname, version, service_id)
1243 )
1244 elif service_type == 'rbd-mirror':
1245 mirror_metadata = self.get_metadata('rbd-mirror', service_id)
1246 if mirror_metadata is None:
1247 continue
1248 mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
1249 service_id)
1250 rbd_mirror_metadata = cast(LabelValues,
1251 (mirror_metadata.get(k, '')
1252 for k in RBD_MIRROR_METADATA))
1253 self.metrics['rbd_mirror_metadata'].set(
1254 1, rbd_mirror_metadata
1255 )
1256
1257 @profile_method()
1258 def get_num_objects(self) -> None:
1259 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
1260 for obj in NUM_OBJECTS:
1261 stat = 'num_objects_{}'.format(obj)
1262 self.metrics[stat].set(pg_sum[stat])
1263
1264 @profile_method()
1265 def get_rbd_stats(self) -> None:
1266 # Per RBD image stats is collected by registering a dynamic osd perf
1267 # stats query that tells OSDs to group stats for requests associated
1268 # with RBD objects by pool, namespace, and image id, which are
1269 # extracted from the request object names or other attributes.
1270 # The RBD object names have the following prefixes:
1271 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
1272 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
1273 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
1274 # The pool_id in the object name is the id of the pool with the image
1275 # metdata, and should be used in the image spec. If there is no pool_id
1276 # in the object name, the image pool is the pool where the object is
1277 # located.
1278
1279 # Parse rbd_stats_pools option, which is a comma or space separated
1280 # list of pool[/namespace] entries. If no namespace is specifed the
1281 # stats are collected for every namespace in the pool. The wildcard
1282 # '*' can be used to indicate all pools or namespaces
1283 pools_string = cast(str, self.get_localized_module_option('rbd_stats_pools'))
1284 pool_keys = []
1285 for x in re.split(r'[\s,]+', pools_string):
1286 if not x:
1287 continue
1288
1289 s = x.split('/', 2)
1290 pool_name = s[0]
1291 namespace_name = None
1292 if len(s) == 2:
1293 namespace_name = s[1]
1294
1295 if pool_name == "*":
1296 # collect for all pools
1297 osd_map = self.get('osd_map')
1298 for pool in osd_map['pools']:
1299 if 'rbd' not in pool.get('application_metadata', {}):
1300 continue
1301 pool_keys.append((pool['pool_name'], namespace_name))
1302 else:
1303 pool_keys.append((pool_name, namespace_name))
1304
1305 pools = {} # type: Dict[str, Set[str]]
1306 for pool_key in pool_keys:
1307 pool_name = pool_key[0]
1308 namespace_name = pool_key[1]
1309 if not namespace_name or namespace_name == "*":
1310 # empty set means collect for all namespaces
1311 pools[pool_name] = set()
1312 continue
1313
1314 if pool_name not in pools:
1315 pools[pool_name] = set()
1316 elif not pools[pool_name]:
1317 continue
1318 pools[pool_name].add(namespace_name)
1319
1320 rbd_stats_pools = {}
1321 for pool_id in self.rbd_stats['pools'].keys():
1322 name = self.rbd_stats['pools'][pool_id]['name']
1323 if name not in pools:
1324 del self.rbd_stats['pools'][pool_id]
1325 else:
1326 rbd_stats_pools[name] = \
1327 self.rbd_stats['pools'][pool_id]['ns_names']
1328
1329 pools_refreshed = False
1330 if pools:
1331 next_refresh = self.rbd_stats['pools_refresh_time'] + \
1332 self.get_localized_module_option(
1333 'rbd_stats_pools_refresh_interval', 300)
1334 if rbd_stats_pools != pools or time.time() >= next_refresh:
1335 self.refresh_rbd_stats_pools(pools)
1336 pools_refreshed = True
1337
1338 pool_ids = list(self.rbd_stats['pools'])
1339 pool_ids.sort()
1340 pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
1341
1342 nspace_names = []
1343 for pool_id, pool in self.rbd_stats['pools'].items():
1344 if pool['ns_names']:
1345 nspace_names.extend(pool['ns_names'])
1346 else:
1347 nspace_names = []
1348 break
1349 if nspace_names:
1350 namespace_regex = '^(' + \
1351 "|".join([re.escape(x)
1352 for x in set(nspace_names)]) + ')$'
1353 else:
1354 namespace_regex = '^(.*)$'
1355
1356 if ('query' in self.rbd_stats
1357 and (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex']
1358 or namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex'])):
1359 self.remove_osd_perf_query(self.rbd_stats['query_id'])
1360 del self.rbd_stats['query_id']
1361 del self.rbd_stats['query']
1362
1363 if not self.rbd_stats['pools']:
1364 return
1365
1366 counters_info = self.rbd_stats['counters_info']
1367
1368 if 'query_id' not in self.rbd_stats:
1369 query = {
1370 'key_descriptor': [
1371 {'type': 'pool_id', 'regex': pool_id_regex},
1372 {'type': 'namespace', 'regex': namespace_regex},
1373 {'type': 'object_name',
1374 'regex': r'^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
1375 ],
1376 'performance_counter_descriptors': list(counters_info),
1377 }
1378 query_id = self.add_osd_perf_query(query)
1379 if query_id is None:
1380 self.log.error('failed to add query %s' % query)
1381 return
1382 self.rbd_stats['query'] = query
1383 self.rbd_stats['query_id'] = query_id
1384
1385 res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
1386 assert res
1387 for c in res['counters']:
1388 # if the pool id is not found in the object name use id of the
1389 # pool where the object is located
1390 if c['k'][2][0]:
1391 pool_id = int(c['k'][2][0])
1392 else:
1393 pool_id = int(c['k'][0][0])
1394 if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
1395 self.refresh_rbd_stats_pools(pools)
1396 pools_refreshed = True
1397 if pool_id not in self.rbd_stats['pools']:
1398 continue
1399 pool = self.rbd_stats['pools'][pool_id]
1400 nspace_name = c['k'][1][0]
1401 if nspace_name not in pool['images']:
1402 continue
1403 image_id = c['k'][2][1]
1404 if image_id not in pool['images'][nspace_name] and \
1405 not pools_refreshed:
1406 self.refresh_rbd_stats_pools(pools)
1407 pool = self.rbd_stats['pools'][pool_id]
1408 pools_refreshed = True
1409 if image_id not in pool['images'][nspace_name]:
1410 continue
1411 counters = pool['images'][nspace_name][image_id]['c']
1412 for i in range(len(c['c'])):
1413 counters[i][0] += c['c'][i][0]
1414 counters[i][1] += c['c'][i][1]
1415
1416 label_names = ("pool", "namespace", "image")
1417 for pool_id, pool in self.rbd_stats['pools'].items():
1418 pool_name = pool['name']
1419 for nspace_name, images in pool['images'].items():
1420 for image_id in images:
1421 image_name = images[image_id]['n']
1422 counters = images[image_id]['c']
1423 i = 0
1424 for key in counters_info:
1425 counter_info = counters_info[key]
1426 stattype = self._stattype_to_str(counter_info['type'])
1427 labels = (pool_name, nspace_name, image_name)
1428 if counter_info['type'] == self.PERFCOUNTER_COUNTER:
1429 path = 'rbd_' + key
1430 if path not in self.metrics:
1431 self.metrics[path] = Metric(
1432 stattype,
1433 path,
1434 counter_info['desc'],
1435 label_names,
1436 )
1437 self.metrics[path].set(counters[i][0], labels)
1438 elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
1439 path = 'rbd_' + key + '_sum'
1440 if path not in self.metrics:
1441 self.metrics[path] = Metric(
1442 stattype,
1443 path,
1444 counter_info['desc'] + ' Total',
1445 label_names,
1446 )
1447 self.metrics[path].set(counters[i][0], labels)
1448 path = 'rbd_' + key + '_count'
1449 if path not in self.metrics:
1450 self.metrics[path] = Metric(
1451 'counter',
1452 path,
1453 counter_info['desc'] + ' Count',
1454 label_names,
1455 )
1456 self.metrics[path].set(counters[i][1], labels)
1457 i += 1
1458
1459 def refresh_rbd_stats_pools(self, pools: Dict[str, Set[str]]) -> None:
1460 self.log.debug('refreshing rbd pools %s' % (pools))
1461
1462 rbd = RBD()
1463 counters_info = self.rbd_stats['counters_info']
1464 for pool_name, cfg_ns_names in pools.items():
1465 try:
1466 pool_id = self.rados.pool_lookup(pool_name)
1467 with self.rados.open_ioctx(pool_name) as ioctx:
1468 if pool_id not in self.rbd_stats['pools']:
1469 self.rbd_stats['pools'][pool_id] = {'images': {}}
1470 pool = self.rbd_stats['pools'][pool_id]
1471 pool['name'] = pool_name
1472 pool['ns_names'] = cfg_ns_names
1473 if cfg_ns_names:
1474 nspace_names = list(cfg_ns_names)
1475 else:
1476 nspace_names = [''] + rbd.namespace_list(ioctx)
1477 for nspace_name in pool['images']:
1478 if nspace_name not in nspace_names:
1479 del pool['images'][nspace_name]
1480 for nspace_name in nspace_names:
1481 if nspace_name and\
1482 not rbd.namespace_exists(ioctx, nspace_name):
1483 self.log.debug('unknown namespace %s for pool %s' %
1484 (nspace_name, pool_name))
1485 continue
1486 ioctx.set_namespace(nspace_name)
1487 if nspace_name not in pool['images']:
1488 pool['images'][nspace_name] = {}
1489 namespace = pool['images'][nspace_name]
1490 images = {}
1491 for image_meta in RBD().list2(ioctx):
1492 image = {'n': image_meta['name']}
1493 image_id = image_meta['id']
1494 if image_id in namespace:
1495 image['c'] = namespace[image_id]['c']
1496 else:
1497 image['c'] = [[0, 0] for x in counters_info]
1498 images[image_id] = image
1499 pool['images'][nspace_name] = images
1500 except Exception as e:
1501 self.log.error('failed listing pool %s: %s' % (pool_name, e))
1502 self.rbd_stats['pools_refresh_time'] = time.time()
1503
1504 def shutdown_rbd_stats(self) -> None:
1505 if 'query_id' in self.rbd_stats:
1506 self.remove_osd_perf_query(self.rbd_stats['query_id'])
1507 del self.rbd_stats['query_id']
1508 del self.rbd_stats['query']
1509 self.rbd_stats['pools'].clear()
1510
1511 def add_fixed_name_metrics(self) -> None:
1512 """
1513 Add fixed name metrics from existing ones that have details in their names
1514 that should be in labels (not in name).
1515 For backward compatibility, a new fixed name metric is created (instead of replacing)
1516 and details are put in new labels.
1517 Intended for RGW sync perf. counters but extendable as required.
1518 See: https://tracker.ceph.com/issues/45311
1519 """
1520 new_metrics = {}
1521 for metric_path, metrics in self.metrics.items():
1522 # Address RGW sync perf. counters.
1523 match = re.search(r'^data-sync-from-(.*)\.', metric_path)
1524 if match:
1525 new_path = re.sub('from-([^.]*)', 'from-zone', metric_path)
1526 if new_path not in new_metrics:
1527 new_metrics[new_path] = Metric(
1528 metrics.mtype,
1529 new_path,
1530 metrics.desc,
1531 cast(LabelValues, metrics.labelnames) + ('source_zone',)
1532 )
1533 for label_values, value in metrics.value.items():
1534 new_metrics[new_path].set(value, label_values + (match.group(1),))
1535
1536 self.metrics.update(new_metrics)
1537
1538 def get_collect_time_metrics(self) -> None:
1539 sum_metric = self.metrics.get('prometheus_collect_duration_seconds_sum')
1540 count_metric = self.metrics.get('prometheus_collect_duration_seconds_count')
1541 if sum_metric is None:
1542 sum_metric = MetricCounter(
1543 'prometheus_collect_duration_seconds_sum',
1544 'The sum of seconds took to collect all metrics of this exporter',
1545 ('method',))
1546 self.metrics['prometheus_collect_duration_seconds_sum'] = sum_metric
1547 if count_metric is None:
1548 count_metric = MetricCounter(
1549 'prometheus_collect_duration_seconds_count',
1550 'The amount of metrics gathered for this exporter',
1551 ('method',))
1552 self.metrics['prometheus_collect_duration_seconds_count'] = count_metric
1553
1554 # Collect all timing data and make it available as metric, excluding the
1555 # `collect` method because it has not finished at this point and hence
1556 # there's no `_execution_duration` attribute to be found. The
1557 # `_execution_duration` attribute is added by the `profile_method`
1558 # decorator.
1559 for method_name, method in Module.__dict__.items():
1560 duration = getattr(method, '_execution_duration', None)
1561 if duration is not None:
1562 cast(MetricCounter, sum_metric).add(duration, (method_name,))
1563 cast(MetricCounter, count_metric).add(1, (method_name,))
1564
1565 @profile_method(True)
1566 def collect(self) -> str:
1567 # Clear the metrics before scraping
1568 for k in self.metrics.keys():
1569 self.metrics[k].clear()
1570
1571 self.get_health()
1572 self.get_df()
1573 self.get_pool_stats()
1574 self.get_fs()
1575 self.get_osd_stats()
1576 self.get_quorum_status()
1577 self.get_mgr_status()
1578 self.get_metadata_and_osd_status()
1579 self.get_pg_status()
1580 self.get_num_objects()
1581
1582 for daemon, counters in self.get_all_perf_counters().items():
1583 for path, counter_info in counters.items():
1584 # Skip histograms, they are represented by long running avgs
1585 stattype = self._stattype_to_str(counter_info['type'])
1586 if not stattype or stattype == 'histogram':
1587 self.log.debug('ignoring %s, type %s' % (path, stattype))
1588 continue
1589
1590 path, label_names, labels = self._perfpath_to_path_labels(
1591 daemon, path)
1592
1593 # Get the value of the counter
1594 value = self._perfvalue_to_value(
1595 counter_info['type'], counter_info['value'])
1596
1597 # Represent the long running avgs as sum/count pairs
1598 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
1599 _path = path + '_sum'
1600 if _path not in self.metrics:
1601 self.metrics[_path] = Metric(
1602 stattype,
1603 _path,
1604 counter_info['description'] + ' Total',
1605 label_names,
1606 )
1607 self.metrics[_path].set(value, labels)
1608
1609 _path = path + '_count'
1610 if _path not in self.metrics:
1611 self.metrics[_path] = Metric(
1612 'counter',
1613 _path,
1614 counter_info['description'] + ' Count',
1615 label_names,
1616 )
1617 self.metrics[_path].set(counter_info['count'], labels,)
1618 else:
1619 if path not in self.metrics:
1620 self.metrics[path] = Metric(
1621 stattype,
1622 path,
1623 counter_info['description'],
1624 label_names,
1625 )
1626 self.metrics[path].set(value, labels)
1627
1628 self.add_fixed_name_metrics()
1629 self.get_rbd_stats()
1630
1631 self.get_collect_time_metrics()
1632
1633 # Return formatted metrics and clear no longer used data
1634 _metrics = [m.str_expfmt() for m in self.metrics.values()]
1635 for k in self.metrics.keys():
1636 self.metrics[k].clear()
1637
1638 return ''.join(_metrics) + '\n'
1639
1640 @CLIReadCommand('prometheus file_sd_config')
1641 def get_file_sd_config(self) -> Tuple[int, str, str]:
1642 '''
1643 Return file_sd compatible prometheus config for mgr cluster
1644 '''
1645 servers = self.list_servers()
1646 targets = []
1647 for server in servers:
1648 hostname = server.get('hostname', '')
1649 for service in cast(List[ServiceInfoT], server.get('services', [])):
1650 if service['type'] != 'mgr':
1651 continue
1652 id_ = service['id']
1653 port = self._get_module_option('server_port', DEFAULT_PORT, id_)
1654 targets.append(f'{hostname}:{port}')
1655 ret = [
1656 {
1657 "targets": targets,
1658 "labels": {}
1659 }
1660 ]
1661 return 0, json.dumps(ret), ""
1662
1663 def self_test(self) -> None:
1664 self.collect()
1665 self.get_file_sd_config()
1666
1667 def serve(self) -> None:
1668
1669 class Root(object):
1670
1671 # collapse everything to '/'
1672 def _cp_dispatch(self, vpath: str) -> 'Root':
1673 cherrypy.request.path = ''
1674 return self
1675
1676 @cherrypy.expose
1677 def index(self) -> str:
1678 return '''<!DOCTYPE html>
1679 <html>
1680 <head><title>Ceph Exporter</title></head>
1681 <body>
1682 <h1>Ceph Exporter</h1>
1683 <p><a href='/metrics'>Metrics</a></p>
1684 </body>
1685 </html>'''
1686
1687 @cherrypy.expose
1688 def metrics(self) -> Optional[str]:
1689 # Lock the function execution
1690 assert isinstance(_global_instance, Module)
1691 with _global_instance.collect_lock:
1692 return self._metrics(_global_instance)
1693
1694 @staticmethod
1695 def _metrics(instance: 'Module') -> Optional[str]:
1696 if not self.cache:
1697 self.log.debug('Cache disabled, collecting and returning without cache')
1698 cherrypy.response.headers['Content-Type'] = 'text/plain'
1699 return self.collect()
1700
1701 # Return cached data if available
1702 if not instance.collect_cache:
1703 raise cherrypy.HTTPError(503, 'No cached data available yet')
1704
1705 def respond() -> Optional[str]:
1706 assert isinstance(instance, Module)
1707 cherrypy.response.headers['Content-Type'] = 'text/plain'
1708 return instance.collect_cache
1709
1710 if instance.collect_time < instance.scrape_interval:
1711 # Respond if cache isn't stale
1712 return respond()
1713
1714 if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN:
1715 # Respond even if cache is stale
1716 instance.log.info(
1717 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1718 'returning metrics from stale cache.'.format(
1719 instance.collect_time,
1720 instance.collect_time - instance.scrape_interval
1721 )
1722 )
1723 return respond()
1724
1725 if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL:
1726 # Fail if cache is stale
1727 msg = (
1728 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1729 'returning "service unavailable".'.format(
1730 instance.collect_time,
1731 instance.collect_time - instance.scrape_interval,
1732 )
1733 )
1734 instance.log.error(msg)
1735 raise cherrypy.HTTPError(503, msg)
1736 return None
1737
1738 # Make the cache timeout for collecting configurable
1739 self.scrape_interval = cast(float, self.get_localized_module_option('scrape_interval'))
1740
1741 self.stale_cache_strategy = cast(
1742 str, self.get_localized_module_option('stale_cache_strategy'))
1743 if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL,
1744 self.STALE_CACHE_RETURN]:
1745 self.stale_cache_strategy = self.STALE_CACHE_FAIL
1746
1747 server_addr = cast(str, self.get_localized_module_option(
1748 'server_addr', get_default_addr()))
1749 server_port = cast(int, self.get_localized_module_option(
1750 'server_port', DEFAULT_PORT))
1751 self.log.info(
1752 "server_addr: %s server_port: %s" %
1753 (server_addr, server_port)
1754 )
1755
1756 self.cache = cast(bool, self.get_localized_module_option('cache', True))
1757 if self.cache:
1758 self.log.info('Cache enabled')
1759 self.metrics_thread.start()
1760 else:
1761 self.log.info('Cache disabled')
1762
1763 cherrypy.config.update({
1764 'server.socket_host': server_addr,
1765 'server.socket_port': server_port,
1766 'engine.autoreload.on': False
1767 })
1768 # Publish the URI that others may use to access the service we're
1769 # about to start serving
1770 self.set_uri(build_url(scheme='http', host=self.get_server_addr(), port=server_port, path='/'))
1771
1772 cherrypy.tree.mount(Root(), "/")
1773 self.log.info('Starting engine...')
1774 cherrypy.engine.start()
1775 self.log.info('Engine started.')
1776 # wait for the shutdown event
1777 self.shutdown_event.wait()
1778 self.shutdown_event.clear()
1779 # tell metrics collection thread to stop collecting new metrics
1780 self.metrics_thread.stop()
1781 cherrypy.engine.stop()
1782 cherrypy.server.httpserver = None
1783 self.log.info('Engine stopped.')
1784 self.shutdown_rbd_stats()
1785 # wait for the metrics collection thread to stop
1786 self.metrics_thread.join()
1787
1788 def shutdown(self) -> None:
1789 self.log.info('Stopping engine...')
1790 self.shutdown_event.set()
1791
1792 @CLIReadCommand('healthcheck history ls')
1793 def _list_healthchecks(self, format: Format = Format.plain) -> HandleCommandResult:
1794 """List all the healthchecks being tracked
1795
1796 The format options are parsed in ceph_argparse, before they get evaluated here so
1797 we can safely assume that what we have to process is valid. ceph_argparse will throw
1798 a ValueError if the cast to our Format class fails.
1799
1800 Args:
1801 format (Format, optional): output format. Defaults to Format.plain.
1802
1803 Returns:
1804 HandleCommandResult: return code, stdout and stderr returned to the caller
1805 """
1806
1807 out = ""
1808 if format == Format.plain:
1809 out = str(self.health_history)
1810 elif format == Format.yaml:
1811 out = self.health_history.as_yaml()
1812 else:
1813 out = self.health_history.as_json(format == Format.json_pretty)
1814
1815 return HandleCommandResult(retval=0, stdout=out)
1816
1817 @CLIWriteCommand('healthcheck history clear')
1818 def _clear_healthchecks(self) -> HandleCommandResult:
1819 """Clear the healthcheck history"""
1820 self.health_history.reset()
1821 return HandleCommandResult(retval=0, stdout="healthcheck history cleared")
1822
1823
1824 class StandbyModule(MgrStandbyModule):
1825
1826 MODULE_OPTIONS = Module.MODULE_OPTIONS
1827
1828 def __init__(self, *args: Any, **kwargs: Any) -> None:
1829 super(StandbyModule, self).__init__(*args, **kwargs)
1830 self.shutdown_event = threading.Event()
1831
1832 def serve(self) -> None:
1833 server_addr = self.get_localized_module_option(
1834 'server_addr', get_default_addr())
1835 server_port = self.get_localized_module_option(
1836 'server_port', DEFAULT_PORT)
1837 self.log.info("server_addr: %s server_port: %s" %
1838 (server_addr, server_port))
1839 cherrypy.config.update({
1840 'server.socket_host': server_addr,
1841 'server.socket_port': server_port,
1842 'engine.autoreload.on': False,
1843 'request.show_tracebacks': False
1844 })
1845
1846 module = self
1847
1848 class Root(object):
1849 @cherrypy.expose
1850 def index(self) -> str:
1851 standby_behaviour = module.get_module_option('standby_behaviour')
1852 if standby_behaviour == 'default':
1853 active_uri = module.get_active_uri()
1854 return '''<!DOCTYPE html>
1855 <html>
1856 <head><title>Ceph Exporter</title></head>
1857 <body>
1858 <h1>Ceph Exporter</h1>
1859 <p><a href='{}metrics'>Metrics</a></p>
1860 </body>
1861 </html>'''.format(active_uri)
1862 else:
1863 status = module.get_module_option('standby_error_status_code')
1864 raise cherrypy.HTTPError(status, message="Keep on looking")
1865
1866 @cherrypy.expose
1867 def metrics(self) -> str:
1868 cherrypy.response.headers['Content-Type'] = 'text/plain'
1869 return ''
1870
1871 cherrypy.tree.mount(Root(), '/', {})
1872 self.log.info('Starting engine...')
1873 cherrypy.engine.start()
1874 self.log.info('Engine started.')
1875 # Wait for shutdown event
1876 self.shutdown_event.wait()
1877 self.shutdown_event.clear()
1878 cherrypy.engine.stop()
1879 cherrypy.server.httpserver = None
1880 self.log.info('Engine stopped.')
1881
1882 def shutdown(self) -> None:
1883 self.log.info("Stopping engine...")
1884 self.shutdown_event.set()
1885 self.log.info("Stopped engine")