10 from collections
import OrderedDict
11 from datetime
import datetime
12 from enum
import Enum
, unique
13 from threading
import Event
18 class FSTopException(Exception):
19 def __init__(self
, msg
=''):
22 def get_error_msg(self
):
27 class MetricType(Enum
):
29 METRIC_TYPE_PERCENTAGE
= 1
30 METRIC_TYPE_LATENCY
= 2
33 FS_TOP_PROG_STR
= 'cephfs-top'
35 # version match b/w fstop and stats emitted by mgr/stats
36 FS_TOP_SUPPORTED_VER
= 1
39 ITEMS_PAD
= " " * ITEMS_PAD_LEN
40 DEFAULT_REFRESH_INTERVAL
= 1
41 # min refresh interval allowed
42 MIN_REFRESH_INTERVAL
= 0.5
44 # metadata provided by mgr/stats
45 FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
= "CLIENT_ID"
46 FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
= "MOUNT_ROOT"
47 FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
= "MOUNT_POINT@HOST/ADDR"
49 MAIN_WINDOW_TOP_LINE_ITEMS_START
= [ITEMS_PAD
,
50 FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
,
51 FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
]
52 MAIN_WINDOW_TOP_LINE_ITEMS_END
= [FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
]
54 # adjust this map according to stats version and maintain order
55 # as emitted by mgr/stast
56 MAIN_WINDOW_TOP_LINE_METRICS
= OrderedDict([
57 ("CAP_HIT", MetricType
.METRIC_TYPE_PERCENTAGE
),
58 ("READ_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
59 ("WRITE_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
60 ("METADATA_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
61 ("DENTRY_LEASE", MetricType
.METRIC_TYPE_PERCENTAGE
),
62 ("OPENED_FILES", MetricType
.METRIC_TYPE_NONE
),
63 ("PINNED_ICAPS", MetricType
.METRIC_TYPE_NONE
),
64 ("OPENED_INODES", MetricType
.METRIC_TYPE_NONE
),
66 MGR_STATS_COUNTERS
= list(MAIN_WINDOW_TOP_LINE_METRICS
.keys())
68 FS_TOP_VERSION_HEADER_FMT
= '{prog_name} - {now}'
69 FS_TOP_CLIENT_HEADER_FMT
= 'Client(s): {num_clients} - {num_mounts} FUSE, '\
70 '{num_kclients} kclient, {num_libs} libcephfs'
72 CLIENT_METADATA_KEY
= "client_metadata"
73 CLIENT_METADATA_MOUNT_POINT_KEY
= "mount_point"
74 CLIENT_METADATA_MOUNT_ROOT_KEY
= "root"
75 CLIENT_METADATA_IP_KEY
= "IP"
76 CLIENT_METADATA_HOSTNAME_KEY
= "hostname"
77 CLIENT_METADATA_VALID_METRICS_KEY
= "valid_metrics"
79 GLOBAL_METRICS_KEY
= "global_metrics"
80 GLOBAL_COUNTERS_KEY
= "global_counters"
84 if c
[0] == 0 and c
[1] == 0:
86 return round((c
[0] / (c
[0] + c
[1])) * 100, 2)
90 return round(c
[0] + c
[1] / 1000000000, 2)
94 """return a '+' suffixed wrapped string"""
97 return f
'{s[0:sl-1]}+'
101 def __init__(self
, args
):
103 self
.stdscr
= None # curses instance
104 self
.client_name
= args
.id
105 self
.cluster_name
= args
.cluster
106 self
.conffile
= args
.conffile
107 self
.refresh_interval_secs
= args
.delay
108 self
.exit_ev
= Event()
110 def handle_signal(self
, signum
, _
):
116 r_rados
= rados
.Rados(rados_id
=self
.client_name
, clustername
=self
.cluster_name
,
117 conffile
=self
.conffile
)
119 r_rados
= rados
.Rados(rados_id
=self
.client_name
, clustername
=self
.cluster_name
)
120 r_rados
.conf_read_file()
123 except rados
.Error
as e
:
124 if e
.errno
== errno
.ENOENT
:
125 raise FSTopException(f
'cluster {self.cluster_name} does not exist')
127 raise FSTopException(f
'error connecting to cluster: {e}')
128 self
.verify_perf_stats_support()
129 signal
.signal(signal
.SIGTERM
, self
.handle_signal
)
130 signal
.signal(signal
.SIGINT
, self
.handle_signal
)
134 self
.rados
.shutdown()
138 stats_json
= self
.perf_stats_query()
139 if not stats_json
['version'] == FS_TOP_SUPPORTED_VER
:
140 raise FSTopException('perf stats version mismatch!')
141 missing
= [m
for m
in stats_json
["global_counters"] if m
.upper() not in MGR_STATS_COUNTERS
]
143 raise FSTopException('Cannot handle unknown metrics from \'ceph fs perf stats\': '
146 def setup_curses(self
):
147 self
.stdscr
= curses
.initscr()
149 # coordinate constants for windowing -- (height, width, y, x)
150 # NOTE: requires initscr() call before accessing COLS, LINES.
151 HEADER_WINDOW_COORD
= (2, curses
.COLS
- 1, 0, 0)
152 TOPLINE_WINDOW_COORD
= (1, curses
.COLS
- 1, 3, 0)
153 MAIN_WINDOW_COORD
= (curses
.LINES
- 4, curses
.COLS
- 1, 4, 0)
155 self
.header
= curses
.newwin(*HEADER_WINDOW_COORD
)
156 self
.topl
= curses
.newwin(*TOPLINE_WINDOW_COORD
)
157 self
.mainw
= curses
.newwin(*MAIN_WINDOW_COORD
)
158 curses
.wrapper(self
.display
)
160 def verify_perf_stats_support(self
):
161 mon_cmd
= {'prefix': 'mgr module ls', 'format': 'json'}
163 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(mon_cmd
), b
'')
164 except Exception as e
:
165 raise FSTopException(f
'error checking \'stats\' module: {e}')
167 raise FSTopException(f
'error checking \'stats\' module: {out}')
168 if 'stats' not in json
.loads(buf
.decode('utf-8'))['enabled_modules']:
169 raise FSTopException('\'stats\' module not enabled. Use \'ceph mgr module '
170 'enable stats\' to enable')
172 def perf_stats_query(self
):
173 mgr_cmd
= {'prefix': 'fs perf stats', 'format': 'json'}
175 ret
, buf
, out
= self
.rados
.mgr_command(json
.dumps(mgr_cmd
), b
'')
176 except Exception as e
:
177 raise FSTopException(f
'error in \'perf stats\' query: {e}')
179 raise FSTopException(f
'error in \'perf stats\' query: {out}')
180 return json
.loads(buf
.decode('utf-8'))
182 def mtype(self
, typ
):
183 if typ
== MetricType
.METRIC_TYPE_PERCENTAGE
:
185 elif typ
== MetricType
.METRIC_TYPE_LATENCY
:
188 # return empty string for none type
191 def refresh_top_line_and_build_coord(self
):
196 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_START
:
198 nlen
= len(item
) + len(ITEMS_PAD
)
199 x_coord_map
[item
] = (xp
, nlen
)
202 for item
, typ
in MAIN_WINDOW_TOP_LINE_METRICS
.items():
203 it
= f
'{item}{self.mtype(typ)}'
205 nlen
= len(it
) + len(ITEMS_PAD
)
206 x_coord_map
[item
] = (xp
, nlen
)
209 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_END
:
211 nlen
= len(item
) + len(ITEMS_PAD
)
212 x_coord_map
[item
] = (xp
, nlen
)
214 self
.topl
.addstr(0, 0, ITEMS_PAD
.join(heading
), curses
.A_STANDOUT | curses
.A_BOLD
)
218 def has_metric(metadata
, metrics_key
):
219 return metrics_key
in metadata
222 def has_metrics(metadata
, metrics_keys
):
223 for key
in metrics_keys
:
224 if not FSTop
.has_metric(metadata
, key
):
228 def refresh_client(self
, client_id
, metrics
, counters
, client_meta
, x_coord_map
, y_coord
):
229 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_END
:
230 coord
= x_coord_map
[item
]
231 if item
== FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
:
232 if FSTop
.has_metrics(client_meta
, [CLIENT_METADATA_MOUNT_POINT_KEY
,
233 CLIENT_METADATA_HOSTNAME_KEY
,
234 CLIENT_METADATA_IP_KEY
]):
235 self
.mainw
.addstr(y_coord
, coord
[0],
236 f
'{client_meta[CLIENT_METADATA_MOUNT_POINT_KEY]}@'
237 f
'{client_meta[CLIENT_METADATA_HOSTNAME_KEY]}/'
238 f
'{client_meta[CLIENT_METADATA_IP_KEY]}')
240 self
.mainw
.addstr(y_coord
, coord
[0], "N/A")
241 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_START
:
242 coord
= x_coord_map
[item
]
243 hlen
= coord
[1] - len(ITEMS_PAD
)
244 if item
== FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
:
245 self
.mainw
.addstr(y_coord
, coord
[0],
246 wrap(client_id
.split('.')[1], hlen
))
247 elif item
== FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
:
248 if FSTop
.has_metric(client_meta
, CLIENT_METADATA_MOUNT_ROOT_KEY
):
249 self
.mainw
.addstr(y_coord
, coord
[0],
250 wrap(client_meta
[CLIENT_METADATA_MOUNT_ROOT_KEY
], hlen
))
252 self
.mainw
.addstr(y_coord
, coord
[0], "N/A")
254 for item
in counters
:
255 coord
= x_coord_map
[item
]
257 typ
= MAIN_WINDOW_TOP_LINE_METRICS
[MGR_STATS_COUNTERS
[cidx
]]
258 if item
.lower() in client_meta
.get(CLIENT_METADATA_VALID_METRICS_KEY
, []):
259 if typ
== MetricType
.METRIC_TYPE_PERCENTAGE
:
260 self
.mainw
.addstr(y_coord
, coord
[0], f
'{calc_perc(m)}')
261 elif typ
== MetricType
.METRIC_TYPE_LATENCY
:
262 self
.mainw
.addstr(y_coord
, coord
[0], f
'{calc_lat(m)}')
264 # display 0th element from metric tuple
265 self
.mainw
.addstr(y_coord
, coord
[0], f
'{m[0]}')
267 self
.mainw
.addstr(y_coord
, coord
[0], "N/A")
270 def refresh_clients(self
, x_coord_map
, stats_json
):
271 counters
= [m
.upper() for m
in stats_json
[GLOBAL_COUNTERS_KEY
]]
273 for client_id
, metrics
in stats_json
[GLOBAL_METRICS_KEY
].items():
274 self
.refresh_client(client_id
,
277 stats_json
[CLIENT_METADATA_KEY
][client_id
],
282 def refresh_main_window(self
, x_coord_map
, stats_json
):
283 self
.refresh_clients(x_coord_map
, stats_json
)
285 def refresh_header(self
, stats_json
):
286 if not stats_json
['version'] == FS_TOP_SUPPORTED_VER
:
287 self
.header
.addstr(0, 0, 'perf stats version mismatch!')
289 client_metadata
= stats_json
[CLIENT_METADATA_KEY
]
290 num_clients
= len(client_metadata
)
291 num_mounts
= len([client
for client
, metadata
in client_metadata
.items() if
292 CLIENT_METADATA_MOUNT_POINT_KEY
in metadata
293 and metadata
[CLIENT_METADATA_MOUNT_POINT_KEY
] != 'N/A'])
294 num_kclients
= len([client
for client
, metadata
in client_metadata
.items() if
295 "kernel_version" in metadata
])
296 num_libs
= num_clients
- (num_mounts
+ num_kclients
)
297 now
= datetime
.now().ctime()
298 self
.header
.addstr(0, 0,
299 FS_TOP_VERSION_HEADER_FMT
.format(prog_name
=FS_TOP_PROG_STR
, now
=now
),
300 curses
.A_STANDOUT | curses
.A_BOLD
)
301 self
.header
.addstr(1, 0, FS_TOP_CLIENT_HEADER_FMT
.format(num_clients
=num_clients
,
302 num_mounts
=num_mounts
,
303 num_kclients
=num_kclients
,
307 def display(self
, _
):
308 x_coord_map
= self
.refresh_top_line_and_build_coord()
310 while not self
.exit_ev
.is_set():
311 stats_json
= self
.perf_stats_query()
314 if self
.refresh_header(stats_json
):
315 self
.refresh_main_window(x_coord_map
, stats_json
)
316 self
.header
.refresh()
318 self
.exit_ev
.wait(timeout
=self
.refresh_interval_secs
)
321 if __name__
== '__main__':
322 def float_greater_than(x
):
324 if value
< MIN_REFRESH_INTERVAL
:
325 raise argparse
.ArgumentTypeError(f
'{value} should be greater than '
326 f
'{MIN_REFRESH_INTERVAL}')
329 parser
= argparse
.ArgumentParser(description
='Ceph Filesystem top utility')
330 parser
.add_argument('--cluster', nargs
='?', const
='ceph', default
='ceph',
331 help='Ceph cluster to connect (defualt: ceph)')
332 parser
.add_argument('--id', nargs
='?', const
='fstop', default
='fstop',
333 help='Ceph user to use to connection (default: fstop)')
334 parser
.add_argument('--conffile', nargs
='?', default
=None,
335 help='Path to cluster configuration file')
336 parser
.add_argument('--selftest', dest
='selftest', action
='store_true',
337 help='run in selftest mode')
338 parser
.add_argument('-d', '--delay', nargs
='?', default
=DEFAULT_REFRESH_INTERVAL
,
339 type=float_greater_than
, help='Interval to refresh data '
340 f
'(default: {DEFAULT_REFRESH_INTERVAL})')
342 args
= parser
.parse_args()
349 sys
.stdout
.write("selftest ok\n")
352 except FSTopException
as fst
:
354 sys
.stderr
.write(f
'{fst.get_error_msg()}\n')
355 except Exception as e
:
357 sys
.stderr
.write(f
'exception: {e}\n')
360 sys
.exit(0 if not err
else -1)