]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
c215181540965bb4d6f0cc141a625552e037bced
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 from distutils.version import StrictVersion
3 import json
4 import errno
5 import math
6 import os
7 import re
8 import socket
9 import threading
10 import time
11 from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
12 from mgr_util import get_default_addr
13 from rbd import RBD
14
15 # Defaults for the Prometheus HTTP server. Can also set in config-key
16 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
17 # for Prometheus exporter port registry
18
19 DEFAULT_PORT = 9283
20
21 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
22 # that the ports its listening on are in fact bound. When using the any address
23 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
24 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
25 # exception.
26 if cherrypy is not None:
27 v = StrictVersion(cherrypy.__version__)
28 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
29 # centos:7) and back to at least 3.0.0.
30 if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"):
31 # https://github.com/cherrypy/cherrypy/issues/1100
32 from cherrypy.process import servers
33 servers.wait_for_occupied_port = lambda host, port: None
34
35 # cherrypy likes to sys.exit on error. don't let it take us down too!
36 def os_exit_noop(*args, **kwargs):
37 pass
38
39
40 os._exit = os_exit_noop
41
42 # to access things in class Module from subclass Root. Because
43 # it's a dict, the writer doesn't need to declare 'global' for access
44
45 _global_instance = {'plugin': None}
46
47
48 def global_instance():
49 assert _global_instance['plugin'] is not None
50 return _global_instance['plugin']
51
52
53 def health_status_to_number(status):
54 if status == 'HEALTH_OK':
55 return 0
56 elif status == 'HEALTH_WARN':
57 return 1
58 elif status == 'HEALTH_ERR':
59 return 2
60
61
62 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
63
64 DF_POOL = ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
65 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
66
67 OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
68 'recovering_keys_per_sec', 'num_objects_recovered',
69 'num_bytes_recovered', 'num_bytes_recovered')
70
71 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
72 'norecover', 'noscrub', 'nodeep-scrub')
73
74 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
75
76 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
77 'ceph_version')
78
79 MON_METADATA = ('ceph_daemon', 'hostname',
80 'public_addr', 'rank', 'ceph_version')
81
82 MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
83
84 MGR_STATUS = ('ceph_daemon',)
85
86 MGR_MODULE_STATUS = ('name',)
87
88 MGR_MODULE_CAN_RUN = ('name',)
89
90 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
91 'front_iface', 'hostname', 'objectstore', 'public_addr',
92 'ceph_version')
93
94 OSD_STATUS = ['weight', 'up', 'in']
95
96 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
97
98 POOL_METADATA = ('pool_id', 'name')
99
100 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
101
102 RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
103 'ceph_version')
104
105 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
106 'wal_device', 'instance')
107
108 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
109
110
111 class Metric(object):
112 def __init__(self, mtype, name, desc, labels=None):
113 self.mtype = mtype
114 self.name = name
115 self.desc = desc
116 self.labelnames = labels # tuple if present
117 self.value = {} # indexed by label values
118
119 def clear(self):
120 self.value = {}
121
122 def set(self, value, labelvalues=None):
123 # labelvalues must be a tuple
124 labelvalues = labelvalues or ('',)
125 self.value[labelvalues] = value
126
127 def str_expfmt(self):
128
129 def promethize(path):
130 ''' replace illegal metric name characters '''
131 result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
132
133 # Hyphens usually turn into underscores, unless they are
134 # trailing
135 if result.endswith("-"):
136 result = result[0:-1] + "_minus"
137 else:
138 result = result.replace("-", "_")
139
140 return "ceph_{0}".format(result)
141
142 def floatstr(value):
143 ''' represent as Go-compatible float '''
144 if value == float('inf'):
145 return '+Inf'
146 if value == float('-inf'):
147 return '-Inf'
148 if math.isnan(value):
149 return 'NaN'
150 return repr(float(value))
151
152 name = promethize(self.name)
153 expfmt = '''
154 # HELP {name} {desc}
155 # TYPE {name} {mtype}'''.format(
156 name=name,
157 desc=self.desc,
158 mtype=self.mtype,
159 )
160
161 for labelvalues, value in self.value.items():
162 if self.labelnames:
163 labels = zip(self.labelnames, labelvalues)
164 labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
165 else:
166 labels = ''
167 if labels:
168 fmtstr = '\n{name}{{{labels}}} {value}'
169 else:
170 fmtstr = '\n{name} {value}'
171 expfmt += fmtstr.format(
172 name=name,
173 labels=labels,
174 value=floatstr(value),
175 )
176 return expfmt
177
178
179 class Module(MgrModule):
180 COMMANDS = [
181 {
182 "cmd": "prometheus file_sd_config",
183 "desc": "Return file_sd compatible prometheus config for mgr cluster",
184 "perm": "r"
185 },
186 ]
187
188 MODULE_OPTIONS = [
189 {'name': 'server_addr'},
190 {'name': 'server_port'},
191 {'name': 'scrape_interval'},
192 {'name': 'rbd_stats_pools'},
193 {'name': 'rbd_stats_pools_refresh_interval'},
194 ]
195
196 def __init__(self, *args, **kwargs):
197 super(Module, self).__init__(*args, **kwargs)
198 self.metrics = self._setup_static_metrics()
199 self.shutdown_event = threading.Event()
200 self.collect_lock = threading.RLock()
201 self.collect_time = 0
202 self.collect_timeout = 5.0
203 self.collect_cache = None
204 self.rbd_stats = {
205 'pools': {},
206 'pools_refresh_time': 0,
207 'counters_info': {
208 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
209 'desc': 'RBD image writes count'},
210 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
211 'desc': 'RBD image reads count'},
212 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
213 'desc': 'RBD image bytes written'},
214 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
215 'desc': 'RBD image bytes read'},
216 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
217 'desc': 'RBD image writes latency (msec)'},
218 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
219 'desc': 'RBD image reads latency (msec)'},
220 },
221 }
222 _global_instance['plugin'] = self
223
224 def _setup_static_metrics(self):
225 metrics = {}
226 metrics['health_status'] = Metric(
227 'untyped',
228 'health_status',
229 'Cluster health status'
230 )
231 metrics['mon_quorum_status'] = Metric(
232 'gauge',
233 'mon_quorum_status',
234 'Monitors in quorum',
235 ('ceph_daemon',)
236 )
237 metrics['fs_metadata'] = Metric(
238 'untyped',
239 'fs_metadata',
240 'FS Metadata',
241 FS_METADATA
242 )
243 metrics['mds_metadata'] = Metric(
244 'untyped',
245 'mds_metadata',
246 'MDS Metadata',
247 MDS_METADATA
248 )
249 metrics['mon_metadata'] = Metric(
250 'untyped',
251 'mon_metadata',
252 'MON Metadata',
253 MON_METADATA
254 )
255 metrics['mgr_metadata'] = Metric(
256 'gauge',
257 'mgr_metadata',
258 'MGR metadata',
259 MGR_METADATA
260 )
261 metrics['mgr_status'] = Metric(
262 'gauge',
263 'mgr_status',
264 'MGR status (0=standby, 1=active)',
265 MGR_STATUS
266 )
267 metrics['mgr_module_status'] = Metric(
268 'gauge',
269 'mgr_module_status',
270 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
271 MGR_MODULE_STATUS
272 )
273 metrics['mgr_module_can_run'] = Metric(
274 'gauge',
275 'mgr_module_can_run',
276 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
277 MGR_MODULE_CAN_RUN
278 )
279 metrics['osd_metadata'] = Metric(
280 'untyped',
281 'osd_metadata',
282 'OSD Metadata',
283 OSD_METADATA
284 )
285
286 # The reason for having this separate to OSD_METADATA is
287 # so that we can stably use the same tag names that
288 # the Prometheus node_exporter does
289 metrics['disk_occupation'] = Metric(
290 'untyped',
291 'disk_occupation',
292 'Associate Ceph daemon with disk used',
293 DISK_OCCUPATION
294 )
295
296 metrics['pool_metadata'] = Metric(
297 'untyped',
298 'pool_metadata',
299 'POOL Metadata',
300 POOL_METADATA
301 )
302
303 metrics['rgw_metadata'] = Metric(
304 'untyped',
305 'rgw_metadata',
306 'RGW Metadata',
307 RGW_METADATA
308 )
309
310 metrics['rbd_mirror_metadata'] = Metric(
311 'untyped',
312 'rbd_mirror_metadata',
313 'RBD Mirror Metadata',
314 RBD_MIRROR_METADATA
315 )
316
317 metrics['pg_total'] = Metric(
318 'gauge',
319 'pg_total',
320 'PG Total Count'
321 )
322
323 metrics['scrape_duration_seconds'] = Metric(
324 'gauge',
325 'scrape_duration_secs',
326 'Time taken to gather metrics from Ceph (secs)'
327 )
328
329 for flag in OSD_FLAGS:
330 path = 'osd_flag_{}'.format(flag)
331 metrics[path] = Metric(
332 'untyped',
333 path,
334 'OSD Flag {}'.format(flag)
335 )
336 for state in OSD_STATUS:
337 path = 'osd_{}'.format(state)
338 metrics[path] = Metric(
339 'untyped',
340 path,
341 'OSD status {}'.format(state),
342 ('ceph_daemon',)
343 )
344 for stat in OSD_STATS:
345 path = 'osd_{}'.format(stat)
346 metrics[path] = Metric(
347 'gauge',
348 path,
349 'OSD stat {}'.format(stat),
350 ('ceph_daemon',)
351 )
352 for stat in OSD_POOL_STATS:
353 path = 'pool_{}'.format(stat)
354 metrics[path] = Metric(
355 'gauge',
356 path,
357 "OSD POOL STATS: {}".format(stat),
358 ('pool_id',)
359 )
360 for state in PG_STATES:
361 path = 'pg_{}'.format(state)
362 metrics[path] = Metric(
363 'gauge',
364 path,
365 'PG {}'.format(state),
366 )
367 for state in DF_CLUSTER:
368 path = 'cluster_{}'.format(state)
369 metrics[path] = Metric(
370 'gauge',
371 path,
372 'DF {}'.format(state),
373 )
374 for state in DF_POOL:
375 path = 'pool_{}'.format(state)
376 metrics[path] = Metric(
377 'gauge',
378 path,
379 'DF pool {}'.format(state),
380 ('pool_id',)
381 )
382 for state in NUM_OBJECTS:
383 path = 'num_objects_{}'.format(state)
384 metrics[path] = Metric(
385 'gauge',
386 path,
387 'Number of {} objects'.format(state),
388 )
389
390 return metrics
391
392 def get_health(self):
393 health = json.loads(self.get('health')['json'])
394 self.metrics['health_status'].set(
395 health_status_to_number(health['status'])
396 )
397
398 def get_pool_stats(self):
399 # retrieve pool stats to provide per pool recovery metrics
400 # (osd_pool_stats moved to mgr in Mimic)
401 pstats = self.get('osd_pool_stats')
402 for pool in pstats['pool_stats']:
403 for stat in OSD_POOL_STATS:
404 self.metrics['pool_{}'.format(stat)].set(
405 pool['recovery_rate'].get(stat, 0),
406 (pool['pool_id'],)
407 )
408
409 def get_df(self):
410 # maybe get the to-be-exported metrics from a config?
411 df = self.get('df')
412 for stat in DF_CLUSTER:
413 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
414
415 for pool in df['pools']:
416 for stat in DF_POOL:
417 self.metrics['pool_{}'.format(stat)].set(
418 pool['stats'][stat],
419 (pool['id'],)
420 )
421
422 def get_fs(self):
423 fs_map = self.get('fs_map')
424 servers = self.get_service_list()
425 active_daemons = []
426 for fs in fs_map['filesystems']:
427 # collect fs metadata
428 data_pools = ",".join([str(pool)
429 for pool in fs['mdsmap']['data_pools']])
430 self.metrics['fs_metadata'].set(1, (
431 data_pools,
432 fs['id'],
433 fs['mdsmap']['metadata_pool'],
434 fs['mdsmap']['fs_name']
435 ))
436 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
437 for gid, daemon in fs['mdsmap']['info'].items():
438 id_ = daemon['name']
439 host_version = servers.get((id_, 'mds'), ('', ''))
440 self.metrics['mds_metadata'].set(1, (
441 'mds.{}'.format(id_), fs['id'],
442 host_version[0], daemon['addr'],
443 daemon['rank'], host_version[1]
444 ))
445
446 def get_quorum_status(self):
447 mon_status = json.loads(self.get('mon_status')['json'])
448 servers = self.get_service_list()
449 for mon in mon_status['monmap']['mons']:
450 rank = mon['rank']
451 id_ = mon['name']
452 host_version = servers.get((id_, 'mon'), ('', ''))
453 self.metrics['mon_metadata'].set(1, (
454 'mon.{}'.format(id_), host_version[0],
455 mon['public_addr'].split(':')[0], rank,
456 host_version[1]
457 ))
458 in_quorum = int(rank in mon_status['quorum'])
459 self.metrics['mon_quorum_status'].set(in_quorum, (
460 'mon.{}'.format(id_),
461 ))
462
463 def get_mgr_status(self):
464 mgr_map = self.get('mgr_map')
465 servers = self.get_service_list()
466
467 active = mgr_map['active_name']
468 standbys = [s.get('name') for s in mgr_map['standbys']]
469
470 all_mgrs = list(standbys)
471 all_mgrs.append(active)
472
473 all_modules = {module.get('name'):module.get('can_run') for module in mgr_map['available_modules']}
474
475 for mgr in all_mgrs:
476 host_version = servers.get((mgr, 'mgr'), ('', ''))
477 if mgr == active:
478 _state = 1
479 ceph_release = host_version[1].split()[-2] # e.g. nautilus
480 else:
481 _state = 0
482
483 self.metrics['mgr_metadata'].set(1, (
484 'mgr.{}'.format(mgr), host_version[0],
485 host_version[1]
486 ))
487 self.metrics['mgr_status'].set(_state, (
488 'mgr.{}'.format(mgr),
489 ))
490 always_on_modules = mgr_map['always_on_modules'][ceph_release]
491 active_modules = list(always_on_modules)
492 active_modules.extend(mgr_map['modules'])
493
494 for mod_name in all_modules.keys():
495
496 if mod_name in always_on_modules:
497 _state = 2
498 elif mod_name in active_modules:
499 _state = 1
500 else:
501 _state = 0
502
503 _can_run = 1 if all_modules[mod_name] else 0
504 self.metrics['mgr_module_status'].set(_state, (mod_name,))
505 self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
506
507 def get_pg_status(self):
508 # TODO add per pool status?
509 pg_status = self.get('pg_status')
510
511 # Set total count of PGs, first
512 self.metrics['pg_total'].set(pg_status['num_pgs'])
513
514 reported_states = {}
515 for pg in pg_status['pgs_by_state']:
516 for state in pg['state_name'].split('+'):
517 reported_states[state] = reported_states.get(
518 state, 0) + pg['count']
519
520 for state in reported_states:
521 path = 'pg_{}'.format(state)
522 try:
523 self.metrics[path].set(reported_states[state])
524 except KeyError:
525 self.log.warn("skipping pg in unknown state {}".format(state))
526
527 for state in PG_STATES:
528 if state not in reported_states:
529 try:
530 self.metrics['pg_{}'.format(state)].set(0)
531 except KeyError:
532 self.log.warn(
533 "skipping pg in unknown state {}".format(state))
534
535 def get_osd_stats(self):
536 osd_stats = self.get('osd_stats')
537 for osd in osd_stats['osd_stats']:
538 id_ = osd['osd']
539 for stat in OSD_STATS:
540 val = osd['perf_stat'][stat]
541 self.metrics['osd_{}'.format(stat)].set(val, (
542 'osd.{}'.format(id_),
543 ))
544
545 def get_service_list(self):
546 ret = {}
547 for server in self.list_servers():
548 version = server.get('ceph_version', '')
549 host = server.get('hostname', '')
550 for service in server.get('services', []):
551 ret.update({(service['id'], service['type']): (host, version)})
552 return ret
553
554 def get_metadata_and_osd_status(self):
555 osd_map = self.get('osd_map')
556 osd_flags = osd_map['flags'].split(',')
557 for flag in OSD_FLAGS:
558 self.metrics['osd_flag_{}'.format(flag)].set(
559 int(flag in osd_flags)
560 )
561
562 osd_devices = self.get('osd_map_crush')['devices']
563 servers = self.get_service_list()
564 for osd in osd_map['osds']:
565 # id can be used to link osd metrics and metadata
566 id_ = osd['osd']
567 # collect osd metadata
568 p_addr = osd['public_addr'].split(':')[0]
569 c_addr = osd['cluster_addr'].split(':')[0]
570 if p_addr == "-" or c_addr == "-":
571 self.log.info(
572 "Missing address metadata for osd {0}, skipping occupation"
573 " and metadata records for this osd".format(id_)
574 )
575 continue
576
577 dev_class = None
578 for osd_device in osd_devices:
579 if osd_device['id'] == id_:
580 dev_class = osd_device.get('class', '')
581 break
582
583 if dev_class is None:
584 self.log.info(
585 "OSD {0} is missing from CRUSH map, skipping output".format(
586 id_))
587 continue
588
589 host_version = servers.get((str(id_), 'osd'), ('', ''))
590
591 # collect disk occupation metadata
592 osd_metadata = self.get_metadata("osd", str(id_))
593 if osd_metadata is None:
594 continue
595
596 obj_store = osd_metadata.get('osd_objectstore', '')
597 f_iface = osd_metadata.get('front_iface', '')
598 b_iface = osd_metadata.get('back_iface', '')
599
600 self.metrics['osd_metadata'].set(1, (
601 b_iface,
602 'osd.{}'.format(id_),
603 c_addr,
604 dev_class,
605 f_iface,
606 host_version[0],
607 obj_store,
608 p_addr,
609 host_version[1]
610 ))
611
612 # collect osd status
613 for state in OSD_STATUS:
614 status = osd[state]
615 self.metrics['osd_{}'.format(state)].set(status, (
616 'osd.{}'.format(id_),
617 ))
618
619 if obj_store == "filestore":
620 # collect filestore backend device
621 osd_dev_node = osd_metadata.get(
622 'backend_filestore_dev_node', None)
623 # collect filestore journal device
624 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
625 osd_db_dev_node = ''
626 elif obj_store == "bluestore":
627 # collect bluestore backend device
628 osd_dev_node = osd_metadata.get(
629 'bluestore_bdev_dev_node', None)
630 # collect bluestore wal backend
631 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
632 # collect bluestore db backend
633 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
634 if osd_dev_node and osd_dev_node == "unknown":
635 osd_dev_node = None
636
637 osd_hostname = osd_metadata.get('hostname', None)
638 if osd_dev_node and osd_hostname:
639 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
640 id_, osd_hostname, osd_dev_node))
641 self.metrics['disk_occupation'].set(1, (
642 "osd.{0}".format(id_),
643 osd_dev_node,
644 osd_db_dev_node,
645 osd_wal_dev_node,
646 osd_hostname
647 ))
648 else:
649 self.log.info("Missing dev node metadata for osd {0}, skipping "
650 "occupation record for this osd".format(id_))
651
652 pool_meta = []
653 for pool in osd_map['pools']:
654 self.metrics['pool_metadata'].set(
655 1, (pool['pool'], pool['pool_name']))
656
657 # Populate other servers metadata
658 for key, value in servers.items():
659 service_id, service_type = key
660 if service_type == 'rgw':
661 hostname, version = value
662 self.metrics['rgw_metadata'].set(
663 1,
664 ('{}.{}'.format(service_type, service_id), hostname, version)
665 )
666 elif service_type == 'rbd-mirror':
667 mirror_metadata = self.get_metadata('rbd-mirror', service_id)
668 if mirror_metadata is None:
669 continue
670 mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
671 service_id)
672 self.metrics['rbd_mirror_metadata'].set(
673 1, (mirror_metadata.get(k, '')
674 for k in RBD_MIRROR_METADATA)
675 )
676
677 def get_num_objects(self):
678 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
679 for obj in NUM_OBJECTS:
680 stat = 'num_objects_{}'.format(obj)
681 self.metrics[stat].set(pg_sum[stat])
682
683 def get_rbd_stats(self):
684 # Per RBD image stats is collected by registering a dynamic osd perf
685 # stats query that tells OSDs to group stats for requests associated
686 # with RBD objects by pool, namespace, and image id, which are
687 # extracted from the request object names or other attributes.
688 # The RBD object names have the following prefixes:
689 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
690 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
691 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
692 # The pool_id in the object name is the id of the pool with the image
693 # metdata, and should be used in the image spec. If there is no pool_id
694 # in the object name, the image pool is the pool where the object is
695 # located.
696
697 # Parse rbd_stats_pools option, which is a comma or space separated
698 # list of pool[/namespace] entries. If no namespace is specifed the
699 # stats are collected for every namespace in the pool.
700 pools_string = self.get_localized_module_option('rbd_stats_pools', '')
701 pools = {}
702 for p in [x for x in re.split('[\s,]+', pools_string) if x]:
703 s = p.split('/', 2)
704 pool_name = s[0]
705 if len(s) == 1:
706 # empty set means collect for all namespaces
707 pools[pool_name] = set()
708 continue
709 if pool_name not in pools:
710 pools[pool_name] = set()
711 elif not pools[pool_name]:
712 continue
713 pools[pool_name].add(s[1])
714
715 rbd_stats_pools = {}
716 for pool_id in list(self.rbd_stats['pools']):
717 name = self.rbd_stats['pools'][pool_id]['name']
718 if name not in pools:
719 del self.rbd_stats['pools'][pool_id]
720 else:
721 rbd_stats_pools[name] = \
722 self.rbd_stats['pools'][pool_id]['ns_names']
723
724 pools_refreshed = False
725 if pools:
726 next_refresh = self.rbd_stats['pools_refresh_time'] + \
727 self.get_localized_module_option(
728 'rbd_stats_pools_refresh_interval', 300)
729 if rbd_stats_pools != pools or time.time() >= next_refresh:
730 self.refresh_rbd_stats_pools(pools)
731 pools_refreshed = True
732
733 pool_ids = list(self.rbd_stats['pools'])
734 pool_ids.sort()
735 pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
736
737 nspace_names = []
738 for pool_id, pool in self.rbd_stats['pools'].items():
739 if pool['ns_names']:
740 nspace_names.extend(pool['ns_names'])
741 else:
742 nspace_names = []
743 break
744 if nspace_names:
745 namespace_regex = '^(' + \
746 "|".join([re.escape(x)
747 for x in set(nspace_names)]) + ')$'
748 else:
749 namespace_regex = '^(.*)$'
750
751 if 'query' in self.rbd_stats and \
752 (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex'] or
753 namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex']):
754 self.remove_osd_perf_query(self.rbd_stats['query_id'])
755 del self.rbd_stats['query_id']
756 del self.rbd_stats['query']
757
758 if not self.rbd_stats['pools']:
759 return
760
761 counters_info = self.rbd_stats['counters_info']
762
763 if 'query_id' not in self.rbd_stats:
764 query = {
765 'key_descriptor': [
766 {'type': 'pool_id', 'regex': pool_id_regex},
767 {'type': 'namespace', 'regex': namespace_regex},
768 {'type': 'object_name',
769 'regex': '^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
770 ],
771 'performance_counter_descriptors': list(counters_info),
772 }
773 query_id = self.add_osd_perf_query(query)
774 if query_id is None:
775 self.log.error('failed to add query %s' % query)
776 return
777 self.rbd_stats['query'] = query
778 self.rbd_stats['query_id'] = query_id
779
780 res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
781 for c in res['counters']:
782 # if the pool id is not found in the object name use id of the
783 # pool where the object is located
784 if c['k'][2][0]:
785 pool_id = int(c['k'][2][0])
786 else:
787 pool_id = int(c['k'][0][0])
788 if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
789 self.refresh_rbd_stats_pools(pools)
790 pools_refreshed = True
791 if pool_id not in self.rbd_stats['pools']:
792 continue
793 pool = self.rbd_stats['pools'][pool_id]
794 nspace_name = c['k'][1][0]
795 if nspace_name not in pool['images']:
796 continue
797 image_id = c['k'][2][1]
798 if image_id not in pool['images'][nspace_name] and \
799 not pools_refreshed:
800 self.refresh_rbd_stats_pools(pools)
801 pool = self.rbd_stats['pools'][pool_id]
802 pools_refreshed = True
803 if image_id not in pool['images'][nspace_name]:
804 continue
805 counters = pool['images'][nspace_name][image_id]['c']
806 for i in range(len(c['c'])):
807 counters[i][0] += c['c'][i][0]
808 counters[i][1] += c['c'][i][1]
809
810 label_names = ("pool", "namespace", "image")
811 for pool_id, pool in self.rbd_stats['pools'].items():
812 pool_name = pool['name']
813 for nspace_name, images in pool['images'].items():
814 for image_id in images:
815 image_name = images[image_id]['n']
816 counters = images[image_id]['c']
817 i = 0
818 for key in counters_info:
819 counter_info = counters_info[key]
820 stattype = self._stattype_to_str(counter_info['type'])
821 labels = (pool_name, nspace_name, image_name)
822 if counter_info['type'] == self.PERFCOUNTER_COUNTER:
823 path = 'rbd_' + key
824 if path not in self.metrics:
825 self.metrics[path] = Metric(
826 stattype,
827 path,
828 counter_info['desc'],
829 label_names,
830 )
831 self.metrics[path].set(counters[i][0], labels)
832 elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
833 path = 'rbd_' + key + '_sum'
834 if path not in self.metrics:
835 self.metrics[path] = Metric(
836 stattype,
837 path,
838 counter_info['desc'] + ' Total',
839 label_names,
840 )
841 self.metrics[path].set(counters[i][0], labels)
842 path = 'rbd_' + key + '_count'
843 if path not in self.metrics:
844 self.metrics[path] = Metric(
845 'counter',
846 path,
847 counter_info['desc'] + ' Count',
848 label_names,
849 )
850 self.metrics[path].set(counters[i][1], labels)
851 i += 1
852
853 def refresh_rbd_stats_pools(self, pools):
854 self.log.debug('refreshing rbd pools %s' % (pools))
855
856 rbd = RBD()
857 counters_info = self.rbd_stats['counters_info']
858 for pool_name, cfg_ns_names in pools.items():
859 try:
860 pool_id = self.rados.pool_lookup(pool_name)
861 with self.rados.open_ioctx(pool_name) as ioctx:
862 if pool_id not in self.rbd_stats['pools']:
863 self.rbd_stats['pools'][pool_id] = {'images': {}}
864 pool = self.rbd_stats['pools'][pool_id]
865 pool['name'] = pool_name
866 pool['ns_names'] = cfg_ns_names
867 if cfg_ns_names:
868 nspace_names = list(cfg_ns_names)
869 else:
870 nspace_names = [''] + rbd.namespace_list(ioctx)
871 for nspace_name in pool['images']:
872 if nspace_name not in nspace_names:
873 del pool['images'][nspace_name]
874 for nspace_name in nspace_names:
875 if (nspace_name and
876 not rbd.namespace_exists(ioctx, nspace_name)):
877 self.log.debug('unknown namespace %s for pool %s' %
878 (nspace_name, pool_name))
879 continue
880 ioctx.set_namespace(nspace_name)
881 if nspace_name not in pool['images']:
882 pool['images'][nspace_name] = {}
883 namespace = pool['images'][nspace_name]
884 images = {}
885 for image_meta in RBD().list2(ioctx):
886 image = {'n': image_meta['name']}
887 image_id = image_meta['id']
888 if image_id in namespace:
889 image['c'] = namespace[image_id]['c']
890 else:
891 image['c'] = [[0, 0] for x in counters_info]
892 images[image_id] = image
893 pool['images'][nspace_name] = images
894 except Exception as e:
895 self.log.error('failed listing pool %s: %s' % (pool_name, e))
896 self.rbd_stats['pools_refresh_time'] = time.time()
897
898 def shutdown_rbd_stats(self):
899 if 'query_id' in self.rbd_stats:
900 self.remove_osd_perf_query(self.rbd_stats['query_id'])
901 del self.rbd_stats['query_id']
902 del self.rbd_stats['query']
903 self.rbd_stats['pools'].clear()
904
905 def collect(self):
906 # Clear the metrics before scraping
907 for k in self.metrics.keys():
908 self.metrics[k].clear()
909
910 _start_time = time.time()
911
912 self.get_health()
913 self.get_df()
914 self.get_pool_stats()
915 self.get_fs()
916 self.get_osd_stats()
917 self.get_quorum_status()
918 self.get_mgr_status()
919 self.get_metadata_and_osd_status()
920 self.get_pg_status()
921 self.get_num_objects()
922
923 for daemon, counters in self.get_all_perf_counters().items():
924 for path, counter_info in counters.items():
925 # Skip histograms, they are represented by long running avgs
926 stattype = self._stattype_to_str(counter_info['type'])
927 if not stattype or stattype == 'histogram':
928 self.log.debug('ignoring %s, type %s' % (path, stattype))
929 continue
930
931 path, label_names, labels = self._perfpath_to_path_labels(
932 daemon, path)
933
934 # Get the value of the counter
935 value = self._perfvalue_to_value(
936 counter_info['type'], counter_info['value'])
937
938 # Represent the long running avgs as sum/count pairs
939 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
940 _path = path + '_sum'
941 if _path not in self.metrics:
942 self.metrics[_path] = Metric(
943 stattype,
944 _path,
945 counter_info['description'] + ' Total',
946 label_names,
947 )
948 self.metrics[_path].set(value, labels)
949
950 _path = path + '_count'
951 if _path not in self.metrics:
952 self.metrics[_path] = Metric(
953 'counter',
954 _path,
955 counter_info['description'] + ' Count',
956 label_names,
957 )
958 self.metrics[_path].set(counter_info['count'], labels,)
959 else:
960 if path not in self.metrics:
961 self.metrics[path] = Metric(
962 stattype,
963 path,
964 counter_info['description'],
965 label_names,
966 )
967 self.metrics[path].set(value, labels)
968
969 self.get_rbd_stats()
970
971 _end_time = time.time()
972 self.metrics['scrape_duration_seconds'].set(_end_time - _start_time)
973
974 # Return formatted metrics and clear no longer used data
975 _metrics = [m.str_expfmt() for m in self.metrics.values()]
976 for k in self.metrics.keys():
977 self.metrics[k].clear()
978
979 return ''.join(_metrics) + '\n'
980
981 def get_file_sd_config(self):
982 servers = self.list_servers()
983 targets = []
984 for server in servers:
985 hostname = server.get('hostname', '')
986 for service in server.get('services', []):
987 if service['type'] != 'mgr':
988 continue
989 id_ = service['id']
990 # get port for prometheus module at mgr with id_
991 # TODO use get_config_prefix or get_config here once
992 # https://github.com/ceph/ceph/pull/20458 is merged
993 result = CommandResult("")
994 global_instance().send_command(
995 result, "mon", '',
996 json.dumps({
997 "prefix": "config-key get",
998 'key': "config/mgr/mgr/prometheus/{}/server_port".format(id_),
999 }),
1000 "")
1001 r, outb, outs = result.wait()
1002 if r != 0:
1003 global_instance().log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
1004 targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
1005 else:
1006 port = json.loads(outb)
1007 targets.append('{}:{}'.format(hostname, port))
1008
1009 ret = [
1010 {
1011 "targets": targets,
1012 "labels": {}
1013 }
1014 ]
1015 return 0, json.dumps(ret), ""
1016
1017 def self_test(self):
1018 self.collect()
1019 self.get_file_sd_config()
1020
1021 def handle_command(self, inbuf, cmd):
1022 if cmd['prefix'] == 'prometheus file_sd_config':
1023 return self.get_file_sd_config()
1024 else:
1025 return (-errno.EINVAL, '',
1026 "Command not found '{0}'".format(cmd['prefix']))
1027
1028 def serve(self):
1029
1030 class Root(object):
1031
1032 # collapse everything to '/'
1033 def _cp_dispatch(self, vpath):
1034 cherrypy.request.path = ''
1035 return self
1036
1037 @cherrypy.expose
1038 def index(self):
1039 return '''<!DOCTYPE html>
1040 <html>
1041 <head><title>Ceph Exporter</title></head>
1042 <body>
1043 <h1>Ceph Exporter</h1>
1044 <p><a href='/metrics'>Metrics</a></p>
1045 </body>
1046 </html>'''
1047
1048 @cherrypy.expose
1049 def metrics(self):
1050 instance = global_instance()
1051 # Lock the function execution
1052 try:
1053 instance.collect_lock.acquire()
1054 return self._metrics(instance)
1055 finally:
1056 instance.collect_lock.release()
1057
1058 @staticmethod
1059 def _metrics(instance):
1060 # Return cached data if available and collected before the cache times out
1061 if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
1062 cherrypy.response.headers['Content-Type'] = 'text/plain'
1063 return instance.collect_cache
1064
1065 if instance.have_mon_connection():
1066 instance.collect_cache = None
1067 instance.collect_time = time.time()
1068 instance.collect_cache = instance.collect()
1069 cherrypy.response.headers['Content-Type'] = 'text/plain'
1070 return instance.collect_cache
1071 else:
1072 raise cherrypy.HTTPError(503, 'No MON connection')
1073
1074 # Make the cache timeout for collecting configurable
1075 self.collect_timeout = self.get_localized_module_option(
1076 'scrape_interval', 5.0)
1077
1078 server_addr = self.get_localized_module_option(
1079 'server_addr', get_default_addr())
1080 server_port = self.get_localized_module_option(
1081 'server_port', DEFAULT_PORT)
1082 self.log.info(
1083 "server_addr: %s server_port: %s" %
1084 (server_addr, server_port)
1085 )
1086
1087 # Publish the URI that others may use to access the service we're
1088 # about to start serving
1089 self.set_uri('http://{0}:{1}/'.format(
1090 socket.getfqdn() if server_addr == '::' else server_addr,
1091 server_port
1092 ))
1093
1094 cherrypy.config.update({
1095 'server.socket_host': server_addr,
1096 'server.socket_port': int(server_port),
1097 'engine.autoreload.on': False
1098 })
1099 cherrypy.tree.mount(Root(), "/")
1100 self.log.info('Starting engine...')
1101 cherrypy.engine.start()
1102 self.log.info('Engine started.')
1103 # wait for the shutdown event
1104 self.shutdown_event.wait()
1105 self.shutdown_event.clear()
1106 cherrypy.engine.stop()
1107 self.log.info('Engine stopped.')
1108 self.shutdown_rbd_stats()
1109
1110 def shutdown(self):
1111 self.log.info('Stopping engine...')
1112 self.shutdown_event.set()
1113
1114
1115 class StandbyModule(MgrStandbyModule):
1116 def __init__(self, *args, **kwargs):
1117 super(StandbyModule, self).__init__(*args, **kwargs)
1118 self.shutdown_event = threading.Event()
1119
1120 def serve(self):
1121 server_addr = self.get_localized_module_option(
1122 'server_addr', get_default_addr())
1123 server_port = self.get_localized_module_option(
1124 'server_port', DEFAULT_PORT)
1125 self.log.info("server_addr: %s server_port: %s" %
1126 (server_addr, server_port))
1127 cherrypy.config.update({
1128 'server.socket_host': server_addr,
1129 'server.socket_port': int(server_port),
1130 'engine.autoreload.on': False
1131 })
1132
1133 module = self
1134
1135 class Root(object):
1136 @cherrypy.expose
1137 def index(self):
1138 active_uri = module.get_active_uri()
1139 return '''<!DOCTYPE html>
1140 <html>
1141 <head><title>Ceph Exporter</title></head>
1142 <body>
1143 <h1>Ceph Exporter</h1>
1144 <p><a href='{}metrics'>Metrics</a></p>
1145 </body>
1146 </html>'''.format(active_uri)
1147
1148 @cherrypy.expose
1149 def metrics(self):
1150 cherrypy.response.headers['Content-Type'] = 'text/plain'
1151 return ''
1152
1153 cherrypy.tree.mount(Root(), '/', {})
1154 self.log.info('Starting engine...')
1155 cherrypy.engine.start()
1156 self.log.info('Engine started.')
1157 # Wait for shutdown event
1158 self.shutdown_event.wait()
1159 self.shutdown_event.clear()
1160 cherrypy.engine.stop()
1161 self.log.info('Engine stopped.')
1162
1163 def shutdown(self):
1164 self.log.info("Stopping engine...")
1165 self.shutdown_event.set()
1166 self.log.info("Stopped engine")