10 from collections
import OrderedDict
11 from datetime
import datetime
12 from enum
import Enum
, unique
13 from threading
import Event
18 class FSTopException(Exception):
19 def __init__(self
, msg
=''):
22 def get_error_msg(self
):
27 class MetricType(Enum
):
29 METRIC_TYPE_PERCENTAGE
= 1
30 METRIC_TYPE_LATENCY
= 2
33 FS_TOP_PROG_STR
= 'cephfs-top'
35 # version match b/w fstop and stats emitted by mgr/stats
36 FS_TOP_SUPPORTED_VER
= 1
39 ITEMS_PAD
= " " * ITEMS_PAD_LEN
40 DEFAULT_REFRESH_INTERVAL
= 1
41 # min refresh interval allowed
42 MIN_REFRESH_INTERVAL
= 0.5
44 # metadata provided by mgr/stats
45 FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
= "client_id"
46 FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
= "mount_root"
47 FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
= "mount_point@host/addr"
49 MAIN_WINDOW_TOP_LINE_ITEMS_START
= [ITEMS_PAD
,
50 FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
,
51 FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
]
52 MAIN_WINDOW_TOP_LINE_ITEMS_END
= [FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
]
54 # adjust this map according to stats version and maintain order
55 # as emitted by mgr/stast
56 MAIN_WINDOW_TOP_LINE_METRICS
= OrderedDict([
57 ("CAP_HIT", MetricType
.METRIC_TYPE_PERCENTAGE
),
58 ("READ_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
59 ("WRITE_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
60 ("METADATA_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
61 ("DENTRY_LEASE", MetricType
.METRIC_TYPE_PERCENTAGE
),
62 ("OPENED_FILES", MetricType
.METRIC_TYPE_NONE
),
63 ("PINNED_ICAPS", MetricType
.METRIC_TYPE_NONE
),
64 ("OPENED_INODES", MetricType
.METRIC_TYPE_NONE
),
66 MGR_STATS_COUNTERS
= list(MAIN_WINDOW_TOP_LINE_METRICS
.keys())
68 FS_TOP_VERSION_HEADER_FMT
= '{prog_name} - {now}'
69 FS_TOP_CLIENT_HEADER_FMT
= 'Client(s): {num_clients} - {num_mounts} FUSE, '\
70 '{num_kclients} kclient, {num_libs} libcephfs'
72 CLIENT_METADATA_KEY
= "client_metadata"
73 CLIENT_METADATA_MOUNT_POINT_KEY
= "mount_point"
74 CLIENT_METADATA_MOUNT_ROOT_KEY
= "root"
75 CLIENT_METADATA_IP_KEY
= "IP"
76 CLIENT_METADATA_HOSTNAME_KEY
= "hostname"
77 CLIENT_METADATA_VALID_METRICS_KEY
= "valid_metrics"
79 GLOBAL_METRICS_KEY
= "global_metrics"
80 GLOBAL_COUNTERS_KEY
= "global_counters"
84 if c
[0] == 0 and c
[1] == 0:
86 return round((c
[0] / (c
[0] + c
[1])) * 100, 2)
90 return round(c
[0] + c
[1] / 1000000000, 2)
94 """return a '+' suffixed wrapped string"""
97 return f
'{s[0:sl-1]}+'
101 def __init__(self
, args
):
103 self
.stdscr
= None # curses instance
104 self
.client_name
= args
.id
105 self
.cluster_name
= args
.cluster
106 self
.conffile
= args
.conffile
107 self
.refresh_interval_secs
= args
.delay
108 self
.exit_ev
= Event()
110 def refresh_window_size(self
):
111 self
.height
, self
.width
= self
.stdscr
.getmaxyx()
113 def handle_signal(self
, signum
, _
):
119 r_rados
= rados
.Rados(rados_id
=self
.client_name
, clustername
=self
.cluster_name
,
120 conffile
=self
.conffile
)
122 r_rados
= rados
.Rados(rados_id
=self
.client_name
, clustername
=self
.cluster_name
)
123 r_rados
.conf_read_file()
126 except rados
.Error
as e
:
127 if e
.errno
== errno
.ENOENT
:
128 raise FSTopException(f
'cluster {self.cluster_name} does not exist')
130 raise FSTopException(f
'error connecting to cluster: {e}')
131 self
.verify_perf_stats_support()
132 signal
.signal(signal
.SIGTERM
, self
.handle_signal
)
133 signal
.signal(signal
.SIGINT
, self
.handle_signal
)
137 self
.rados
.shutdown()
141 stats_json
= self
.perf_stats_query()
142 if not stats_json
['version'] == FS_TOP_SUPPORTED_VER
:
143 raise FSTopException('perf stats version mismatch!')
144 missing
= [m
for m
in stats_json
["global_counters"] if m
.upper() not in MGR_STATS_COUNTERS
]
146 raise FSTopException('Cannot handle unknown metrics from \'ceph fs perf stats\': '
149 def setup_curses(self
, win
):
151 curses
.use_default_colors()
156 # If the terminal do not support the visibility
157 # requested it will raise an exception
161 def verify_perf_stats_support(self
):
162 mon_cmd
= {'prefix': 'mgr module ls', 'format': 'json'}
164 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(mon_cmd
), b
'')
165 except Exception as e
:
166 raise FSTopException(f
'error checking \'stats\' module: {e}')
168 raise FSTopException(f
'error checking \'stats\' module: {out}')
169 if 'stats' not in json
.loads(buf
.decode('utf-8'))['enabled_modules']:
170 raise FSTopException('\'stats\' module not enabled. Use \'ceph mgr module '
171 'enable stats\' to enable')
173 def perf_stats_query(self
):
174 mgr_cmd
= {'prefix': 'fs perf stats', 'format': 'json'}
176 ret
, buf
, out
= self
.rados
.mgr_command(json
.dumps(mgr_cmd
), b
'')
177 except Exception as e
:
178 raise FSTopException(f
'error in \'perf stats\' query: {e}')
180 raise FSTopException(f
'error in \'perf stats\' query: {out}')
181 return json
.loads(buf
.decode('utf-8'))
183 def items(self
, item
):
184 if item
== "CAP_HIT":
186 if item
== "READ_LATENCY":
188 if item
== "WRITE_LATENCY":
190 if item
== "METADATA_LATENCY":
192 if item
== "DENTRY_LEASE":
194 if item
== "OPENED_FILES":
196 if item
== "PINNED_ICAPS":
198 if item
== "OPENED_INODES":
201 # return empty string for none type
204 def mtype(self
, typ
):
205 if typ
== MetricType
.METRIC_TYPE_PERCENTAGE
:
207 elif typ
== MetricType
.METRIC_TYPE_LATENCY
:
210 # return empty string for none type
213 def refresh_top_line_and_build_coord(self
):
214 if self
.topl
is None:
221 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_START
:
223 nlen
= len(item
) + len(ITEMS_PAD
)
224 x_coord_map
[item
] = (xp
, nlen
)
227 for item
, typ
in MAIN_WINDOW_TOP_LINE_METRICS
.items():
228 it
= f
'{self.items(item)}{self.mtype(typ)}'
230 nlen
= len(it
) + len(ITEMS_PAD
)
231 x_coord_map
[item
] = (xp
, nlen
)
234 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_END
:
236 nlen
= len(item
) + len(ITEMS_PAD
)
237 x_coord_map
[item
] = (xp
, nlen
)
239 title
= ITEMS_PAD
.join(heading
)
240 hlen
= min(self
.width
- 2, len(title
))
241 self
.topl
.addnstr(0, 0, title
, hlen
, curses
.A_STANDOUT | curses
.A_BOLD
)
246 def has_metric(metadata
, metrics_key
):
247 return metrics_key
in metadata
250 def has_metrics(metadata
, metrics_keys
):
251 for key
in metrics_keys
:
252 if not FSTop
.has_metric(metadata
, key
):
256 def refresh_client(self
, client_id
, metrics
, counters
, client_meta
, x_coord_map
, y_coord
):
257 remaining_hlen
= self
.width
- 1
258 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_START
:
259 coord
= x_coord_map
[item
]
260 hlen
= coord
[1] - len(ITEMS_PAD
)
261 hlen
= min(hlen
, remaining_hlen
)
262 if remaining_hlen
< coord
[1]:
265 remaining_hlen
-= coord
[1]
266 if item
== FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
:
267 self
.mainw
.addnstr(y_coord
, coord
[0],
268 wrap(client_id
.split('.')[1], hlen
),
270 elif item
== FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
:
271 if FSTop
.has_metric(client_meta
, CLIENT_METADATA_MOUNT_ROOT_KEY
):
272 self
.mainw
.addnstr(y_coord
, coord
[0],
273 wrap(client_meta
[CLIENT_METADATA_MOUNT_ROOT_KEY
], hlen
),
276 self
.mainw
.addnstr(y_coord
, coord
[0], "N/A", hlen
)
278 if remaining_hlen
== 0:
282 for item
in counters
:
283 coord
= x_coord_map
[item
]
284 hlen
= coord
[1] - len(ITEMS_PAD
)
285 hlen
= min(hlen
, remaining_hlen
)
286 if remaining_hlen
< coord
[1]:
289 remaining_hlen
-= coord
[1]
291 typ
= MAIN_WINDOW_TOP_LINE_METRICS
[MGR_STATS_COUNTERS
[cidx
]]
292 if item
.lower() in client_meta
.get(CLIENT_METADATA_VALID_METRICS_KEY
, []):
293 if typ
== MetricType
.METRIC_TYPE_PERCENTAGE
:
294 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{calc_perc(m)}', hlen
)
295 elif typ
== MetricType
.METRIC_TYPE_LATENCY
:
296 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{calc_lat(m)}', hlen
)
298 # display 0th element from metric tuple
299 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{m[0]}', hlen
)
301 self
.mainw
.addnstr(y_coord
, coord
[0], "N/A", hlen
)
304 if remaining_hlen
== 0:
307 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_END
:
308 coord
= x_coord_map
[item
]
309 hlen
= coord
[1] - len(ITEMS_PAD
)
310 # always place the FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR in the
311 # last, it will be a very long string to display
312 if item
== FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
:
313 if FSTop
.has_metrics(client_meta
, [CLIENT_METADATA_MOUNT_POINT_KEY
,
314 CLIENT_METADATA_HOSTNAME_KEY
,
315 CLIENT_METADATA_IP_KEY
]):
316 self
.mainw
.addnstr(y_coord
, coord
[0],
317 f
'{client_meta[CLIENT_METADATA_MOUNT_POINT_KEY]}@'
318 f
'{client_meta[CLIENT_METADATA_HOSTNAME_KEY]}/'
319 f
'{client_meta[CLIENT_METADATA_IP_KEY]}',
322 self
.mainw
.addnstr(y_coord
, coord
[0], "N/A", remaining_hlen
)
323 hlen
= min(hlen
, remaining_hlen
)
324 if remaining_hlen
< coord
[1]:
327 remaining_hlen
-= coord
[1]
328 if remaining_hlen
== 0:
331 def refresh_clients(self
, x_coord_map
, stats_json
):
332 counters
= [m
.upper() for m
in stats_json
[GLOBAL_COUNTERS_KEY
]]
334 for client_id
, metrics
in stats_json
[GLOBAL_METRICS_KEY
].items():
335 self
.refresh_client(client_id
,
338 stats_json
[CLIENT_METADATA_KEY
][client_id
],
343 def refresh_main_window(self
, x_coord_map
, stats_json
):
344 if self
.mainw
is None:
346 self
.refresh_clients(x_coord_map
, stats_json
)
349 def refresh_header(self
, stats_json
):
350 hlen
= self
.width
- 2
351 if not stats_json
['version'] == FS_TOP_SUPPORTED_VER
:
352 self
.header
.addnstr(0, 0, 'perf stats version mismatch!', hlen
)
354 client_metadata
= stats_json
[CLIENT_METADATA_KEY
]
355 num_clients
= len(client_metadata
)
356 num_mounts
= len([client
for client
, metadata
in client_metadata
.items() if
357 CLIENT_METADATA_MOUNT_POINT_KEY
in metadata
358 and metadata
[CLIENT_METADATA_MOUNT_POINT_KEY
] != 'N/A'])
359 num_kclients
= len([client
for client
, metadata
in client_metadata
.items() if
360 "kernel_version" in metadata
])
361 num_libs
= num_clients
- (num_mounts
+ num_kclients
)
362 now
= datetime
.now().ctime()
363 self
.header
.addnstr(0, 0,
364 FS_TOP_VERSION_HEADER_FMT
.format(prog_name
=FS_TOP_PROG_STR
, now
=now
),
365 hlen
, curses
.A_STANDOUT | curses
.A_BOLD
)
366 self
.header
.addnstr(1, 0, FS_TOP_CLIENT_HEADER_FMT
.format(num_clients
=num_clients
,
367 num_mounts
=num_mounts
,
368 num_kclients
=num_kclients
,
369 num_libs
=num_libs
), hlen
)
370 self
.header
.refresh()
373 def run_display(self
):
374 while not self
.exit_ev
.is_set():
375 # use stdscr.clear() instead of clearing each window
376 # to avoid screen blinking.
378 self
.refresh_window_size()
379 if self
.width
<= 2 or self
.width
<= 2:
380 self
.exit_ev
.wait(timeout
=self
.refresh_interval_secs
)
383 # coordinate constants for windowing -- (height, width, y, x)
384 # NOTE: requires initscr() call before accessing COLS, LINES.
386 HEADER_WINDOW_COORD
= (2, self
.width
- 1, 0, 0)
387 self
.header
= curses
.newwin(*HEADER_WINDOW_COORD
)
389 TOPLINE_WINDOW_COORD
= (1, self
.width
- 1, 3, 0)
390 self
.topl
= curses
.newwin(*TOPLINE_WINDOW_COORD
)
394 MAIN_WINDOW_COORD
= (self
.height
- 4, self
.width
- 1, 4, 0)
395 self
.mainw
= curses
.newwin(*MAIN_WINDOW_COORD
)
399 # this may happen when creating the sub windows the
400 # terminal window size changed, just retry it
403 stats_json
= self
.perf_stats_query()
405 if self
.refresh_header(stats_json
):
406 x_coord_map
= self
.refresh_top_line_and_build_coord()
407 self
.refresh_main_window(x_coord_map
, stats_json
)
408 self
.exit_ev
.wait(timeout
=self
.refresh_interval_secs
)
410 # this may happen when addstr the terminal window
411 # size changed, just retry it
415 if __name__
== '__main__':
416 def float_greater_than(x
):
418 if value
< MIN_REFRESH_INTERVAL
:
419 raise argparse
.ArgumentTypeError(f
'{value} should be greater than '
420 f
'{MIN_REFRESH_INTERVAL}')
423 parser
= argparse
.ArgumentParser(description
='Ceph Filesystem top utility')
424 parser
.add_argument('--cluster', nargs
='?', const
='ceph', default
='ceph',
425 help='Ceph cluster to connect (defualt: ceph)')
426 parser
.add_argument('--id', nargs
='?', const
='fstop', default
='fstop',
427 help='Ceph user to use to connection (default: fstop)')
428 parser
.add_argument('--conffile', nargs
='?', default
=None,
429 help='Path to cluster configuration file')
430 parser
.add_argument('--selftest', dest
='selftest', action
='store_true',
431 help='run in selftest mode')
432 parser
.add_argument('-d', '--delay', nargs
='?', default
=DEFAULT_REFRESH_INTERVAL
,
433 type=float_greater_than
, help='Interval to refresh data '
434 f
'(default: {DEFAULT_REFRESH_INTERVAL})')
436 args
= parser
.parse_args()
443 sys
.stdout
.write("selftest ok\n")
445 curses
.wrapper(ft
.setup_curses
)
446 except FSTopException
as fst
:
448 sys
.stderr
.write(f
'{fst.get_error_msg()}\n')
449 except Exception as e
:
451 sys
.stderr
.write(f
'exception: {e}\n')
454 sys
.exit(0 if not err
else -1)