12 from collections
import OrderedDict
13 from datetime
import datetime
14 from enum
import Enum
, unique
15 from threading
import Event
20 class FSTopException(Exception):
21 def __init__(self
, msg
=''):
24 def get_error_msg(self
):
29 class MetricType(Enum
):
31 METRIC_TYPE_PERCENTAGE
= 1
32 METRIC_TYPE_LATENCY
= 2
37 FS_TOP_PROG_STR
= 'cephfs-top'
39 # version match b/w fstop and stats emitted by mgr/stats
40 FS_TOP_SUPPORTED_VER
= 1
43 ITEMS_PAD
= " " * ITEMS_PAD_LEN
44 DEFAULT_REFRESH_INTERVAL
= 1
45 # min refresh interval allowed
46 MIN_REFRESH_INTERVAL
= 0.5
48 # metadata provided by mgr/stats
49 FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
= "client_id"
50 FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
= "mount_root"
51 FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
= "mount_point@host/addr"
53 MAIN_WINDOW_TOP_LINE_ITEMS_START
= [ITEMS_PAD
,
54 FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
,
55 FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
]
56 MAIN_WINDOW_TOP_LINE_ITEMS_END
= [FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
]
58 MAIN_WINDOW_TOP_LINE_METRICS_LEGACY
= ["READ_LATENCY",
63 # adjust this map according to stats version and maintain order
64 # as emitted by mgr/stast
65 MAIN_WINDOW_TOP_LINE_METRICS
= OrderedDict([
66 ("CAP_HIT", MetricType
.METRIC_TYPE_PERCENTAGE
),
67 ("READ_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
68 ("WRITE_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
69 ("METADATA_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
70 ("DENTRY_LEASE", MetricType
.METRIC_TYPE_PERCENTAGE
),
71 ("OPENED_FILES", MetricType
.METRIC_TYPE_NONE
),
72 ("PINNED_ICAPS", MetricType
.METRIC_TYPE_NONE
),
73 ("OPENED_INODES", MetricType
.METRIC_TYPE_NONE
),
74 ("READ_IO_SIZES", MetricType
.METRIC_TYPE_SIZE
),
75 ("WRITE_IO_SIZES", MetricType
.METRIC_TYPE_SIZE
),
76 ("AVG_READ_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
77 ("STDEV_READ_LATENCY", MetricType
.METRIC_TYPE_STDEV
),
78 ("AVG_WRITE_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
79 ("STDEV_WRITE_LATENCY", MetricType
.METRIC_TYPE_STDEV
),
80 ("AVG_METADATA_LATENCY", MetricType
.METRIC_TYPE_LATENCY
),
81 ("STDEV_METADATA_LATENCY", MetricType
.METRIC_TYPE_STDEV
),
83 MGR_STATS_COUNTERS
= list(MAIN_WINDOW_TOP_LINE_METRICS
.keys())
85 FS_TOP_VERSION_HEADER_FMT
= '{prog_name} - {now}'
86 FS_TOP_CLIENT_HEADER_FMT
= 'Client(s): {num_clients} - {num_mounts} FUSE, '\
87 '{num_kclients} kclient, {num_libs} libcephfs'
89 CLIENT_METADATA_KEY
= "client_metadata"
90 CLIENT_METADATA_MOUNT_POINT_KEY
= "mount_point"
91 CLIENT_METADATA_MOUNT_ROOT_KEY
= "root"
92 CLIENT_METADATA_IP_KEY
= "IP"
93 CLIENT_METADATA_HOSTNAME_KEY
= "hostname"
94 CLIENT_METADATA_VALID_METRICS_KEY
= "valid_metrics"
96 GLOBAL_METRICS_KEY
= "global_metrics"
97 GLOBAL_COUNTERS_KEY
= "global_counters"
99 last_time
= time
.time()
105 if c
[0] == 0 and c
[1] == 0:
107 return round((c
[0] / (c
[0] + c
[1])) * 100, 2)
111 return round(c
[0] * 1000 + c
[1] / 1000000, 2)
117 stdev
= math
.sqrt(c
[0] / (c
[1] - 1)) / 1000000
118 return round(stdev
, 2)
123 return round(c
[1] / (1024 * 1024), 2)
127 def calc_avg_size(c
):
130 return round(c
[1] / (c
[0] * 1024 * 1024), 2)
134 def calc_speed(size
, duration
):
137 return round(size
/ (duration
* 1024 * 1024), 2)
141 """return a '+' suffixed wrapped string"""
144 return f
'{s[0:sl-1]}+'
148 def __init__(self
, args
):
150 self
.stdscr
= None # curses instance
151 self
.client_name
= args
.id
152 self
.cluster_name
= args
.cluster
153 self
.conffile
= args
.conffile
154 self
.refresh_interval_secs
= args
.delay
155 self
.exit_ev
= Event()
157 def refresh_window_size(self
):
158 self
.height
, self
.width
= self
.stdscr
.getmaxyx()
160 def handle_signal(self
, signum
, _
):
166 r_rados
= rados
.Rados(rados_id
=self
.client_name
, clustername
=self
.cluster_name
,
167 conffile
=self
.conffile
)
169 r_rados
= rados
.Rados(rados_id
=self
.client_name
, clustername
=self
.cluster_name
)
170 r_rados
.conf_read_file()
173 except rados
.Error
as e
:
174 if e
.errno
== errno
.ENOENT
:
175 raise FSTopException(f
'cluster {self.cluster_name} does not exist')
177 raise FSTopException(f
'error connecting to cluster: {e}')
178 self
.verify_perf_stats_support()
179 signal
.signal(signal
.SIGTERM
, self
.handle_signal
)
180 signal
.signal(signal
.SIGINT
, self
.handle_signal
)
184 self
.rados
.shutdown()
188 stats_json
= self
.perf_stats_query()
189 if not stats_json
['version'] == FS_TOP_SUPPORTED_VER
:
190 raise FSTopException('perf stats version mismatch!')
191 missing
= [m
for m
in stats_json
["global_counters"] if m
.upper() not in MGR_STATS_COUNTERS
]
193 raise FSTopException('Cannot handle unknown metrics from \'ceph fs perf stats\': '
196 def setup_curses(self
, win
):
198 curses
.use_default_colors()
203 # If the terminal do not support the visibility
204 # requested it will raise an exception
208 def verify_perf_stats_support(self
):
209 mon_cmd
= {'prefix': 'mgr module ls', 'format': 'json'}
211 ret
, buf
, out
= self
.rados
.mon_command(json
.dumps(mon_cmd
), b
'')
212 except Exception as e
:
213 raise FSTopException(f
'error checking \'stats\' module: {e}')
215 raise FSTopException(f
'error checking \'stats\' module: {out}')
216 if 'stats' not in json
.loads(buf
.decode('utf-8'))['enabled_modules']:
217 raise FSTopException('\'stats\' module not enabled. Use \'ceph mgr module '
218 'enable stats\' to enable')
220 def perf_stats_query(self
):
221 mgr_cmd
= {'prefix': 'fs perf stats', 'format': 'json'}
223 ret
, buf
, out
= self
.rados
.mgr_command(json
.dumps(mgr_cmd
), b
'')
224 except Exception as e
:
225 raise FSTopException(f
'error in \'perf stats\' query: {e}')
227 raise FSTopException(f
'error in \'perf stats\' query: {out}')
228 return json
.loads(buf
.decode('utf-8'))
230 def items(self
, item
):
231 if item
== "CAP_HIT":
233 if item
== "READ_LATENCY":
235 if item
== "WRITE_LATENCY":
237 if item
== "METADATA_LATENCY":
239 if item
== "DENTRY_LEASE":
241 if item
== "OPENED_FILES":
243 if item
== "PINNED_ICAPS":
245 if item
== "OPENED_INODES":
247 if item
== "READ_IO_SIZES":
249 if item
== "WRITE_IO_SIZES":
251 if item
== 'AVG_READ_LATENCY':
253 if item
== 'STDEV_READ_LATENCY':
255 if item
== 'AVG_WRITE_LATENCY':
257 if item
== 'STDEV_WRITE_LATENCY':
259 if item
== 'AVG_METADATA_LATENCY':
261 if item
== 'STDEV_METADATA_LATENCY':
264 # return empty string for none type
267 def mtype(self
, typ
):
268 if typ
== MetricType
.METRIC_TYPE_PERCENTAGE
:
270 elif typ
== MetricType
.METRIC_TYPE_LATENCY
:
272 elif typ
== MetricType
.METRIC_TYPE_SIZE
:
274 elif typ
== MetricType
.METRIC_TYPE_STDEV
:
277 # return empty string for none type
280 def avg_items(self
, item
):
281 if item
== "READ_IO_SIZES":
283 if item
== "WRITE_IO_SIZES":
286 # return empty string for none type
289 def speed_items(self
, item
):
290 if item
== "READ_IO_SIZES":
292 if item
== "WRITE_IO_SIZES":
295 # return empty string for none type
298 def speed_mtype(self
, typ
):
299 if typ
== MetricType
.METRIC_TYPE_SIZE
:
302 # return empty string for none type
305 def refresh_top_line_and_build_coord(self
):
306 if self
.topl
is None:
313 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_START
:
315 nlen
= len(item
) + len(ITEMS_PAD
)
316 x_coord_map
[item
] = (xp
, nlen
)
319 for item
, typ
in MAIN_WINDOW_TOP_LINE_METRICS
.items():
320 if item
in MAIN_WINDOW_TOP_LINE_METRICS_LEGACY
:
322 it
= f
'{self.items(item)}{self.mtype(typ)}'
324 nlen
= len(it
) + len(ITEMS_PAD
)
325 x_coord_map
[item
] = (xp
, nlen
)
328 if item
== "READ_IO_SIZES" or item
== "WRITE_IO_SIZES":
330 it
= f
'{self.avg_items(item)}{self.mtype(typ)}'
332 nlen
= len(it
) + len(ITEMS_PAD
)
333 if item
== "READ_IO_SIZES":
334 x_coord_map
["READ_IO_AVG"] = (xp
, nlen
)
335 if item
== "WRITE_IO_SIZES":
336 x_coord_map
["WRITE_IO_AVG"] = (xp
, nlen
)
340 it
= f
'{self.speed_items(item)}{self.speed_mtype(typ)}'
342 nlen
= len(it
) + len(ITEMS_PAD
)
343 if item
== "READ_IO_SIZES":
344 x_coord_map
["READ_IO_SPEED"] = (xp
, nlen
)
345 if item
== "WRITE_IO_SIZES":
346 x_coord_map
["WRITE_IO_SPEED"] = (xp
, nlen
)
349 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_END
:
351 nlen
= len(item
) + len(ITEMS_PAD
)
352 x_coord_map
[item
] = (xp
, nlen
)
354 title
= ITEMS_PAD
.join(heading
)
355 hlen
= min(self
.width
- 2, len(title
))
356 self
.topl
.addnstr(0, 0, title
, hlen
, curses
.A_STANDOUT | curses
.A_BOLD
)
361 def has_metric(metadata
, metrics_key
):
362 return metrics_key
in metadata
365 def has_metrics(metadata
, metrics_keys
):
366 for key
in metrics_keys
:
367 if not FSTop
.has_metric(metadata
, key
):
371 def refresh_client(self
, client_id
, metrics
, counters
, client_meta
, x_coord_map
, y_coord
):
374 cur_time
= time
.time()
375 duration
= cur_time
- last_time
377 remaining_hlen
= self
.width
- 1
378 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_START
:
379 coord
= x_coord_map
[item
]
380 hlen
= coord
[1] - len(ITEMS_PAD
)
381 hlen
= min(hlen
, remaining_hlen
)
382 if remaining_hlen
< coord
[1]:
385 remaining_hlen
-= coord
[1]
386 if item
== FS_TOP_MAIN_WINDOW_COL_CLIENT_ID
:
387 self
.mainw
.addnstr(y_coord
, coord
[0],
388 wrap(client_id
.split('.')[1], hlen
),
390 elif item
== FS_TOP_MAIN_WINDOW_COL_MNT_ROOT
:
391 if FSTop
.has_metric(client_meta
, CLIENT_METADATA_MOUNT_ROOT_KEY
):
392 self
.mainw
.addnstr(y_coord
, coord
[0],
393 wrap(client_meta
[CLIENT_METADATA_MOUNT_ROOT_KEY
], hlen
),
396 self
.mainw
.addnstr(y_coord
, coord
[0], "N/A", hlen
)
398 if remaining_hlen
== 0:
402 for item
in counters
:
403 if item
in MAIN_WINDOW_TOP_LINE_METRICS_LEGACY
:
406 coord
= x_coord_map
[item
]
407 hlen
= coord
[1] - len(ITEMS_PAD
)
408 hlen
= min(hlen
, remaining_hlen
)
409 if remaining_hlen
< coord
[1]:
412 remaining_hlen
-= coord
[1]
414 key
= MGR_STATS_COUNTERS
[cidx
]
415 typ
= MAIN_WINDOW_TOP_LINE_METRICS
[key
]
416 if item
.lower() in client_meta
.get(CLIENT_METADATA_VALID_METRICS_KEY
, []):
417 if typ
== MetricType
.METRIC_TYPE_PERCENTAGE
:
418 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{calc_perc(m)}', hlen
)
419 elif typ
== MetricType
.METRIC_TYPE_LATENCY
:
420 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{calc_lat(m)}', hlen
)
421 elif typ
== MetricType
.METRIC_TYPE_STDEV
:
422 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{calc_stdev(m)}', hlen
)
423 elif typ
== MetricType
.METRIC_TYPE_SIZE
:
424 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{calc_size(m)}', hlen
)
427 if remaining_hlen
== 0:
429 if key
== "READ_IO_SIZES":
430 coord
= x_coord_map
["READ_IO_AVG"]
431 elif key
== "WRITE_IO_SIZES":
432 coord
= x_coord_map
["WRITE_IO_AVG"]
433 hlen
= coord
[1] - len(ITEMS_PAD
)
434 hlen
= min(hlen
, remaining_hlen
)
435 if remaining_hlen
< coord
[1]:
438 remaining_hlen
-= coord
[1]
439 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{calc_avg_size(m)}', hlen
)
442 if remaining_hlen
== 0:
444 if key
== "READ_IO_SIZES":
445 coord
= x_coord_map
["READ_IO_SPEED"]
446 elif key
== "WRITE_IO_SIZES":
447 coord
= x_coord_map
["WRITE_IO_SPEED"]
448 hlen
= coord
[1] - len(ITEMS_PAD
)
449 hlen
= min(hlen
, remaining_hlen
)
450 if remaining_hlen
< coord
[1]:
453 remaining_hlen
-= coord
[1]
455 if key
== "READ_IO_SIZES":
457 global last_read_size
458 last_size
= last_read_size
.get(client_id
, 0)
459 size
= m
[1] - last_size
460 last_read_size
[client_id
] = m
[1]
461 if key
== "WRITE_IO_SIZES":
463 global last_write_size
464 last_size
= last_write_size
.get(client_id
, 0)
465 size
= m
[1] - last_size
466 last_write_size
[client_id
] = m
[1]
467 self
.mainw
.addnstr(y_coord
, coord
[0],
468 f
'{calc_speed(abs(size), duration)}',
471 # display 0th element from metric tuple
472 self
.mainw
.addnstr(y_coord
, coord
[0], f
'{m[0]}', hlen
)
474 self
.mainw
.addnstr(y_coord
, coord
[0], "N/A", hlen
)
477 if remaining_hlen
== 0:
480 for item
in MAIN_WINDOW_TOP_LINE_ITEMS_END
:
481 coord
= x_coord_map
[item
]
482 hlen
= coord
[1] - len(ITEMS_PAD
)
483 # always place the FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR in the
484 # last, it will be a very long string to display
485 if item
== FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR
:
486 if FSTop
.has_metrics(client_meta
, [CLIENT_METADATA_MOUNT_POINT_KEY
,
487 CLIENT_METADATA_HOSTNAME_KEY
,
488 CLIENT_METADATA_IP_KEY
]):
489 self
.mainw
.addnstr(y_coord
, coord
[0],
490 f
'{client_meta[CLIENT_METADATA_MOUNT_POINT_KEY]}@'
491 f
'{client_meta[CLIENT_METADATA_HOSTNAME_KEY]}/'
492 f
'{client_meta[CLIENT_METADATA_IP_KEY]}',
495 self
.mainw
.addnstr(y_coord
, coord
[0], "N/A", remaining_hlen
)
496 hlen
= min(hlen
, remaining_hlen
)
497 if remaining_hlen
< coord
[1]:
500 remaining_hlen
-= coord
[1]
501 if remaining_hlen
== 0:
504 def refresh_clients(self
, x_coord_map
, stats_json
):
505 counters
= [m
.upper() for m
in stats_json
[GLOBAL_COUNTERS_KEY
]]
507 for client_id
, metrics
in stats_json
[GLOBAL_METRICS_KEY
].items():
508 self
.refresh_client(client_id
,
511 stats_json
[CLIENT_METADATA_KEY
][client_id
],
516 def refresh_main_window(self
, x_coord_map
, stats_json
):
517 if self
.mainw
is None:
519 self
.refresh_clients(x_coord_map
, stats_json
)
522 def refresh_header(self
, stats_json
):
523 hlen
= self
.width
- 2
524 if not stats_json
['version'] == FS_TOP_SUPPORTED_VER
:
525 self
.header
.addnstr(0, 0, 'perf stats version mismatch!', hlen
)
527 client_metadata
= stats_json
[CLIENT_METADATA_KEY
]
528 num_clients
= len(client_metadata
)
529 num_mounts
= len([client
for client
, metadata
in client_metadata
.items() if
530 CLIENT_METADATA_MOUNT_POINT_KEY
in metadata
531 and metadata
[CLIENT_METADATA_MOUNT_POINT_KEY
] != 'N/A'])
532 num_kclients
= len([client
for client
, metadata
in client_metadata
.items() if
533 "kernel_version" in metadata
])
534 num_libs
= num_clients
- (num_mounts
+ num_kclients
)
535 now
= datetime
.now().ctime()
536 self
.header
.addnstr(0, 0,
537 FS_TOP_VERSION_HEADER_FMT
.format(prog_name
=FS_TOP_PROG_STR
, now
=now
),
538 hlen
, curses
.A_STANDOUT | curses
.A_BOLD
)
539 self
.header
.addnstr(1, 0, FS_TOP_CLIENT_HEADER_FMT
.format(num_clients
=num_clients
,
540 num_mounts
=num_mounts
,
541 num_kclients
=num_kclients
,
542 num_libs
=num_libs
), hlen
)
543 self
.header
.refresh()
546 def run_display(self
):
547 while not self
.exit_ev
.is_set():
548 # use stdscr.clear() instead of clearing each window
549 # to avoid screen blinking.
551 self
.refresh_window_size()
552 if self
.width
<= 2 or self
.width
<= 2:
553 self
.exit_ev
.wait(timeout
=self
.refresh_interval_secs
)
556 # coordinate constants for windowing -- (height, width, y, x)
557 # NOTE: requires initscr() call before accessing COLS, LINES.
559 HEADER_WINDOW_COORD
= (2, self
.width
- 1, 0, 0)
560 self
.header
= curses
.newwin(*HEADER_WINDOW_COORD
)
562 TOPLINE_WINDOW_COORD
= (1, self
.width
- 1, 3, 0)
563 self
.topl
= curses
.newwin(*TOPLINE_WINDOW_COORD
)
567 MAIN_WINDOW_COORD
= (self
.height
- 4, self
.width
- 1, 4, 0)
568 self
.mainw
= curses
.newwin(*MAIN_WINDOW_COORD
)
572 # this may happen when creating the sub windows the
573 # terminal window size changed, just retry it
576 stats_json
= self
.perf_stats_query()
578 if self
.refresh_header(stats_json
):
579 x_coord_map
= self
.refresh_top_line_and_build_coord()
580 self
.refresh_main_window(x_coord_map
, stats_json
)
581 self
.exit_ev
.wait(timeout
=self
.refresh_interval_secs
)
583 # this may happen when addstr the terminal window
584 # size changed, just retry it
588 if __name__
== '__main__':
589 def float_greater_than(x
):
591 if value
< MIN_REFRESH_INTERVAL
:
592 raise argparse
.ArgumentTypeError(
593 f
'Refresh interval should be greater than or equal to {MIN_REFRESH_INTERVAL}')
596 parser
= argparse
.ArgumentParser(description
='Ceph Filesystem top utility')
597 parser
.add_argument('--cluster', nargs
='?', const
='ceph', default
='ceph',
598 help='Ceph cluster to connect (default: ceph)')
599 parser
.add_argument('--id', nargs
='?', const
='fstop', default
='fstop',
600 help='Ceph user to use to connection (default: fstop)')
601 parser
.add_argument('--conffile', nargs
='?', default
=None,
602 help='Path to cluster configuration file')
603 parser
.add_argument('--selftest', dest
='selftest', action
='store_true',
604 help='Run in selftest mode')
605 parser
.add_argument('-d', '--delay', nargs
='?', default
=DEFAULT_REFRESH_INTERVAL
,
606 type=float_greater_than
, help='Interval to refresh data '
607 f
'(default: {DEFAULT_REFRESH_INTERVAL})')
609 args
= parser
.parse_args()
616 sys
.stdout
.write("selftest ok\n")
618 curses
.wrapper(ft
.setup_curses
)
619 except FSTopException
as fst
:
621 sys
.stderr
.write(f
'{fst.get_error_msg()}\n')
622 except Exception as e
:
624 sys
.stderr
.write(f
'exception: {e}\n')
627 sys
.exit(0 if not err
else -1)