]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/prometheus/module.py
c721a8579d9dba28dfbe26e4241398e150fefd68
[ceph.git] / ceph / src / pybind / mgr / prometheus / module.py
1 import cherrypy
2 from distutils.version import StrictVersion
3 import json
4 import errno
5 import math
6 import os
7 import re
8 import socket
9 import threading
10 import time
11 from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
12 from mgr_util import get_default_addr
13 from rbd import RBD
14
15 # Defaults for the Prometheus HTTP server. Can also set in config-key
16 # see https://github.com/prometheus/prometheus/wiki/Default-port-allocations
17 # for Prometheus exporter port registry
18
19 DEFAULT_PORT = 9283
20
21 # When the CherryPy server in 3.2.2 (and later) starts it attempts to verify
22 # that the ports its listening on are in fact bound. When using the any address
23 # "::" it tries both ipv4 and ipv6, and in some environments (e.g. kubernetes)
24 # ipv6 isn't yet configured / supported and CherryPy throws an uncaught
25 # exception.
26 if cherrypy is not None:
27 v = StrictVersion(cherrypy.__version__)
28 # the issue was fixed in 3.2.3. it's present in 3.2.2 (current version on
29 # centos:7) and back to at least 3.0.0.
30 if StrictVersion("3.1.2") <= v < StrictVersion("3.2.3"):
31 # https://github.com/cherrypy/cherrypy/issues/1100
32 from cherrypy.process import servers
33 servers.wait_for_occupied_port = lambda host, port: None
34
35
36 # cherrypy likes to sys.exit on error. don't let it take us down too!
37 def os_exit_noop(*args, **kwargs):
38 pass
39
40
41 os._exit = os_exit_noop
42
43 # to access things in class Module from subclass Root. Because
44 # it's a dict, the writer doesn't need to declare 'global' for access
45
46 _global_instance = {'plugin': None}
47
48
49 def global_instance():
50 assert _global_instance['plugin'] is not None
51 return _global_instance['plugin']
52
53
54 def health_status_to_number(status):
55 if status == 'HEALTH_OK':
56 return 0
57 elif status == 'HEALTH_WARN':
58 return 1
59 elif status == 'HEALTH_ERR':
60 return 2
61
62
63 DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_used_raw_bytes']
64
65 DF_POOL = ['max_avail', 'stored', 'stored_raw', 'objects', 'dirty',
66 'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
67
68 OSD_POOL_STATS = ('recovering_objects_per_sec', 'recovering_bytes_per_sec',
69 'recovering_keys_per_sec', 'num_objects_recovered',
70 'num_bytes_recovered', 'num_bytes_recovered')
71
72 OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
73 'norecover', 'noscrub', 'nodeep-scrub')
74
75 FS_METADATA = ('data_pools', 'fs_id', 'metadata_pool', 'name')
76
77 MDS_METADATA = ('ceph_daemon', 'fs_id', 'hostname', 'public_addr', 'rank',
78 'ceph_version')
79
80 MON_METADATA = ('ceph_daemon', 'hostname',
81 'public_addr', 'rank', 'ceph_version')
82
83 MGR_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
84
85 MGR_STATUS = ('ceph_daemon',)
86
87 MGR_MODULE_STATUS = ('name',)
88
89 MGR_MODULE_CAN_RUN = ('name',)
90
91 OSD_METADATA = ('back_iface', 'ceph_daemon', 'cluster_addr', 'device_class',
92 'front_iface', 'hostname', 'objectstore', 'public_addr',
93 'ceph_version')
94
95 OSD_STATUS = ['weight', 'up', 'in']
96
97 OSD_STATS = ['apply_latency_ms', 'commit_latency_ms']
98
99 POOL_METADATA = ('pool_id', 'name')
100
101 RGW_METADATA = ('ceph_daemon', 'hostname', 'ceph_version')
102
103 RBD_MIRROR_METADATA = ('ceph_daemon', 'id', 'instance_id', 'hostname',
104 'ceph_version')
105
106 DISK_OCCUPATION = ('ceph_daemon', 'device', 'db_device',
107 'wal_device', 'instance')
108
109 NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
110
111
112 class Metric(object):
113 def __init__(self, mtype, name, desc, labels=None):
114 self.mtype = mtype
115 self.name = name
116 self.desc = desc
117 self.labelnames = labels # tuple if present
118 self.value = {} # indexed by label values
119
120 def clear(self):
121 self.value = {}
122
123 def set(self, value, labelvalues=None):
124 # labelvalues must be a tuple
125 labelvalues = labelvalues or ('',)
126 self.value[labelvalues] = value
127
128 def str_expfmt(self):
129
130 def promethize(path):
131 ''' replace illegal metric name characters '''
132 result = re.sub(r'[./\s]|::', '_', path).replace('+', '_plus')
133
134 # Hyphens usually turn into underscores, unless they are
135 # trailing
136 if result.endswith("-"):
137 result = result[0:-1] + "_minus"
138 else:
139 result = result.replace("-", "_")
140
141 return "ceph_{0}".format(result)
142
143 def floatstr(value):
144 ''' represent as Go-compatible float '''
145 if value == float('inf'):
146 return '+Inf'
147 if value == float('-inf'):
148 return '-Inf'
149 if math.isnan(value):
150 return 'NaN'
151 return repr(float(value))
152
153 name = promethize(self.name)
154 expfmt = '''
155 # HELP {name} {desc}
156 # TYPE {name} {mtype}'''.format(
157 name=name,
158 desc=self.desc,
159 mtype=self.mtype,
160 )
161
162 for labelvalues, value in self.value.items():
163 if self.labelnames:
164 labels = zip(self.labelnames, labelvalues)
165 labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
166 else:
167 labels = ''
168 if labels:
169 fmtstr = '\n{name}{{{labels}}} {value}'
170 else:
171 fmtstr = '\n{name} {value}'
172 expfmt += fmtstr.format(
173 name=name,
174 labels=labels,
175 value=floatstr(value),
176 )
177 return expfmt
178
179
180 class Module(MgrModule):
181 COMMANDS = [
182 {
183 "cmd": "prometheus file_sd_config",
184 "desc": "Return file_sd compatible prometheus config for mgr cluster",
185 "perm": "r"
186 },
187 ]
188
189 MODULE_OPTIONS = [
190 {'name': 'server_addr'},
191 {'name': 'server_port'},
192 {'name': 'scrape_interval'},
193 {'name': 'rbd_stats_pools'},
194 {'name': 'rbd_stats_pools_refresh_interval', 'type': 'int', 'default': 300},
195 ]
196
197 def __init__(self, *args, **kwargs):
198 super(Module, self).__init__(*args, **kwargs)
199 self.metrics = self._setup_static_metrics()
200 self.shutdown_event = threading.Event()
201 self.collect_lock = threading.RLock()
202 self.collect_time = 0
203 self.collect_timeout = 5.0
204 self.collect_cache = None
205 self.rbd_stats = {
206 'pools': {},
207 'pools_refresh_time': 0,
208 'counters_info': {
209 'write_ops': {'type': self.PERFCOUNTER_COUNTER,
210 'desc': 'RBD image writes count'},
211 'read_ops': {'type': self.PERFCOUNTER_COUNTER,
212 'desc': 'RBD image reads count'},
213 'write_bytes': {'type': self.PERFCOUNTER_COUNTER,
214 'desc': 'RBD image bytes written'},
215 'read_bytes': {'type': self.PERFCOUNTER_COUNTER,
216 'desc': 'RBD image bytes read'},
217 'write_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
218 'desc': 'RBD image writes latency (msec)'},
219 'read_latency': {'type': self.PERFCOUNTER_LONGRUNAVG,
220 'desc': 'RBD image reads latency (msec)'},
221 },
222 }
223 _global_instance['plugin'] = self
224
225 def _setup_static_metrics(self):
226 metrics = {}
227 metrics['health_status'] = Metric(
228 'untyped',
229 'health_status',
230 'Cluster health status'
231 )
232 metrics['mon_quorum_status'] = Metric(
233 'gauge',
234 'mon_quorum_status',
235 'Monitors in quorum',
236 ('ceph_daemon',)
237 )
238 metrics['fs_metadata'] = Metric(
239 'untyped',
240 'fs_metadata',
241 'FS Metadata',
242 FS_METADATA
243 )
244 metrics['mds_metadata'] = Metric(
245 'untyped',
246 'mds_metadata',
247 'MDS Metadata',
248 MDS_METADATA
249 )
250 metrics['mon_metadata'] = Metric(
251 'untyped',
252 'mon_metadata',
253 'MON Metadata',
254 MON_METADATA
255 )
256 metrics['mgr_metadata'] = Metric(
257 'gauge',
258 'mgr_metadata',
259 'MGR metadata',
260 MGR_METADATA
261 )
262 metrics['mgr_status'] = Metric(
263 'gauge',
264 'mgr_status',
265 'MGR status (0=standby, 1=active)',
266 MGR_STATUS
267 )
268 metrics['mgr_module_status'] = Metric(
269 'gauge',
270 'mgr_module_status',
271 'MGR module status (0=disabled, 1=enabled, 2=auto-enabled)',
272 MGR_MODULE_STATUS
273 )
274 metrics['mgr_module_can_run'] = Metric(
275 'gauge',
276 'mgr_module_can_run',
277 'MGR module runnable state i.e. can it run (0=no, 1=yes)',
278 MGR_MODULE_CAN_RUN
279 )
280 metrics['osd_metadata'] = Metric(
281 'untyped',
282 'osd_metadata',
283 'OSD Metadata',
284 OSD_METADATA
285 )
286
287 # The reason for having this separate to OSD_METADATA is
288 # so that we can stably use the same tag names that
289 # the Prometheus node_exporter does
290 metrics['disk_occupation'] = Metric(
291 'untyped',
292 'disk_occupation',
293 'Associate Ceph daemon with disk used',
294 DISK_OCCUPATION
295 )
296
297 metrics['pool_metadata'] = Metric(
298 'untyped',
299 'pool_metadata',
300 'POOL Metadata',
301 POOL_METADATA
302 )
303
304 metrics['rgw_metadata'] = Metric(
305 'untyped',
306 'rgw_metadata',
307 'RGW Metadata',
308 RGW_METADATA
309 )
310
311 metrics['rbd_mirror_metadata'] = Metric(
312 'untyped',
313 'rbd_mirror_metadata',
314 'RBD Mirror Metadata',
315 RBD_MIRROR_METADATA
316 )
317
318 metrics['pg_total'] = Metric(
319 'gauge',
320 'pg_total',
321 'PG Total Count per Pool',
322 ('pool_id',)
323 )
324
325 for flag in OSD_FLAGS:
326 path = 'osd_flag_{}'.format(flag)
327 metrics[path] = Metric(
328 'untyped',
329 path,
330 'OSD Flag {}'.format(flag)
331 )
332 for state in OSD_STATUS:
333 path = 'osd_{}'.format(state)
334 metrics[path] = Metric(
335 'untyped',
336 path,
337 'OSD status {}'.format(state),
338 ('ceph_daemon',)
339 )
340 for stat in OSD_STATS:
341 path = 'osd_{}'.format(stat)
342 metrics[path] = Metric(
343 'gauge',
344 path,
345 'OSD stat {}'.format(stat),
346 ('ceph_daemon',)
347 )
348 for stat in OSD_POOL_STATS:
349 path = 'pool_{}'.format(stat)
350 metrics[path] = Metric(
351 'gauge',
352 path,
353 "OSD pool stats: {}".format(stat),
354 ('pool_id',)
355 )
356 for state in PG_STATES:
357 path = 'pg_{}'.format(state)
358 metrics[path] = Metric(
359 'gauge',
360 path,
361 'PG {} per pool'.format(state),
362 ('pool_id',)
363 )
364 for state in DF_CLUSTER:
365 path = 'cluster_{}'.format(state)
366 metrics[path] = Metric(
367 'gauge',
368 path,
369 'DF {}'.format(state),
370 )
371 for state in DF_POOL:
372 path = 'pool_{}'.format(state)
373 metrics[path] = Metric(
374 'gauge',
375 path,
376 'DF pool {}'.format(state),
377 ('pool_id',)
378 )
379 for state in NUM_OBJECTS:
380 path = 'num_objects_{}'.format(state)
381 metrics[path] = Metric(
382 'gauge',
383 path,
384 'Number of {} objects'.format(state),
385 )
386
387 return metrics
388
389 def get_health(self):
390 health = json.loads(self.get('health')['json'])
391 self.metrics['health_status'].set(
392 health_status_to_number(health['status'])
393 )
394
395 def get_pool_stats(self):
396 # retrieve pool stats to provide per pool recovery metrics
397 # (osd_pool_stats moved to mgr in Mimic)
398 pstats = self.get('osd_pool_stats')
399 for pool in pstats['pool_stats']:
400 for stat in OSD_POOL_STATS:
401 self.metrics['pool_{}'.format(stat)].set(
402 pool['recovery_rate'].get(stat, 0),
403 (pool['pool_id'],)
404 )
405
406 def get_df(self):
407 # maybe get the to-be-exported metrics from a config?
408 df = self.get('df')
409 for stat in DF_CLUSTER:
410 self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
411
412 for pool in df['pools']:
413 for stat in DF_POOL:
414 self.metrics['pool_{}'.format(stat)].set(
415 pool['stats'][stat],
416 (pool['id'],)
417 )
418
419 def get_fs(self):
420 fs_map = self.get('fs_map')
421 servers = self.get_service_list()
422 self.log.debug('standbys: {}'.format(fs_map['standbys']))
423 # export standby mds metadata, default standby fs_id is '-1'
424 for standby in fs_map['standbys']:
425 id_ = standby['name']
426 host_version = servers.get((id_, 'mds'), ('', ''))
427 self.metrics['mds_metadata'].set(1, (
428 'mds.{}'.format(id_), '-1',
429 host_version[0], standby['addr'],
430 standby['rank'], host_version[1]
431 ))
432 for fs in fs_map['filesystems']:
433 # collect fs metadata
434 data_pools = ",".join([str(pool)
435 for pool in fs['mdsmap']['data_pools']])
436 self.metrics['fs_metadata'].set(1, (
437 data_pools,
438 fs['id'],
439 fs['mdsmap']['metadata_pool'],
440 fs['mdsmap']['fs_name']
441 ))
442 self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
443 for gid, daemon in fs['mdsmap']['info'].items():
444 id_ = daemon['name']
445 host_version = servers.get((id_, 'mds'), ('', ''))
446 self.metrics['mds_metadata'].set(1, (
447 'mds.{}'.format(id_), fs['id'],
448 host_version[0], daemon['addr'],
449 daemon['rank'], host_version[1]
450 ))
451
452 def get_quorum_status(self):
453 mon_status = json.loads(self.get('mon_status')['json'])
454 servers = self.get_service_list()
455 for mon in mon_status['monmap']['mons']:
456 rank = mon['rank']
457 id_ = mon['name']
458 host_version = servers.get((id_, 'mon'), ('', ''))
459 self.metrics['mon_metadata'].set(1, (
460 'mon.{}'.format(id_), host_version[0],
461 mon['public_addr'].split(':')[0], rank,
462 host_version[1]
463 ))
464 in_quorum = int(rank in mon_status['quorum'])
465 self.metrics['mon_quorum_status'].set(in_quorum, (
466 'mon.{}'.format(id_),
467 ))
468
469 def get_mgr_status(self):
470 mgr_map = self.get('mgr_map')
471 servers = self.get_service_list()
472
473 active = mgr_map['active_name']
474 standbys = [s.get('name') for s in mgr_map['standbys']]
475
476 all_mgrs = list(standbys)
477 all_mgrs.append(active)
478
479 all_modules = {module.get('name'):module.get('can_run') for module in mgr_map['available_modules']}
480
481 ceph_release = None
482 for mgr in all_mgrs:
483 host_version = servers.get((mgr, 'mgr'), ('', ''))
484 if mgr == active:
485 _state = 1
486 ceph_release = host_version[1].split()[-2] # e.g. nautilus
487 else:
488 _state = 0
489
490 self.metrics['mgr_metadata'].set(1, (
491 'mgr.{}'.format(mgr), host_version[0],
492 host_version[1]
493 ))
494 self.metrics['mgr_status'].set(_state, (
495 'mgr.{}'.format(mgr),
496 ))
497 always_on_modules = mgr_map['always_on_modules'].get(ceph_release, [])
498 active_modules = list(always_on_modules)
499 active_modules.extend(mgr_map['modules'])
500
501 for mod_name in all_modules.keys():
502
503 if mod_name in always_on_modules:
504 _state = 2
505 elif mod_name in active_modules:
506 _state = 1
507 else:
508 _state = 0
509
510 _can_run = 1 if all_modules[mod_name] else 0
511 self.metrics['mgr_module_status'].set(_state, (mod_name,))
512 self.metrics['mgr_module_can_run'].set(_can_run, (mod_name,))
513
514 def get_pg_status(self):
515
516 pg_summary = self.get('pg_summary')
517
518 for pool in pg_summary['by_pool']:
519 num_by_state = dict((state, 0) for state in PG_STATES)
520 num_by_state['total'] = 0
521
522 for state_name, count in pg_summary['by_pool'][pool].items():
523 for state in state_name.split('+'):
524 num_by_state[state] += count
525 num_by_state['total'] += count
526
527 for state, num in num_by_state.items():
528 try:
529 self.metrics["pg_{}".format(state)].set(num, (pool,))
530 except KeyError:
531 self.log.warning("skipping pg in unknown state {}".format(state))
532
533 def get_osd_stats(self):
534 osd_stats = self.get('osd_stats')
535 for osd in osd_stats['osd_stats']:
536 id_ = osd['osd']
537 for stat in OSD_STATS:
538 val = osd['perf_stat'][stat]
539 self.metrics['osd_{}'.format(stat)].set(val, (
540 'osd.{}'.format(id_),
541 ))
542
543 def get_service_list(self):
544 ret = {}
545 for server in self.list_servers():
546 version = server.get('ceph_version', '')
547 host = server.get('hostname', '')
548 for service in server.get('services', []):
549 ret.update({(service['id'], service['type']): (host, version)})
550 return ret
551
552 def get_metadata_and_osd_status(self):
553 osd_map = self.get('osd_map')
554 osd_flags = osd_map['flags'].split(',')
555 for flag in OSD_FLAGS:
556 self.metrics['osd_flag_{}'.format(flag)].set(
557 int(flag in osd_flags)
558 )
559
560 osd_devices = self.get('osd_map_crush')['devices']
561 servers = self.get_service_list()
562 for osd in osd_map['osds']:
563 # id can be used to link osd metrics and metadata
564 id_ = osd['osd']
565 # collect osd metadata
566 p_addr = osd['public_addr'].split(':')[0]
567 c_addr = osd['cluster_addr'].split(':')[0]
568 if p_addr == "-" or c_addr == "-":
569 self.log.info(
570 "Missing address metadata for osd {0}, skipping occupation"
571 " and metadata records for this osd".format(id_)
572 )
573 continue
574
575 dev_class = None
576 for osd_device in osd_devices:
577 if osd_device['id'] == id_:
578 dev_class = osd_device.get('class', '')
579 break
580
581 if dev_class is None:
582 self.log.info("OSD {0} is missing from CRUSH map, "
583 "skipping output".format(id_))
584 continue
585
586 host_version = servers.get((str(id_), 'osd'), ('', ''))
587
588 # collect disk occupation metadata
589 osd_metadata = self.get_metadata("osd", str(id_))
590 if osd_metadata is None:
591 continue
592
593 obj_store = osd_metadata.get('osd_objectstore', '')
594 f_iface = osd_metadata.get('front_iface', '')
595 b_iface = osd_metadata.get('back_iface', '')
596
597 self.metrics['osd_metadata'].set(1, (
598 b_iface,
599 'osd.{}'.format(id_),
600 c_addr,
601 dev_class,
602 f_iface,
603 host_version[0],
604 obj_store,
605 p_addr,
606 host_version[1]
607 ))
608
609 # collect osd status
610 for state in OSD_STATUS:
611 status = osd[state]
612 self.metrics['osd_{}'.format(state)].set(status, (
613 'osd.{}'.format(id_),
614 ))
615
616 osd_dev_node = None
617 if obj_store == "filestore":
618 # collect filestore backend device
619 osd_dev_node = osd_metadata.get(
620 'backend_filestore_dev_node', None)
621 # collect filestore journal device
622 osd_wal_dev_node = osd_metadata.get('osd_journal', '')
623 osd_db_dev_node = ''
624 elif obj_store == "bluestore":
625 # collect bluestore backend device
626 osd_dev_node = osd_metadata.get(
627 'bluestore_bdev_dev_node', None)
628 # collect bluestore wal backend
629 osd_wal_dev_node = osd_metadata.get('bluefs_wal_dev_node', '')
630 # collect bluestore db backend
631 osd_db_dev_node = osd_metadata.get('bluefs_db_dev_node', '')
632 if osd_dev_node and osd_dev_node == "unknown":
633 osd_dev_node = None
634
635 osd_hostname = osd_metadata.get('hostname', None)
636 if osd_dev_node and osd_hostname:
637 self.log.debug("Got dev for osd {0}: {1}/{2}".format(
638 id_, osd_hostname, osd_dev_node))
639 self.metrics['disk_occupation'].set(1, (
640 "osd.{0}".format(id_),
641 osd_dev_node,
642 osd_db_dev_node,
643 osd_wal_dev_node,
644 osd_hostname
645 ))
646 else:
647 self.log.info("Missing dev node metadata for osd {0}, skipping "
648 "occupation record for this osd".format(id_))
649
650 for pool in osd_map['pools']:
651 self.metrics['pool_metadata'].set(
652 1, (pool['pool'], pool['pool_name']))
653
654 # Populate other servers metadata
655 for key, value in servers.items():
656 service_id, service_type = key
657 if service_type == 'rgw':
658 hostname, version = value
659 self.metrics['rgw_metadata'].set(
660 1,
661 ('{}.{}'.format(service_type, service_id),
662 hostname, version)
663 )
664 elif service_type == 'rbd-mirror':
665 mirror_metadata = self.get_metadata('rbd-mirror', service_id)
666 if mirror_metadata is None:
667 continue
668 mirror_metadata['ceph_daemon'] = '{}.{}'.format(service_type,
669 service_id)
670 self.metrics['rbd_mirror_metadata'].set(
671 1, (mirror_metadata.get(k, '')
672 for k in RBD_MIRROR_METADATA)
673 )
674
675 def get_num_objects(self):
676 pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
677 for obj in NUM_OBJECTS:
678 stat = 'num_objects_{}'.format(obj)
679 self.metrics[stat].set(pg_sum[stat])
680
681 def get_rbd_stats(self):
682 # Per RBD image stats is collected by registering a dynamic osd perf
683 # stats query that tells OSDs to group stats for requests associated
684 # with RBD objects by pool, namespace, and image id, which are
685 # extracted from the request object names or other attributes.
686 # The RBD object names have the following prefixes:
687 # - rbd_data.{image_id}. (data stored in the same pool as metadata)
688 # - rbd_data.{pool_id}.{image_id}. (data stored in a dedicated data pool)
689 # - journal_data.{pool_id}.{image_id}. (journal if journaling is enabled)
690 # The pool_id in the object name is the id of the pool with the image
691 # metdata, and should be used in the image spec. If there is no pool_id
692 # in the object name, the image pool is the pool where the object is
693 # located.
694
695 # Parse rbd_stats_pools option, which is a comma or space separated
696 # list of pool[/namespace] entries. If no namespace is specifed the
697 # stats are collected for every namespace in the pool.
698 pools_string = self.get_localized_module_option('rbd_stats_pools', '')
699 pools = {}
700 for p in [x for x in re.split('[\s,]+', pools_string) if x]:
701 s = p.split('/', 2)
702 pool_name = s[0]
703 if len(s) == 1:
704 # empty set means collect for all namespaces
705 pools[pool_name] = set()
706 continue
707 if pool_name not in pools:
708 pools[pool_name] = set()
709 elif not pools[pool_name]:
710 continue
711 pools[pool_name].add(s[1])
712
713 rbd_stats_pools = {}
714 for pool_id in list(self.rbd_stats['pools']):
715 name = self.rbd_stats['pools'][pool_id]['name']
716 if name not in pools:
717 del self.rbd_stats['pools'][pool_id]
718 else:
719 rbd_stats_pools[name] = \
720 self.rbd_stats['pools'][pool_id]['ns_names']
721
722 pools_refreshed = False
723 if pools:
724 next_refresh = self.rbd_stats['pools_refresh_time'] + \
725 self.get_localized_module_option(
726 'rbd_stats_pools_refresh_interval', 300)
727 if rbd_stats_pools != pools or time.time() >= next_refresh:
728 self.refresh_rbd_stats_pools(pools)
729 pools_refreshed = True
730
731 pool_ids = list(self.rbd_stats['pools'])
732 pool_ids.sort()
733 pool_id_regex = '^(' + '|'.join([str(x) for x in pool_ids]) + ')$'
734
735 nspace_names = []
736 for pool_id, pool in self.rbd_stats['pools'].items():
737 if pool['ns_names']:
738 nspace_names.extend(pool['ns_names'])
739 else:
740 nspace_names = []
741 break
742 if nspace_names:
743 namespace_regex = '^(' + \
744 "|".join([re.escape(x)
745 for x in set(nspace_names)]) + ')$'
746 else:
747 namespace_regex = '^(.*)$'
748
749 if 'query' in self.rbd_stats and \
750 (pool_id_regex != self.rbd_stats['query']['key_descriptor'][0]['regex'] or
751 namespace_regex != self.rbd_stats['query']['key_descriptor'][1]['regex']):
752 self.remove_osd_perf_query(self.rbd_stats['query_id'])
753 del self.rbd_stats['query_id']
754 del self.rbd_stats['query']
755
756 if not self.rbd_stats['pools']:
757 return
758
759 counters_info = self.rbd_stats['counters_info']
760
761 if 'query_id' not in self.rbd_stats:
762 query = {
763 'key_descriptor': [
764 {'type': 'pool_id', 'regex': pool_id_regex},
765 {'type': 'namespace', 'regex': namespace_regex},
766 {'type': 'object_name',
767 'regex': '^(?:rbd|journal)_data\.(?:([0-9]+)\.)?([^.]+)\.'},
768 ],
769 'performance_counter_descriptors': list(counters_info),
770 }
771 query_id = self.add_osd_perf_query(query)
772 if query_id is None:
773 self.log.error('failed to add query %s' % query)
774 return
775 self.rbd_stats['query'] = query
776 self.rbd_stats['query_id'] = query_id
777
778 res = self.get_osd_perf_counters(self.rbd_stats['query_id'])
779 for c in res['counters']:
780 # if the pool id is not found in the object name use id of the
781 # pool where the object is located
782 if c['k'][2][0]:
783 pool_id = int(c['k'][2][0])
784 else:
785 pool_id = int(c['k'][0][0])
786 if pool_id not in self.rbd_stats['pools'] and not pools_refreshed:
787 self.refresh_rbd_stats_pools(pools)
788 pools_refreshed = True
789 if pool_id not in self.rbd_stats['pools']:
790 continue
791 pool = self.rbd_stats['pools'][pool_id]
792 nspace_name = c['k'][1][0]
793 if nspace_name not in pool['images']:
794 continue
795 image_id = c['k'][2][1]
796 if image_id not in pool['images'][nspace_name] and \
797 not pools_refreshed:
798 self.refresh_rbd_stats_pools(pools)
799 pool = self.rbd_stats['pools'][pool_id]
800 pools_refreshed = True
801 if image_id not in pool['images'][nspace_name]:
802 continue
803 counters = pool['images'][nspace_name][image_id]['c']
804 for i in range(len(c['c'])):
805 counters[i][0] += c['c'][i][0]
806 counters[i][1] += c['c'][i][1]
807
808 label_names = ("pool", "namespace", "image")
809 for pool_id, pool in self.rbd_stats['pools'].items():
810 pool_name = pool['name']
811 for nspace_name, images in pool['images'].items():
812 for image_id in images:
813 image_name = images[image_id]['n']
814 counters = images[image_id]['c']
815 i = 0
816 for key in counters_info:
817 counter_info = counters_info[key]
818 stattype = self._stattype_to_str(counter_info['type'])
819 labels = (pool_name, nspace_name, image_name)
820 if counter_info['type'] == self.PERFCOUNTER_COUNTER:
821 path = 'rbd_' + key
822 if path not in self.metrics:
823 self.metrics[path] = Metric(
824 stattype,
825 path,
826 counter_info['desc'],
827 label_names,
828 )
829 self.metrics[path].set(counters[i][0], labels)
830 elif counter_info['type'] == self.PERFCOUNTER_LONGRUNAVG:
831 path = 'rbd_' + key + '_sum'
832 if path not in self.metrics:
833 self.metrics[path] = Metric(
834 stattype,
835 path,
836 counter_info['desc'] + ' Total',
837 label_names,
838 )
839 self.metrics[path].set(counters[i][0], labels)
840 path = 'rbd_' + key + '_count'
841 if path not in self.metrics:
842 self.metrics[path] = Metric(
843 'counter',
844 path,
845 counter_info['desc'] + ' Count',
846 label_names,
847 )
848 self.metrics[path].set(counters[i][1], labels)
849 i += 1
850
851 def refresh_rbd_stats_pools(self, pools):
852 self.log.debug('refreshing rbd pools %s' % (pools))
853
854 rbd = RBD()
855 counters_info = self.rbd_stats['counters_info']
856 for pool_name, cfg_ns_names in pools.items():
857 try:
858 pool_id = self.rados.pool_lookup(pool_name)
859 with self.rados.open_ioctx(pool_name) as ioctx:
860 if pool_id not in self.rbd_stats['pools']:
861 self.rbd_stats['pools'][pool_id] = {'images': {}}
862 pool = self.rbd_stats['pools'][pool_id]
863 pool['name'] = pool_name
864 pool['ns_names'] = cfg_ns_names
865 if cfg_ns_names:
866 nspace_names = list(cfg_ns_names)
867 else:
868 nspace_names = [''] + rbd.namespace_list(ioctx)
869 for nspace_name in pool['images']:
870 if nspace_name not in nspace_names:
871 del pool['images'][nspace_name]
872 for nspace_name in nspace_names:
873 if (nspace_name and
874 not rbd.namespace_exists(ioctx, nspace_name)):
875 self.log.debug('unknown namespace %s for pool %s' %
876 (nspace_name, pool_name))
877 continue
878 ioctx.set_namespace(nspace_name)
879 if nspace_name not in pool['images']:
880 pool['images'][nspace_name] = {}
881 namespace = pool['images'][nspace_name]
882 images = {}
883 for image_meta in RBD().list2(ioctx):
884 image = {'n': image_meta['name']}
885 image_id = image_meta['id']
886 if image_id in namespace:
887 image['c'] = namespace[image_id]['c']
888 else:
889 image['c'] = [[0, 0] for x in counters_info]
890 images[image_id] = image
891 pool['images'][nspace_name] = images
892 except Exception as e:
893 self.log.error('failed listing pool %s: %s' % (pool_name, e))
894 self.rbd_stats['pools_refresh_time'] = time.time()
895
896 def shutdown_rbd_stats(self):
897 if 'query_id' in self.rbd_stats:
898 self.remove_osd_perf_query(self.rbd_stats['query_id'])
899 del self.rbd_stats['query_id']
900 del self.rbd_stats['query']
901 self.rbd_stats['pools'].clear()
902
903 def add_fixed_name_metrics(self):
904 """
905 Add fixed name metrics from existing ones that have details in their names
906 that should be in labels (not in name).
907 For backward compatibility, a new fixed name metric is created (instead of replacing)
908 and details are put in new labels.
909 Intended for RGW sync perf. counters but extendable as required.
910 See: https://tracker.ceph.com/issues/45311
911 """
912 new_metrics = {}
913 for metric_path in self.metrics.keys():
914 # Address RGW sync perf. counters.
915 match = re.search('^data-sync-from-(.*)\.', metric_path)
916 if match:
917 new_path = re.sub('from-([^.]*)', 'from-zone', metric_path)
918 if new_path not in new_metrics:
919 new_metrics[new_path] = Metric(
920 self.metrics[metric_path].mtype,
921 new_path,
922 self.metrics[metric_path].desc,
923 self.metrics[metric_path].labelnames + ('source_zone',)
924 )
925 for label_values, value in self.metrics[metric_path].value.items():
926 new_metrics[new_path].set(value, label_values + (match.group(1),))
927
928 self.metrics.update(new_metrics)
929
930 def collect(self):
931 # Clear the metrics before scraping
932 for k in self.metrics.keys():
933 self.metrics[k].clear()
934
935 self.get_health()
936 self.get_df()
937 self.get_pool_stats()
938 self.get_fs()
939 self.get_osd_stats()
940 self.get_quorum_status()
941 self.get_mgr_status()
942 self.get_metadata_and_osd_status()
943 self.get_pg_status()
944 self.get_num_objects()
945
946 for daemon, counters in self.get_all_perf_counters().items():
947 for path, counter_info in counters.items():
948 # Skip histograms, they are represented by long running avgs
949 stattype = self._stattype_to_str(counter_info['type'])
950 if not stattype or stattype == 'histogram':
951 self.log.debug('ignoring %s, type %s' % (path, stattype))
952 continue
953
954 path, label_names, labels = self._perfpath_to_path_labels(
955 daemon, path)
956
957 # Get the value of the counter
958 value = self._perfvalue_to_value(
959 counter_info['type'], counter_info['value'])
960
961 # Represent the long running avgs as sum/count pairs
962 if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
963 _path = path + '_sum'
964 if _path not in self.metrics:
965 self.metrics[_path] = Metric(
966 stattype,
967 _path,
968 counter_info['description'] + ' Total',
969 label_names,
970 )
971 self.metrics[_path].set(value, labels)
972
973 _path = path + '_count'
974 if _path not in self.metrics:
975 self.metrics[_path] = Metric(
976 'counter',
977 _path,
978 counter_info['description'] + ' Count',
979 label_names,
980 )
981 self.metrics[_path].set(counter_info['count'], labels,)
982 else:
983 if path not in self.metrics:
984 self.metrics[path] = Metric(
985 stattype,
986 path,
987 counter_info['description'],
988 label_names,
989 )
990 self.metrics[path].set(value, labels)
991
992 self.add_fixed_name_metrics()
993 self.get_rbd_stats()
994
995 # Return formatted metrics and clear no longer used data
996 _metrics = [m.str_expfmt() for m in self.metrics.values()]
997 for k in self.metrics.keys():
998 self.metrics[k].clear()
999
1000 return ''.join(_metrics) + '\n'
1001
1002 def get_file_sd_config(self):
1003 servers = self.list_servers()
1004 targets = []
1005 for server in servers:
1006 hostname = server.get('hostname', '')
1007 for service in server.get('services', []):
1008 if service['type'] != 'mgr':
1009 continue
1010 id_ = service['id']
1011 # get port for prometheus module at mgr with id_
1012 # TODO use get_config_prefix or get_config here once
1013 # https://github.com/ceph/ceph/pull/20458 is merged
1014 result = CommandResult("")
1015 global_instance().send_command(
1016 result, "mon", '',
1017 json.dumps({
1018 "prefix": "config-key get",
1019 'key': "config/mgr/mgr/prometheus/{}/server_port".format(id_),
1020 }),
1021 "")
1022 r, outb, outs = result.wait()
1023 if r != 0:
1024 global_instance().log.error("Failed to retrieve port for mgr {}: {}".format(id_, outs))
1025 targets.append('{}:{}'.format(hostname, DEFAULT_PORT))
1026 else:
1027 port = json.loads(outb)
1028 targets.append('{}:{}'.format(hostname, port))
1029
1030 ret = [
1031 {
1032 "targets": targets,
1033 "labels": {}
1034 }
1035 ]
1036 return 0, json.dumps(ret), ""
1037
1038 def self_test(self):
1039 self.collect()
1040 self.get_file_sd_config()
1041
1042 def handle_command(self, inbuf, cmd):
1043 if cmd['prefix'] == 'prometheus file_sd_config':
1044 return self.get_file_sd_config()
1045 else:
1046 return (-errno.EINVAL, '',
1047 "Command not found '{0}'".format(cmd['prefix']))
1048
1049 def serve(self):
1050
1051 class Root(object):
1052
1053 # collapse everything to '/'
1054 def _cp_dispatch(self, vpath):
1055 cherrypy.request.path = ''
1056 return self
1057
1058 @cherrypy.expose
1059 def index(self):
1060 return '''<!DOCTYPE html>
1061 <html>
1062 <head><title>Ceph Exporter</title></head>
1063 <body>
1064 <h1>Ceph Exporter</h1>
1065 <p><a href='/metrics'>Metrics</a></p>
1066 </body>
1067 </html>'''
1068
1069 @cherrypy.expose
1070 def metrics(self):
1071 instance = global_instance()
1072 # Lock the function execution
1073 try:
1074 instance.collect_lock.acquire()
1075 return self._metrics(instance)
1076 finally:
1077 instance.collect_lock.release()
1078
1079 @staticmethod
1080 def _metrics(instance):
1081 # Return cached data if available and collected before the
1082 # cache times out
1083 if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
1084 cherrypy.response.headers['Content-Type'] = 'text/plain'
1085 return instance.collect_cache
1086
1087 if instance.have_mon_connection():
1088 instance.collect_cache = None
1089 instance.collect_time = time.time()
1090 instance.collect_cache = instance.collect()
1091 cherrypy.response.headers['Content-Type'] = 'text/plain'
1092 return instance.collect_cache
1093 else:
1094 raise cherrypy.HTTPError(503, 'No MON connection')
1095
1096 # Make the cache timeout for collecting configurable
1097 self.collect_timeout = float(self.get_localized_module_option(
1098 'scrape_interval', 5.0))
1099
1100 server_addr = self.get_localized_module_option(
1101 'server_addr', get_default_addr())
1102 server_port = self.get_localized_module_option(
1103 'server_port', DEFAULT_PORT)
1104 self.log.info(
1105 "server_addr: %s server_port: %s" %
1106 (server_addr, server_port)
1107 )
1108
1109 # Publish the URI that others may use to access the service we're
1110 # about to start serving
1111 self.set_uri('http://{0}:{1}/'.format(
1112 socket.getfqdn() if server_addr in ['::', '0.0.0.0'] else server_addr,
1113 server_port
1114 ))
1115
1116 cherrypy.config.update({
1117 'server.socket_host': server_addr,
1118 'server.socket_port': int(server_port),
1119 'engine.autoreload.on': False
1120 })
1121 cherrypy.tree.mount(Root(), "/")
1122 self.log.info('Starting engine...')
1123 cherrypy.engine.start()
1124 self.log.info('Engine started.')
1125 # wait for the shutdown event
1126 self.shutdown_event.wait()
1127 self.shutdown_event.clear()
1128 cherrypy.engine.stop()
1129 self.log.info('Engine stopped.')
1130 self.shutdown_rbd_stats()
1131
1132 def shutdown(self):
1133 self.log.info('Stopping engine...')
1134 self.shutdown_event.set()
1135
1136
1137 class StandbyModule(MgrStandbyModule):
1138 def __init__(self, *args, **kwargs):
1139 super(StandbyModule, self).__init__(*args, **kwargs)
1140 self.shutdown_event = threading.Event()
1141
1142 def serve(self):
1143 server_addr = self.get_localized_module_option(
1144 'server_addr', get_default_addr())
1145 server_port = self.get_localized_module_option(
1146 'server_port', DEFAULT_PORT)
1147 self.log.info("server_addr: %s server_port: %s" %
1148 (server_addr, server_port))
1149 cherrypy.config.update({
1150 'server.socket_host': server_addr,
1151 'server.socket_port': int(server_port),
1152 'engine.autoreload.on': False
1153 })
1154
1155 module = self
1156
1157 class Root(object):
1158 @cherrypy.expose
1159 def index(self):
1160 active_uri = module.get_active_uri()
1161 return '''<!DOCTYPE html>
1162 <html>
1163 <head><title>Ceph Exporter</title></head>
1164 <body>
1165 <h1>Ceph Exporter</h1>
1166 <p><a href='{}metrics'>Metrics</a></p>
1167 </body>
1168 </html>'''.format(active_uri)
1169
1170 @cherrypy.expose
1171 def metrics(self):
1172 cherrypy.response.headers['Content-Type'] = 'text/plain'
1173 return ''
1174
1175 cherrypy.tree.mount(Root(), '/', {})
1176 self.log.info('Starting engine...')
1177 cherrypy.engine.start()
1178 self.log.info('Engine started.')
1179 # Wait for shutdown event
1180 self.shutdown_event.wait()
1181 self.shutdown_event.clear()
1182 cherrypy.engine.stop()
1183 self.log.info('Engine stopped.')
1184
1185 def shutdown(self):
1186 self.log.info("Stopping engine...")
1187 self.shutdown_event.set()
1188 self.log.info("Stopped engine")