]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/diskprediction_cloud/agent/metrics/db_relay.py
1 from __future__
import absolute_import
6 from . import MetricsAgent
, MetricsField
7 from ...common
.clusterdata
import ClusterAPI
8 from ...common
.cypher
import CypherOP
, NodeInfo
12 """ basic diskprediction structure """
15 def __init__(self
, *args
, **kwargs
):
16 if len(args
) > len(self
._fields
):
17 raise TypeError('Expected {} arguments'.format(len(self
._fields
)))
19 for name
, value
in zip(self
._fields
, args
):
20 setattr(self
, name
, value
)
22 for name
in self
._fields
[len(args
):]:
23 setattr(self
, name
, kwargs
.pop(name
))
26 raise TypeError('Invalid argument(s): {}'.format(','.join(kwargs
)))
29 class MGRDpCeph(BaseDP
):
31 'fsid', 'health', 'max_osd', 'size',
32 'avail_size', 'raw_used', 'raw_used_percent'
36 class MGRDpHost(BaseDP
):
37 _fields
= ['fsid', 'host', 'ipaddr']
40 class MGRDpMon(BaseDP
):
41 _fields
= ['fsid', 'host', 'ipaddr']
44 class MGRDpOsd(BaseDP
):
46 'fsid', 'host', '_id', 'uuid', 'up', '_in', 'weight', 'public_addr',
47 'cluster_addr', 'state', 'ceph_release', 'osd_devices', 'rotational'
51 class MGRDpMds(BaseDP
):
52 _fields
= ['fsid', 'host', 'ipaddr']
55 class MGRDpPool(BaseDP
):
57 'fsid', 'size', 'pool_name', 'pool_id', 'type', 'min_size',
58 'pg_num', 'pgp_num', 'created_time', 'pgids', 'osd_ids', 'tiers', 'cache_mode',
59 'erasure_code_profile', 'tier_of'
63 class MGRDpRBD(BaseDP
):
64 _fields
= ['fsid', '_id', 'name', 'pool_name', 'pool_id']
67 class MGRDpFS(BaseDP
):
68 _fields
= ['fsid', '_id', 'name', 'metadata_pool', 'data_pools', 'mds_nodes']
71 class MGRDpPG(BaseDP
):
73 'fsid', 'pgid', 'up_osds', 'acting_osds', 'state',
74 'objects', 'degraded', 'misplaced', 'unfound'
78 class MGRDpDisk(BaseDP
):
79 _fields
= ['host_domain_id', 'host', 'fs_journal_osd', 'bs_db_osd', 'bs_wal_osd', 'data_osd', 'osd_ids']
82 class DBRelay(MetricsField
):
83 """ DB Relay structure """
84 measurement
= 'db_relay'
87 super(DBRelay
, self
).__init
__()
88 self
.fields
['agenthost'] = None
89 self
.tags
['agenthost_domain_id'] = None
90 self
.tags
['dc_tag'] = 'na'
91 self
.tags
['host'] = None
92 self
.fields
['cmd'] = None
95 class DBRelayAgent(MetricsAgent
):
96 measurement
= 'db_relay'
98 def __init__(self
, *args
, **kwargs
):
99 super(DBRelayAgent
, self
).__init
__(*args
, **kwargs
)
100 self
._cluster
_node
= None
101 self
._cluster
_id
= None
102 self
._ceph
= ClusterAPI(self
._module
_inst
)
103 self
._osd
_maps
= self
._ceph
.module
.get('osd_map')
104 self
._mon
_maps
= self
._ceph
.module
.get('mon_map')
105 self
._fs
_maps
= self
._ceph
.module
.get('fs_map')
106 self
._osd
_metadata
= self
._ceph
.module
.get('osd_metadata')
107 self
._host
_nodes
= dict()
108 self
._osd
_nodes
= dict()
109 self
._mon
_nodes
= dict()
110 self
._mds
_nodes
= dict()
111 self
._dev
_nodes
= dict()
112 self
._pool
_nodes
= dict()
113 self
._rbd
_nodes
= dict()
114 self
._fs
_nodes
= dict()
115 # initial ceph all node states
116 self
._init
_cluster
_node
()
126 def _init_hosts(self
):
129 osd_data
= self
._osd
_maps
.get('osds', [])
130 for _data
in osd_data
:
131 osd_id
= _data
['osd']
132 if not _data
.get('in'):
134 osd_addr
= _data
['public_addr'].split(':')[0]
135 osd_metadata
= self
._ceph
.get_osd_metadata(osd_id
)
137 osd_host
= osd_metadata
['hostname']
138 hosts
.add((osd_host
, osd_addr
))
141 mons
= self
._mon
_maps
.get('mons', [])
143 mon_host
= _data
['name']
144 mon_addr
= _data
['public_addr'].split(':')[0]
146 hosts
.add((mon_host
, mon_addr
))
149 file_systems
= self
._fs
_maps
.get('filesystems', [])
150 for _data
in file_systems
:
151 mds_info
= _data
.get('mdsmap').get('info')
152 for _gid
in mds_info
:
153 mds_data
= mds_info
[_gid
]
154 mds_addr
= mds_data
.get('addr').split(':')[0]
155 mds_host
= mds_data
.get('name')
157 hosts
.add((mds_host
, mds_addr
))
160 self
._host
_nodes
[host
] = None
162 host_node
= NodeInfo(
164 domain_id
='{}_{}'.format(self
._cluster
_id
, host
),
168 self
._host
_nodes
[host
] = host_node
170 def _init_mons(self
):
171 cluster_id
= self
._cluster
_id
172 mons
= self
._mon
_maps
.get('mons')
174 mon_name
= mon
.get('name', '')
175 mon_addr
= mon
.get('addr', '').split(':')[0]
176 if mon_name
not in self
._host
_nodes
.keys():
188 domain_id
='{}.mon.{}'.format(cluster_id
, mon_name
),
192 self
._mon
_nodes
[mon_name
] = mon_node
195 cluster_id
= self
._cluster
_id
196 file_systems
= self
._fs
_maps
.get('filesystems', [])
197 for _data
in file_systems
:
198 mds_info
= _data
.get('mdsmap').get('info')
199 for _gid
in mds_info
:
200 mds_data
= mds_info
[_gid
]
201 mds_addr
= mds_data
.get('addr').split(':')[0]
202 mds_host
= mds_data
.get('name')
203 mds_gid
= mds_data
.get('gid')
205 if mds_host
not in self
._host
_nodes
:
217 domain_id
='{}.mds.{}'.format(cluster_id
, mds_gid
),
218 name
='MDS.{}'.format(mds_gid
),
221 self
._mds
_nodes
[mds_host
] = mds_node
223 def _init_osds(self
):
224 for osd
in self
._osd
_maps
.get('osds', []):
225 osd_id
= osd
.get('osd', -1)
226 meta
= self
._osd
_metadata
.get(str(osd_id
), {})
227 osd_host
= meta
['hostname']
228 osd_ceph_version
= meta
['ceph_version']
229 osd_rotational
= meta
['rotational']
230 osd_devices
= meta
['devices'].split(',')
232 # filter 'dm' device.
234 for devname
in osd_devices
:
237 devices
.append(devname
)
239 if osd_host
not in self
._host
_nodes
.keys():
241 self
._osd
_nodes
[str(osd_id
)] = None
244 for addr
in osd
.get('public_addrs', {}).get('addrvec', []):
245 public_addr
.append(addr
.get('addr'))
246 for addr
in osd
.get('cluster_addrs', {}).get('addrvec', []):
247 cluster_addr
.append(addr
.get('addr'))
249 fsid
=self
._cluster
_id
,
252 uuid
=osd
.get('uuid'),
255 weight
=osd
.get('weight'),
256 public_addr
=','.join(public_addr
),
257 cluster_addr
=','.join(cluster_addr
),
258 state
=','.join(osd
.get('state', [])),
259 ceph_release
=osd_ceph_version
,
260 osd_devices
=','.join(devices
),
261 rotational
=osd_rotational
)
262 for k
, v
in meta
.items():
263 setattr(dp_osd
, k
, v
)
268 domain_id
='{}.osd.{}'.format(self
._cluster
_id
, osd_id
),
269 name
='OSD.{}'.format(osd_id
),
272 self
._osd
_nodes
[str(osd_id
)] = osd_node
274 def _init_devices(self
):
275 r
= re
.compile('[^/dev]\D+')
276 for osdid
, o_val
in self
._osd
_nodes
.items():
277 o_devs
= o_val
.meta
.get('device_ids', '').split(',')
279 journal_devs
= o_val
.meta
.get('backend_filestore_journal_dev_node', '').split(',')
281 bs_db_devs
= o_val
.meta
.get('bluefs_db_dev_node', '').split(',')
282 bs_wal_devs
= o_val
.meta
.get('bluefs_wal_dev_node', '').split(',')
289 if len(dev
.split('=')) != 2:
291 dev_name
= dev
.split('=')[0]
292 dev_id
= dev
.split('=')[1]
296 for j_dev
in journal_devs
:
297 if dev_name
== ''.join(r
.findall(j_dev
)):
298 fs_journal
.append(osdid
)
299 for db_dev
in bs_db_devs
:
300 if dev_name
== ''.join(r
.findall(db_dev
)):
302 for wal_dev
in bs_wal_devs
:
303 if dev_name
== ''.join(r
.findall(wal_dev
)):
306 if not fs_journal
and not bs_db
and not bs_wal
:
309 disk_domain_id
= dev_id
310 if disk_domain_id
not in self
._dev
_nodes
.keys():
312 host_domain_id
='{}_{}'.format(self
._cluster
_id
, o_val
.meta
.get('host')),
313 host
=o_val
.meta
.get('host'),
315 fs_journal_osd
=','.join(str(x
) for x
in fs_journal
) if fs_journal
else '',
316 bs_db_osd
=','.join(str(x
) for x
in bs_db
) if bs_db
else '',
317 bs_wal_osd
=','.join(str(x
) for x
in bs_wal
) if bs_wal
else '',
318 data_osd
=','.join(str(x
) for x
in data
) if data
else ''
321 disk_node
= NodeInfo(
323 domain_id
=disk_domain_id
,
325 meta
=dp_disk
.__dict
__
327 self
._dev
_nodes
[disk_domain_id
] = disk_node
329 dev_node
= self
._dev
_nodes
[disk_domain_id
]
330 osd_ids
= dev_node
.meta
.get('osd_ids', '')
331 if osdid
not in osd_ids
.split(','):
332 arr_value
= osd_ids
.split(',')
333 arr_value
.append(str(osdid
))
334 dev_node
.meta
['osd_ids'] = ','.join(arr_value
)
338 value
= dev_node
.meta
.get('fs_journal_osd', '')
340 arr_value
= value
.split(',')
343 if t
not in arr_value
:
346 dev_node
.meta
['fs_journal_osd'] = ','.join(str(x
) for x
in arr_value
)
350 value
= dev_node
.meta
.get('bs_db_osd', '')
352 arr_value
= value
.split(',')
355 if t
not in arr_value
:
358 dev_node
.meta
['bs_db_osd'] = ','.join(str(x
) for x
in arr_value
)
362 value
= dev_node
.meta
.get('bs_wal_osd', '')
364 arr_value
= value
.split(',')
367 if t
not in arr_value
:
370 dev_node
.meta
['bs_wal_osd'] = ','.join(str(x
) for x
in arr_value
)
374 value
= dev_node
.meta
.get('data_osd', '')
376 arr_value
= value
.split(',')
379 if t
not in arr_value
:
382 dev_node
.meta
['data_osd'] = ','.join(str(x
) for x
in arr_value
)
384 def _init_cluster_node(self
):
385 cluster_id
= self
._ceph
.get_cluster_id()
386 ceph_df_stat
= self
._ceph
.get_ceph_df_state()
387 dp_cluster
= MGRDpCeph(
389 health
=self
._ceph
.get_health_status(),
390 max_osd
=len(self
._ceph
.get_osds()),
391 size
=ceph_df_stat
.get('total_size'),
392 avail_size
=ceph_df_stat
.get('avail_size'),
393 raw_used
=ceph_df_stat
.get('raw_used_size'),
394 raw_used_percent
=ceph_df_stat
.get('used_percent')
396 cluster_name
= cluster_id
[-12:]
397 cluster_node
= NodeInfo(
399 domain_id
=cluster_id
,
400 name
='cluster-{}'.format(cluster_name
),
401 meta
=dp_cluster
.__dict
__
403 self
._cluster
_id
= cluster_id
404 self
._cluster
_node
= cluster_node
406 def _init_pools(self
):
407 pools
= self
._osd
_maps
.get('pools', [])
408 cluster_id
= self
._cluster
_id
411 pgs
= self
._ceph
.get_pgs_up_by_poolid(int(pool
.get('pool', -1)))
412 for pg_id
, osd_id
in pgs
.items():
415 osds
.append(str(o_id
))
418 size
=pool
.get('size'),
419 pool_name
=pool
.get('pool_name'),
420 pool_id
=pool
.get('pool'),
421 type=pool
.get('type'),
422 min_size
=pool
.get('min_szie'),
423 pg_num
=pool
.get('pg_num'),
424 pgp_num
=pool
.get('pg_placement_num'),
425 created_time
=pool
.get('create_time'),
426 pgids
=','.join(pgs
.keys()),
427 osd_ids
=','.join(osds
),
428 tiers
=','.join(str(x
) for x
in pool
.get('tiers', [])),
429 cache_mode
=pool
.get('cache_mode', ''),
430 erasure_code_profile
=str(pool
.get('erasure_code_profile', '')),
431 tier_of
=str(pool
.get('tier_of', -1)))
433 pool_node
= NodeInfo(
435 domain_id
='{}_pool_{}'.format(cluster_id
, pool
.get('pool')),
436 name
=pool
.get('pool_name'),
437 meta
=dp_pool
.__dict
__
439 self
._pool
_nodes
[str(pool
.get('pool'))] = pool_node
441 def _init_rbds(self
):
442 cluster_id
= self
._cluster
_id
443 for p_id
, p_node
in self
._pool
_nodes
.items():
444 rbds
= self
._ceph
.get_rbd_list(p_node
.name
)
445 self
._rbd
_nodes
[str(p_id
)] = []
451 pool_name
=rbd
['pool_name'],
457 domain_id
='{}_rbd_{}'.format(cluster_id
, rbd
['id']),
459 meta
=dp_rbd
.__dict
__,
461 self
._rbd
_nodes
[str(p_id
)].append(rbd_node
)
464 # _fields = ['fsid', '_id', 'name', 'metadata_pool', 'data_pool', 'mds_nodes']
465 cluster_id
= self
._cluster
_id
466 file_systems
= self
._fs
_maps
.get('filesystems', [])
467 for fs
in file_systems
:
468 mdsmap
= fs
.get('mdsmap', {})
470 for m
, md
in mdsmap
.get('info', {}).items():
471 if md
.get('name') not in mds_hostnames
:
472 mds_hostnames
.append(md
.get('name'))
476 name
=mdsmap
.get('fs_name'),
477 metadata_pool
=str(mdsmap
.get('metadata_pool', -1)),
478 data_pools
=','.join(str(i
) for i
in mdsmap
.get('data_pools', [])),
479 mds_nodes
=','.join(mds_hostnames
),
483 domain_id
='{}_fs_{}'.format(cluster_id
, fs
.get('id')),
484 name
=mdsmap
.get('fs_name'),
487 self
._fs
_nodes
[str(fs
.get('id'))] = fs_node
489 def _cluster_contains_host(self
):
490 cluster_id
= self
._cluster
_id
491 cluster_node
= self
._cluster
_node
493 # create node relation
494 for h_id
, h_node
in self
._host
_nodes
.items():
496 # add osd node relationship
497 cypher_cmd
= CypherOP
.add_link(
500 'CephClusterContainsHost'
502 cluster_host
= socket
.gethostname()
503 data
.fields
['agenthost'] = cluster_host
504 data
.tags
['agenthost_domain_id'] = cluster_id
505 data
.tags
['host'] = cluster_host
506 data
.fields
['cmd'] = str(cypher_cmd
)
507 self
.data
.append(data
)
509 def _host_contains_mon(self
):
510 for m_name
, m_node
in self
._mon
_nodes
.items():
511 host_node
= self
._host
_nodes
.get(m_name
)
515 # add mon node relationship
516 cypher_cmd
= CypherOP
.add_link(
521 cluster_host
= socket
.gethostname()
522 data
.fields
['agenthost'] = cluster_host
523 data
.tags
['agenthost_domain_id'] = self
._cluster
_id
524 data
.tags
['host'] = cluster_host
525 data
.fields
['cmd'] = str(cypher_cmd
)
526 self
.data
.append(data
)
528 def _host_contains_osd(self
):
529 cluster_id
= self
._cluster
_id
530 for o_id
, o_node
in self
._osd
_nodes
.items():
531 host_node
= self
._host
_nodes
.get(o_node
.meta
.get('host'))
535 # add osd node relationship
536 cypher_cmd
= CypherOP
.add_link(
541 cluster_host
= socket
.gethostname()
542 data
.fields
['agenthost'] = cluster_host
543 data
.tags
['agenthost_domain_id'] = cluster_id
, data
544 data
.tags
['host'] = cluster_host
545 data
.fields
['cmd'] = str(cypher_cmd
)
546 self
.data
.append(data
)
548 def _host_contains_mds(self
):
549 cluster_id
= self
._cluster
_id
550 for m_name
, mds_node
in self
._mds
_nodes
.items():
552 host_node
= self
._host
_nodes
.get(mds_node
.meta
.get('host'))
555 # add osd node relationship
556 cypher_cmd
= CypherOP
.add_link(
561 cluster_host
= socket
.gethostname()
562 data
.fields
['agenthost'] = cluster_host
563 data
.tags
['agenthost_domain_id'] = cluster_id
564 data
.tags
['host'] = cluster_host
565 data
.fields
['cmd'] = str(cypher_cmd
)
566 self
.data
.append(data
)
568 def _osd_contains_disk(self
):
569 cluster_id
= self
._cluster
_id
570 cluster_host
= socket
.gethostname()
571 for d_name
, d_node
in self
._dev
_nodes
.items():
572 keys
= {'data_osd': 'DataDiskOfOSD',
573 'fs_journal_osd': 'FsJournalDiskOfOSD',
574 'bs_db_osd': 'BsDBDiskOfOSD',
575 'bs_wal_osd': 'BsWalDiskOfOSD'}
576 for k
, v
in keys
.items():
577 if not d_node
.meta
.get(k
):
579 for osdid
in d_node
.meta
.get(k
, '').split(','):
581 osd_node
= self
._osd
_nodes
.get(str(osdid
))
584 # add disk node relationship
585 cypher_cmd
= CypherOP
.add_link(
589 data
.fields
['agenthost'] = cluster_host
590 data
.tags
['agenthost_domain_id'] = cluster_id
591 data
.tags
['host'] = cluster_host
592 data
.fields
['cmd'] = str(cypher_cmd
)
593 self
.data
.append(data
)
595 hostname
= d_node
.meta
.get('host', '')
598 host_node
= self
._host
_nodes
.get(hostname
)
601 # add osd node relationship
603 cypher_cmd
= CypherOP
.add_link(
606 'VmHostContainsVmDisk'
608 data
.fields
['agenthost'] = cluster_host
609 data
.tags
['agenthost_domain_id'] = cluster_id
610 data
.tags
['host'] = cluster_host
611 data
.fields
['cmd'] = str(cypher_cmd
)
612 self
.data
.append(data
)
614 def _pool_contains_osd(self
):
615 cluster_id
= self
._cluster
_id
616 cluster_host
= socket
.gethostname()
617 for p_id
, p_node
in self
._pool
_nodes
.items():
618 for o_id
in p_node
.meta
.get('osd_ids', '').split(','):
619 osd_node
= self
._osd
_nodes
.get(str(o_id
))
623 cypher_cmd
= CypherOP
.add_link(
628 data
.fields
['agenthost'] = cluster_host
629 data
.tags
['agenthost_domain_id'] = cluster_id
630 data
.tags
['host'] = cluster_host
631 data
.fields
['cmd'] = str(cypher_cmd
)
632 self
.data
.append(data
)
634 def _pool_contains_rbd(self
):
635 cluster_id
= self
._cluster
_id
636 cluster_host
= socket
.gethostname()
637 for p_id
, p_node
in self
._pool
_nodes
.items():
638 for rbd_node
in self
._rbd
_nodes
.get(str(p_id
), []):
642 cypher_cmd
= CypherOP
.add_link(
647 data
.fields
['agenthost'] = cluster_host
648 data
.tags
['agenthost_domain_id'] = cluster_id
649 data
.tags
['host'] = cluster_host
650 data
.fields
['cmd'] = str(cypher_cmd
)
651 self
.data
.append(data
)
653 def _pool_contains_fs(self
):
654 cluster_id
= self
._cluster
_id
655 cluster_host
= socket
.gethostname()
656 for fs_id
, fs_node
in self
._fs
_nodes
.items():
657 pool_attrs
= ['metadata_pool', 'data_pools']
658 for p_attr
in pool_attrs
:
659 pools_id
= fs_node
.meta
.get(p_attr
).split(',')
660 for p_id
in pools_id
:
661 p_node
= self
._pool
_nodes
.get(str(p_id
))
664 cypher_cmd
= CypherOP
.add_link(
667 'MetadataPoolContainsFS' if p_attr
== 'metadata_pool' else 'DataPoolContainsFS'
669 data
.fields
['agenthost'] = cluster_host
670 data
.tags
['agenthost_domain_id'] = cluster_id
671 data
.tags
['host'] = cluster_host
672 data
.fields
['cmd'] = str(cypher_cmd
)
673 self
.data
.append(data
)
674 for mds_name
in fs_node
.meta
.get('mds_nodes', '').split(','):
675 mds_node
= self
._mds
_nodes
.get(mds_name
)
679 cypher_cmd
= CypherOP
.add_link(
684 data
.fields
['agenthost'] = cluster_host
685 data
.tags
['agenthost_domain_id'] = cluster_id
686 data
.tags
['host'] = cluster_host
687 data
.fields
['cmd'] = str(cypher_cmd
)
688 self
.data
.append(data
)
690 def _collect_data(self
):
691 if not self
._module
_inst
:
693 job_name
= ['cluster_contains_host', 'host_contains_mon', 'host_contains_mds', 'host_contains_osd', 'osd_contains_disk',
694 'pool_contains_osd', 'pool_contains_rbd', 'pool_contains_fs']
696 fn
= getattr(self
, '_%s' % job
)
701 except Exception as e
:
702 self
._module
_inst
.log
.error('dbrelay - execute function {} fail, due to {}'.format(job
, str(e
)))