]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 from distutils.version import StrictVersion
3 import json
4 import errno
5 import math
6 import os
7 import re
8 import socket
9 import threading
10 import time
11 from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
12 from rbd import RBD
13
14 # Defaults for the Prometheus HTTP server. Can also set in config-key
15 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
16 # for Prometheus exporter port registry
17
18 DEFAULT_ADDR = '::'
19 DEFAULT_PORT = 9283
20
21 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
22 # that the ports its listening on are in fact bound. When using the any address
23 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
24 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
25 # exception.
26 if cherrypy is not None:
27 v = StrictVersion(cherrypy.__version__)
28 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
29 # centos:7) and back to at least 3.0.0.
30 if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"):
31 # https://github.com/cherrypy/cherrypy/issues/1100
32 from cherrypy.process import servers
33 servers.wait_for_occupied_port = lambda host, port: None
34
35 # cherrypy likes to sys.exit on error. don't let it take us down too!
36 def os_exit_noop(*args, **kwargs):
37 pass
38
39
40 os._exit = os_exit_noop
41
42 # to access things in class Module from subclass Root. Because
43 # it's a dict, the writer doesn't need to declare 'global' for access
44
45 _global_instance = {'plugin': None}
46
47
48 def global_instance():
49 assert _global_instance['plugin'] is not None
50 return _global_instance['plugin']
51
52
53 def health_status_to_number(status):
54 if status == 'HEALTH_OK':
55 return 0
56 elif status == 'HEALTH_WARN':
57 return 1
58 elif status == 'HEALTH_ERR':
59 return 2
60
61
62 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
63
64 DF_POOL = ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
65 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
66
67 OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
68 'recovering_keys_per_sec', 'num_objects_recovered',
69 'num_bytes_recovered', 'num_bytes_recovered')
70
71 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
72 'norecover', 'noscrub', 'nodeep-scrub')
73
74 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
75
76 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
77 'ceph_version')
78
79 MON_METADATA = ('ceph_daemon', 'hostname',
80 'public_addr', 'rank', 'ceph_version')
81
82 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
83 'front_iface', 'hostname', 'objectstore', 'public_addr',
84 'ceph_version')
85
86 OSD_STATUS = ['weight', 'up', 'in']
87
88 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
89
90 POOL_METADATA = ('pool_id', 'name')
91
92 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
93
94 RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
95 'ceph_version')
96
97 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
98 'wal_device', 'instance')
99
100 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
101
102
103 class Metric(object):
104 def __init__(self, mtype, name, desc, labels=None):
105 self.mtype = mtype
106 self.name = name
107 self.desc = desc
108 self.labelnames = labels # tuple if present
109 self.value = {} # indexed by label values
110
111 def clear(self):
112 self.value = {}
113
114 def set(self, value, labelvalues=None):
115 # labelvalues must be a tuple
116 labelvalues = labelvalues or ('',)
117 self.value[labelvalues] = value
118
119 def str_expfmt(self):
120
121 def promethize(path):
122 ''' replace illegal metric name characters '''
123 result = path.replace('.', '_').replace(
124 '+', '_plus').replace('::', '_')
125
126 # Hyphens usually turn into underscores, unless they are
127 # trailing
128 if result.endswith("-"):
129 result = result[0:-1] + "_minus"
130 else:
131 result = result.replace("-", "_")
132
133 return "ceph_{0}".format(result)
134
135 def floatstr(value):
136 ''' represent as Go-compatible float '''
137 if value == float('inf'):
138 return '+Inf'
139 if value == float('-inf'):
140 return '-Inf'
141 if math.isnan(value):
142 return 'NaN'
143 return repr(float(value))
144
145 name = promethize(self.name)
146 expfmt = '''
147 # HELP {name} {desc}
148 # TYPE {name} {mtype}'''.format(
149 name=name,
150 desc=self.desc,
151 mtype=self.mtype,
152 )
153
154 for labelvalues, value in self.value.items():
155 if self.labelnames:
156 labels = zip(self.labelnames, labelvalues)
157 labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
158 else:
159 labels = ''
160 if labels:
161 fmtstr = '\n{name}{{{labels}}} {value}'
162 else:
163 fmtstr = '\n{name} {value}'
164 expfmt += fmtstr.format(
165 name=name,
166 labels=labels,
167 value=floatstr(value),
168 )
169 return expfmt
170
171
172 class Module(MgrModule):
173 COMMANDS = [
174 {
175 "cmd": "prometheus file_sd_config",
176 "desc": "Return file_sd compatible prometheus config for mgr cluster",
177 "perm": "r"
178 },
179 ]
180
181 MODULE_OPTIONS = [
182 {'name': 'server_addr'},
183 {'name': 'server_port'},
184 {'name': 'scrape_interval'},
185 {'name': 'rbd_stats_pools'},
186 {'name': 'rbd_stats_pools_refresh_interval'},
187 ]
188
189 def __init__(self, *args, **kwargs):
190 super(Module, self).__init__(*args, **kwargs)
191 self.metrics = self._setup_static_metrics()
192 self.shutdown_event = threading.Event()
193 self.collect_lock = threading.RLock()
194 self.collect_time = 0
195 self.collect_timeout = 5.0
196 self.collect_cache = None
197 self.rbd_stats = {
198 'pools': {},
199 'pools_refresh_time': 0,
200 'counters_info': {
201 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
202 'desc': 'RBD image writes count'},
203 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
204 'desc': 'RBD image reads count'},
205 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
206 'desc': 'RBD image bytes written'},
207 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
208 'desc': 'RBD image bytes read'},
209 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
210 'desc': 'RBD image writes latency (msec)'},
211 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
212 'desc': 'RBD image reads latency (msec)'},
213 },
214 }
215 _global_instance['plugin'] = self
216
217 def _setup_static_metrics(self):
218 metrics = {}
219 metrics['health_status'] = Metric(
220 'untyped',
221 'health_status',
222 'Cluster health status'
223 )
224 metrics['mon_quorum_status'] = Metric(
225 'gauge',
226 'mon_quorum_status',
227 'Monitors in quorum',
228 ('ceph_daemon',)
229 )
230 metrics['fs_metadata'] = Metric(
231 'untyped',
232 'fs_metadata',
233 'FS Metadata',
234 FS_METADATA
235 )
236 metrics['mds_metadata'] = Metric(
237 'untyped',
238 'mds_metadata',
239 'MDS Metadata',
240 MDS_METADATA
241 )
242 metrics['mon_metadata'] = Metric(
243 'untyped',
244 'mon_metadata',
245 'MON Metadata',
246 MON_METADATA
247 )
248 metrics['osd_metadata'] = Metric(
249 'untyped',
250 'osd_metadata',
251 'OSD Metadata',
252 OSD_METADATA
253 )
254
255 # The reason for having this separate to OSD_METADATA is
256 # so that we can stably use the same tag names that
257 # the Prometheus node_exporter does
258 metrics['disk_occupation'] = Metric(
259 'untyped',
260 'disk_occupation',
261 'Associate Ceph daemon with disk used',
262 DISK_OCCUPATION
263 )
264
265 metrics['pool_metadata'] = Metric(
266 'untyped',
267 'pool_metadata',
268 'POOL Metadata',
269 POOL_METADATA
270 )
271
272 metrics['rgw_metadata'] = Metric(
273 'untyped',
274 'rgw_metadata',
275 'RGW Metadata',
276 RGW_METADATA
277 )
278
279 metrics['rbd_mirror_metadata'] = Metric(
280 'untyped',
281 'rbd_mirror_metadata',
282 'RBD Mirror Metadata',
283 RBD_MIRROR_METADATA
284 )
285
286 metrics['pg_total'] = Metric(
287 'gauge',
288 'pg_total',
289 'PG Total Count'
290 )
291
292 metrics['scrape_duration_seconds'] = Metric(
293 'gauge',
294 'scrape_duration_secs',
295 'Time taken to gather metrics from Ceph (secs)'
296 )
297
298 for flag in OSD_FLAGS:
299 path = 'osd_flag_{}'.format(flag)
300 metrics[path] = Metric(
301 'untyped',
302 path,
303 'OSD Flag {}'.format(flag)
304 )
305 for state in OSD_STATUS:
306 path = 'osd_{}'.format(state)
307 metrics[path] = Metric(
308 'untyped',
309 path,
310 'OSD status {}'.format(state),
311 ('ceph_daemon',)
312 )
313 for stat in OSD_STATS:
314 path = 'osd_{}'.format(stat)
315 metrics[path] = Metric(
316 'gauge',
317 path,
318 'OSD stat {}'.format(stat),
319 ('ceph_daemon',)
320 )
321 for stat in OSD_POOL_STATS:
322 path = 'pool_{}'.format(stat)
323 metrics[path] = Metric(
324 'gauge',
325 path,
326 "OSD POOL STATS: {}".format(stat),
327 ('pool_id',)
328 )
329 for state in PG_STATES:
330 path = 'pg_{}'.format(state)
331 metrics[path] = Metric(
332 'gauge',
333 path,
334 'PG {}'.format(state),
335 )
336 for state in DF_CLUSTER:
337 path = 'cluster_{}'.format(state)
338 metrics[path] = Metric(
339 'gauge',
340 path,
341 'DF {}'.format(state),
342 )
343 for state in DF_POOL:
344 path = 'pool_{}'.format(state)
345 metrics[path] = Metric(
346 'gauge',
347 path,
348 'DF pool {}'.format(state),
349 ('pool_id',)
350 )
351 for state in NUM_OBJECTS:
352 path = 'num_objects_{}'.format(state)
353 metrics[path] = Metric(
354 'gauge',
355 path,
356 'Number of {} objects'.format(state),
357 )
358
359 return metrics
360
361 def get_health(self):
362 health = json.loads(self.get('health')['json'])
363 self.metrics['health_status'].set(
364 health_status_to_number(health['status'])
365 )
366
367 def get_pool_stats(self):
368 # retrieve pool stats to provide per pool recovery metrics
369 # (osd_pool_stats moved to mgr in Mimic)
370 pstats = self.get('osd_pool_stats')
371 for pool in pstats['pool_stats']:
372 for stat in OSD_POOL_STATS:
373 self.metrics['pool_{}'.format(stat)].set(
374 pool['recovery_rate'].get(stat, 0),
375 (pool['pool_id'],)
376 )
377
378 def get_df(self):
379 # maybe get the to-be-exported metrics from a config?
380 df = self.get('df')
381 for stat in DF_CLUSTER:
382 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
383
384 for pool in df['pools']:
385 for stat in DF_POOL:
386 self.metrics['pool_{}'.format(stat)].set(
387 pool['stats'][stat],
388 (pool['id'],)
389 )
390
391 def get_fs(self):
392 fs_map = self.get('fs_map')
393 servers = self.get_service_list()
394 active_daemons = []
395 for fs in fs_map['filesystems']:
396 # collect fs metadata
397 data_pools = ",".join([str(pool)
398 for pool in fs['mdsmap']['data_pools']])
399 self.metrics['fs_metadata'].set(1, (
400 data_pools,
401 fs['id'],
402 fs['mdsmap']['metadata_pool'],
403 fs['mdsmap']['fs_name']
404 ))
405 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
406 for gid, daemon in fs['mdsmap']['info'].items():
407 id_ = daemon['name']
408 host_version = servers.get((id_, 'mds'), ('', ''))
409 self.metrics['mds_metadata'].set(1, (
410 'mds.{}'.format(id_), fs['id'],
411 host_version[0], daemon['addr'],
412 daemon['rank'], host_version[1]
413 ))
414
415 def get_quorum_status(self):
416 mon_status = json.loads(self.get('mon_status')['json'])
417 servers = self.get_service_list()
418 for mon in mon_status['monmap']['mons']:
419 rank = mon['rank']
420 id_ = mon['name']
421 host_version = servers.get((id_, 'mon'), ('', ''))
422 self.metrics['mon_metadata'].set(1, (
423 'mon.{}'.format(id_), host_version[0],
424 mon['public_addr'].split(':')[0], rank,
425 host_version[1]
426 ))
427 in_quorum = int(rank in mon_status['quorum'])
428 self.metrics['mon_quorum_status'].set(in_quorum, (
429 'mon.{}'.format(id_),
430 ))
431
432 def get_pg_status(self):
433 # TODO add per pool status?
434 pg_status = self.get('pg_status')
435
436 # Set total count of PGs, first
437 self.metrics['pg_total'].set(pg_status['num_pgs'])
438
439 reported_states = {}
440 for pg in pg_status['pgs_by_state']:
441 for state in pg['state_name'].split('+'):
442 reported_states[state] = reported_states.get(
443 state, 0) + pg['count']
444
445 for state in reported_states:
446 path = 'pg_{}'.format(state)
447 try:
448 self.metrics[path].set(reported_states[state])
449 except KeyError:
450 self.log.warn("skipping pg in unknown state {}".format(state))
451
452 for state in PG_STATES:
453 if state not in reported_states:
454 try:
455 self.metrics['pg_{}'.format(state)].set(0)
456 except KeyError:
457 self.log.warn(
458 "skipping pg in unknown state {}".format(state))
459
460 def get_osd_stats(self):
461 osd_stats = self.get('osd_stats')
462 for osd in osd_stats['osd_stats']:
463 id_ = osd['osd']
464 for stat in OSD_STATS:
465 val = osd['perf_stat'][stat]
466 self.metrics['osd_{}'.format(stat)].set(val, (
467 'osd.{}'.format(id_),
468 ))
469
470 def get_service_list(self):
471 ret = {}
472 for server in self.list_servers():
473 version = server.get('ceph_version', '')
474 host = server.get('hostname', '')
475 for service in server.get('services', []):
476 ret.update({(service['id'], service['type']): (host, version)})
477 return ret
478
479 def get_metadata_and_osd_status(self):
480 osd_map = self.get('osd_map')
481 osd_flags = osd_map['flags'].split(',')
482 for flag in OSD_FLAGS:
483 self.metrics['osd_flag_{}'.format(flag)].set(
484 int(flag in osd_flags)
485 )
486
487 osd_devices = self.get('osd_map_crush')['devices']
488 servers = self.get_service_list()
489 for osd in osd_map['osds']:
490 # id can be used to link osd metrics and metadata
491 id_ = osd['osd']
492 # collect osd metadata
493 p_addr = osd['public_addr'].split(':')[0]
494 c_addr = osd['cluster_addr'].split(':')[0]
495 if p_addr == "-" or c_addr == "-":
496 self.log.info(
497 "Missing address metadata for osd {0}, skipping occupation"
498 " and metadata records for this osd".format(id_)
499 )
500 continue
501
502 dev_class = None
503 for osd_device in osd_devices:
504 if osd_device['id'] == id_:
505 dev_class = osd_device.get('class', '')
506 break
507
508 if dev_class is None:
509 self.log.info(
510 "OSD {0} is missing from CRUSH map, skipping output".format(
511 id_))
512 continue
513
514 host_version = servers.get((str(id_), 'osd'), ('', ''))
515
516 # collect disk occupation metadata
517 osd_metadata = self.get_metadata("osd", str(id_))
518 if osd_metadata is None:
519 continue
520
521 obj_store = osd_metadata.get('osd_objectstore', '')
522 f_iface = osd_metadata.get('front_iface', '')
523 b_iface = osd_metadata.get('back_iface', '')
524
525 self.metrics['osd_metadata'].set(1, (
526 b_iface,
527 'osd.{}'.format(id_),
528 c_addr,
529 dev_class,
530 f_iface,
531 host_version[0],
532 obj_store,
533 p_addr,
534 host_version[1]
535 ))
536
537 # collect osd status
538 for state in OSD_STATUS:
539 status = osd[state]
540 self.metrics['osd_{}'.format(state)].set(status, (
541 'osd.{}'.format(id_),
542 ))
543
544 if obj_store == "filestore":
545 # collect filestore backend device
546 osd_dev_node = osd_metadata.get(
547 'backend_filestore_dev_node', None)
548 # collect filestore journal device
549 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
550 osd_db_dev_node = ''
551 elif obj_store == "bluestore":
552 # collect bluestore backend device
553 osd_dev_node = osd_metadata.get(
554 'bluestore_bdev_dev_node', None)
555 # collect bluestore wal backend
556 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
557 # collect bluestore db backend
558 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
559 if osd_dev_node and osd_dev_node == "unknown":
560 osd_dev_node = None
561
562 osd_hostname = osd_metadata.get('hostname', None)
563 if osd_dev_node and osd_hostname:
564 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
565 id_, osd_hostname, osd_dev_node))
566 self.metrics['disk_occupation'].set(1, (
567 "osd.{0}".format(id_),
568 osd_dev_node,
569 osd_db_dev_node,
570 osd_wal_dev_node,
571 osd_hostname
572 ))
573 else:
574 self.log.info("Missing dev node metadata for osd {0}, skipping "
575 "occupation record for this osd".format(id_))
576
577 pool_meta = []
578 for pool in osd_map['pools']:
579 self.metrics['pool_metadata'].set(
580 1, (pool['pool'], pool['pool_name']))
581
582 # Populate other servers metadata
583 for key, value in servers.items():
584 service_id, service_type = key
585 if service_type == 'rgw':
586 hostname, version = value
587 self.metrics['rgw_metadata'].set(
588 1,
589 ('{}.{}'.format(service_type, service_id), hostname, version)
590 )
591 elif service_type == 'rbd-mirror':
592 mirror_metadata = self.get_metadata('rbd-mirror', service_id)
593 if mirror_metadata is None:
594 continue
595 mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
596 service_id)
597 self.metrics['rbd_mirror_metadata'].set(
598 1, (mirror_metadata.get(k, '')
599 for k in RBD_MIRROR_METADATA)
600 )
601
602 def get_num_objects(self):
603 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
604 for obj in NUM_OBJECTS:
605 stat = 'num_objects_{}'.format(obj)
606 self.metrics[stat].set(pg_sum[stat])
607
608 def get_rbd_stats(self):
609 # Per RBD image stats is collected by registering a dynamic osd perf
610 # stats query that tells OSDs to group stats for requests associated
611 # with RBD objects by pool, namespace, and image id, which are
612 # extracted from the request object names or other attributes.
613 # The RBD object names have the following prefixes:
614 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
615 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
616 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
617 # The pool_id in the object name is the id of the pool with the image
618 # metdata, and should be used in the image spec. If there is no pool_id
619 # in the object name, the image pool is the pool where the object is
620 # located.
621
622 # Parse rbd_stats_pools option, which is a comma or space separated
623 # list of pool[/namespace] entries. If no namespace is specifed the
624 # stats are collected for every namespace in the pool.
625 pools_string = self.get_localized_module_option('rbd_stats_pools', '')
626 pools = {}
627 for p in [x for x in re.split('[\s,]+', pools_string) if x]:
628 s = p.split('/', 2)
629 pool_name = s[0]
630 if len(s) == 1:
631 # empty set means collect for all namespaces
632 pools[pool_name] = set()
633 continue
634 if pool_name not in pools:
635 pools[pool_name] = set()
636 elif not pools[pool_name]:
637 continue
638 pools[pool_name].add(s[1])
639
640 rbd_stats_pools = {}
641 for pool_id in list(self.rbd_stats['pools']):
642 name = self.rbd_stats['pools'][pool_id]['name']
643 if name not in pools:
644 del self.rbd_stats['pools'][pool_id]
645 else:
646 rbd_stats_pools[name] = \
647 self.rbd_stats['pools'][pool_id]['ns_names']
648
649 pools_refreshed = False
650 if pools:
651 next_refresh = self.rbd_stats['pools_refresh_time'] + \
652 self.get_localized_module_option(
653 'rbd_stats_pools_refresh_interval', 300)
654 if rbd_stats_pools != pools or time.time() >= next_refresh:
655 self.refresh_rbd_stats_pools(pools)
656 pools_refreshed = True
657
658 pool_ids = list(self.rbd_stats['pools'])
659 pool_ids.sort()
660 pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
661
662 nspace_names = []
663 for pool_id, pool in self.rbd_stats['pools'].items():
664 if pool['ns_names']:
665 nspace_names.extend(pool['ns_names'])
666 else:
667 nspace_names = []
668 break
669 if nspace_names:
670 namespace_regex = '^(' + \
671 "|".join([re.escape(x)
672 for x in set(nspace_names)]) + ')$'
673 else:
674 namespace_regex = '^(.*)$'
675
676 if 'query' in self.rbd_stats and \
677 (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex'] or
678 namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex']):
679 self.remove_osd_perf_query(self.rbd_stats['query_id'])
680 del self.rbd_stats['query_id']
681 del self.rbd_stats['query']
682
683 if not self.rbd_stats['pools']:
684 return
685
686 counters_info = self.rbd_stats['counters_info']
687
688 if 'query_id' not in self.rbd_stats:
689 query = {
690 'key_descriptor': [
691 {'type': 'pool_id', 'regex': pool_id_regex},
692 {'type': 'namespace', 'regex': namespace_regex},
693 {'type': 'object_name',
694 'regex': '^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
695 ],
696 'performance_counter_descriptors': list(counters_info),
697 }
698 query_id = self.add_osd_perf_query(query)
699 if query_id is None:
700 self.log.error('failed to add query %s' % query)
701 return
702 self.rbd_stats['query'] = query
703 self.rbd_stats['query_id'] = query_id
704
705 res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
706 for c in res['counters']:
707 # if the pool id is not found in the object name use id of the
708 # pool where the object is located
709 if c['k'][2][0]:
710 pool_id = int(c['k'][2][0])
711 else:
712 pool_id = int(c['k'][0][0])
713 if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
714 self.refresh_rbd_stats_pools(pools)
715 pools_refreshed = True
716 if pool_id not in self.rbd_stats['pools']:
717 continue
718 pool = self.rbd_stats['pools'][pool_id]
719 nspace_name = c['k'][1][0]
720 if nspace_name not in pool['images']:
721 continue
722 image_id = c['k'][2][1]
723 if image_id not in pool['images'][nspace_name] and \
724 not pools_refreshed:
725 self.refresh_rbd_stats_pools(pools)
726 pool = self.rbd_stats['pools'][pool_id]
727 pools_refreshed = True
728 if image_id not in pool['images'][nspace_name]:
729 continue
730 counters = pool['images'][nspace_name][image_id]['c']
731 for i in range(len(c['c'])):
732 counters[i][0] += c['c'][i][0]
733 counters[i][1] += c['c'][i][1]
734
735 label_names = ("pool", "namespace", "image")
736 for pool_id, pool in self.rbd_stats['pools'].items():
737 pool_name = pool['name']
738 for nspace_name, images in pool['images'].items():
739 for image_id in images:
740 image_name = images[image_id]['n']
741 counters = images[image_id]['c']
742 i = 0
743 for key in counters_info:
744 counter_info = counters_info[key]
745 stattype = self._stattype_to_str(counter_info['type'])
746 labels = (pool_name, nspace_name, image_name)
747 if counter_info['type'] == self.PERFCOUNTER_COUNTER:
748 path = 'rbd_' + key
749 if path not in self.metrics:
750 self.metrics[path] = Metric(
751 stattype,
752 path,
753 counter_info['desc'],
754 label_names,
755 )
756 self.metrics[path].set(counters[i][0], labels)
757 elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
758 path = 'rbd_' + key + '_sum'
759 if path not in self.metrics:
760 self.metrics[path] = Metric(
761 stattype,
762 path,
763 counter_info['desc'] + ' Total',
764 label_names,
765 )
766 self.metrics[path].set(counters[i][0], labels)
767 path = 'rbd_' + key + '_count'
768 if path not in self.metrics:
769 self.metrics[path] = Metric(
770 'counter',
771 path,
772 counter_info['desc'] + ' Count',
773 label_names,
774 )
775 self.metrics[path].set(counters[i][1], labels)
776 i += 1
777
778 def refresh_rbd_stats_pools(self, pools):
779 self.log.debug('refreshing rbd pools %s' % (pools))
780
781 rbd = RBD()
782 counters_info = self.rbd_stats['counters_info']
783 for pool_name, cfg_ns_names in pools.items():
784 try:
785 pool_id = self.rados.pool_lookup(pool_name)
786 with self.rados.open_ioctx(pool_name) as ioctx:
787 if pool_id not in self.rbd_stats['pools']:
788 self.rbd_stats['pools'][pool_id] = {'images': {}}
789 pool = self.rbd_stats['pools'][pool_id]
790 pool['name'] = pool_name
791 pool['ns_names'] = cfg_ns_names
792 if cfg_ns_names:
793 nspace_names = list(cfg_ns_names)
794 else:
795 nspace_names = [''] + rbd.namespace_list(ioctx)
796 for nspace_name in pool['images']:
797 if nspace_name not in nspace_names:
798 del pool['images'][nspace_name]
799 for nspace_name in nspace_names:
800 if (nspace_name and
801 not rbd.namespace_exists(ioctx, nspace_name)):
802 self.log.debug('unknown namespace %s for pool %s' %
803 (nspace_name, pool_name))
804 continue
805 ioctx.set_namespace(nspace_name)
806 if nspace_name not in pool['images']:
807 pool['images'][nspace_name] = {}
808 namespace = pool['images'][nspace_name]
809 images = {}
810 for image_meta in RBD().list2(ioctx):
811 image = {'n': image_meta['name']}
812 image_id = image_meta['id']
813 if image_id in namespace:
814 image['c'] = namespace[image_id]['c']
815 else:
816 image['c'] = [[0, 0] for x in counters_info]
817 images[image_id] = image
818 pool['images'][nspace_name] = images
819 except Exception as e:
820 self.log.error('failed listing pool %s: %s' % (pool_name, e))
821 self.rbd_stats['pools_refresh_time'] = time.time()
822
823 def shutdown_rbd_stats(self):
824 if 'query_id' in self.rbd_stats:
825 self.remove_osd_perf_query(self.rbd_stats['query_id'])
826 del self.rbd_stats['query_id']
827 del self.rbd_stats['query']
828 self.rbd_stats['pools'].clear()
829
830 def collect(self):
831 # Clear the metrics before scraping
832 for k in self.metrics.keys():
833 self.metrics[k].clear()
834
835 _start_time = time.time()
836
837 self.get_health()
838 self.get_df()
839 self.get_pool_stats()
840 self.get_fs()
841 self.get_osd_stats()
842 self.get_quorum_status()
843 self.get_metadata_and_osd_status()
844 self.get_pg_status()
845 self.get_num_objects()
846
847 for daemon, counters in self.get_all_perf_counters().items():
848 for path, counter_info in counters.items():
849 # Skip histograms, they are represented by long running avgs
850 stattype = self._stattype_to_str(counter_info['type'])
851 if not stattype or stattype == 'histogram':
852 self.log.debug('ignoring %s, type %s' % (path, stattype))
853 continue
854
855 # Get the value of the counter
856 value = self._perfvalue_to_value(
857 counter_info['type'], counter_info['value'])
858
859 # Represent the long running avgs as sum/count pairs
860 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
861 _path = path + '_sum'
862 if _path not in self.metrics:
863 self.metrics[_path] = Metric(
864 stattype,
865 _path,
866 counter_info['description'] + ' Total',
867 ("ceph_daemon",),
868 )
869 self.metrics[_path].set(value, (daemon,))
870
871 _path = path + '_count'
872 if _path not in self.metrics:
873 self.metrics[_path] = Metric(
874 'counter',
875 _path,
876 counter_info['description'] + ' Count',
877 ("ceph_daemon",),
878 )
879 self.metrics[_path].set(counter_info['count'], (daemon,))
880 else:
881 if path not in self.metrics:
882 self.metrics[path] = Metric(
883 stattype,
884 path,
885 counter_info['description'],
886 ("ceph_daemon",),
887 )
888 self.metrics[path].set(value, (daemon,))
889
890 self.get_rbd_stats()
891
892 _end_time = time.time()
893 self.metrics['scrape_duration_seconds'].set(_end_time - _start_time)
894
895 # Return formatted metrics and clear no longer used data
896 _metrics = [m.str_expfmt() for m in self.metrics.values()]
897 for k in self.metrics.keys():
898 self.metrics[k].clear()
899
900 return ''.join(_metrics) + '\n'
901
902 def get_file_sd_config(self):
903 servers = self.list_servers()
904 targets = []
905 for server in servers:
906 hostname = server.get('hostname', '')
907 for service in server.get('services', []):
908 if service['type'] != 'mgr':
909 continue
910 id_ = service['id']
911 # get port for prometheus module at mgr with id_
912 # TODO use get_config_prefix or get_config here once
913 # https://github.com/ceph/ceph/pull/20458 is merged
914 result = CommandResult("")
915 global_instance().send_command(
916 result, "mon", '',
917 json.dumps({
918 "prefix": "config-key get",
919 'key': "config/mgr/mgr/prometheus/{}/server_port".format(id_),
920 }),
921 "")
922 r, outb, outs = result.wait()
923 if r != 0:
924 global_instance().log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
925 targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
926 else:
927 port = json.loads(outb)
928 targets.append('{}:{}'.format(hostname, port))
929
930 ret = [
931 {
932 "targets": targets,
933 "labels": {}
934 }
935 ]
936 return 0, json.dumps(ret), ""
937
938 def self_test(self):
939 self.collect()
940 self.get_file_sd_config()
941
942 def handle_command(self, inbuf, cmd):
943 if cmd['prefix'] == 'prometheus file_sd_config':
944 return self.get_file_sd_config()
945 else:
946 return (-errno.EINVAL, '',
947 "Command not found '{0}'".format(cmd['prefix']))
948
949 def serve(self):
950
951 class Root(object):
952
953 # collapse everything to '/'
954 def _cp_dispatch(self, vpath):
955 cherrypy.request.path = ''
956 return self
957
958 @cherrypy.expose
959 def index(self):
960 return '''<!DOCTYPE html>
961 <html>
962 <head><title>Ceph Exporter</title></head>
963 <body>
964 <h1>Ceph Exporter</h1>
965 <p><a href='/metrics'>Metrics</a></p>
966 </body>
967 </html>'''
968
969 @cherrypy.expose
970 def metrics(self):
971 instance = global_instance()
972 # Lock the function execution
973 try:
974 instance.collect_lock.acquire()
975 return self._metrics(instance)
976 finally:
977 instance.collect_lock.release()
978
979 @staticmethod
980 def _metrics(instance):
981 # Return cached data if available and collected before the cache times out
982 if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
983 cherrypy.response.headers['Content-Type'] = 'text/plain'
984 return instance.collect_cache
985
986 if instance.have_mon_connection():
987 instance.collect_cache = None
988 instance.collect_time = time.time()
989 instance.collect_cache = instance.collect()
990 cherrypy.response.headers['Content-Type'] = 'text/plain'
991 return instance.collect_cache
992 else:
993 raise cherrypy.HTTPError(503, 'No MON connection')
994
995 # Make the cache timeout for collecting configurable
996 self.collect_timeout = self.get_localized_module_option(
997 'scrape_interval', 5.0)
998
999 server_addr = self.get_localized_module_option(
1000 'server_addr', DEFAULT_ADDR)
1001 server_port = self.get_localized_module_option(
1002 'server_port', DEFAULT_PORT)
1003 self.log.info(
1004 "server_addr: %s server_port: %s" %
1005 (server_addr, server_port)
1006 )
1007
1008 # Publish the URI that others may use to access the service we're
1009 # about to start serving
1010 self.set_uri('http://{0}:{1}/'.format(
1011 socket.getfqdn() if server_addr == '::' else server_addr,
1012 server_port
1013 ))
1014
1015 cherrypy.config.update({
1016 'server.socket_host': server_addr,
1017 'server.socket_port': int(server_port),
1018 'engine.autoreload.on': False
1019 })
1020 cherrypy.tree.mount(Root(), "/")
1021 self.log.info('Starting engine...')
1022 cherrypy.engine.start()
1023 self.log.info('Engine started.')
1024 # wait for the shutdown event
1025 self.shutdown_event.wait()
1026 self.shutdown_event.clear()
1027 cherrypy.engine.stop()
1028 self.log.info('Engine stopped.')
1029 self.shutdown_rbd_stats()
1030
1031 def shutdown(self):
1032 self.log.info('Stopping engine...')
1033 self.shutdown_event.set()
1034
1035
1036 class StandbyModule(MgrStandbyModule):
1037 def __init__(self, *args, **kwargs):
1038 super(StandbyModule, self).__init__(*args, **kwargs)
1039 self.shutdown_event = threading.Event()
1040
1041 def serve(self):
1042 server_addr = self.get_localized_module_option('server_addr', '::')
1043 server_port = self.get_localized_module_option(
1044 'server_port', DEFAULT_PORT)
1045 self.log.info("server_addr: %s server_port: %s" %
1046 (server_addr, server_port))
1047 cherrypy.config.update({
1048 'server.socket_host': server_addr,
1049 'server.socket_port': int(server_port),
1050 'engine.autoreload.on': False
1051 })
1052
1053 module = self
1054
1055 class Root(object):
1056 @cherrypy.expose
1057 def index(self):
1058 active_uri = module.get_active_uri()
1059 return '''<!DOCTYPE html>
1060 <html>
1061 <head><title>Ceph Exporter</title></head>
1062 <body>
1063 <h1>Ceph Exporter</h1>
1064 <p><a href='{}metrics'>Metrics</a></p>
1065 </body>
1066 </html>'''.format(active_uri)
1067
1068 @cherrypy.expose
1069 def metrics(self):
1070 cherrypy.response.headers['Content-Type'] = 'text/plain'
1071 return ''
1072
1073 cherrypy.tree.mount(Root(), '/', {})
1074 self.log.info('Starting engine...')
1075 cherrypy.engine.start()
1076 self.log.info('Engine started.')
1077 # Wait for shutdown event
1078 self.shutdown_event.wait()
1079 self.shutdown_event.clear()
1080 cherrypy.engine.stop()
1081 self.log.info('Engine stopped.')
1082
1083 def shutdown(self):
1084 self.log.info("Stopping engine...")
1085 self.shutdown_event.set()
1086 self.log.info("Stopped engine")