]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/diskprediction_cloud/agent/metrics/db_relay.py
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / pybind / mgr / diskprediction_cloud / agent / metrics / db_relay.py
1 from __future__ import absolute_import
2
3 import re
4 import socket
5
6 from . import MetricsAgent, MetricsField
7 from ...common.clusterdata import ClusterAPI
8 from ...common.cypher import CypherOP, NodeInfo
9
10
11 class BaseDP(object):
12 """ basic diskprediction structure """
13 _fields = []
14
15 def __init__(self, *args, **kwargs):
16 if len(args) > len(self._fields):
17 raise TypeError('Expected {} arguments'.format(len(self._fields)))
18
19 for name, value in zip(self._fields, args):
20 setattr(self, name, value)
21
22 for name in self._fields[len(args):]:
23 setattr(self, name, kwargs.pop(name))
24
25 if kwargs:
26 raise TypeError('Invalid argument(s): {}'.format(','.join(kwargs)))
27
28
29 class MGRDpCeph(BaseDP):
30 _fields = [
31 'fsid', 'health', 'max_osd', 'size',
32 'avail_size', 'raw_used', 'raw_used_percent'
33 ]
34
35
36 class MGRDpHost(BaseDP):
37 _fields = ['fsid', 'host', 'ipaddr']
38
39
40 class MGRDpMon(BaseDP):
41 _fields = ['fsid', 'host', 'ipaddr']
42
43
44 class MGRDpOsd(BaseDP):
45 _fields = [
46 'fsid', 'host', '_id', 'uuid', 'up', '_in', 'weight', 'public_addr',
47 'cluster_addr', 'state', 'ceph_release', 'osd_devices', 'rotational'
48 ]
49
50
51 class MGRDpMds(BaseDP):
52 _fields = ['fsid', 'host', 'ipaddr']
53
54
55 class MGRDpPool(BaseDP):
56 _fields = [
57 'fsid', 'size', 'pool_name', 'pool_id', 'type', 'min_size',
58 'pg_num', 'pgp_num', 'created_time', 'pgids', 'osd_ids', 'tiers', 'cache_mode',
59 'erasure_code_profile', 'tier_of'
60 ]
61
62
63 class MGRDpRBD(BaseDP):
64 _fields = ['fsid', '_id', 'name', 'pool_name', 'pool_id']
65
66
67 class MGRDpFS(BaseDP):
68 _fields = ['fsid', '_id', 'name', 'metadata_pool', 'data_pools', 'mds_nodes']
69
70
71 class MGRDpPG(BaseDP):
72 _fields = [
73 'fsid', 'pgid', 'up_osds', 'acting_osds', 'state',
74 'objects', 'degraded', 'misplaced', 'unfound'
75 ]
76
77
78 class MGRDpDisk(BaseDP):
79 _fields = ['host_domain_id', 'host', 'fs_journal_osd', 'bs_db_osd', 'bs_wal_osd', 'data_osd', 'osd_ids']
80
81
82 class DBRelay(MetricsField):
83 """ DB Relay structure """
84 measurement = 'db_relay'
85
86 def __init__(self):
87 super(DBRelay, self).__init__()
88 self.fields['agenthost'] = None
89 self.tags['agenthost_domain_id'] = None
90 self.tags['dc_tag'] = 'na'
91 self.tags['host'] = None
92 self.fields['cmd'] = None
93
94
95 class DBRelayAgent(MetricsAgent):
96 measurement = 'db_relay'
97
98 def __init__(self, *args, **kwargs):
99 super(DBRelayAgent, self).__init__(*args, **kwargs)
100 self._cluster_node = None
101 self._cluster_id = None
102 self._ceph = ClusterAPI(self._module_inst)
103 self._osd_maps = self._ceph.module.get('osd_map')
104 self._mon_maps = self._ceph.module.get('mon_map')
105 self._fs_maps = self._ceph.module.get('fs_map')
106 self._osd_metadata = self._ceph.module.get('osd_metadata')
107 self._host_nodes = dict()
108 self._osd_nodes = dict()
109 self._mon_nodes = dict()
110 self._mds_nodes = dict()
111 self._dev_nodes = dict()
112 self._pool_nodes = dict()
113 self._rbd_nodes = dict()
114 self._fs_nodes = dict()
115 # initial ceph all node states
116 self._init_cluster_node()
117 self._init_hosts()
118 self._init_mons()
119 self._init_mds()
120 self._init_osds()
121 self._init_devices()
122 self._init_pools()
123 self._init_rbds()
124 self._init_fs()
125
126 def _init_hosts(self):
127 hosts = set()
128 # Add host from osd
129 osd_data = self._osd_maps.get('osds', [])
130 for _data in osd_data:
131 osd_id = _data['osd']
132 if not _data.get('in'):
133 continue
134 osd_addr = _data['public_addr'].split(':')[0]
135 osd_metadata = self._ceph.get_osd_metadata(osd_id)
136 if osd_metadata:
137 osd_host = osd_metadata['hostname']
138 hosts.add((osd_host, osd_addr))
139
140 # Add host from mon
141 mons = self._mon_maps.get('mons', [])
142 for _data in mons:
143 mon_host = _data['name']
144 mon_addr = _data['public_addr'].split(':')[0]
145 if mon_host:
146 hosts.add((mon_host, mon_addr))
147
148 # Add host from mds
149 file_systems = self._fs_maps.get('filesystems', [])
150 for _data in file_systems:
151 mds_info = _data.get('mdsmap').get('info')
152 for _gid in mds_info:
153 mds_data = mds_info[_gid]
154 mds_addr = mds_data.get('addr').split(':')[0]
155 mds_host = mds_data.get('name')
156 if mds_host:
157 hosts.add((mds_host, mds_addr))
158 for tp in hosts:
159 host = tp[0]
160 self._host_nodes[host] = None
161
162 host_node = NodeInfo(
163 label='VMHost',
164 domain_id='{}_{}'.format(self._cluster_id, host),
165 name=host,
166 meta={}
167 )
168 self._host_nodes[host] = host_node
169
170 def _init_mons(self):
171 cluster_id = self._cluster_id
172 mons = self._mon_maps.get('mons')
173 for mon in mons:
174 mon_name = mon.get('name', '')
175 mon_addr = mon.get('addr', '').split(':')[0]
176 if mon_name not in self._host_nodes.keys():
177 continue
178
179 dp_mon = MGRDpMon(
180 fsid=cluster_id,
181 host=mon_name,
182 ipaddr=mon_addr
183 )
184
185 # create mon node
186 mon_node = NodeInfo(
187 label='CephMon',
188 domain_id='{}.mon.{}'.format(cluster_id, mon_name),
189 name=mon_name,
190 meta=dp_mon.__dict__
191 )
192 self._mon_nodes[mon_name] = mon_node
193
194 def _init_mds(self):
195 cluster_id = self._cluster_id
196 file_systems = self._fs_maps.get('filesystems', [])
197 for _data in file_systems:
198 mds_info = _data.get('mdsmap').get('info')
199 for _gid in mds_info:
200 mds_data = mds_info[_gid]
201 mds_addr = mds_data.get('addr').split(':')[0]
202 mds_host = mds_data.get('name')
203 mds_gid = mds_data.get('gid')
204
205 if mds_host not in self._host_nodes:
206 continue
207
208 dp_mds = MGRDpMds(
209 fsid=cluster_id,
210 host=mds_host,
211 ipaddr=mds_addr
212 )
213
214 # create osd node
215 mds_node = NodeInfo(
216 label='CephMds',
217 domain_id='{}.mds.{}'.format(cluster_id, mds_gid),
218 name='MDS.{}'.format(mds_gid),
219 meta=dp_mds.__dict__
220 )
221 self._mds_nodes[mds_host] = mds_node
222
223 def _init_osds(self):
224 for osd in self._osd_maps.get('osds', []):
225 osd_id = osd.get('osd', -1)
226 meta = self._osd_metadata.get(str(osd_id), {})
227 osd_host = meta['hostname']
228 osd_ceph_version = meta['ceph_version']
229 osd_rotational = meta['rotational']
230 osd_devices = meta['devices'].split(',')
231
232 # filter 'dm' device.
233 devices = []
234 for devname in osd_devices:
235 if 'dm' in devname:
236 continue
237 devices.append(devname)
238
239 if osd_host not in self._host_nodes.keys():
240 continue
241 self._osd_nodes[str(osd_id)] = None
242 public_addr = []
243 cluster_addr = []
244 for addr in osd.get('public_addrs', {}).get('addrvec', []):
245 public_addr.append(addr.get('addr'))
246 for addr in osd.get('cluster_addrs', {}).get('addrvec', []):
247 cluster_addr.append(addr.get('addr'))
248 dp_osd = MGRDpOsd(
249 fsid=self._cluster_id,
250 host=osd_host,
251 _id=osd_id,
252 uuid=osd.get('uuid'),
253 up=osd.get('up'),
254 _in=osd.get('in'),
255 weight=osd.get('weight'),
256 public_addr=','.join(public_addr),
257 cluster_addr=','.join(cluster_addr),
258 state=','.join(osd.get('state', [])),
259 ceph_release=osd_ceph_version,
260 osd_devices=','.join(devices),
261 rotational=osd_rotational)
262 for k, v in meta.items():
263 setattr(dp_osd, k, v)
264
265 # create osd node
266 osd_node = NodeInfo(
267 label='CephOsd',
268 domain_id='{}.osd.{}'.format(self._cluster_id, osd_id),
269 name='OSD.{}'.format(osd_id),
270 meta=dp_osd.__dict__
271 )
272 self._osd_nodes[str(osd_id)] = osd_node
273
274 def _init_devices(self):
275 r = re.compile('[^/dev]\D+')
276 for osdid, o_val in self._osd_nodes.items():
277 o_devs = o_val.meta.get('device_ids', '').split(',')
278 # fs_store
279 journal_devs = o_val.meta.get('backend_filestore_journal_dev_node', '').split(',')
280 # bs_store
281 bs_db_devs = o_val.meta.get('bluefs_db_dev_node', '').split(',')
282 bs_wal_devs = o_val.meta.get('bluefs_wal_dev_node', '').split(',')
283
284 for dev in o_devs:
285 fs_journal = []
286 bs_db = []
287 bs_wal = []
288 data = []
289 if len(dev.split('=')) != 2:
290 continue
291 dev_name = dev.split('=')[0]
292 dev_id = dev.split('=')[1]
293 if not dev_id:
294 continue
295
296 for j_dev in journal_devs:
297 if dev_name == ''.join(r.findall(j_dev)):
298 fs_journal.append(osdid)
299 for db_dev in bs_db_devs:
300 if dev_name == ''.join(r.findall(db_dev)):
301 bs_db.append(osdid)
302 for wal_dev in bs_wal_devs:
303 if dev_name == ''.join(r.findall(wal_dev)):
304 bs_wal.append(osdid)
305
306 if not fs_journal and not bs_db and not bs_wal:
307 data.append(osdid)
308
309 disk_domain_id = dev_id
310 if disk_domain_id not in self._dev_nodes.keys():
311 dp_disk = MGRDpDisk(
312 host_domain_id='{}_{}'.format(self._cluster_id, o_val.meta.get('host')),
313 host=o_val.meta.get('host'),
314 osd_ids=osdid,
315 fs_journal_osd=','.join(str(x) for x in fs_journal) if fs_journal else '',
316 bs_db_osd=','.join(str(x) for x in bs_db) if bs_db else '',
317 bs_wal_osd=','.join(str(x) for x in bs_wal) if bs_wal else '',
318 data_osd=','.join(str(x) for x in data) if data else ''
319 )
320 # create disk node
321 disk_node = NodeInfo(
322 label='VMDisk',
323 domain_id=disk_domain_id,
324 name=dev_name,
325 meta=dp_disk.__dict__
326 )
327 self._dev_nodes[disk_domain_id] = disk_node
328 else:
329 dev_node = self._dev_nodes[disk_domain_id]
330 osd_ids = dev_node.meta.get('osd_ids', '')
331 if osdid not in osd_ids.split(','):
332 arr_value = osd_ids.split(',')
333 arr_value.append(str(osdid))
334 dev_node.meta['osd_ids'] = ','.join(arr_value)
335 if fs_journal:
336 arr_value = None
337 for t in fs_journal:
338 value = dev_node.meta.get('fs_journal_osd', '')
339 if value:
340 arr_value = value.split(',')
341 else:
342 arr_value = []
343 if t not in arr_value:
344 arr_value.append(t)
345 if arr_value:
346 dev_node.meta['fs_journal_osd'] = ','.join(str(x) for x in arr_value)
347 if bs_db:
348 arr_value = None
349 for t in bs_db:
350 value = dev_node.meta.get('bs_db_osd', '')
351 if value:
352 arr_value = value.split(',')
353 else:
354 arr_value = []
355 if t not in arr_value:
356 arr_value.append(t)
357 if arr_value:
358 dev_node.meta['bs_db_osd'] = ','.join(str(x) for x in arr_value)
359 if bs_wal:
360 arr_value = None
361 for t in bs_wal:
362 value = dev_node.meta.get('bs_wal_osd', '')
363 if value:
364 arr_value = value.split(',')
365 else:
366 arr_value = []
367 if t not in arr_value:
368 arr_value.append(t)
369 if arr_value:
370 dev_node.meta['bs_wal_osd'] = ','.join(str(x) for x in arr_value)
371 if data:
372 arr_value = None
373 for t in data:
374 value = dev_node.meta.get('data_osd', '')
375 if value:
376 arr_value = value.split(',')
377 else:
378 arr_value = []
379 if t not in arr_value:
380 arr_value.append(t)
381 if arr_value:
382 dev_node.meta['data_osd'] = ','.join(str(x) for x in arr_value)
383
384 def _init_cluster_node(self):
385 cluster_id = self._ceph.get_cluster_id()
386 ceph_df_stat = self._ceph.get_ceph_df_state()
387 dp_cluster = MGRDpCeph(
388 fsid=cluster_id,
389 health=self._ceph.get_health_status(),
390 max_osd=len(self._ceph.get_osds()),
391 size=ceph_df_stat.get('total_size'),
392 avail_size=ceph_df_stat.get('avail_size'),
393 raw_used=ceph_df_stat.get('raw_used_size'),
394 raw_used_percent=ceph_df_stat.get('used_percent')
395 )
396 cluster_name = cluster_id[-12:]
397 cluster_node = NodeInfo(
398 label='CephCluster',
399 domain_id=cluster_id,
400 name='cluster-{}'.format(cluster_name),
401 meta=dp_cluster.__dict__
402 )
403 self._cluster_id = cluster_id
404 self._cluster_node = cluster_node
405
406 def _init_pools(self):
407 pools = self._osd_maps.get('pools', [])
408 cluster_id = self._cluster_id
409 for pool in pools:
410 osds = []
411 pgs = self._ceph.get_pgs_up_by_poolid(int(pool.get('pool', -1)))
412 for pg_id, osd_id in pgs.items():
413 for o_id in osd_id:
414 if o_id not in osds:
415 osds.append(str(o_id))
416 dp_pool = MGRDpPool(
417 fsid=cluster_id,
418 size=pool.get('size'),
419 pool_name=pool.get('pool_name'),
420 pool_id=pool.get('pool'),
421 type=pool.get('type'),
422 min_size=pool.get('min_szie'),
423 pg_num=pool.get('pg_num'),
424 pgp_num=pool.get('pg_placement_num'),
425 created_time=pool.get('create_time'),
426 pgids=','.join(pgs.keys()),
427 osd_ids=','.join(osds),
428 tiers=','.join(str(x) for x in pool.get('tiers', [])),
429 cache_mode=pool.get('cache_mode', ''),
430 erasure_code_profile=str(pool.get('erasure_code_profile', '')),
431 tier_of=str(pool.get('tier_of', -1)))
432 # create pool node
433 pool_node = NodeInfo(
434 label='CephPool',
435 domain_id='{}_pool_{}'.format(cluster_id, pool.get('pool')),
436 name=pool.get('pool_name'),
437 meta=dp_pool.__dict__
438 )
439 self._pool_nodes[str(pool.get('pool'))] = pool_node
440
441 def _init_rbds(self):
442 cluster_id = self._cluster_id
443 for p_id, p_node in self._pool_nodes.items():
444 rbds = self._ceph.get_rbd_list(p_node.name)
445 self._rbd_nodes[str(p_id)] = []
446 for rbd in rbds:
447 dp_rbd = MGRDpRBD(
448 fsid=cluster_id,
449 _id=rbd['id'],
450 name=rbd['name'],
451 pool_name=rbd['pool_name'],
452 pool_id=p_id,
453 )
454 # create pool node
455 rbd_node = NodeInfo(
456 label='CephRBD',
457 domain_id='{}_rbd_{}'.format(cluster_id, rbd['id']),
458 name=rbd['name'],
459 meta=dp_rbd.__dict__,
460 )
461 self._rbd_nodes[str(p_id)].append(rbd_node)
462
463 def _init_fs(self):
464 # _fields = ['fsid', '_id', 'name', 'metadata_pool', 'data_pool', 'mds_nodes']
465 cluster_id = self._cluster_id
466 file_systems = self._fs_maps.get('filesystems', [])
467 for fs in file_systems:
468 mdsmap = fs.get('mdsmap', {})
469 mds_hostnames = []
470 for m, md in mdsmap.get('info', {}).items():
471 if md.get('name') not in mds_hostnames:
472 mds_hostnames.append(md.get('name'))
473 dp_fs = MGRDpFS(
474 fsid=cluster_id,
475 _id=fs.get('id'),
476 name=mdsmap.get('fs_name'),
477 metadata_pool=str(mdsmap.get('metadata_pool', -1)),
478 data_pools=','.join(str(i) for i in mdsmap.get('data_pools', [])),
479 mds_nodes=','.join(mds_hostnames),
480 )
481 fs_node = NodeInfo(
482 label='CephFS',
483 domain_id='{}_fs_{}'.format(cluster_id, fs.get('id')),
484 name=mdsmap.get('fs_name'),
485 meta=dp_fs.__dict__,
486 )
487 self._fs_nodes[str(fs.get('id'))] = fs_node
488
489 def _cluster_contains_host(self):
490 cluster_id = self._cluster_id
491 cluster_node = self._cluster_node
492
493 # create node relation
494 for h_id, h_node in self._host_nodes.items():
495 data = DBRelay()
496 # add osd node relationship
497 cypher_cmd = CypherOP.add_link(
498 cluster_node,
499 h_node,
500 'CephClusterContainsHost'
501 )
502 cluster_host = socket.gethostname()
503 data.fields['agenthost'] = cluster_host
504 data.tags['agenthost_domain_id'] = cluster_id
505 data.tags['host'] = cluster_host
506 data.fields['cmd'] = str(cypher_cmd)
507 self.data.append(data)
508
509 def _host_contains_mon(self):
510 for m_name, m_node in self._mon_nodes.items():
511 host_node = self._host_nodes.get(m_name)
512 if not host_node:
513 continue
514 data = DBRelay()
515 # add mon node relationship
516 cypher_cmd = CypherOP.add_link(
517 host_node,
518 m_node,
519 'HostContainsMon'
520 )
521 cluster_host = socket.gethostname()
522 data.fields['agenthost'] = cluster_host
523 data.tags['agenthost_domain_id'] = self._cluster_id
524 data.tags['host'] = cluster_host
525 data.fields['cmd'] = str(cypher_cmd)
526 self.data.append(data)
527
528 def _host_contains_osd(self):
529 cluster_id = self._cluster_id
530 for o_id, o_node in self._osd_nodes.items():
531 host_node = self._host_nodes.get(o_node.meta.get('host'))
532 if not host_node:
533 continue
534 data = DBRelay()
535 # add osd node relationship
536 cypher_cmd = CypherOP.add_link(
537 host_node,
538 o_node,
539 'HostContainsOsd'
540 )
541 cluster_host = socket.gethostname()
542 data.fields['agenthost'] = cluster_host
543 data.tags['agenthost_domain_id'] = cluster_id, data
544 data.tags['host'] = cluster_host
545 data.fields['cmd'] = str(cypher_cmd)
546 self.data.append(data)
547
548 def _host_contains_mds(self):
549 cluster_id = self._cluster_id
550 for m_name, mds_node in self._mds_nodes.items():
551 data = DBRelay()
552 host_node = self._host_nodes.get(mds_node.meta.get('host'))
553 if not host_node:
554 continue
555 # add osd node relationship
556 cypher_cmd = CypherOP.add_link(
557 host_node,
558 mds_node,
559 'HostContainsMds'
560 )
561 cluster_host = socket.gethostname()
562 data.fields['agenthost'] = cluster_host
563 data.tags['agenthost_domain_id'] = cluster_id
564 data.tags['host'] = cluster_host
565 data.fields['cmd'] = str(cypher_cmd)
566 self.data.append(data)
567
568 def _osd_contains_disk(self):
569 cluster_id = self._cluster_id
570 cluster_host = socket.gethostname()
571 for d_name, d_node in self._dev_nodes.items():
572 keys = {'data_osd': 'DataDiskOfOSD',
573 'fs_journal_osd': 'FsJournalDiskOfOSD',
574 'bs_db_osd': 'BsDBDiskOfOSD',
575 'bs_wal_osd': 'BsWalDiskOfOSD'}
576 for k, v in keys.items():
577 if not d_node.meta.get(k):
578 continue
579 for osdid in d_node.meta.get(k, '').split(','):
580 data = DBRelay()
581 osd_node = self._osd_nodes.get(str(osdid))
582 if not osd_node:
583 continue
584 # add disk node relationship
585 cypher_cmd = CypherOP.add_link(
586 osd_node,
587 d_node,
588 v)
589 data.fields['agenthost'] = cluster_host
590 data.tags['agenthost_domain_id'] = cluster_id
591 data.tags['host'] = cluster_host
592 data.fields['cmd'] = str(cypher_cmd)
593 self.data.append(data)
594
595 hostname = d_node.meta.get('host', '')
596 if not hostname:
597 continue
598 host_node = self._host_nodes.get(hostname)
599 if not host_node:
600 continue
601 # add osd node relationship
602 data = DBRelay()
603 cypher_cmd = CypherOP.add_link(
604 host_node,
605 d_node,
606 'VmHostContainsVmDisk'
607 )
608 data.fields['agenthost'] = cluster_host
609 data.tags['agenthost_domain_id'] = cluster_id
610 data.tags['host'] = cluster_host
611 data.fields['cmd'] = str(cypher_cmd)
612 self.data.append(data)
613
614 def _pool_contains_osd(self):
615 cluster_id = self._cluster_id
616 cluster_host = socket.gethostname()
617 for p_id, p_node in self._pool_nodes.items():
618 for o_id in p_node.meta.get('osd_ids', '').split(','):
619 osd_node = self._osd_nodes.get(str(o_id))
620 if not osd_node:
621 continue
622 data = DBRelay()
623 cypher_cmd = CypherOP.add_link(
624 osd_node,
625 p_node,
626 'OsdContainsPool'
627 )
628 data.fields['agenthost'] = cluster_host
629 data.tags['agenthost_domain_id'] = cluster_id
630 data.tags['host'] = cluster_host
631 data.fields['cmd'] = str(cypher_cmd)
632 self.data.append(data)
633
634 def _pool_contains_rbd(self):
635 cluster_id = self._cluster_id
636 cluster_host = socket.gethostname()
637 for p_id, p_node in self._pool_nodes.items():
638 for rbd_node in self._rbd_nodes.get(str(p_id), []):
639 if not rbd_node:
640 continue
641 data = DBRelay()
642 cypher_cmd = CypherOP.add_link(
643 p_node,
644 rbd_node,
645 'PoolContainsRBD'
646 )
647 data.fields['agenthost'] = cluster_host
648 data.tags['agenthost_domain_id'] = cluster_id
649 data.tags['host'] = cluster_host
650 data.fields['cmd'] = str(cypher_cmd)
651 self.data.append(data)
652
653 def _pool_contains_fs(self):
654 cluster_id = self._cluster_id
655 cluster_host = socket.gethostname()
656 for fs_id, fs_node in self._fs_nodes.items():
657 pool_attrs = ['metadata_pool', 'data_pools']
658 for p_attr in pool_attrs:
659 pools_id = fs_node.meta.get(p_attr).split(',')
660 for p_id in pools_id:
661 p_node = self._pool_nodes.get(str(p_id))
662 if p_node:
663 data = DBRelay()
664 cypher_cmd = CypherOP.add_link(
665 p_node,
666 fs_node,
667 'MetadataPoolContainsFS' if p_attr == 'metadata_pool' else 'DataPoolContainsFS'
668 )
669 data.fields['agenthost'] = cluster_host
670 data.tags['agenthost_domain_id'] = cluster_id
671 data.tags['host'] = cluster_host
672 data.fields['cmd'] = str(cypher_cmd)
673 self.data.append(data)
674 for mds_name in fs_node.meta.get('mds_nodes', '').split(','):
675 mds_node = self._mds_nodes.get(mds_name)
676 if not mds_node:
677 continue
678 data = DBRelay()
679 cypher_cmd = CypherOP.add_link(
680 mds_node,
681 fs_node,
682 'MDSContainsFS'
683 )
684 data.fields['agenthost'] = cluster_host
685 data.tags['agenthost_domain_id'] = cluster_id
686 data.tags['host'] = cluster_host
687 data.fields['cmd'] = str(cypher_cmd)
688 self.data.append(data)
689
690 def _collect_data(self):
691 if not self._module_inst:
692 return
693 job_name = ['cluster_contains_host', 'host_contains_mon', 'host_contains_mds', 'host_contains_osd', 'osd_contains_disk',
694 'pool_contains_osd', 'pool_contains_rbd', 'pool_contains_fs']
695 for job in job_name:
696 fn = getattr(self, '_%s' % job)
697 if not fn:
698 continue
699 try:
700 fn()
701 except Exception as e:
702 self._module_inst.log.error('dbrelay - execute function {} fail, due to {}'.format(job, str(e)))
703 continue