]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
f3068250fd9fd023e33fdb2b214feff98817d916
2 from collections
import defaultdict
3 from distutils
.version
import StrictVersion
12 from mgr_module
import MgrModule
, MgrStandbyModule
, PG_STATES
13 from mgr_util
import get_default_addr
, profile_method
15 from collections
import namedtuple
17 from typing
import DefaultDict
, Optional
, Dict
, Any
, Set
21 # Defaults for the Prometheus HTTP server. Can also set in config-key
22 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
23 # for Prometheus exporter port registry
27 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
28 # that the ports its listening on are in fact bound. When using the any address
29 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
30 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
32 if cherrypy
is not None:
33 v
= StrictVersion(cherrypy
.__version
__)
34 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
35 # centos:7) and back to at least 3.0.0.
36 if StrictVersion("3.1.2") <= v
< StrictVersion("3.2.3"):
37 # https://github.com/cherrypy/cherrypy/issues/1100
38 from cherrypy
.process
import servers
39 servers
.wait_for_occupied_port
= lambda host
, port
: None
42 # cherrypy likes to sys.exit on error. don't let it take us down too!
43 def os_exit_noop(*args
, **kwargs
):
47 os
._exit
= os_exit_noop
49 # to access things in class Module from subclass Root. Because
50 # it's a dict, the writer doesn't need to declare 'global' for access
52 _global_instance
= None # type: Optional[Module]
55 def health_status_to_number(status
):
56 if status
== 'HEALTH_OK':
58 elif status
== 'HEALTH_WARN':
60 elif status
== 'HEALTH_ERR':
64 DF_CLUSTER
= ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
66 DF_POOL
= ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
67 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes',
68 'compress_bytes_used', 'compress_under_bytes']
70 OSD_POOL_STATS
= ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
71 'recovering_keys_per_sec', 'num_objects_recovered',
72 'num_bytes_recovered', 'num_bytes_recovered')
74 OSD_FLAGS
= ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
75 'norecover', 'noscrub', 'nodeep-scrub')
77 FS_METADATA
= ('data_pools', 'fs_id', 'metadata_pool', 'name')
79 MDS_METADATA
= ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
82 MON_METADATA
= ('ceph_daemon', 'hostname',
83 'public_addr', 'rank', 'ceph_version')
85 MGR_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version')
87 MGR_STATUS
= ('ceph_daemon',)
89 MGR_MODULE_STATUS
= ('name',)
91 MGR_MODULE_CAN_RUN
= ('name',)
93 OSD_METADATA
= ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
94 'front_iface', 'hostname', 'objectstore', 'public_addr',
97 OSD_STATUS
= ['weight', 'up', 'in']
99 OSD_STATS
= ['apply_latency_ms', 'commit_latency_ms']
101 POOL_METADATA
= ('pool_id', 'name')
103 RGW_METADATA
= ('ceph_daemon', 'hostname', 'ceph_version')
105 RBD_MIRROR_METADATA
= ('ceph_daemon', 'id', 'instance_id', 'hostname',
108 DISK_OCCUPATION
= ('ceph_daemon', 'device', 'db_device',
109 'wal_device', 'instance')
111 NUM_OBJECTS
= ['degraded', 'misplaced', 'unfound']
113 alert_metric
= namedtuple('alert_metric', 'name description')
115 alert_metric('SLOW_OPS', 'OSD or Monitor requests taking a long time to process' ),
119 class Metric(object):
120 def __init__(self
, mtype
, name
, desc
, labels
=None):
124 self
.labelnames
= labels
# tuple if present
125 self
.value
= {} # indexed by label values
130 def set(self
, value
, labelvalues
=None):
131 # labelvalues must be a tuple
132 labelvalues
= labelvalues
or ('',)
133 self
.value
[labelvalues
] = value
135 def str_expfmt(self
):
137 def promethize(path
):
138 ''' replace illegal metric name characters '''
139 result
= re
.sub(r
'[./\s]|::', '_', path
).replace('+', '_plus')
141 # Hyphens usually turn into underscores, unless they are
143 if result
.endswith("-"):
144 result
= result
[0:-1] + "_minus"
146 result
= result
.replace("-", "_")
148 return "ceph_{0}".format(result
)
151 ''' represent as Go-compatible float '''
152 if value
== float('inf'):
154 if value
== float('-inf'):
156 if math
.isnan(value
):
158 return repr(float(value
))
160 name
= promethize(self
.name
)
163 # TYPE {name} {mtype}'''.format(
169 for labelvalues
, value
in self
.value
.items():
171 labels_list
= zip(self
.labelnames
, labelvalues
)
172 labels
= ','.join('%s="%s"' % (k
, v
) for k
, v
in labels_list
)
176 fmtstr
= '\n{name}{{{labels}}} {value}'
178 fmtstr
= '\n{name} {value}'
179 expfmt
+= fmtstr
.format(
182 value
=floatstr(value
),
187 class MetricCollectionThread(threading
.Thread
):
188 def __init__(self
, module
):
189 # type: (Module) -> None
192 self
.event
= threading
.Event()
193 super(MetricCollectionThread
, self
).__init
__(target
=self
.collect
)
196 self
.mod
.log
.info('starting metric collection thread')
198 self
.mod
.log
.debug('collecting cache in thread')
199 if self
.mod
.have_mon_connection():
200 start_time
= time
.time()
203 data
= self
.mod
.collect()
205 # Log any issues encountered during the data collection and continue
206 self
.mod
.log
.exception("failed to collect metrics:")
207 self
.event
.wait(self
.mod
.scrape_interval
)
210 duration
= time
.time() - start_time
211 self
.mod
.log
.debug('collecting cache in thread done')
213 sleep_time
= self
.mod
.scrape_interval
- duration
215 self
.mod
.log
.warning(
216 'Collecting data took more time than configured scrape interval. '
217 'This possibly results in stale data. Please check the '
218 '`stale_cache_strategy` configuration option. '
219 'Collecting data took {:.2f} seconds but scrape interval is configured '
220 'to be {:.0f} seconds.'.format(
222 self
.mod
.scrape_interval
,
227 with self
.mod
.collect_lock
:
228 self
.mod
.collect_cache
= data
229 self
.mod
.collect_time
= duration
231 self
.event
.wait(sleep_time
)
233 self
.mod
.log
.error('No MON connection')
234 self
.event
.wait(self
.mod
.scrape_interval
)
240 class Module(MgrModule
):
243 "cmd": "prometheus file_sd_config",
244 "desc": "Return file_sd compatible prometheus config for mgr cluster",
250 {'name': 'server_addr'},
251 {'name': 'server_port'},
252 {'name': 'scrape_interval'},
253 {'name': 'stale_cache_strategy'},
254 {'name': 'rbd_stats_pools'},
255 {'name': 'rbd_stats_pools_refresh_interval', 'type': 'int', 'default': 300},
258 STALE_CACHE_FAIL
= 'fail'
259 STALE_CACHE_RETURN
= 'return'
261 def __init__(self
, *args
, **kwargs
):
262 super(Module
, self
).__init
__(*args
, **kwargs
)
263 self
.metrics
= self
._setup
_static
_metrics
()
264 self
.shutdown_event
= threading
.Event()
265 self
.collect_lock
= threading
.Lock()
266 self
.collect_time
= 0.0
267 self
.scrape_interval
= 15.0
268 self
.stale_cache_strategy
= self
.STALE_CACHE_FAIL
269 self
.collect_cache
= None
272 'pools_refresh_time': 0,
274 'write_ops': {'type': self
.PERFCOUNTER_COUNTER
,
275 'desc': 'RBD image writes count'},
276 'read_ops': {'type': self
.PERFCOUNTER_COUNTER
,
277 'desc': 'RBD image reads count'},
278 'write_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
279 'desc': 'RBD image bytes written'},
280 'read_bytes': {'type': self
.PERFCOUNTER_COUNTER
,
281 'desc': 'RBD image bytes read'},
282 'write_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
283 'desc': 'RBD image writes latency (msec)'},
284 'read_latency': {'type': self
.PERFCOUNTER_LONGRUNAVG
,
285 'desc': 'RBD image reads latency (msec)'},
287 } # type: Dict[str, Any]
288 global _global_instance
289 _global_instance
= self
290 self
.metrics_thread
= MetricCollectionThread(_global_instance
)
292 def _setup_static_metrics(self
):
294 metrics
['health_status'] = Metric(
297 'Cluster health status'
299 metrics
['mon_quorum_status'] = Metric(
302 'Monitors in quorum',
305 metrics
['fs_metadata'] = Metric(
311 metrics
['mds_metadata'] = Metric(
317 metrics
['mon_metadata'] = Metric(
323 metrics
['mgr_metadata'] = Metric(
329 metrics
['mgr_status'] = Metric(
332 'MGR status (0=standby, 1=active)',
335 metrics
['mgr_module_status'] = Metric(
338 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
341 metrics
['mgr_module_can_run'] = Metric(
343 'mgr_module_can_run',
344 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
347 metrics
['osd_metadata'] = Metric(
354 # The reason for having this separate to OSD_METADATA is
355 # so that we can stably use the same tag names that
356 # the Prometheus node_exporter does
357 metrics
['disk_occupation'] = Metric(
360 'Associate Ceph daemon with disk used',
364 metrics
['pool_metadata'] = Metric(
371 metrics
['rgw_metadata'] = Metric(
378 metrics
['rbd_mirror_metadata'] = Metric(
380 'rbd_mirror_metadata',
381 'RBD Mirror Metadata',
385 metrics
['pg_total'] = Metric(
388 'PG Total Count per Pool',
392 for flag
in OSD_FLAGS
:
393 path
= 'osd_flag_{}'.format(flag
)
394 metrics
[path
] = Metric(
397 'OSD Flag {}'.format(flag
)
399 for state
in OSD_STATUS
:
400 path
= 'osd_{}'.format(state
)
401 metrics
[path
] = Metric(
404 'OSD status {}'.format(state
),
407 for stat
in OSD_STATS
:
408 path
= 'osd_{}'.format(stat
)
409 metrics
[path
] = Metric(
412 'OSD stat {}'.format(stat
),
415 for stat
in OSD_POOL_STATS
:
416 path
= 'pool_{}'.format(stat
)
417 metrics
[path
] = Metric(
420 "OSD pool stats: {}".format(stat
),
423 for state
in PG_STATES
:
424 path
= 'pg_{}'.format(state
)
425 metrics
[path
] = Metric(
428 'PG {} per pool'.format(state
),
431 for state
in DF_CLUSTER
:
432 path
= 'cluster_{}'.format(state
)
433 metrics
[path
] = Metric(
436 'DF {}'.format(state
),
438 for state
in DF_POOL
:
439 path
= 'pool_{}'.format(state
)
440 metrics
[path
] = Metric(
443 'DF pool {}'.format(state
),
446 for state
in NUM_OBJECTS
:
447 path
= 'num_objects_{}'.format(state
)
448 metrics
[path
] = Metric(
451 'Number of {} objects'.format(state
),
454 for check
in HEALTH_CHECKS
:
455 path
= 'healthcheck_{}'.format(check
.name
.lower())
456 metrics
[path
] = Metric(
465 def get_health(self
):
467 def _get_value(message
, delim
=' ', word_pos
=0):
468 """Extract value from message (default is 1st field)"""
469 v_str
= message
.split(delim
)[word_pos
]
474 health
= json
.loads(self
.get('health')['json'])
476 self
.metrics
['health_status'].set(
477 health_status_to_number(health
['status'])
480 # Examine the health to see if any health checks triggered need to
482 active_healthchecks
= health
.get('checks', {})
483 active_names
= active_healthchecks
.keys()
485 for check
in HEALTH_CHECKS
:
486 path
= 'healthcheck_{}'.format(check
.name
.lower())
488 if path
in self
.metrics
:
490 if check
.name
in active_names
:
491 check_data
= active_healthchecks
[check
.name
]
492 message
= check_data
['summary'].get('message', '')
495 if check
.name
== "SLOW_OPS":
496 # 42 slow ops, oldest one blocked for 12 sec, daemons [osd.0, osd.3] have slow ops.
497 v
, err
= _get_value(message
)
500 self
.log
.error("healthcheck {} message format is incompatible and has been dropped".format(check
.name
))
501 # drop the metric, so it's no longer emitted
502 del self
.metrics
[path
]
505 self
.metrics
[path
].set(v
)
507 # health check is not active, so give it a default of 0
508 self
.metrics
[path
].set(0)
511 def get_pool_stats(self
):
512 # retrieve pool stats to provide per pool recovery metrics
513 # (osd_pool_stats moved to mgr in Mimic)
514 pstats
= self
.get('osd_pool_stats')
515 for pool
in pstats
['pool_stats']:
516 for stat
in OSD_POOL_STATS
:
517 self
.metrics
['pool_{}'.format(stat
)].set(
518 pool
['recovery_rate'].get(stat
, 0),
524 # maybe get the to-be-exported metrics from a config?
526 for stat
in DF_CLUSTER
:
527 self
.metrics
['cluster_{}'.format(stat
)].set(df
['stats'][stat
])
529 for pool
in df
['pools']:
531 self
.metrics
['pool_{}'.format(stat
)].set(
538 fs_map
= self
.get('fs_map')
539 servers
= self
.get_service_list()
540 self
.log
.debug('standbys: {}'.format(fs_map
['standbys']))
541 # export standby mds metadata, default standby fs_id is '-1'
542 for standby
in fs_map
['standbys']:
543 id_
= standby
['name']
544 host_version
= servers
.get((id_
, 'mds'), ('', ''))
545 self
.metrics
['mds_metadata'].set(1, (
546 'mds.{}'.format(id_
), '-1',
547 host_version
[0], standby
['addr'],
548 standby
['rank'], host_version
[1]
550 for fs
in fs_map
['filesystems']:
551 # collect fs metadata
552 data_pools
= ",".join([str(pool
)
553 for pool
in fs
['mdsmap']['data_pools']])
554 self
.metrics
['fs_metadata'].set(1, (
557 fs
['mdsmap']['metadata_pool'],
558 fs
['mdsmap']['fs_name']
560 self
.log
.debug('mdsmap: {}'.format(fs
['mdsmap']))
561 for gid
, daemon
in fs
['mdsmap']['info'].items():
563 host_version
= servers
.get((id_
, 'mds'), ('', ''))
564 self
.metrics
['mds_metadata'].set(1, (
565 'mds.{}'.format(id_
), fs
['id'],
566 host_version
[0], daemon
['addr'],
567 daemon
['rank'], host_version
[1]
571 def get_quorum_status(self
):
572 mon_status
= json
.loads(self
.get('mon_status')['json'])
573 servers
= self
.get_service_list()
574 for mon
in mon_status
['monmap']['mons']:
577 host_version
= servers
.get((id_
, 'mon'), ('', ''))
578 self
.metrics
['mon_metadata'].set(1, (
579 'mon.{}'.format(id_
), host_version
[0],
580 mon
['public_addr'].rsplit(':', 1)[0], rank
,
583 in_quorum
= int(rank
in mon_status
['quorum'])
584 self
.metrics
['mon_quorum_status'].set(in_quorum
, (
585 'mon.{}'.format(id_
),
589 def get_mgr_status(self
):
590 mgr_map
= self
.get('mgr_map')
591 servers
= self
.get_service_list()
593 active
= mgr_map
['active_name']
594 standbys
= [s
.get('name') for s
in mgr_map
['standbys']]
596 all_mgrs
= list(standbys
)
597 all_mgrs
.append(active
)
599 all_modules
= {module
.get('name'):module
.get('can_run') for module
in mgr_map
['available_modules']}
602 host_version
= servers
.get((mgr
, 'mgr'), ('', ''))
608 self
.metrics
['mgr_metadata'].set(1, (
609 'mgr.{}'.format(mgr
), host_version
[0],
612 self
.metrics
['mgr_status'].set(_state
, (
613 'mgr.{}'.format(mgr
),
615 always_on_modules
= mgr_map
['always_on_modules'].get(self
.release_name
, [])
616 active_modules
= list(always_on_modules
)
617 active_modules
.extend(mgr_map
['modules'])
619 for mod_name
in all_modules
.keys():
621 if mod_name
in always_on_modules
:
623 elif mod_name
in active_modules
:
628 _can_run
= 1 if all_modules
[mod_name
] else 0
629 self
.metrics
['mgr_module_status'].set(_state
, (mod_name
,))
630 self
.metrics
['mgr_module_can_run'].set(_can_run
, (mod_name
,))
633 def get_pg_status(self
):
635 pg_summary
= self
.get('pg_summary')
637 for pool
in pg_summary
['by_pool']:
638 num_by_state
= defaultdict(int) # type: DefaultDict[str, int]
640 for state_name
, count
in pg_summary
['by_pool'][pool
].items():
641 for state
in state_name
.split('+'):
642 num_by_state
[state
] += count
643 num_by_state
['total'] += count
645 for state
, num
in num_by_state
.items():
647 self
.metrics
["pg_{}".format(state
)].set(num
, (pool
,))
649 self
.log
.warning("skipping pg in unknown state {}".format(state
))
652 def get_osd_stats(self
):
653 osd_stats
= self
.get('osd_stats')
654 for osd
in osd_stats
['osd_stats']:
656 for stat
in OSD_STATS
:
657 val
= osd
['perf_stat'][stat
]
658 self
.metrics
['osd_{}'.format(stat
)].set(val
, (
659 'osd.{}'.format(id_
),
662 def get_service_list(self
):
664 for server
in self
.list_servers():
665 version
= server
.get('ceph_version', '')
666 host
= server
.get('hostname', '')
667 for service
in server
.get('services', []):
668 ret
.update({(service
['id'], service
['type']): (host
, version
)})
672 def get_metadata_and_osd_status(self
):
673 osd_map
= self
.get('osd_map')
674 osd_flags
= osd_map
['flags'].split(',')
675 for flag
in OSD_FLAGS
:
676 self
.metrics
['osd_flag_{}'.format(flag
)].set(
677 int(flag
in osd_flags
)
680 osd_devices
= self
.get('osd_map_crush')['devices']
681 servers
= self
.get_service_list()
682 for osd
in osd_map
['osds']:
683 # id can be used to link osd metrics and metadata
685 # collect osd metadata
686 p_addr
= osd
['public_addr'].rsplit(':', 1)[0]
687 c_addr
= osd
['cluster_addr'].rsplit(':', 1)[0]
688 if p_addr
== "-" or c_addr
== "-":
690 "Missing address metadata for osd {0}, skipping occupation"
691 " and metadata records for this osd".format(id_
)
696 for osd_device
in osd_devices
:
697 if osd_device
['id'] == id_
:
698 dev_class
= osd_device
.get('class', '')
701 if dev_class
is None:
702 self
.log
.info("OSD {0} is missing from CRUSH map, "
703 "skipping output".format(id_
))
706 host_version
= servers
.get((str(id_
), 'osd'), ('', ''))
708 # collect disk occupation metadata
709 osd_metadata
= self
.get_metadata("osd", str(id_
))
710 if osd_metadata
is None:
713 obj_store
= osd_metadata
.get('osd_objectstore', '')
714 f_iface
= osd_metadata
.get('front_iface', '')
715 b_iface
= osd_metadata
.get('back_iface', '')
717 self
.metrics
['osd_metadata'].set(1, (
719 'osd.{}'.format(id_
),
730 for state
in OSD_STATUS
:
732 self
.metrics
['osd_{}'.format(state
)].set(status
, (
733 'osd.{}'.format(id_
),
737 if obj_store
== "filestore":
738 # collect filestore backend device
739 osd_dev_node
= osd_metadata
.get(
740 'backend_filestore_dev_node', None)
741 # collect filestore journal device
742 osd_wal_dev_node
= osd_metadata
.get('osd_journal', '')
744 elif obj_store
== "bluestore":
745 # collect bluestore backend device
746 osd_dev_node
= osd_metadata
.get(
747 'bluestore_bdev_dev_node', None)
748 # collect bluestore wal backend
749 osd_wal_dev_node
= osd_metadata
.get('bluefs_wal_dev_node', '')
750 # collect bluestore db backend
751 osd_db_dev_node
= osd_metadata
.get('bluefs_db_dev_node', '')
752 if osd_dev_node
and osd_dev_node
== "unknown":
755 osd_hostname
= osd_metadata
.get('hostname', None)
756 if osd_dev_node
and osd_hostname
:
757 self
.log
.debug("Got dev for osd {0}: {1}/{2}".format(
758 id_
, osd_hostname
, osd_dev_node
))
759 self
.metrics
['disk_occupation'].set(1, (
760 "osd.{0}".format(id_
),
767 self
.log
.info("Missing dev node metadata for osd {0}, skipping "
768 "occupation record for this osd".format(id_
))
770 for pool
in osd_map
['pools']:
771 self
.metrics
['pool_metadata'].set(
772 1, (pool
['pool'], pool
['pool_name']))
774 # Populate other servers metadata
775 for key
, value
in servers
.items():
776 service_id
, service_type
= key
777 if service_type
== 'rgw':
778 hostname
, version
= value
779 self
.metrics
['rgw_metadata'].set(
781 ('{}.{}'.format(service_type
, service_id
),
784 elif service_type
== 'rbd-mirror':
785 mirror_metadata
= self
.get_metadata('rbd-mirror', service_id
)
786 if mirror_metadata
is None:
788 mirror_metadata
['ceph_daemon'] = '{}.{}'.format(service_type
,
790 self
.metrics
['rbd_mirror_metadata'].set(
791 1, (mirror_metadata
.get(k
, '')
792 for k
in RBD_MIRROR_METADATA
)
796 def get_num_objects(self
):
797 pg_sum
= self
.get('pg_summary')['pg_stats_sum']['stat_sum']
798 for obj
in NUM_OBJECTS
:
799 stat
= 'num_objects_{}'.format(obj
)
800 self
.metrics
[stat
].set(pg_sum
[stat
])
803 def get_rbd_stats(self
):
804 # Per RBD image stats is collected by registering a dynamic osd perf
805 # stats query that tells OSDs to group stats for requests associated
806 # with RBD objects by pool, namespace, and image id, which are
807 # extracted from the request object names or other attributes.
808 # The RBD object names have the following prefixes:
809 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
810 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
811 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
812 # The pool_id in the object name is the id of the pool with the image
813 # metdata, and should be used in the image spec. If there is no pool_id
814 # in the object name, the image pool is the pool where the object is
817 # Parse rbd_stats_pools option, which is a comma or space separated
818 # list of pool[/namespace] entries. If no namespace is specifed the
819 # stats are collected for every namespace in the pool. The wildcard
820 # '*' can be used to indicate all pools or namespaces
821 pools_string
= self
.get_localized_module_option('rbd_stats_pools', '')
823 for x
in re
.split('[\s,]+', pools_string
):
829 namespace_name
= None
831 namespace_name
= s
[1]
834 # collect for all pools
835 osd_map
= self
.get('osd_map')
836 for pool
in osd_map
['pools']:
837 if 'rbd' not in pool
.get('application_metadata', {}):
839 pool_keys
.append((pool
['pool_name'], namespace_name
))
841 pool_keys
.append((pool_name
, namespace_name
))
843 pools
= {} # type: Dict[str, Set[str]]
844 for pool_key
in pool_keys
:
845 pool_name
= pool_key
[0]
846 namespace_name
= pool_key
[1]
847 if not namespace_name
or namespace_name
== "*":
848 # empty set means collect for all namespaces
849 pools
[pool_name
] = set()
852 if pool_name
not in pools
:
853 pools
[pool_name
] = set()
854 elif not pools
[pool_name
]:
856 pools
[pool_name
].add(namespace_name
)
859 for pool_id
in self
.rbd_stats
['pools'].keys():
860 name
= self
.rbd_stats
['pools'][pool_id
]['name']
861 if name
not in pools
:
862 del self
.rbd_stats
['pools'][pool_id
]
864 rbd_stats_pools
[name
] = \
865 self
.rbd_stats
['pools'][pool_id
]['ns_names']
867 pools_refreshed
= False
869 next_refresh
= self
.rbd_stats
['pools_refresh_time'] + \
870 self
.get_localized_module_option(
871 'rbd_stats_pools_refresh_interval', 300)
872 if rbd_stats_pools
!= pools
or time
.time() >= next_refresh
:
873 self
.refresh_rbd_stats_pools(pools
)
874 pools_refreshed
= True
876 pool_ids
= list(self
.rbd_stats
['pools'])
878 pool_id_regex
= '^(' + '|'.join([str(x
) for x
in pool_ids
]) + ')$'
881 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
883 nspace_names
.extend(pool
['ns_names'])
888 namespace_regex
= '^(' + \
889 "|".join([re
.escape(x
)
890 for x
in set(nspace_names
)]) + ')$'
892 namespace_regex
= '^(.*)$'
894 if 'query' in self
.rbd_stats
and \
895 (pool_id_regex
!= self
.rbd_stats
['query']['key_descriptor'][0]['regex'] or
896 namespace_regex
!= self
.rbd_stats
['query']['key_descriptor'][1]['regex']):
897 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
898 del self
.rbd_stats
['query_id']
899 del self
.rbd_stats
['query']
901 if not self
.rbd_stats
['pools']:
904 counters_info
= self
.rbd_stats
['counters_info']
906 if 'query_id' not in self
.rbd_stats
:
909 {'type': 'pool_id', 'regex': pool_id_regex
},
910 {'type': 'namespace', 'regex': namespace_regex
},
911 {'type': 'object_name',
912 'regex': '^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
914 'performance_counter_descriptors': list(counters_info
),
916 query_id
= self
.add_osd_perf_query(query
)
918 self
.log
.error('failed to add query %s' % query
)
920 self
.rbd_stats
['query'] = query
921 self
.rbd_stats
['query_id'] = query_id
923 res
= self
.get_osd_perf_counters(self
.rbd_stats
['query_id'])
924 for c
in res
['counters']:
925 # if the pool id is not found in the object name use id of the
926 # pool where the object is located
928 pool_id
= int(c
['k'][2][0])
930 pool_id
= int(c
['k'][0][0])
931 if pool_id
not in self
.rbd_stats
['pools'] and not pools_refreshed
:
932 self
.refresh_rbd_stats_pools(pools
)
933 pools_refreshed
= True
934 if pool_id
not in self
.rbd_stats
['pools']:
936 pool
= self
.rbd_stats
['pools'][pool_id
]
937 nspace_name
= c
['k'][1][0]
938 if nspace_name
not in pool
['images']:
940 image_id
= c
['k'][2][1]
941 if image_id
not in pool
['images'][nspace_name
] and \
943 self
.refresh_rbd_stats_pools(pools
)
944 pool
= self
.rbd_stats
['pools'][pool_id
]
945 pools_refreshed
= True
946 if image_id
not in pool
['images'][nspace_name
]:
948 counters
= pool
['images'][nspace_name
][image_id
]['c']
949 for i
in range(len(c
['c'])):
950 counters
[i
][0] += c
['c'][i
][0]
951 counters
[i
][1] += c
['c'][i
][1]
953 label_names
= ("pool", "namespace", "image")
954 for pool_id
, pool
in self
.rbd_stats
['pools'].items():
955 pool_name
= pool
['name']
956 for nspace_name
, images
in pool
['images'].items():
957 for image_id
in images
:
958 image_name
= images
[image_id
]['n']
959 counters
= images
[image_id
]['c']
961 for key
in counters_info
:
962 counter_info
= counters_info
[key
]
963 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
964 labels
= (pool_name
, nspace_name
, image_name
)
965 if counter_info
['type'] == self
.PERFCOUNTER_COUNTER
:
967 if path
not in self
.metrics
:
968 self
.metrics
[path
] = Metric(
971 counter_info
['desc'],
974 self
.metrics
[path
].set(counters
[i
][0], labels
)
975 elif counter_info
['type'] == self
.PERFCOUNTER_LONGRUNAVG
:
976 path
= 'rbd_' + key
+ '_sum'
977 if path
not in self
.metrics
:
978 self
.metrics
[path
] = Metric(
981 counter_info
['desc'] + ' Total',
984 self
.metrics
[path
].set(counters
[i
][0], labels
)
985 path
= 'rbd_' + key
+ '_count'
986 if path
not in self
.metrics
:
987 self
.metrics
[path
] = Metric(
990 counter_info
['desc'] + ' Count',
993 self
.metrics
[path
].set(counters
[i
][1], labels
)
996 def refresh_rbd_stats_pools(self
, pools
):
997 self
.log
.debug('refreshing rbd pools %s' % (pools
))
1000 counters_info
= self
.rbd_stats
['counters_info']
1001 for pool_name
, cfg_ns_names
in pools
.items():
1003 pool_id
= self
.rados
.pool_lookup(pool_name
)
1004 with self
.rados
.open_ioctx(pool_name
) as ioctx
:
1005 if pool_id
not in self
.rbd_stats
['pools']:
1006 self
.rbd_stats
['pools'][pool_id
] = {'images': {}}
1007 pool
= self
.rbd_stats
['pools'][pool_id
]
1008 pool
['name'] = pool_name
1009 pool
['ns_names'] = cfg_ns_names
1011 nspace_names
= list(cfg_ns_names
)
1013 nspace_names
= [''] + rbd
.namespace_list(ioctx
)
1014 for nspace_name
in pool
['images']:
1015 if nspace_name
not in nspace_names
:
1016 del pool
['images'][nspace_name
]
1017 for nspace_name
in nspace_names
:
1019 not rbd
.namespace_exists(ioctx
, nspace_name
)):
1020 self
.log
.debug('unknown namespace %s for pool %s' %
1021 (nspace_name
, pool_name
))
1023 ioctx
.set_namespace(nspace_name
)
1024 if nspace_name
not in pool
['images']:
1025 pool
['images'][nspace_name
] = {}
1026 namespace
= pool
['images'][nspace_name
]
1028 for image_meta
in RBD().list2(ioctx
):
1029 image
= {'n': image_meta
['name']}
1030 image_id
= image_meta
['id']
1031 if image_id
in namespace
:
1032 image
['c'] = namespace
[image_id
]['c']
1034 image
['c'] = [[0, 0] for x
in counters_info
]
1035 images
[image_id
] = image
1036 pool
['images'][nspace_name
] = images
1037 except Exception as e
:
1038 self
.log
.error('failed listing pool %s: %s' % (pool_name
, e
))
1039 self
.rbd_stats
['pools_refresh_time'] = time
.time()
1041 def shutdown_rbd_stats(self
):
1042 if 'query_id' in self
.rbd_stats
:
1043 self
.remove_osd_perf_query(self
.rbd_stats
['query_id'])
1044 del self
.rbd_stats
['query_id']
1045 del self
.rbd_stats
['query']
1046 self
.rbd_stats
['pools'].clear()
1048 def add_fixed_name_metrics(self
):
1050 Add fixed name metrics from existing ones that have details in their names
1051 that should be in labels (not in name).
1052 For backward compatibility, a new fixed name metric is created (instead of replacing)
1053 and details are put in new labels.
1054 Intended for RGW sync perf. counters but extendable as required.
1055 See: https://tracker.ceph.com/issues/45311
1058 for metric_path
in self
.metrics
.keys():
1059 # Address RGW sync perf. counters.
1060 match
= re
.search('^data-sync-from-(.*)\.', metric_path
)
1062 new_path
= re
.sub('from-([^.]*)', 'from-zone', metric_path
)
1063 if new_path
not in new_metrics
:
1064 new_metrics
[new_path
] = Metric(
1065 self
.metrics
[metric_path
].mtype
,
1067 self
.metrics
[metric_path
].desc
,
1068 self
.metrics
[metric_path
].labelnames
+ ('source_zone',)
1070 for label_values
, value
in self
.metrics
[metric_path
].value
.items():
1071 new_metrics
[new_path
].set(value
, label_values
+ (match
.group(1),))
1073 self
.metrics
.update(new_metrics
)
1075 @profile_method(True)
1077 # Clear the metrics before scraping
1078 for k
in self
.metrics
.keys():
1079 self
.metrics
[k
].clear()
1083 self
.get_pool_stats()
1085 self
.get_osd_stats()
1086 self
.get_quorum_status()
1087 self
.get_mgr_status()
1088 self
.get_metadata_and_osd_status()
1089 self
.get_pg_status()
1090 self
.get_num_objects()
1092 for daemon
, counters
in self
.get_all_perf_counters().items():
1093 for path
, counter_info
in counters
.items():
1094 # Skip histograms, they are represented by long running avgs
1095 stattype
= self
._stattype
_to
_str
(counter_info
['type'])
1096 if not stattype
or stattype
== 'histogram':
1097 self
.log
.debug('ignoring %s, type %s' % (path
, stattype
))
1100 path
, label_names
, labels
= self
._perfpath
_to
_path
_labels
(
1103 # Get the value of the counter
1104 value
= self
._perfvalue
_to
_value
(
1105 counter_info
['type'], counter_info
['value'])
1107 # Represent the long running avgs as sum/count pairs
1108 if counter_info
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1109 _path
= path
+ '_sum'
1110 if _path
not in self
.metrics
:
1111 self
.metrics
[_path
] = Metric(
1114 counter_info
['description'] + ' Total',
1117 self
.metrics
[_path
].set(value
, labels
)
1119 _path
= path
+ '_count'
1120 if _path
not in self
.metrics
:
1121 self
.metrics
[_path
] = Metric(
1124 counter_info
['description'] + ' Count',
1127 self
.metrics
[_path
].set(counter_info
['count'], labels
,)
1129 if path
not in self
.metrics
:
1130 self
.metrics
[path
] = Metric(
1133 counter_info
['description'],
1136 self
.metrics
[path
].set(value
, labels
)
1138 self
.add_fixed_name_metrics()
1139 self
.get_rbd_stats()
1141 # Return formatted metrics and clear no longer used data
1142 _metrics
= [m
.str_expfmt() for m
in self
.metrics
.values()]
1143 for k
in self
.metrics
.keys():
1144 self
.metrics
[k
].clear()
1146 return ''.join(_metrics
) + '\n'
1148 def get_file_sd_config(self
):
1149 servers
= self
.list_servers()
1151 for server
in servers
:
1152 hostname
= server
.get('hostname', '')
1153 for service
in server
.get('services', []):
1154 if service
['type'] != 'mgr':
1157 port
= self
._get
_module
_option
('server_port', DEFAULT_PORT
, id_
)
1158 targets
.append(f
'{hostname}:{port}')
1165 return 0, json
.dumps(ret
), ""
1167 def self_test(self
):
1169 self
.get_file_sd_config()
1171 def handle_command(self
, inbuf
, cmd
):
1172 if cmd
['prefix'] == 'prometheus file_sd_config':
1173 return self
.get_file_sd_config()
1175 return (-errno
.EINVAL
, '',
1176 "Command not found '{0}'".format(cmd
['prefix']))
1182 # collapse everything to '/'
1183 def _cp_dispatch(self
, vpath
):
1184 cherrypy
.request
.path
= ''
1189 return '''<!DOCTYPE html>
1191 <head><title>Ceph Exporter</title></head>
1193 <h1>Ceph Exporter</h1>
1194 <p><a href='/metrics'>Metrics</a></p>
1200 # Lock the function execution
1201 assert isinstance(_global_instance
, Module
)
1202 with _global_instance
.collect_lock
:
1203 return self
._metrics
(_global_instance
)
1206 def _metrics(instance
):
1207 # type: (Module) -> Any
1208 # Return cached data if available
1209 if not instance
.collect_cache
:
1210 raise cherrypy
.HTTPError(503, 'No cached data available yet')
1213 assert isinstance(instance
, Module
)
1214 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1215 return instance
.collect_cache
1217 if instance
.collect_time
< instance
.scrape_interval
:
1218 # Respond if cache isn't stale
1221 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_RETURN
:
1222 # Respond even if cache is stale
1224 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1225 'returning metrics from stale cache.'.format(
1226 instance
.collect_time
,
1227 instance
.collect_time
- instance
.scrape_interval
1232 if instance
.stale_cache_strategy
== instance
.STALE_CACHE_FAIL
:
1233 # Fail if cache is stale
1235 'Gathering data took {:.2f} seconds, metrics are stale for {:.2f} seconds, '
1236 'returning "service unavailable".'.format(
1237 instance
.collect_time
,
1238 instance
.collect_time
- instance
.scrape_interval
,
1241 instance
.log
.error(msg
)
1242 raise cherrypy
.HTTPError(503, msg
)
1244 # Make the cache timeout for collecting configurable
1245 self
.scrape_interval
= float(self
.get_localized_module_option('scrape_interval', 15.0))
1247 self
.stale_cache_strategy
= self
.get_localized_module_option('stale_cache_strategy', 'log')
1248 if self
.stale_cache_strategy
not in [self
.STALE_CACHE_FAIL
,
1249 self
.STALE_CACHE_RETURN
]:
1250 self
.stale_cache_strategy
= self
.STALE_CACHE_FAIL
1252 server_addr
= self
.get_localized_module_option(
1253 'server_addr', get_default_addr())
1254 server_port
= self
.get_localized_module_option(
1255 'server_port', DEFAULT_PORT
)
1257 "server_addr: %s server_port: %s" %
1258 (server_addr
, server_port
)
1261 self
.metrics_thread
.start()
1263 # Publish the URI that others may use to access the service we're
1264 # about to start serving
1265 self
.set_uri('http://{0}:{1}/'.format(
1266 socket
.getfqdn() if server_addr
in ['::', '0.0.0.0'] else server_addr
,
1270 cherrypy
.config
.update({
1271 'server.socket_host': server_addr
,
1272 'server.socket_port': int(server_port
),
1273 'engine.autoreload.on': False
1275 cherrypy
.tree
.mount(Root(), "/")
1276 self
.log
.info('Starting engine...')
1277 cherrypy
.engine
.start()
1278 self
.log
.info('Engine started.')
1279 # wait for the shutdown event
1280 self
.shutdown_event
.wait()
1281 self
.shutdown_event
.clear()
1282 # tell metrics collection thread to stop collecting new metrics
1283 self
.metrics_thread
.stop()
1284 cherrypy
.engine
.stop()
1285 self
.log
.info('Engine stopped.')
1286 self
.shutdown_rbd_stats()
1287 # wait for the metrics collection thread to stop
1288 self
.metrics_thread
.join()
1291 self
.log
.info('Stopping engine...')
1292 self
.shutdown_event
.set()
1295 class StandbyModule(MgrStandbyModule
):
1296 def __init__(self
, *args
, **kwargs
):
1297 super(StandbyModule
, self
).__init
__(*args
, **kwargs
)
1298 self
.shutdown_event
= threading
.Event()
1301 server_addr
= self
.get_localized_module_option(
1302 'server_addr', get_default_addr())
1303 server_port
= self
.get_localized_module_option(
1304 'server_port', DEFAULT_PORT
)
1305 self
.log
.info("server_addr: %s server_port: %s" %
1306 (server_addr
, server_port
))
1307 cherrypy
.config
.update({
1308 'server.socket_host': server_addr
,
1309 'server.socket_port': int(server_port
),
1310 'engine.autoreload.on': False
1318 active_uri
= module
.get_active_uri()
1319 return '''<!DOCTYPE html>
1321 <head><title>Ceph Exporter</title></head>
1323 <h1>Ceph Exporter</h1>
1324 <p><a href='{}metrics'>Metrics</a></p>
1326 </html>'''.format(active_uri
)
1330 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
1333 cherrypy
.tree
.mount(Root(), '/', {})
1334 self
.log
.info('Starting engine...')
1335 cherrypy
.engine
.start()
1336 self
.log
.info('Engine started.')
1337 # Wait for shutdown event
1338 self
.shutdown_event
.wait()
1339 self
.shutdown_event
.clear()
1340 cherrypy
.engine
.stop()
1341 self
.log
.info('Engine stopped.')
1344 self
.log
.info("Stopping engine...")
1345 self
.shutdown_event
.set()
1346 self
.log
.info("Stopped engine")