]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
f3068250fd9fd023e33fdb2b214feff98817d916
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 from collections import defaultdict
3 from distutils.version import StrictVersion
4 import json
5 import errno
6 import math
7 import os
8 import re
9 import socket
10 import threading
11 import time
12 from mgr_module import MgrModule, MgrStandbyModule, PG_STATES
13 from mgr_util import get_default_addr, profile_method
14 from rbd import RBD
15 from collections import namedtuple
16 try:
17 from typing import DefaultDict, Optional, Dict, Any, Set
18 except ImportError:
19 pass
20
21 # Defaults for the Prometheus HTTP server. Can also set in config-key
22 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
23 # for Prometheus exporter port registry
24
25 DEFAULT_PORT = 9283
26
27 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
28 # that the ports its listening on are in fact bound. When using the any address
29 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
30 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
31 # exception.
32 if cherrypy is not None:
33 v = StrictVersion(cherrypy.__version__)
34 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
35 # centos:7) and back to at least 3.0.0.
36 if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"):
37 # https://github.com/cherrypy/cherrypy/issues/1100
38 from cherrypy.process import servers
39 servers.wait_for_occupied_port = lambda host, port: None
40
41
42 # cherrypy likes to sys.exit on error. don't let it take us down too!
43 def os_exit_noop(*args, **kwargs):
44 pass
45
46
47 os._exit = os_exit_noop
48
49 # to access things in class Module from subclass Root. Because
50 # it's a dict, the writer doesn't need to declare 'global' for access
51
52 _global_instance = None # type: Optional[Module]
53
54
55 def health_status_to_number(status):
56 if status == 'HEALTH_OK':
57 return 0
58 elif status == 'HEALTH_WARN':
59 return 1
60 elif status == 'HEALTH_ERR':
61 return 2
62
63
64 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
65
66 DF_POOL = ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
67 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
68 'compress_bytes_used', 'compress_under_bytes']
69
70 OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
71 'recovering_keys_per_sec', 'num_objects_recovered',
72 'num_bytes_recovered', 'num_bytes_recovered')
73
74 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
75 'norecover', 'noscrub', 'nodeep-scrub')
76
77 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
78
79 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
80 'ceph_version')
81
82 MON_METADATA = ('ceph_daemon', 'hostname',
83 'public_addr', 'rank', 'ceph_version')
84
85 MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
86
87 MGR_STATUS = ('ceph_daemon',)
88
89 MGR_MODULE_STATUS = ('name',)
90
91 MGR_MODULE_CAN_RUN = ('name',)
92
93 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
94 'front_iface', 'hostname', 'objectstore', 'public_addr',
95 'ceph_version')
96
97 OSD_STATUS = ['weight', 'up', 'in']
98
99 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
100
101 POOL_METADATA = ('pool_id', 'name')
102
103 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
104
105 RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
106 'ceph_version')
107
108 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
109 'wal_device', 'instance')
110
111 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
112
113 alert_metric = namedtuple('alert_metric', 'name description')
114 HEALTH_CHECKS = [
115 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process' ),
116 ]
117
118
119 class Metric(object):
120 def __init__(self, mtype, name, desc, labels=None):
121 self.mtype = mtype
122 self.name = name
123 self.desc = desc
124 self.labelnames = labels # tuple if present
125 self.value = {} # indexed by label values
126
127 def clear(self):
128 self.value = {}
129
130 def set(self, value, labelvalues=None):
131 # labelvalues must be a tuple
132 labelvalues = labelvalues or ('',)
133 self.value[labelvalues] = value
134
135 def str_expfmt(self):
136
137 def promethize(path):
138 ''' replace illegal metric name characters '''
139 result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
140
141 # Hyphens usually turn into underscores, unless they are
142 # trailing
143 if result.endswith("-"):
144 result = result[0:-1] + "_minus"
145 else:
146 result = result.replace("-", "_")
147
148 return "ceph_{0}".format(result)
149
150 def floatstr(value):
151 ''' represent as Go-compatible float '''
152 if value == float('inf'):
153 return '+Inf'
154 if value == float('-inf'):
155 return '-Inf'
156 if math.isnan(value):
157 return 'NaN'
158 return repr(float(value))
159
160 name = promethize(self.name)
161 expfmt = '''
162 # HELP {name} {desc}
163 # TYPE {name} {mtype}'''.format(
164 name=name,
165 desc=self.desc,
166 mtype=self.mtype,
167 )
168
169 for labelvalues, value in self.value.items():
170 if self.labelnames:
171 labels_list = zip(self.labelnames, labelvalues)
172 labels = ','.join('%s="%s"' % (k, v) for k, v in labels_list)
173 else:
174 labels = ''
175 if labels:
176 fmtstr = '\n{name}{{{labels}}} {value}'
177 else:
178 fmtstr = '\n{name} {value}'
179 expfmt += fmtstr.format(
180 name=name,
181 labels=labels,
182 value=floatstr(value),
183 )
184 return expfmt
185
186
187 class MetricCollectionThread(threading.Thread):
188 def __init__(self, module):
189 # type: (Module) -> None
190 self.mod = module
191 self.active = True
192 self.event = threading.Event()
193 super(MetricCollectionThread, self).__init__(target=self.collect)
194
195 def collect(self):
196 self.mod.log.info('starting metric collection thread')
197 while self.active:
198 self.mod.log.debug('collecting cache in thread')
199 if self.mod.have_mon_connection():
200 start_time = time.time()
201
202 try:
203 data = self.mod.collect()
204 except:
205 # Log any issues encountered during the data collection and continue
206 self.mod.log.exception("failed to collect metrics:")
207 self.event.wait(self.mod.scrape_interval)
208 continue
209
210 duration = time.time() - start_time
211 self.mod.log.debug('collecting cache in thread done')
212
213 sleep_time = self.mod.scrape_interval - duration
214 if sleep_time < 0:
215 self.mod.log.warning(
216 'Collecting data took more time than configured scrape interval. '
217 'This possibly results in stale data. Please check the '
218 '`stale_cache_strategy` configuration option. '
219 'Collecting data took {:.2f} seconds but scrape interval is configured '
220 'to be {:.0f} seconds.'.format(
221 duration,
222 self.mod.scrape_interval,
223 )
224 )
225 sleep_time = 0
226
227 with self.mod.collect_lock:
228 self.mod.collect_cache = data
229 self.mod.collect_time = duration
230
231 self.event.wait(sleep_time)
232 else:
233 self.mod.log.error('No MON connection')
234 self.event.wait(self.mod.scrape_interval)
235
236 def stop(self):
237 self.active = False
238 self.event.set()
239
240 class Module(MgrModule):
241 COMMANDS = [
242 {
243 "cmd": "prometheus file_sd_config",
244 "desc": "Return file_sd compatible prometheus config for mgr cluster",
245 "perm": "r"
246 },
247 ]
248
249 MODULE_OPTIONS = [
250 {'name': 'server_addr'},
251 {'name': 'server_port'},
252 {'name': 'scrape_interval'},
253 {'name': 'stale_cache_strategy'},
254 {'name': 'rbd_stats_pools'},
255 {'name': 'rbd_stats_pools_refresh_interval', 'type': 'int', 'default': 300},
256 ]
257
258 STALE_CACHE_FAIL = 'fail'
259 STALE_CACHE_RETURN = 'return'
260
261 def __init__(self, *args, **kwargs):
262 super(Module, self).__init__(*args, **kwargs)
263 self.metrics = self._setup_static_metrics()
264 self.shutdown_event = threading.Event()
265 self.collect_lock = threading.Lock()
266 self.collect_time = 0.0
267 self.scrape_interval = 15.0
268 self.stale_cache_strategy = self.STALE_CACHE_FAIL
269 self.collect_cache = None
270 self.rbd_stats = {
271 'pools': {},
272 'pools_refresh_time': 0,
273 'counters_info': {
274 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
275 'desc': 'RBD image writes count'},
276 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
277 'desc': 'RBD image reads count'},
278 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
279 'desc': 'RBD image bytes written'},
280 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
281 'desc': 'RBD image bytes read'},
282 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
283 'desc': 'RBD image writes latency (msec)'},
284 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
285 'desc': 'RBD image reads latency (msec)'},
286 },
287 } # type: Dict[str, Any]
288 global _global_instance
289 _global_instance = self
290 self.metrics_thread = MetricCollectionThread(_global_instance)
291
292 def _setup_static_metrics(self):
293 metrics = {}
294 metrics['health_status'] = Metric(
295 'untyped',
296 'health_status',
297 'Cluster health status'
298 )
299 metrics['mon_quorum_status'] = Metric(
300 'gauge',
301 'mon_quorum_status',
302 'Monitors in quorum',
303 ('ceph_daemon',)
304 )
305 metrics['fs_metadata'] = Metric(
306 'untyped',
307 'fs_metadata',
308 'FS Metadata',
309 FS_METADATA
310 )
311 metrics['mds_metadata'] = Metric(
312 'untyped',
313 'mds_metadata',
314 'MDS Metadata',
315 MDS_METADATA
316 )
317 metrics['mon_metadata'] = Metric(
318 'untyped',
319 'mon_metadata',
320 'MON Metadata',
321 MON_METADATA
322 )
323 metrics['mgr_metadata'] = Metric(
324 'gauge',
325 'mgr_metadata',
326 'MGR metadata',
327 MGR_METADATA
328 )
329 metrics['mgr_status'] = Metric(
330 'gauge',
331 'mgr_status',
332 'MGR status (0=standby, 1=active)',
333 MGR_STATUS
334 )
335 metrics['mgr_module_status'] = Metric(
336 'gauge',
337 'mgr_module_status',
338 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
339 MGR_MODULE_STATUS
340 )
341 metrics['mgr_module_can_run'] = Metric(
342 'gauge',
343 'mgr_module_can_run',
344 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
345 MGR_MODULE_CAN_RUN
346 )
347 metrics['osd_metadata'] = Metric(
348 'untyped',
349 'osd_metadata',
350 'OSD Metadata',
351 OSD_METADATA
352 )
353
354 # The reason for having this separate to OSD_METADATA is
355 # so that we can stably use the same tag names that
356 # the Prometheus node_exporter does
357 metrics['disk_occupation'] = Metric(
358 'untyped',
359 'disk_occupation',
360 'Associate Ceph daemon with disk used',
361 DISK_OCCUPATION
362 )
363
364 metrics['pool_metadata'] = Metric(
365 'untyped',
366 'pool_metadata',
367 'POOL Metadata',
368 POOL_METADATA
369 )
370
371 metrics['rgw_metadata'] = Metric(
372 'untyped',
373 'rgw_metadata',
374 'RGW Metadata',
375 RGW_METADATA
376 )
377
378 metrics['rbd_mirror_metadata'] = Metric(
379 'untyped',
380 'rbd_mirror_metadata',
381 'RBD Mirror Metadata',
382 RBD_MIRROR_METADATA
383 )
384
385 metrics['pg_total'] = Metric(
386 'gauge',
387 'pg_total',
388 'PG Total Count per Pool',
389 ('pool_id',)
390 )
391
392 for flag in OSD_FLAGS:
393 path = 'osd_flag_{}'.format(flag)
394 metrics[path] = Metric(
395 'untyped',
396 path,
397 'OSD Flag {}'.format(flag)
398 )
399 for state in OSD_STATUS:
400 path = 'osd_{}'.format(state)
401 metrics[path] = Metric(
402 'untyped',
403 path,
404 'OSD status {}'.format(state),
405 ('ceph_daemon',)
406 )
407 for stat in OSD_STATS:
408 path = 'osd_{}'.format(stat)
409 metrics[path] = Metric(
410 'gauge',
411 path,
412 'OSD stat {}'.format(stat),
413 ('ceph_daemon',)
414 )
415 for stat in OSD_POOL_STATS:
416 path = 'pool_{}'.format(stat)
417 metrics[path] = Metric(
418 'gauge',
419 path,
420 "OSD pool stats: {}".format(stat),
421 ('pool_id',)
422 )
423 for state in PG_STATES:
424 path = 'pg_{}'.format(state)
425 metrics[path] = Metric(
426 'gauge',
427 path,
428 'PG {} per pool'.format(state),
429 ('pool_id',)
430 )
431 for state in DF_CLUSTER:
432 path = 'cluster_{}'.format(state)
433 metrics[path] = Metric(
434 'gauge',
435 path,
436 'DF {}'.format(state),
437 )
438 for state in DF_POOL:
439 path = 'pool_{}'.format(state)
440 metrics[path] = Metric(
441 'gauge',
442 path,
443 'DF pool {}'.format(state),
444 ('pool_id',)
445 )
446 for state in NUM_OBJECTS:
447 path = 'num_objects_{}'.format(state)
448 metrics[path] = Metric(
449 'gauge',
450 path,
451 'Number of {} objects'.format(state),
452 )
453
454 for check in HEALTH_CHECKS:
455 path = 'healthcheck_{}'.format(check.name.lower())
456 metrics[path] = Metric(
457 'gauge',
458 path,
459 check.description,
460 )
461
462 return metrics
463
464 @profile_method()
465 def get_health(self):
466
467 def _get_value(message, delim=' ', word_pos=0):
468 """Extract value from message (default is 1st field)"""
469 v_str = message.split(delim)[word_pos]
470 if v_str.isdigit():
471 return int(v_str), 0
472 return 0, 1
473
474 health = json.loads(self.get('health')['json'])
475 # set overall health
476 self.metrics['health_status'].set(
477 health_status_to_number(health['status'])
478 )
479
480 # Examine the health to see if any health checks triggered need to
481 # become a metric.
482 active_healthchecks = health.get('checks', {})
483 active_names = active_healthchecks.keys()
484
485 for check in HEALTH_CHECKS:
486 path = 'healthcheck_{}'.format(check.name.lower())
487
488 if path in self.metrics:
489
490 if check.name in active_names:
491 check_data = active_healthchecks[check.name]
492 message = check_data['summary'].get('message', '')
493 v, err = 0, 0
494
495 if check.name == "SLOW_OPS":
496 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have slow ops.
497 v, err = _get_value(message)
498
499 if err:
500 self.log.error("healthcheck {} message format is incompatible and has been dropped".format(check.name))
501 # drop the metric, so it's no longer emitted
502 del self.metrics[path]
503 continue
504 else:
505 self.metrics[path].set(v)
506 else:
507 # health check is not active, so give it a default of 0
508 self.metrics[path].set(0)
509
510 @profile_method()
511 def get_pool_stats(self):
512 # retrieve pool stats to provide per pool recovery metrics
513 # (osd_pool_stats moved to mgr in Mimic)
514 pstats = self.get('osd_pool_stats')
515 for pool in pstats['pool_stats']:
516 for stat in OSD_POOL_STATS:
517 self.metrics['pool_{}'.format(stat)].set(
518 pool['recovery_rate'].get(stat, 0),
519 (pool['pool_id'],)
520 )
521
522 @profile_method()
523 def get_df(self):
524 # maybe get the to-be-exported metrics from a config?
525 df = self.get('df')
526 for stat in DF_CLUSTER:
527 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
528
529 for pool in df['pools']:
530 for stat in DF_POOL:
531 self.metrics['pool_{}'.format(stat)].set(
532 pool['stats'][stat],
533 (pool['id'],)
534 )
535
536 @profile_method()
537 def get_fs(self):
538 fs_map = self.get('fs_map')
539 servers = self.get_service_list()
540 self.log.debug('standbys: {}'.format(fs_map['standbys']))
541 # export standby mds metadata, default standby fs_id is '-1'
542 for standby in fs_map['standbys']:
543 id_ = standby['name']
544 host_version = servers.get((id_, 'mds'), ('', ''))
545 self.metrics['mds_metadata'].set(1, (
546 'mds.{}'.format(id_), '-1',
547 host_version[0], standby['addr'],
548 standby['rank'], host_version[1]
549 ))
550 for fs in fs_map['filesystems']:
551 # collect fs metadata
552 data_pools = ",".join([str(pool)
553 for pool in fs['mdsmap']['data_pools']])
554 self.metrics['fs_metadata'].set(1, (
555 data_pools,
556 fs['id'],
557 fs['mdsmap']['metadata_pool'],
558 fs['mdsmap']['fs_name']
559 ))
560 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
561 for gid, daemon in fs['mdsmap']['info'].items():
562 id_ = daemon['name']
563 host_version = servers.get((id_, 'mds'), ('', ''))
564 self.metrics['mds_metadata'].set(1, (
565 'mds.{}'.format(id_), fs['id'],
566 host_version[0], daemon['addr'],
567 daemon['rank'], host_version[1]
568 ))
569
570 @profile_method()
571 def get_quorum_status(self):
572 mon_status = json.loads(self.get('mon_status')['json'])
573 servers = self.get_service_list()
574 for mon in mon_status['monmap']['mons']:
575 rank = mon['rank']
576 id_ = mon['name']
577 host_version = servers.get((id_, 'mon'), ('', ''))
578 self.metrics['mon_metadata'].set(1, (
579 'mon.{}'.format(id_), host_version[0],
580 mon['public_addr'].rsplit(':', 1)[0], rank,
581 host_version[1]
582 ))
583 in_quorum = int(rank in mon_status['quorum'])
584 self.metrics['mon_quorum_status'].set(in_quorum, (
585 'mon.{}'.format(id_),
586 ))
587
588 @profile_method()
589 def get_mgr_status(self):
590 mgr_map = self.get('mgr_map')
591 servers = self.get_service_list()
592
593 active = mgr_map['active_name']
594 standbys = [s.get('name') for s in mgr_map['standbys']]
595
596 all_mgrs = list(standbys)
597 all_mgrs.append(active)
598
599 all_modules = {module.get('name'):module.get('can_run') for module in mgr_map['available_modules']}
600
601 for mgr in all_mgrs:
602 host_version = servers.get((mgr, 'mgr'), ('', ''))
603 if mgr == active:
604 _state = 1
605 else:
606 _state = 0
607
608 self.metrics['mgr_metadata'].set(1, (
609 'mgr.{}'.format(mgr), host_version[0],
610 host_version[1]
611 ))
612 self.metrics['mgr_status'].set(_state, (
613 'mgr.{}'.format(mgr),
614 ))
615 always_on_modules = mgr_map['always_on_modules'].get(self.release_name, [])
616 active_modules = list(always_on_modules)
617 active_modules.extend(mgr_map['modules'])
618
619 for mod_name in all_modules.keys():
620
621 if mod_name in always_on_modules:
622 _state = 2
623 elif mod_name in active_modules:
624 _state = 1
625 else:
626 _state = 0
627
628 _can_run = 1 if all_modules[mod_name] else 0
629 self.metrics['mgr_module_status'].set(_state, (mod_name,))
630 self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
631
632 @profile_method()
633 def get_pg_status(self):
634
635 pg_summary = self.get('pg_summary')
636
637 for pool in pg_summary['by_pool']:
638 num_by_state = defaultdict(int) # type: DefaultDict[str, int]
639
640 for state_name, count in pg_summary['by_pool'][pool].items():
641 for state in state_name.split('+'):
642 num_by_state[state] += count
643 num_by_state['total'] += count
644
645 for state, num in num_by_state.items():
646 try:
647 self.metrics["pg_{}".format(state)].set(num, (pool,))
648 except KeyError:
649 self.log.warning("skipping pg in unknown state {}".format(state))
650
651 @profile_method()
652 def get_osd_stats(self):
653 osd_stats = self.get('osd_stats')
654 for osd in osd_stats['osd_stats']:
655 id_ = osd['osd']
656 for stat in OSD_STATS:
657 val = osd['perf_stat'][stat]
658 self.metrics['osd_{}'.format(stat)].set(val, (
659 'osd.{}'.format(id_),
660 ))
661
662 def get_service_list(self):
663 ret = {}
664 for server in self.list_servers():
665 version = server.get('ceph_version', '')
666 host = server.get('hostname', '')
667 for service in server.get('services', []):
668 ret.update({(service['id'], service['type']): (host, version)})
669 return ret
670
671 @profile_method()
672 def get_metadata_and_osd_status(self):
673 osd_map = self.get('osd_map')
674 osd_flags = osd_map['flags'].split(',')
675 for flag in OSD_FLAGS:
676 self.metrics['osd_flag_{}'.format(flag)].set(
677 int(flag in osd_flags)
678 )
679
680 osd_devices = self.get('osd_map_crush')['devices']
681 servers = self.get_service_list()
682 for osd in osd_map['osds']:
683 # id can be used to link osd metrics and metadata
684 id_ = osd['osd']
685 # collect osd metadata
686 p_addr = osd['public_addr'].rsplit(':', 1)[0]
687 c_addr = osd['cluster_addr'].rsplit(':', 1)[0]
688 if p_addr == "-" or c_addr == "-":
689 self.log.info(
690 "Missing address metadata for osd {0}, skipping occupation"
691 " and metadata records for this osd".format(id_)
692 )
693 continue
694
695 dev_class = None
696 for osd_device in osd_devices:
697 if osd_device['id'] == id_:
698 dev_class = osd_device.get('class', '')
699 break
700
701 if dev_class is None:
702 self.log.info("OSD {0} is missing from CRUSH map, "
703 "skipping output".format(id_))
704 continue
705
706 host_version = servers.get((str(id_), 'osd'), ('', ''))
707
708 # collect disk occupation metadata
709 osd_metadata = self.get_metadata("osd", str(id_))
710 if osd_metadata is None:
711 continue
712
713 obj_store = osd_metadata.get('osd_objectstore', '')
714 f_iface = osd_metadata.get('front_iface', '')
715 b_iface = osd_metadata.get('back_iface', '')
716
717 self.metrics['osd_metadata'].set(1, (
718 b_iface,
719 'osd.{}'.format(id_),
720 c_addr,
721 dev_class,
722 f_iface,
723 host_version[0],
724 obj_store,
725 p_addr,
726 host_version[1]
727 ))
728
729 # collect osd status
730 for state in OSD_STATUS:
731 status = osd[state]
732 self.metrics['osd_{}'.format(state)].set(status, (
733 'osd.{}'.format(id_),
734 ))
735
736 osd_dev_node = None
737 if obj_store == "filestore":
738 # collect filestore backend device
739 osd_dev_node = osd_metadata.get(
740 'backend_filestore_dev_node', None)
741 # collect filestore journal device
742 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
743 osd_db_dev_node = ''
744 elif obj_store == "bluestore":
745 # collect bluestore backend device
746 osd_dev_node = osd_metadata.get(
747 'bluestore_bdev_dev_node', None)
748 # collect bluestore wal backend
749 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
750 # collect bluestore db backend
751 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
752 if osd_dev_node and osd_dev_node == "unknown":
753 osd_dev_node = None
754
755 osd_hostname = osd_metadata.get('hostname', None)
756 if osd_dev_node and osd_hostname:
757 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
758 id_, osd_hostname, osd_dev_node))
759 self.metrics['disk_occupation'].set(1, (
760 "osd.{0}".format(id_),
761 osd_dev_node,
762 osd_db_dev_node,
763 osd_wal_dev_node,
764 osd_hostname
765 ))
766 else:
767 self.log.info("Missing dev node metadata for osd {0}, skipping "
768 "occupation record for this osd".format(id_))
769
770 for pool in osd_map['pools']:
771 self.metrics['pool_metadata'].set(
772 1, (pool['pool'], pool['pool_name']))
773
774 # Populate other servers metadata
775 for key, value in servers.items():
776 service_id, service_type = key
777 if service_type == 'rgw':
778 hostname, version = value
779 self.metrics['rgw_metadata'].set(
780 1,
781 ('{}.{}'.format(service_type, service_id),
782 hostname, version)
783 )
784 elif service_type == 'rbd-mirror':
785 mirror_metadata = self.get_metadata('rbd-mirror', service_id)
786 if mirror_metadata is None:
787 continue
788 mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
789 service_id)
790 self.metrics['rbd_mirror_metadata'].set(
791 1, (mirror_metadata.get(k, '')
792 for k in RBD_MIRROR_METADATA)
793 )
794
795 @profile_method()
796 def get_num_objects(self):
797 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
798 for obj in NUM_OBJECTS:
799 stat = 'num_objects_{}'.format(obj)
800 self.metrics[stat].set(pg_sum[stat])
801
802 @profile_method()
803 def get_rbd_stats(self):
804 # Per RBD image stats is collected by registering a dynamic osd perf
805 # stats query that tells OSDs to group stats for requests associated
806 # with RBD objects by pool, namespace, and image id, which are
807 # extracted from the request object names or other attributes.
808 # The RBD object names have the following prefixes:
809 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
810 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
811 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
812 # The pool_id in the object name is the id of the pool with the image
813 # metdata, and should be used in the image spec. If there is no pool_id
814 # in the object name, the image pool is the pool where the object is
815 # located.
816
817 # Parse rbd_stats_pools option, which is a comma or space separated
818 # list of pool[/namespace] entries. If no namespace is specifed the
819 # stats are collected for every namespace in the pool. The wildcard
820 # '*' can be used to indicate all pools or namespaces
821 pools_string = self.get_localized_module_option('rbd_stats_pools', '')
822 pool_keys = []
823 for x in re.split('[\s,]+', pools_string):
824 if not x:
825 continue
826
827 s = x.split('/', 2)
828 pool_name = s[0]
829 namespace_name = None
830 if len(s) == 2:
831 namespace_name = s[1]
832
833 if pool_name == "*":
834 # collect for all pools
835 osd_map = self.get('osd_map')
836 for pool in osd_map['pools']:
837 if 'rbd' not in pool.get('application_metadata', {}):
838 continue
839 pool_keys.append((pool['pool_name'], namespace_name))
840 else:
841 pool_keys.append((pool_name, namespace_name))
842
843 pools = {} # type: Dict[str, Set[str]]
844 for pool_key in pool_keys:
845 pool_name = pool_key[0]
846 namespace_name = pool_key[1]
847 if not namespace_name or namespace_name == "*":
848 # empty set means collect for all namespaces
849 pools[pool_name] = set()
850 continue
851
852 if pool_name not in pools:
853 pools[pool_name] = set()
854 elif not pools[pool_name]:
855 continue
856 pools[pool_name].add(namespace_name)
857
858 rbd_stats_pools = {}
859 for pool_id in self.rbd_stats['pools'].keys():
860 name = self.rbd_stats['pools'][pool_id]['name']
861 if name not in pools:
862 del self.rbd_stats['pools'][pool_id]
863 else:
864 rbd_stats_pools[name] = \
865 self.rbd_stats['pools'][pool_id]['ns_names']
866
867 pools_refreshed = False
868 if pools:
869 next_refresh = self.rbd_stats['pools_refresh_time'] + \
870 self.get_localized_module_option(
871 'rbd_stats_pools_refresh_interval', 300)
872 if rbd_stats_pools != pools or time.time() >= next_refresh:
873 self.refresh_rbd_stats_pools(pools)
874 pools_refreshed = True
875
876 pool_ids = list(self.rbd_stats['pools'])
877 pool_ids.sort()
878 pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
879
880 nspace_names = []
881 for pool_id, pool in self.rbd_stats['pools'].items():
882 if pool['ns_names']:
883 nspace_names.extend(pool['ns_names'])
884 else:
885 nspace_names = []
886 break
887 if nspace_names:
888 namespace_regex = '^(' + \
889 "|".join([re.escape(x)
890 for x in set(nspace_names)]) + ')$'
891 else:
892 namespace_regex = '^(.*)$'
893
894 if 'query' in self.rbd_stats and \
895 (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex'] or
896 namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex']):
897 self.remove_osd_perf_query(self.rbd_stats['query_id'])
898 del self.rbd_stats['query_id']
899 del self.rbd_stats['query']
900
901 if not self.rbd_stats['pools']:
902 return
903
904 counters_info = self.rbd_stats['counters_info']
905
906 if 'query_id' not in self.rbd_stats:
907 query = {
908 'key_descriptor': [
909 {'type': 'pool_id', 'regex': pool_id_regex},
910 {'type': 'namespace', 'regex': namespace_regex},
911 {'type': 'object_name',
912 'regex': '^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
913 ],
914 'performance_counter_descriptors': list(counters_info),
915 }
916 query_id = self.add_osd_perf_query(query)
917 if query_id is None:
918 self.log.error('failed to add query %s' % query)
919 return
920 self.rbd_stats['query'] = query
921 self.rbd_stats['query_id'] = query_id
922
923 res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
924 for c in res['counters']:
925 # if the pool id is not found in the object name use id of the
926 # pool where the object is located
927 if c['k'][2][0]:
928 pool_id = int(c['k'][2][0])
929 else:
930 pool_id = int(c['k'][0][0])
931 if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
932 self.refresh_rbd_stats_pools(pools)
933 pools_refreshed = True
934 if pool_id not in self.rbd_stats['pools']:
935 continue
936 pool = self.rbd_stats['pools'][pool_id]
937 nspace_name = c['k'][1][0]
938 if nspace_name not in pool['images']:
939 continue
940 image_id = c['k'][2][1]
941 if image_id not in pool['images'][nspace_name] and \
942 not pools_refreshed:
943 self.refresh_rbd_stats_pools(pools)
944 pool = self.rbd_stats['pools'][pool_id]
945 pools_refreshed = True
946 if image_id not in pool['images'][nspace_name]:
947 continue
948 counters = pool['images'][nspace_name][image_id]['c']
949 for i in range(len(c['c'])):
950 counters[i][0] += c['c'][i][0]
951 counters[i][1] += c['c'][i][1]
952
953 label_names = ("pool", "namespace", "image")
954 for pool_id, pool in self.rbd_stats['pools'].items():
955 pool_name = pool['name']
956 for nspace_name, images in pool['images'].items():
957 for image_id in images:
958 image_name = images[image_id]['n']
959 counters = images[image_id]['c']
960 i = 0
961 for key in counters_info:
962 counter_info = counters_info[key]
963 stattype = self._stattype_to_str(counter_info['type'])
964 labels = (pool_name, nspace_name, image_name)
965 if counter_info['type'] == self.PERFCOUNTER_COUNTER:
966 path = 'rbd_' + key
967 if path not in self.metrics:
968 self.metrics[path] = Metric(
969 stattype,
970 path,
971 counter_info['desc'],
972 label_names,
973 )
974 self.metrics[path].set(counters[i][0], labels)
975 elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
976 path = 'rbd_' + key + '_sum'
977 if path not in self.metrics:
978 self.metrics[path] = Metric(
979 stattype,
980 path,
981 counter_info['desc'] + ' Total',
982 label_names,
983 )
984 self.metrics[path].set(counters[i][0], labels)
985 path = 'rbd_' + key + '_count'
986 if path not in self.metrics:
987 self.metrics[path] = Metric(
988 'counter',
989 path,
990 counter_info['desc'] + ' Count',
991 label_names,
992 )
993 self.metrics[path].set(counters[i][1], labels)
994 i += 1
995
996 def refresh_rbd_stats_pools(self, pools):
997 self.log.debug('refreshing rbd pools %s' % (pools))
998
999 rbd = RBD()
1000 counters_info = self.rbd_stats['counters_info']
1001 for pool_name, cfg_ns_names in pools.items():
1002 try:
1003 pool_id = self.rados.pool_lookup(pool_name)
1004 with self.rados.open_ioctx(pool_name) as ioctx:
1005 if pool_id not in self.rbd_stats['pools']:
1006 self.rbd_stats['pools'][pool_id] = {'images': {}}
1007 pool = self.rbd_stats['pools'][pool_id]
1008 pool['name'] = pool_name
1009 pool['ns_names'] = cfg_ns_names
1010 if cfg_ns_names:
1011 nspace_names = list(cfg_ns_names)
1012 else:
1013 nspace_names = [''] + rbd.namespace_list(ioctx)
1014 for nspace_name in pool['images']:
1015 if nspace_name not in nspace_names:
1016 del pool['images'][nspace_name]
1017 for nspace_name in nspace_names:
1018 if (nspace_name and
1019 not rbd.namespace_exists(ioctx, nspace_name)):
1020 self.log.debug('unknown namespace %s for pool %s' %
1021 (nspace_name, pool_name))
1022 continue
1023 ioctx.set_namespace(nspace_name)
1024 if nspace_name not in pool['images']:
1025 pool['images'][nspace_name] = {}
1026 namespace = pool['images'][nspace_name]
1027 images = {}
1028 for image_meta in RBD().list2(ioctx):
1029 image = {'n': image_meta['name']}
1030 image_id = image_meta['id']
1031 if image_id in namespace:
1032 image['c'] = namespace[image_id]['c']
1033 else:
1034 image['c'] = [[0, 0] for x in counters_info]
1035 images[image_id] = image
1036 pool['images'][nspace_name] = images
1037 except Exception as e:
1038 self.log.error('failed listing pool %s: %s' % (pool_name, e))
1039 self.rbd_stats['pools_refresh_time'] = time.time()
1040
1041 def shutdown_rbd_stats(self):
1042 if 'query_id' in self.rbd_stats:
1043 self.remove_osd_perf_query(self.rbd_stats['query_id'])
1044 del self.rbd_stats['query_id']
1045 del self.rbd_stats['query']
1046 self.rbd_stats['pools'].clear()
1047
1048 def add_fixed_name_metrics(self):
1049 """
1050 Add fixed name metrics from existing ones that have details in their names
1051 that should be in labels (not in name).
1052 For backward compatibility, a new fixed name metric is created (instead of replacing)
1053 and details are put in new labels.
1054 Intended for RGW sync perf. counters but extendable as required.
1055 See: https://tracker.ceph.com/issues/45311
1056 """
1057 new_metrics = {}
1058 for metric_path in self.metrics.keys():
1059 # Address RGW sync perf. counters.
1060 match = re.search('^data-sync-from-(.*)\.', metric_path)
1061 if match:
1062 new_path = re.sub('from-([^.]*)', 'from-zone', metric_path)
1063 if new_path not in new_metrics:
1064 new_metrics[new_path] = Metric(
1065 self.metrics[metric_path].mtype,
1066 new_path,
1067 self.metrics[metric_path].desc,
1068 self.metrics[metric_path].labelnames + ('source_zone',)
1069 )
1070 for label_values, value in self.metrics[metric_path].value.items():
1071 new_metrics[new_path].set(value, label_values + (match.group(1),))
1072
1073 self.metrics.update(new_metrics)
1074
1075 @profile_method(True)
1076 def collect(self):
1077 # Clear the metrics before scraping
1078 for k in self.metrics.keys():
1079 self.metrics[k].clear()
1080
1081 self.get_health()
1082 self.get_df()
1083 self.get_pool_stats()
1084 self.get_fs()
1085 self.get_osd_stats()
1086 self.get_quorum_status()
1087 self.get_mgr_status()
1088 self.get_metadata_and_osd_status()
1089 self.get_pg_status()
1090 self.get_num_objects()
1091
1092 for daemon, counters in self.get_all_perf_counters().items():
1093 for path, counter_info in counters.items():
1094 # Skip histograms, they are represented by long running avgs
1095 stattype = self._stattype_to_str(counter_info['type'])
1096 if not stattype or stattype == 'histogram':
1097 self.log.debug('ignoring %s, type %s' % (path, stattype))
1098 continue
1099
1100 path, label_names, labels = self._perfpath_to_path_labels(
1101 daemon, path)
1102
1103 # Get the value of the counter
1104 value = self._perfvalue_to_value(
1105 counter_info['type'], counter_info['value'])
1106
1107 # Represent the long running avgs as sum/count pairs
1108 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
1109 _path = path + '_sum'
1110 if _path not in self.metrics:
1111 self.metrics[_path] = Metric(
1112 stattype,
1113 _path,
1114 counter_info['description'] + ' Total',
1115 label_names,
1116 )
1117 self.metrics[_path].set(value, labels)
1118
1119 _path = path + '_count'
1120 if _path not in self.metrics:
1121 self.metrics[_path] = Metric(
1122 'counter',
1123 _path,
1124 counter_info['description'] + ' Count',
1125 label_names,
1126 )
1127 self.metrics[_path].set(counter_info['count'], labels,)
1128 else:
1129 if path not in self.metrics:
1130 self.metrics[path] = Metric(
1131 stattype,
1132 path,
1133 counter_info['description'],
1134 label_names,
1135 )
1136 self.metrics[path].set(value, labels)
1137
1138 self.add_fixed_name_metrics()
1139 self.get_rbd_stats()
1140
1141 # Return formatted metrics and clear no longer used data
1142 _metrics = [m.str_expfmt() for m in self.metrics.values()]
1143 for k in self.metrics.keys():
1144 self.metrics[k].clear()
1145
1146 return ''.join(_metrics) + '\n'
1147
1148 def get_file_sd_config(self):
1149 servers = self.list_servers()
1150 targets = []
1151 for server in servers:
1152 hostname = server.get('hostname', '')
1153 for service in server.get('services', []):
1154 if service['type'] != 'mgr':
1155 continue
1156 id_ = service['id']
1157 port = self._get_module_option('server_port', DEFAULT_PORT, id_)
1158 targets.append(f'{hostname}:{port}')
1159 ret = [
1160 {
1161 "targets": targets,
1162 "labels": {}
1163 }
1164 ]
1165 return 0, json.dumps(ret), ""
1166
1167 def self_test(self):
1168 self.collect()
1169 self.get_file_sd_config()
1170
1171 def handle_command(self, inbuf, cmd):
1172 if cmd['prefix'] == 'prometheus file_sd_config':
1173 return self.get_file_sd_config()
1174 else:
1175 return (-errno.EINVAL, '',
1176 "Command not found '{0}'".format(cmd['prefix']))
1177
1178 def serve(self):
1179
1180 class Root(object):
1181
1182 # collapse everything to '/'
1183 def _cp_dispatch(self, vpath):
1184 cherrypy.request.path = ''
1185 return self
1186
1187 @cherrypy.expose
1188 def index(self):
1189 return '''<!DOCTYPE html>
1190 <html>
1191 <head><title>Ceph Exporter</title></head>
1192 <body>
1193 <h1>Ceph Exporter</h1>
1194 <p><a href='/metrics'>Metrics</a></p>
1195 </body>
1196 </html>'''
1197
1198 @cherrypy.expose
1199 def metrics(self):
1200 # Lock the function execution
1201 assert isinstance(_global_instance, Module)
1202 with _global_instance.collect_lock:
1203 return self._metrics(_global_instance)
1204
1205 @staticmethod
1206 def _metrics(instance):
1207 # type: (Module) -> Any
1208 # Return cached data if available
1209 if not instance.collect_cache:
1210 raise cherrypy.HTTPError(503, 'No cached data available yet')
1211
1212 def respond():
1213 assert isinstance(instance, Module)
1214 cherrypy.response.headers['Content-Type'] = 'text/plain'
1215 return instance.collect_cache
1216
1217 if instance.collect_time < instance.scrape_interval:
1218 # Respond if cache isn't stale
1219 return respond()
1220
1221 if instance.stale_cache_strategy == instance.STALE_CACHE_RETURN:
1222 # Respond even if cache is stale
1223 instance.log.info(
1224 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1225 'returning metrics from stale cache.'.format(
1226 instance.collect_time,
1227 instance.collect_time - instance.scrape_interval
1228 )
1229 )
1230 return respond()
1231
1232 if instance.stale_cache_strategy == instance.STALE_CACHE_FAIL:
1233 # Fail if cache is stale
1234 msg = (
1235 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1236 'returning "service unavailable".'.format(
1237 instance.collect_time,
1238 instance.collect_time - instance.scrape_interval,
1239 )
1240 )
1241 instance.log.error(msg)
1242 raise cherrypy.HTTPError(503, msg)
1243
1244 # Make the cache timeout for collecting configurable
1245 self.scrape_interval = float(self.get_localized_module_option('scrape_interval', 15.0))
1246
1247 self.stale_cache_strategy = self.get_localized_module_option('stale_cache_strategy', 'log')
1248 if self.stale_cache_strategy not in [self.STALE_CACHE_FAIL,
1249 self.STALE_CACHE_RETURN]:
1250 self.stale_cache_strategy = self.STALE_CACHE_FAIL
1251
1252 server_addr = self.get_localized_module_option(
1253 'server_addr', get_default_addr())
1254 server_port = self.get_localized_module_option(
1255 'server_port', DEFAULT_PORT)
1256 self.log.info(
1257 "server_addr: %s server_port: %s" %
1258 (server_addr, server_port)
1259 )
1260
1261 self.metrics_thread.start()
1262
1263 # Publish the URI that others may use to access the service we're
1264 # about to start serving
1265 self.set_uri('http://{0}:{1}/'.format(
1266 socket.getfqdn() if server_addr in ['::', '0.0.0.0'] else server_addr,
1267 server_port
1268 ))
1269
1270 cherrypy.config.update({
1271 'server.socket_host': server_addr,
1272 'server.socket_port': int(server_port),
1273 'engine.autoreload.on': False
1274 })
1275 cherrypy.tree.mount(Root(), "/")
1276 self.log.info('Starting engine...')
1277 cherrypy.engine.start()
1278 self.log.info('Engine started.')
1279 # wait for the shutdown event
1280 self.shutdown_event.wait()
1281 self.shutdown_event.clear()
1282 # tell metrics collection thread to stop collecting new metrics
1283 self.metrics_thread.stop()
1284 cherrypy.engine.stop()
1285 self.log.info('Engine stopped.')
1286 self.shutdown_rbd_stats()
1287 # wait for the metrics collection thread to stop
1288 self.metrics_thread.join()
1289
1290 def shutdown(self):
1291 self.log.info('Stopping engine...')
1292 self.shutdown_event.set()
1293
1294
1295 class StandbyModule(MgrStandbyModule):
1296 def __init__(self, *args, **kwargs):
1297 super(StandbyModule, self).__init__(*args, **kwargs)
1298 self.shutdown_event = threading.Event()
1299
1300 def serve(self):
1301 server_addr = self.get_localized_module_option(
1302 'server_addr', get_default_addr())
1303 server_port = self.get_localized_module_option(
1304 'server_port', DEFAULT_PORT)
1305 self.log.info("server_addr: %s server_port: %s" %
1306 (server_addr, server_port))
1307 cherrypy.config.update({
1308 'server.socket_host': server_addr,
1309 'server.socket_port': int(server_port),
1310 'engine.autoreload.on': False
1311 })
1312
1313 module = self
1314
1315 class Root(object):
1316 @cherrypy.expose
1317 def index(self):
1318 active_uri = module.get_active_uri()
1319 return '''<!DOCTYPE html>
1320 <html>
1321 <head><title>Ceph Exporter</title></head>
1322 <body>
1323 <h1>Ceph Exporter</h1>
1324 <p><a href='{}metrics'>Metrics</a></p>
1325 </body>
1326 </html>'''.format(active_uri)
1327
1328 @cherrypy.expose
1329 def metrics(self):
1330 cherrypy.response.headers['Content-Type'] = 'text/plain'
1331 return ''
1332
1333 cherrypy.tree.mount(Root(), '/', {})
1334 self.log.info('Starting engine...')
1335 cherrypy.engine.start()
1336 self.log.info('Engine started.')
1337 # Wait for shutdown event
1338 self.shutdown_event.wait()
1339 self.shutdown_event.clear()
1340 cherrypy.engine.stop()
1341 self.log.info('Engine stopped.')
1342
1343 def shutdown(self):
1344 self.log.info("Stopping engine...")
1345 self.shutdown_event.set()
1346 self.log.info("Stopped engine")