]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
import ceph 12.2.12
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 from distutils.version import StrictVersion
3 import json
4 import errno
5 import math
6 import os
7 import socket
8 import threading
9 import time
10 from mgr_module import MgrModule, MgrStandbyModule
11
12 # Defaults for the Prometheus HTTP server. Can also set in config-key
13 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
14 # for Prometheus exporter port registry
15
16 DEFAULT_ADDR = '::'
17 DEFAULT_PORT = 9283
18
19 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
20 # that the ports its listening on are in fact bound. When using the any address
21 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
22 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
23 # exception.
24 if cherrypy is not None:
25 v = StrictVersion(cherrypy.__version__)
26 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
27 # centos:7) and back to at least 3.0.0.
28 if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"):
29 # https://github.com/cherrypy/cherrypy/issues/1100
30 from cherrypy.process import servers
31 servers.wait_for_occupied_port = lambda host, port: None
32
33 # cherrypy likes to sys.exit on error. don't let it take us down too!
34 def os_exit_noop(*args, **kwargs):
35 pass
36
37
38 os._exit = os_exit_noop
39
40
41 # to access things in class Module from subclass Root. Because
42 # it's a dict, the writer doesn't need to declare 'global' for access
43
44 _global_instance = {'plugin': None}
45
46
47 def global_instance():
48 assert _global_instance['plugin'] is not None
49 return _global_instance['plugin']
50
51
52 def health_status_to_number(status):
53
54 if status == 'HEALTH_OK':
55 return 0
56 elif status == 'HEALTH_WARN':
57 return 1
58 elif status == 'HEALTH_ERR':
59 return 2
60
61 PG_STATES = [
62 "active",
63 "clean",
64 "down",
65 "recovery_unfound",
66 "backfill_unfound",
67 "scrubbing",
68 "degraded",
69 "inconsistent",
70 "peering",
71 "repair",
72 "recovering",
73 "forced_recovery",
74 "backfill_wait",
75 "incomplete",
76 "stale",
77 "remapped",
78 "deep",
79 "backfilling",
80 "forced_backfill",
81 "backfill_toofull",
82 "recovery_wait",
83 "recovery_toofull",
84 "undersized",
85 "activating",
86 "peered",
87 "snaptrim",
88 "snaptrim_wait",
89 "snaptrim_error",
90 "creating",
91 "unknown"]
92
93 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_objects']
94
95 DF_POOL = ['max_avail', 'bytes_used', 'raw_bytes_used', 'objects', 'dirty',
96 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
97
98 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
99 'norecover', 'noscrub', 'nodeep-scrub')
100
101 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
102
103 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
104 'ceph_version')
105
106 MON_METADATA = ('ceph_daemon', 'hostname', 'public_addr', 'rank', 'ceph_version')
107
108 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
109 'front_iface', 'hostname', 'objectstore', 'public_addr',
110 'ceph_version')
111
112 OSD_STATUS = ['weight', 'up', 'in']
113
114 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
115
116 POOL_METADATA = ('pool_id', 'name')
117
118 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
119
120 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device', 'wal_device', 'instance')
121
122 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
123
124
125 class Metric(object):
126 def __init__(self, mtype, name, desc, labels=None):
127 self.mtype = mtype
128 self.name = name
129 self.desc = desc
130 self.labelnames = labels # tuple if present
131 self.value = {} # indexed by label values
132
133 def clear(self):
134 self.value = {}
135
136 def set(self, value, labelvalues=None):
137 # labelvalues must be a tuple
138 labelvalues = labelvalues or ('',)
139 self.value[labelvalues] = value
140
141 def str_expfmt(self):
142
143 def promethize(path):
144 ''' replace illegal metric name characters '''
145 result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
146
147 # Hyphens usually turn into underscores, unless they are
148 # trailing
149 if result.endswith("-"):
150 result = result[0:-1] + "_minus"
151 else:
152 result = result.replace("-", "_")
153
154 return "ceph_{0}".format(result)
155
156 def floatstr(value):
157 ''' represent as Go-compatible float '''
158 if value == float('inf'):
159 return '+Inf'
160 if value == float('-inf'):
161 return '-Inf'
162 if math.isnan(value):
163 return 'NaN'
164 return repr(float(value))
165
166 name = promethize(self.name)
167 expfmt = '''
168 # HELP {name} {desc}
169 # TYPE {name} {mtype}'''.format(
170 name=name,
171 desc=self.desc,
172 mtype=self.mtype,
173 )
174
175 for labelvalues, value in self.value.items():
176 if self.labelnames:
177 labels = zip(self.labelnames, labelvalues)
178 labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
179 else:
180 labels = ''
181 if labels:
182 fmtstr = '\n{name}{{{labels}}} {value}'
183 else:
184 fmtstr = '\n{name} {value}'
185 expfmt += fmtstr.format(
186 name=name,
187 labels=labels,
188 value=floatstr(value),
189 )
190 return expfmt
191
192
193 class Module(MgrModule):
194 COMMANDS = [
195 {
196 "cmd": "prometheus self-test",
197 "desc": "Run a self test on the prometheus module",
198 "perm": "rw"
199 },
200 ]
201
202 OPTIONS = [
203 {'name': 'server_addr'},
204 {'name': 'server_port'},
205 {'name': 'scrape_interval'},
206 ]
207
208 def __init__(self, *args, **kwargs):
209 super(Module, self).__init__(*args, **kwargs)
210 self.metrics = self._setup_static_metrics()
211 self.shutdown_event = threading.Event()
212 self.collect_lock = threading.RLock()
213 self.collect_time = 0
214 self.collect_timeout = 5.0
215 self.collect_cache = None
216 _global_instance['plugin'] = self
217
218 def _setup_static_metrics(self):
219 metrics = {}
220 metrics['health_status'] = Metric(
221 'untyped',
222 'health_status',
223 'Cluster health status'
224 )
225 metrics['mon_quorum_status'] = Metric(
226 'gauge',
227 'mon_quorum_status',
228 'Monitors in quorum',
229 ('ceph_daemon',)
230 )
231 metrics['fs_metadata'] = Metric(
232 'untyped',
233 'fs_metadata',
234 'FS Metadata',
235 FS_METADATA
236 )
237 metrics['mds_metadata'] = Metric(
238 'untyped',
239 'mds_metadata',
240 'MDS Metadata',
241 MDS_METADATA
242 )
243 metrics['mon_metadata'] = Metric(
244 'untyped',
245 'mon_metadata',
246 'MON Metadata',
247 MON_METADATA
248 )
249 metrics['osd_metadata'] = Metric(
250 'untyped',
251 'osd_metadata',
252 'OSD Metadata',
253 OSD_METADATA
254 )
255
256 # The reason for having this separate to OSD_METADATA is
257 # so that we can stably use the same tag names that
258 # the Prometheus node_exporter does
259 metrics['disk_occupation'] = Metric(
260 'untyped',
261 'disk_occupation',
262 'Associate Ceph daemon with disk used',
263 DISK_OCCUPATION
264 )
265
266 metrics['pool_metadata'] = Metric(
267 'untyped',
268 'pool_metadata',
269 'POOL Metadata',
270 POOL_METADATA
271 )
272
273 metrics['rgw_metadata'] = Metric(
274 'untyped',
275 'rgw_metadata',
276 'RGW Metadata',
277 RGW_METADATA
278 )
279
280 metrics['pg_total'] = Metric(
281 'gauge',
282 'pg_total',
283 'PG Total Count'
284 )
285
286 for flag in OSD_FLAGS:
287 path = 'osd_flag_{}'.format(flag)
288 metrics[path] = Metric(
289 'untyped',
290 path,
291 'OSD Flag {}'.format(flag)
292 )
293 for state in OSD_STATUS:
294 path = 'osd_{}'.format(state)
295 metrics[path] = Metric(
296 'untyped',
297 path,
298 'OSD status {}'.format(state),
299 ('ceph_daemon',)
300 )
301 for stat in OSD_STATS:
302 path = 'osd_{}'.format(stat)
303 metrics[path] = Metric(
304 'gauge',
305 path,
306 'OSD stat {}'.format(stat),
307 ('ceph_daemon',)
308 )
309 for state in PG_STATES:
310 path = 'pg_{}'.format(state)
311 metrics[path] = Metric(
312 'gauge',
313 path,
314 'PG {}'.format(state),
315 )
316 for state in DF_CLUSTER:
317 path = 'cluster_{}'.format(state)
318 metrics[path] = Metric(
319 'gauge',
320 path,
321 'DF {}'.format(state),
322 )
323 for state in DF_POOL:
324 path = 'pool_{}'.format(state)
325 metrics[path] = Metric(
326 'gauge',
327 path,
328 'DF pool {}'.format(state),
329 ('pool_id',)
330 )
331 for state in NUM_OBJECTS:
332 path = 'num_objects_{}'.format(state)
333 metrics[path] = Metric(
334 'gauge',
335 path,
336 'Number of {} objects'.format(state),
337 )
338
339 return metrics
340
341 def get_health(self):
342 health = json.loads(self.get('health')['json'])
343 self.metrics['health_status'].set(
344 health_status_to_number(health['status'])
345 )
346
347 def get_df(self):
348 # maybe get the to-be-exported metrics from a config?
349 df = self.get('df')
350 for stat in DF_CLUSTER:
351 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
352
353 for pool in df['pools']:
354 for stat in DF_POOL:
355 self.metrics['pool_{}'.format(stat)].set(
356 pool['stats'][stat],
357 (pool['id'],)
358 )
359
360 def get_fs(self):
361 fs_map = self.get('fs_map')
362 servers = self.get_service_list()
363 active_daemons = []
364 for fs in fs_map['filesystems']:
365 # collect fs metadata
366 data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
367 self.metrics['fs_metadata'].set(1, (
368 data_pools,
369 fs['id'],
370 fs['mdsmap']['metadata_pool'],
371 fs['mdsmap']['fs_name']
372 ))
373 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
374 for gid, daemon in fs['mdsmap']['info'].items():
375 id_ = daemon['name']
376 host_version = servers.get((id_, 'mds'), ('',''))
377 self.metrics['mds_metadata'].set(1, (
378 'mds.{}'.format(id_), fs['id'],
379 host_version[0], daemon['addr'],
380 daemon['rank'], host_version[1]
381 ))
382
383 def get_quorum_status(self):
384 mon_status = json.loads(self.get('mon_status')['json'])
385 servers = self.get_service_list()
386 for mon in mon_status['monmap']['mons']:
387 rank = mon['rank']
388 id_ = mon['name']
389 host_version = servers.get((id_, 'mon'), ('',''))
390 self.metrics['mon_metadata'].set(1, (
391 'mon.{}'.format(id_), host_version[0],
392 mon['public_addr'].split(':')[0], rank,
393 host_version[1]
394 ))
395 in_quorum = int(rank in mon_status['quorum'])
396 self.metrics['mon_quorum_status'].set(in_quorum, (
397 'mon.{}'.format(id_),
398 ))
399
400 def get_pg_status(self):
401 # TODO add per pool status?
402 pg_status = self.get('pg_status')
403
404 # Set total count of PGs, first
405 self.metrics['pg_total'].set(pg_status['num_pgs'])
406
407 reported_states = {}
408 for pg in pg_status['pgs_by_state']:
409 for state in pg['state_name'].split('+'):
410 reported_states[state] = reported_states.get(state, 0) + pg['count']
411
412 for state in reported_states:
413 path = 'pg_{}'.format(state)
414 try:
415 self.metrics[path].set(reported_states[state])
416 except KeyError:
417 self.log.warn("skipping pg in unknown state {}".format(state))
418
419 for state in PG_STATES:
420 if state not in reported_states:
421 try:
422 self.metrics['pg_{}'.format(state)].set(0)
423 except KeyError:
424 self.log.warn("skipping pg in unknown state {}".format(state))
425
426 def get_osd_stats(self):
427 osd_stats = self.get('osd_stats')
428 for osd in osd_stats['osd_stats']:
429 id_ = osd['osd']
430 for stat in OSD_STATS:
431 val = osd['perf_stat'][stat]
432 self.metrics['osd_{}'.format(stat)].set(val, (
433 'osd.{}'.format(id_),
434 ))
435
436 def get_service_list(self):
437 ret = {}
438 for server in self.list_servers():
439 version = server.get('ceph_version', '')
440 host = server.get('hostname', '')
441 for service in server.get('services', []):
442 ret.update({(service['id'], service['type']): (host, version)})
443 return ret
444
445 def get_metadata_and_osd_status(self):
446 osd_map = self.get('osd_map')
447 osd_flags = osd_map['flags'].split(',')
448 for flag in OSD_FLAGS:
449 self.metrics['osd_flag_{}'.format(flag)].set(
450 int(flag in osd_flags)
451 )
452
453 osd_devices = self.get('osd_map_crush')['devices']
454 servers = self.get_service_list()
455 for osd in osd_map['osds']:
456 # id can be used to link osd metrics and metadata
457 id_ = osd['osd']
458 # collect osd metadata
459 p_addr = osd['public_addr'].split(':')[0]
460 c_addr = osd['cluster_addr'].split(':')[0]
461 if p_addr == "-" or c_addr == "-":
462 self.log.info(
463 "Missing address metadata for osd {0}, skipping occupation"
464 " and metadata records for this osd".format(id_)
465 )
466 continue
467
468 dev_class = None
469 for osd_device in osd_devices:
470 if osd_device['id'] == id_:
471 dev_class = osd_device.get('class', '')
472 break
473
474 if dev_class is None:
475 self.log.info(
476 "OSD {0} is missing from CRUSH map, skipping output".format(
477 id_))
478 continue
479
480 host_version = servers.get((str(id_), 'osd'), ('',''))
481
482 # collect disk occupation metadata
483 osd_metadata = self.get_metadata("osd", str(id_))
484 if osd_metadata is None:
485 continue
486
487 obj_store = osd_metadata.get('osd_objectstore', '')
488 f_iface = osd_metadata.get('front_iface', '')
489 b_iface = osd_metadata.get('back_iface', '')
490
491 self.metrics['osd_metadata'].set(1, (
492 b_iface,
493 'osd.{}'.format(id_),
494 c_addr,
495 dev_class,
496 f_iface,
497 host_version[0],
498 obj_store,
499 p_addr,
500 host_version[1]
501 ))
502
503 # collect osd status
504 for state in OSD_STATUS:
505 status = osd[state]
506 self.metrics['osd_{}'.format(state)].set(status, (
507 'osd.{}'.format(id_),
508 ))
509
510 if obj_store == "filestore":
511 # collect filestore backend device
512 osd_dev_node = osd_metadata.get('backend_filestore_dev_node', None)
513 # collect filestore journal device
514 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
515 osd_db_dev_node = ''
516 elif obj_store == "bluestore":
517 # collect bluestore backend device
518 osd_dev_node = osd_metadata.get('bluestore_bdev_dev_node', None)
519 # collect bluestore wal backend
520 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
521 # collect bluestore db backend
522 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
523 if osd_dev_node and osd_dev_node == "unknown":
524 osd_dev_node = None
525
526 osd_hostname = osd_metadata.get('hostname', None)
527 if osd_dev_node and osd_hostname:
528 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
529 id_, osd_hostname, osd_dev_node))
530 self.metrics['disk_occupation'].set(1, (
531 "osd.{0}".format(id_),
532 osd_dev_node,
533 osd_db_dev_node,
534 osd_wal_dev_node,
535 osd_hostname
536 ))
537 else:
538 self.log.info("Missing dev node metadata for osd {0}, skipping "
539 "occupation record for this osd".format(id_))
540
541 pool_meta = []
542 for pool in osd_map['pools']:
543 self.metrics['pool_metadata'].set(1, (pool['pool'], pool['pool_name']))
544
545 # Populate rgw_metadata
546 for key, value in servers.items():
547 service_id, service_type = key
548 if service_type != 'rgw':
549 continue
550 hostname, version = value
551 self.metrics['rgw_metadata'].set(
552 1,
553 ('{}.{}'.format(service_type, service_id), hostname, version)
554 )
555
556 def get_num_objects(self):
557 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
558 for obj in NUM_OBJECTS:
559 stat = 'num_objects_{}'.format(obj)
560 self.metrics[stat].set(pg_sum[stat])
561
562 def collect(self):
563 # Clear the metrics before scraping
564 for k in self.metrics.keys():
565 self.metrics[k].clear()
566
567 self.get_health()
568 self.get_df()
569 self.get_fs()
570 self.get_osd_stats()
571 self.get_quorum_status()
572 self.get_metadata_and_osd_status()
573 self.get_pg_status()
574 self.get_num_objects()
575
576 for daemon, counters in self.get_all_perf_counters().items():
577 for path, counter_info in counters.items():
578 # Skip histograms, they are represented by long running avgs
579 stattype = self._stattype_to_str(counter_info['type'])
580 if not stattype or stattype == 'histogram':
581 self.log.debug('ignoring %s, type %s' % (path, stattype))
582 continue
583
584 # Get the value of the counter
585 value = self._perfvalue_to_value(counter_info['type'], counter_info['value'])
586
587 # Represent the long running avgs as sum/count pairs
588 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
589 _path = path + '_sum'
590 if _path not in self.metrics:
591 self.metrics[_path] = Metric(
592 stattype,
593 _path,
594 counter_info['description'] + ' Total',
595 ("ceph_daemon",),
596 )
597 self.metrics[_path].set(value, (daemon,))
598
599 _path = path + '_count'
600 if _path not in self.metrics:
601 self.metrics[_path] = Metric(
602 'counter',
603 _path,
604 counter_info['description'] + ' Count',
605 ("ceph_daemon",),
606 )
607 self.metrics[_path].set(counter_info['count'], (daemon,))
608 else:
609 if path not in self.metrics:
610 self.metrics[path] = Metric(
611 stattype,
612 path,
613 counter_info['description'],
614 ("ceph_daemon",),
615 )
616 self.metrics[path].set(value, (daemon,))
617
618 # Return formatted metrics and clear no longer used data
619 _metrics = [m.str_expfmt() for m in self.metrics.values()]
620 for k in self.metrics.keys():
621 self.metrics[k].clear()
622
623 return ''.join(_metrics) + '\n'
624
625 def handle_command(self, cmd):
626 if cmd['prefix'] == 'prometheus self-test':
627 self.collect()
628 return 0, '', 'Self-test OK'
629 else:
630 return (-errno.EINVAL, '',
631 "Command not found '{0}'".format(cmd['prefix']))
632
633 def serve(self):
634
635 class Root(object):
636
637 # collapse everything to '/'
638 def _cp_dispatch(self, vpath):
639 cherrypy.request.path = ''
640 return self
641
642 @cherrypy.expose
643 def index(self):
644 return '''<!DOCTYPE html>
645 <html>
646 <head><title>Ceph Exporter</title></head>
647 <body>
648 <h1>Ceph Exporter</h1>
649 <p><a href='/metrics'>Metrics</a></p>
650 </body>
651 </html>'''
652
653 @cherrypy.expose
654 def metrics(self):
655 instance = global_instance()
656 # Lock the function execution
657 try:
658 instance.collect_lock.acquire()
659 return self._metrics(instance)
660 finally:
661 instance.collect_lock.release()
662
663 def _metrics(self, instance):
664 # Return cached data if available and collected before the cache times out
665 if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
666 cherrypy.response.headers['Content-Type'] = 'text/plain'
667 return instance.collect_cache
668
669 if instance.have_mon_connection():
670 instance.collect_cache = None
671 instance.collect_time = time.time()
672 instance.collect_cache = instance.collect()
673 cherrypy.response.headers['Content-Type'] = 'text/plain'
674 return instance.collect_cache
675 else:
676 raise cherrypy.HTTPError(503, 'No MON connection')
677
678 # Make the cache timeout for collecting configurable
679 self.collect_timeout = self.get_localized_config('scrape_interval', 5.0)
680
681 server_addr = self.get_localized_config('server_addr', DEFAULT_ADDR)
682 server_port = self.get_localized_config('server_port', DEFAULT_PORT)
683 self.log.info(
684 "server_addr: %s server_port: %s" %
685 (server_addr, server_port)
686 )
687
688 # Publish the URI that others may use to access the service we're
689 # about to start serving
690 self.set_uri('http://{0}:{1}/'.format(
691 socket.getfqdn() if server_addr == '::' else server_addr,
692 server_port
693 ))
694
695 cherrypy.config.update({
696 'server.socket_host': server_addr,
697 'server.socket_port': int(server_port),
698 'engine.autoreload.on': False
699 })
700 cherrypy.tree.mount(Root(), "/")
701 self.log.info('Starting engine...')
702 cherrypy.engine.start()
703 self.log.info('Engine started.')
704 # wait for the shutdown event
705 self.shutdown_event.wait()
706 self.shutdown_event.clear()
707 cherrypy.engine.stop()
708 self.log.info('Engine stopped.')
709
710 def shutdown(self):
711 self.log.info('Stopping engine...')
712 self.shutdown_event.set()
713
714
715 class StandbyModule(MgrStandbyModule):
716 def __init__(self, *args, **kwargs):
717 super(StandbyModule, self).__init__(*args, **kwargs)
718 self.shutdown_event = threading.Event()
719
720 def serve(self):
721 server_addr = self.get_localized_config('server_addr', '::')
722 server_port = self.get_localized_config('server_port', DEFAULT_PORT)
723 self.log.info("server_addr: %s server_port: %s" % (server_addr, server_port))
724 cherrypy.config.update({
725 'server.socket_host': server_addr,
726 'server.socket_port': int(server_port),
727 'engine.autoreload.on': False
728 })
729
730 module = self
731
732 class Root(object):
733
734 @cherrypy.expose
735 def index(self):
736 active_uri = module.get_active_uri()
737 return '''<!DOCTYPE html>
738 <html>
739 <head><title>Ceph Exporter</title></head>
740 <body>
741 <h1>Ceph Exporter</h1>
742 <p><a href='{}metrics'>Metrics</a></p>
743 </body>
744 </html>'''.format(active_uri)
745
746 @cherrypy.expose
747 def metrics(self):
748 cherrypy.response.headers['Content-Type'] = 'text/plain'
749 return ''
750
751 cherrypy.tree.mount(Root(), '/', {})
752 self.log.info('Starting engine...')
753 cherrypy.engine.start()
754 self.log.info('Engine started.')
755 # Wait for shutdown event
756 self.shutdown_event.wait()
757 self.shutdown_event.clear()
758 cherrypy.engine.stop()
759 self.log.info('Engine stopped.')
760
761 def shutdown(self):
762 self.log.info("Stopping engine...")
763 self.shutdown_event.set()
764 self.log.info("Stopped engine")