3 # Copyright (c) 2013 Nicira, Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 # The approximate_size code was copied from
19 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20 # which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21 # used under a Creative Commons Attribution-Share-Alike license:
22 # http://creativecommons.org/licenses/by-sa/3.0/
26 """Top like behavior for ovs-dpctl dump-flows output.
28 This program summarizes ovs-dpctl flow content by aggregating the number
29 of packets, total bytes and occurrence of the following fields:
35 - Source and destination MAC addresses
39 - Source and destination IPv4 addresses
41 - Source and destination IPv6 addresses
43 - UDP and TCP destination port
45 - Tunnel source and destination addresses
48 Output shows four values:
49 - FIELDS: the flow fields for example in_port(1).
51 - PACKETS: the total number of packets containing the flow field.
53 - BYTES: the total number of bytes containing the flow field. If units are
54 not present then values are in bytes.
56 - AVERAGE: the average packets size (BYTES/PACKET).
58 - COUNT: the number of lines in the dump-flow output contain the flow field.
62 While in top mode, the default behavior, the following single character
63 commands are supported:
65 a - toggles top in accumulate and live mode. Accumulate mode is described
68 s - toggles which column is used to sort content in decreasing order. A
69 DESC title is placed over the column.
71 _ - a space indicating to collect dump-flow content again
73 h - halt output. Any character will restart sampling
75 f - cycle through flow fields. The initial field is in_port
81 There are two supported modes: live and accumulate. The default is live.
82 The parameter --accumulate or the 'a' character in top mode enables the
83 latter. In live mode, recent dump-flow content is presented.
84 Where as accumulate mode keeps track of the prior historical
85 information until the flow is reset not when the flow is purged. Reset
86 flows are determined when the packet count for a flow has decreased from
87 its previous sample. There is one caveat, eventually the system will
88 run out of memory if, after the accumulate-decay period any flows that
89 have not been refreshed are purged. The goal here is to free memory
90 of flows that are not active. Statistics are not decremented. Their purpose
91 is to reflect the overall history of the flow fields.
96 Parsing errors are counted and displayed in the status line at the beginning
97 of the output. Use the --verbose option with --script to see what output
98 was not parsed, like this:
99 $ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
101 Error messages will identify content that failed to parse.
106 The --host must follow the format user@hostname. This script simply calls
107 'ssh user@Hostname' without checking for login credentials therefore public
108 keys should be installed on the system identified by hostname, such as:
110 $ ssh-copy-id user@hostname
112 Consult ssh-copy-id man pages for more details.
119 or to run as a script:
120 $ ovs-dpctl dump-flows > dump-flows.log
121 $ ovs-dpctl-top --script --flow-file dump-flows.log
125 # pylint: disable-msg=C0103
126 # pylint: disable-msg=C0302
127 # pylint: disable-msg=R0902
128 # pylint: disable-msg=R0903
129 # pylint: disable-msg=R0904
130 # pylint: disable-msg=R0912
131 # pylint: disable-msg=R0913
132 # pylint: disable-msg=R0914
138 # Arg parse is not installed on older Python distributions.
139 # ovs ships with a version in the directory mentioned below.
142 sys.path.append(os.path.join("@pkgdatadir@", "python"))
161 # The following two definitions provide the necessary netaddr functionality.
162 # Python netaddr module is not part of the core installation. Packaging
163 # netaddr was involved and seems inappropriate given that only two
164 # methods where used.
165 def ipv4_to_network(ip_str):
166 """ Calculate the network given a ipv4/mask value.
167 If a mask is not present simply return ip_str.
171 (ip, mask) = ip_str.split("/")
173 # just an ip address no mask.
176 ip_p = socket.inet_pton(socket.AF_INET, ip)
177 ip_t = struct.unpack(pack_length, ip_p)
178 mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
181 return socket.inet_ntop(socket.AF_INET,
182 struct.pack('!HH', network_n[0], network_n[1]))
185 def ipv6_to_network(ip_str):
186 """ Calculate the network given a ipv6/mask value.
187 If a mask is not present simply return ip_str.
189 pack_length = '!HHHHHHHH'
191 (ip, mask) = ip_str.split("/")
193 # just an ip address no mask.
196 ip_p = socket.inet_pton(socket.AF_INET6, ip)
197 ip_t = struct.unpack(pack_length, ip_p)
198 mask_t = struct.unpack(pack_length,
199 socket.inet_pton(socket.AF_INET6, mask))
200 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
202 return socket.inet_ntop(socket.AF_INET6,
203 struct.pack(pack_length,
204 network_n[0], network_n[1],
205 network_n[2], network_n[3],
206 network_n[4], network_n[5],
207 network_n[6], network_n[7]))
214 """ Holds column specific content.
215 Titles needs to be less than 8 characters.
229 """ Return a associated list. """
230 return [(Columns.FIELDS, repr(obj)),
231 (Columns.PACKETS, obj.packets),
232 (Columns.BYTES, obj.bytes),
233 (Columns.COUNT, obj.count),
234 (Columns.AVERAGE, obj.average),
238 def element_eth_get(field_type, element, stats_dict):
239 """ Extract eth frame src and dst from a dump-flow element."""
240 fmt = "%s(src=%s,dst=%s)"
242 element = fmt % (field_type, element["src"], element["dst"])
243 return SumData(field_type, element, stats_dict["packets"],
244 stats_dict["bytes"], element)
247 def element_ipv4_get(field_type, element, stats_dict):
248 """ Extract src and dst from a dump-flow element."""
249 fmt = "%s(src=%s,dst=%s)"
250 element_show = fmt % (field_type, element["src"], element["dst"])
252 element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253 ipv4_to_network(element["dst"]))
255 return SumData(field_type, element_show, stats_dict["packets"],
256 stats_dict["bytes"], element_key)
259 def element_tunnel_get(field_type, element, stats_dict):
260 """ Extract src and dst from a tunnel."""
261 return element_ipv4_get(field_type, element, stats_dict)
264 def element_ipv6_get(field_type, element, stats_dict):
265 """ Extract src and dst from a dump-flow element."""
267 fmt = "%s(src=%s,dst=%s)"
268 element_show = fmt % (field_type, element["src"], element["dst"])
270 element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271 ipv6_to_network(element["dst"]))
273 return SumData(field_type, element_show, stats_dict["packets"],
274 stats_dict["bytes"], element_key)
277 def element_dst_port_get(field_type, element, stats_dict):
278 """ Extract src and dst from a dump-flow element."""
279 element_key = "%s(dst=%s)" % (field_type, element["dst"])
280 return SumData(field_type, element_key, stats_dict["packets"],
281 stats_dict["bytes"], element_key)
284 def element_passthrough_get(field_type, element, stats_dict):
285 """ Extract src and dst from a dump-flow element."""
286 element_key = "%s(%s)" % (field_type, element)
287 return SumData(field_type, element_key,
288 stats_dict["packets"], stats_dict["bytes"], element_key)
291 # pylint: disable-msg=R0903
293 """ Holds field_type and function to extract element value. """
294 def __init__(self, field_type, elements, generator):
295 self.field_type = field_type
296 self.elements = elements
297 self.generator = generator
300 # The order below is important. The initial flow field depends on whether
301 # --script or top mode is used. In top mode, the expected behavior, in_port
302 # flow fields are shown first. A future feature will allow users to
303 # filter output by selecting a row. Filtering by in_port is a natural
304 # filtering starting point.
306 # In script mode, all fields are shown. The expectation is that users could
307 # filter output by piping through grep.
309 # In top mode, the default flow field is in_port. In --script mode,
310 # the default flow field is all.
312 # All is added to the end of the OUTPUT_FORMAT list.
315 OutputFormat("in_port", (), element_passthrough_get),
316 OutputFormat("eth", ("src","dst"), element_eth_get),
317 OutputFormat("eth_type", (), element_passthrough_get),
318 OutputFormat("ipv4", ("src","dst"), element_ipv4_get),
319 OutputFormat("ipv6", ("src","dst"), element_ipv6_get),
320 OutputFormat("udp", ("src","dst"), element_dst_port_get),
321 OutputFormat("tcp", ("src","dst"), element_dst_port_get),
322 OutputFormat("tunnel", ("src","dst"), element_tunnel_get),
333 def top_input_get(args):
334 """ Return subprocess stdout."""
337 cmd += ["ssh", args.host]
338 cmd += ["ovs-dpctl", "dump-flows"]
340 return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
341 stdout=subprocess.PIPE).stdout
345 """ read program parameters handle any necessary validation of input. """
347 parser = argparse.ArgumentParser(
348 formatter_class=argparse.RawDescriptionHelpFormatter,
351 # None is a special value indicating to read flows from stdin.
352 # This handles the case
353 # ovs-dpctl dump-flows | ovs-dpctl-flows.py
354 parser.add_argument("-v", "--version", version="@VERSION@",
355 action="version", help="show version")
356 parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
358 help="file containing flows from ovs-dpctl dump-flow")
359 parser.add_argument("-V", "--verbose", dest="verbose",
360 default=logging.CRITICAL,
361 action="store_const", const=logging.DEBUG,
362 help="enable debug level verbosity")
363 parser.add_argument("-s", "--script", dest="top", action="store_false",
364 help="Run from a script (no user interface)")
365 parser.add_argument("--host", dest="host",
366 help="Specify a user@host for retrieving flows see"
367 "Accessing Remote Hosts for more information")
369 parser.add_argument("-a", "--accumulate", dest="accumulate",
370 action="store_true", default=False,
371 help="Accumulate dump-flow content")
372 parser.add_argument("--accumulate-decay", dest="accumulateDecay",
373 default=5.0 * 60, type=float,
374 help="Decay old accumulated flows. "
375 "The default is 5 minutes. "
376 "A value of 0 disables decay.")
377 parser.add_argument("-d", "--delay", dest="delay", type=int,
379 help="Delay in milliseconds to collect dump-flow "
380 "content (sample rate).")
382 args = parser.parse_args()
384 logging.basicConfig(level=args.verbose)
389 # Code to parse a single line in dump-flow
392 FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
394 FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
395 FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
398 def flow_line_iter(line):
399 """ iterate over flow dump elements.
400 return tuples of (true, element) or (false, remaining element)
402 # splits by , except for when in a (). Actions element was not
403 # split properly but we don't need it.
416 # ignore white space.
418 elif ((ch == ',') and (paren_count == 0)):
425 raise ValueError(line)
427 if (len(element) > 0):
432 def flow_line_compound_parse(compound):
433 """ Parse compound element
435 src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
437 eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
440 for element in flow_line_iter(compound):
441 match = FIELDS_CMPND_ELEMENT.search(element)
444 value = match.group(2)
447 match = FIELDS_CMPND.search(element)
450 value = match.group(2)
451 result[key] = flow_line_compound_parse(value)
454 if (len(result.keys()) == 0):
459 def flow_line_split(line):
460 """ Convert a flow dump line into ([fields], [stats], actions) tuple.
461 Where fields and stats are lists.
462 This function relies on a the following ovs-dpctl dump-flow
463 output characteristics:
464 1. The dumpe flow line consists of a list of frame fields, list of stats
466 2. list of frame fields, each stat and action field are delimited by ', '.
467 3. That all other non stat field are not delimited by ', '.
471 results = re.split(', ', line)
473 (field, stats, action) = (results[0], results[1:-1], results[-1])
475 fields = flow_line_iter(field)
476 return (fields, stats, action)
479 def elements_to_dict(elements):
480 """ Convert line to a hierarchy of dictionaries. """
482 for element in elements:
483 match = FIELDS_CMPND.search(element)
486 value = match.group(2)
487 result[key] = flow_line_compound_parse(value)
490 match = FIELDS_ELEMENT.search(element)
493 value = match.group(2)
496 raise ValueError("can't parse >%s<" % element)
500 # pylint: disable-msg=R0903
501 class SumData(object):
502 """ Interface that all data going into SumDb must implement.
503 Holds the flow field and its corresponding count, total packets,
504 total bytes and calculates average.
506 __repr__ is used as key into SumData singleton.
507 __str__ is used as human readable output.
510 def __init__(self, field_type, field, packets, flow_bytes, key):
511 # Count is the number of lines in the dump-flow log.
512 self.field_type = field_type
515 self.packets = int(packets)
516 self.bytes = int(flow_bytes)
519 def decrement(self, decr_packets, decr_bytes, decr_count):
520 """ Decrement content to calculate delta from previous flow sample."""
521 self.packets -= decr_packets
522 self.bytes -= decr_bytes
523 self.count -= decr_count
525 def __iadd__(self, other):
526 """ Add two objects. """
528 if (self.key != other.key):
529 raise ValueError("adding two unrelated types")
531 self.count += other.count
532 self.packets += other.packets
533 self.bytes += other.bytes
536 def __isub__(self, other):
537 """ Decrement two objects. """
539 if (self.key != other.key):
540 raise ValueError("adding two unrelated types")
542 self.count -= other.count
543 self.packets -= other.packets
544 self.bytes -= other.bytes
547 def __getattr__(self, name):
548 """ Handle average. """
549 if (name == "average"):
550 if (self.packets == 0):
553 return float(self.bytes) / float(self.packets)
554 raise AttributeError(name)
557 """ Used for debugging. """
558 return "%s %s %s %s" % (self.field, self.count,
559 self.packets, self.bytes)
562 """ Used as key in the FlowDB table. """
566 def flow_aggregate(fields_dict, stats_dict):
567 """ Search for content in a line.
568 Passed the flow port of the dump-flows plus the current stats consisting
569 of packets, bytes, etc
573 for output_format in OUTPUT_FORMAT:
574 field = fields_dict.get(output_format.field_type, None)
575 if (field) and all (k in field for k in output_format.elements):
576 obj = output_format.generator(output_format.field_type,
583 def flows_read(ihdl, flow_db):
584 """ read flow content from ihdl and insert into flow_db. """
588 line = ihdl.readline()
594 flow_db.flow_line_add(line)
595 except ValueError, arg:
601 def get_terminal_size():
603 return column width and height of the terminal
605 for fd_io in [0, 1, 2]:
607 result = struct.unpack('hh',
608 fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
614 if (result is None or result == (0, 0)):
615 # Maybe we can't get the width. In that case assume (25, 80)
621 # Content derived from:
622 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
624 SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
625 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
628 def approximate_size(size, a_kilobyte_is_1024_bytes=True):
629 """Convert a file size to human-readable form.
632 size -- file size in bytes
633 a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
634 if False, use multiples of 1000
641 raise ValueError('number must be non-negative')
643 if (a_kilobyte_is_1024_bytes):
647 for suffix in SUFFIXES[multiple]:
650 return "%.1f %s" % (size, suffix)
652 raise ValueError('number too large')
659 """ Concepts about columns. """
660 def __init__(self, sortable, width):
661 self.sortable = sortable
666 """ How to render rows. """
667 def __init__(self, label, fmt):
672 def fmt_packet(obj, width):
673 """ Provide a string for packets that is appropriate for output."""
674 return str(obj.packets).rjust(width)
677 def fmt_count(obj, width):
678 """ Provide a string for average that is appropriate for output."""
679 return str(obj.count).rjust(width)
682 def fmt_avg(obj, width):
683 """ Provide a string for average that is appropriate for output."""
684 return str(int(obj.average)).rjust(width)
687 def fmt_field(obj, width):
688 """ truncate really long flow and insert ellipses to help make it
694 if (len(obj.field) > width):
695 value = value[:(width - len(ellipses))] + ellipses
696 return value.ljust(width)
699 def fmt_bytes(obj, width):
700 """ Provide a string for average that is appropriate for output."""
701 if (len(str(obj.bytes)) <= width):
702 value = str(obj.bytes)
704 value = approximate_size(obj.bytes)
705 return value.rjust(width)
708 def title_center(value, width):
709 """ Center a column title."""
710 return value.upper().center(width)
713 def title_rjust(value, width):
714 """ Right justify a column title. """
715 return value.upper().rjust(width)
718 def column_picker(order, obj):
719 """ return the column as specified by order. """
729 raise ValueError("order outside of range %s" % order)
733 """ Renders flow data.
735 The two FIELD_SELECT variables should be set to the actual field minus
736 1. During construction, an internal method increments and initializes
739 FLOW_FIELDS = [_field.field_type for _field in OUTPUT_FORMAT] + ["all"]
741 FIELD_SELECT_SCRIPT = 7
742 FIELD_SELECT_TOP = -1
744 def __init__(self, console_width, field_select):
745 """ Calculate column widths taking into account changes in format."""
747 self._start_time = datetime.datetime.now()
749 self._cols = [ColMeta(False, 0),
750 ColMeta(True, Columns.VALUE_WIDTH),
751 ColMeta(True, Columns.VALUE_WIDTH),
752 ColMeta(True, Columns.VALUE_WIDTH),
753 ColMeta(True, Columns.VALUE_WIDTH)]
754 self._console_width = console_width
755 self.console_width_set(console_width)
757 # Order in this array dictate the order of the columns.
758 # The 0 width for the first entry is a place holder. This is
759 # dynamically calculated. The first column is special. We need a
760 # way to indicate which field are presented.
761 self._descs = [RowMeta("", title_rjust),
762 RowMeta("", title_rjust),
763 RowMeta("", title_rjust),
764 RowMeta("", title_rjust),
765 RowMeta("", title_rjust)]
766 self._column_sort_select = 0
767 self.column_select_event()
770 RowMeta(Columns.FIELDS, title_center),
771 RowMeta(Columns.COUNT, title_rjust),
772 RowMeta(Columns.PACKETS, title_rjust),
773 RowMeta(Columns.BYTES, title_rjust),
774 RowMeta(Columns.AVERAGE, title_rjust)
778 RowMeta(None, fmt_field),
779 RowMeta(None, fmt_count),
780 RowMeta(None, fmt_packet),
781 RowMeta(None, fmt_bytes),
782 RowMeta(None, fmt_avg)
786 # _field_types hold which fields are displayed in the field
787 # column, with the keyword all implying all fields.
789 self._field_types = Render.FLOW_FIELDS
792 # The default is to show all field types.
794 self._field_type_select = field_select
795 self.field_type_toggle()
797 def _field_type_select_get(self):
798 """ Return which field type to display. """
799 return self._field_types[self._field_type_select]
801 def field_type_toggle(self):
802 """ toggle which field types to show. """
803 self._field_type_select += 1
804 if (self._field_type_select >= len(self._field_types)):
805 self._field_type_select = 0
806 value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
807 self._titles[0].label = value
809 def column_select_event(self):
810 """ Handles column select toggle. """
812 self._descs[self._column_sort_select].label = ""
813 for _ in range(len(self._cols)):
814 self._column_sort_select += 1
815 if (self._column_sort_select >= len(self._cols)):
816 self._column_sort_select = 0
818 # Now look for the next sortable column
819 if (self._cols[self._column_sort_select].sortable):
821 self._descs[self._column_sort_select].label = "DESC"
823 def console_width_set(self, console_width):
824 """ Adjust the output given the new console_width. """
825 self._console_width = console_width
827 spaces = len(self._cols) - 1
829 # Calculating column width can be tedious but important. The
830 # flow field value can be long. The goal here is to dedicate
831 # fixed column space for packets, bytes, average and counts. Give the
832 # remaining space to the flow column. When numbers get large
833 # transition output to output generated by approximate_size which
834 # limits output to ###.# XiB in other words 9 characters.
836 # At this point, we know the maximum length values. We may
837 # truncate the flow column to get everything to fit.
838 self._cols[0].width = 0
839 values_max_length = sum([ii.width for ii in self._cols]) + spaces
840 flow_max_length = console_width - values_max_length
841 self._cols[0].width = flow_max_length
843 def format(self, flow_db):
844 """ shows flows based on --script parameter."""
848 # Top output consists of
850 # Column title (2 rows)
852 # statistics and status
857 rc.append("Flow Summary".center(self._console_width))
859 stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
860 flow_db.flow_stats_get()
861 accumulate = flow_db.accumulate_get()
863 stats += "Accumulate: on "
865 stats += "Accumulate: off "
867 duration = datetime.datetime.now() - self._start_time
868 stats += "Duration: %s " % str(duration)
869 rc.append(stats.ljust(self._console_width))
872 # 2 rows for columns.
874 # Indicate which column is in descending order.
875 rc.append(" ".join([ii.fmt(ii.label, col.width)
876 for (ii, col) in zip(self._descs, self._cols)]))
878 rc.append(" ".join([ii.fmt(ii.label, col.width)
879 for (ii, col) in zip(self._titles, self._cols)]))
884 for dd in flow_db.field_values_in_order(self._field_type_select_get(),
885 self._column_sort_select):
886 rc.append(" ".join([ii.fmt(dd, col.width)
887 for (ii, col) in zip(self._datas,
893 def curses_screen_begin():
894 """ begin curses screen control. """
895 stdscr = curses.initscr()
902 def curses_screen_end(stdscr):
903 """ end curses screen control. """
911 """ Implements live vs accumulate mode.
913 Flows are stored as key value pairs. The key consists of the content
914 prior to stat fields. The value portion consists of stats in a dictionary
917 @ \todo future add filtering here.
919 def __init__(self, accumulate):
920 self._accumulate = accumulate
921 self._error_count = 0
922 # Values are (stats, last update time.)
923 # The last update time is used for aging.
924 self._flow_lock = threading.Lock()
925 # This dictionary holds individual flows.
927 # This dictionary holds aggregate of flow fields.
930 def accumulate_get(self):
931 """ Return the current accumulate state. """
932 return self._accumulate
934 def accumulate_toggle(self):
935 """ toggle accumulate flow behavior. """
936 self._accumulate = not self._accumulate
939 """ Indicate the beginning of processing flow content.
940 if accumulate is false clear current set of flows. """
942 if (not self._accumulate):
943 self._flow_lock.acquire()
947 self._flow_lock.release()
950 def flow_line_add(self, line):
951 """ Split a line from a ovs-dpctl dump-flow into key and stats.
952 The order of the content in the flow should be:
957 This method also assumes that the dump flow output does not
958 change order of fields of the same flow.
961 line = line.rstrip("\n")
962 (fields, stats, _) = flow_line_split(line)
965 fields_dict = elements_to_dict(fields)
967 if (len(fields_dict) == 0):
968 raise ValueError("flow fields are missing %s", line)
970 stats_dict = elements_to_dict(stats)
971 if not all (k in stats_dict for k in ("packets","bytes")):
972 raise ValueError("statistics are missing %s.", line)
975 # In accumulate mode, the Flow database can reach 10,000's of
976 # persistent flows. The interaction of the script with this many
977 # flows is too slow. Instead, delta are sent to the flow_db
978 # database allow incremental changes to be done in O(m) time
979 # where m is the current flow list, instead of iterating over
980 # all flows in O(n) time where n is the entire history of flows.
981 key = ",".join(fields)
983 self._flow_lock.acquire()
985 (stats_old_dict, _) = self._flows.get(key, (None, None))
987 self._flow_lock.release()
989 self.flow_event(fields_dict, stats_old_dict, stats_dict)
991 except ValueError, arg:
993 self._error_count += 1
996 self._flow_lock.acquire()
998 self._flows[key] = (stats_dict, datetime.datetime.now())
1000 self._flow_lock.release()
1002 def decay(self, decayTimeInSeconds):
1003 """ Decay content. """
1004 now = datetime.datetime.now()
1005 for (key, value) in self._flows.items():
1006 (stats_dict, updateTime) = value
1007 delta = now - updateTime
1009 if (delta.seconds > decayTimeInSeconds):
1010 self._flow_lock.acquire()
1012 del self._flows[key]
1014 fields_dict = elements_to_dict(flow_line_iter(key))
1015 matches = flow_aggregate(fields_dict, stats_dict)
1016 for match in matches:
1017 self.field_dec(match)
1020 self._flow_lock.release()
1022 def flow_stats_get(self):
1023 """ Return statistics in a form of a dictionary. """
1025 self._flow_lock.acquire()
1027 rc = {"flow_total": len(self._flows),
1028 "flow_errors": self._error_count}
1030 self._flow_lock.release()
1033 def field_types_get(self):
1034 """ Return the set of types stored in the singleton. """
1035 types = set((ii.field_type for ii in self._fields.values()))
1038 def field_add(self, data):
1039 """ Collect dump-flow data to sum number of times item appears. """
1040 current = self._fields.get(repr(data), None)
1041 if (current is None):
1042 current = copy.copy(data)
1045 self._fields[repr(current)] = current
1047 def field_dec(self, data):
1048 """ Collect dump-flow data to sum number of times item appears. """
1049 current = self._fields.get(repr(data), None)
1050 if (current is None):
1051 raise ValueError("decrementing field missing %s" % repr(data))
1054 self._fields[repr(current)] = current
1055 if (current.count == 0):
1056 del self._fields[repr(current)]
1058 def field_values_in_order(self, field_type_select, column_order):
1059 """ Return a list of items in order maximum first. """
1060 values = self._fields.values()
1061 if (field_type_select != "all"):
1062 # If a field type other than "all" then reduce the list.
1063 values = [ii for ii in values
1064 if (ii.field_type == field_type_select)]
1065 values = [(column_picker(column_order, ii), ii) for ii in values]
1066 values.sort(key=operator.itemgetter(0))
1068 values = [ii[1] for ii in values]
1071 def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1072 """ Receives new flow information. """
1074 # In order to avoid processing every flow at every sample
1075 # period, changes in flow packet count is used to determine the
1076 # delta in the flow statistics. This delta is used in the call
1077 # to self.decrement prior to self.field_add
1079 if (stats_old_dict is None):
1080 # This is a new flow
1081 matches = flow_aggregate(fields_dict, stats_new_dict)
1082 for match in matches:
1083 self.field_add(match)
1085 old_packets = int(stats_old_dict.get("packets", 0))
1086 new_packets = int(stats_new_dict.get("packets", 0))
1087 if (old_packets == new_packets):
1088 # ignore. same data.
1091 old_bytes = stats_old_dict.get("bytes", 0)
1092 # old_packets != new_packets
1093 # if old_packets > new_packets then we end up decrementing
1094 # packets and bytes.
1095 matches = flow_aggregate(fields_dict, stats_new_dict)
1096 for match in matches:
1097 match.decrement(int(old_packets), int(old_bytes), 1)
1098 self.field_add(match)
1101 class DecayThread(threading.Thread):
1102 """ Periodically call flow database to see if any flows are old. """
1103 def __init__(self, flow_db, interval):
1104 """ Start decay thread. """
1105 threading.Thread.__init__(self)
1107 self._interval = max(1, interval)
1108 self._min_interval = min(1, interval / 10)
1109 self._flow_db = flow_db
1110 self._event = threading.Event()
1111 self._running = True
1116 """ Worker thread which handles decaying accumulated flows. """
1118 while(self._running):
1119 self._event.wait(self._min_interval)
1121 self._flow_db.decay(self._interval)
1124 """ Stop thread. """
1125 self._running = False
1128 # Give the calling thread time to terminate but not too long.
1129 # this thread is a daemon so the application will terminate if
1130 # we timeout during the join. This is just a cleaner way to
1131 # release resources.
1135 def flow_top_command(stdscr, render, flow_db):
1136 """ Handle input while in top mode. """
1139 # Any character will restart sampling.
1140 if (ch == ord('h')):
1146 if (ch == ord('s')):
1147 # toggle which column sorts data in descending order.
1148 render.column_select_event()
1149 elif (ch == ord('a')):
1150 flow_db.accumulate_toggle()
1151 elif (ch == ord('f')):
1152 render.field_type_toggle()
1153 elif (ch == ord(' ')):
1160 def decay_timer_start(flow_db, accumulateDecay):
1161 """ If accumulateDecay greater than zero then start timer. """
1162 if (accumulateDecay > 0):
1163 decay_timer = DecayThread(flow_db, accumulateDecay)
1170 def flows_top(args):
1171 """ handles top like behavior when --script is not specified. """
1173 flow_db = FlowDB(args.accumulate)
1174 render = Render(0, Render.FIELD_SELECT_TOP)
1176 decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1180 stdscr = curses_screen_begin()
1184 stdscr.timeout(args.delay)
1186 while (ch != ord('q')):
1190 ihdl = top_input_get(args)
1192 flows_read(ihdl, flow_db)
1195 except OSError, arg:
1196 logging.critical(arg)
1199 (console_height, console_width) = stdscr.getmaxyx()
1200 render.console_width_set(console_width)
1202 output_height = console_height - 1
1203 line_count = range(output_height)
1204 line_output = render.format(flow_db)
1205 lines = zip(line_count, line_output[:output_height])
1208 for (count, line) in lines:
1209 stdscr.addstr(count, 0, line[:console_width])
1212 ch = flow_top_command(stdscr, render, flow_db)
1215 curses_screen_end(stdscr)
1216 except KeyboardInterrupt:
1222 for (count, line) in lines:
1226 def flows_script(args):
1227 """ handles --script option. """
1229 flow_db = FlowDB(args.accumulate)
1232 if (args.flowFiles is None):
1233 logging.info("reading flows from stdin")
1234 ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1236 flow_db = flows_read(ihdl, flow_db)
1240 for flowFile in args.flowFiles:
1241 logging.info("reading flows from %s", flowFile)
1242 ihdl = open(flowFile, "r")
1244 flow_db = flows_read(ihdl, flow_db)
1248 (_, console_width) = get_terminal_size()
1249 render = Render(console_width, Render.FIELD_SELECT_SCRIPT)
1251 for line in render.format(flow_db):
1256 """ Return 0 on success or 1 on failure.
1259 There are four stages to the process ovs-dpctl dump-flow content.
1260 1. Retrieve current input
1261 2. store in FlowDB and maintain history
1262 3. Iterate over FlowDB and aggregating stats for each flow field
1265 Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1266 is called. Future version will have more elaborate means for collecting
1267 dump-flow content. FlowDB returns all data as in the form of a hierarchical
1268 dictionary. Input will vary.
1270 In the case of accumulate mode, flows are not purged from the FlowDB
1271 manager. Instead at the very least, merely the latest statistics are
1272 kept. In the case, of live output the FlowDB is purged prior to sampling
1275 Aggregating results requires identify flow fields to aggregate out
1276 of the flow and summing stats.
1286 except KeyboardInterrupt:
1290 if __name__ == '__main__':
1292 elif __name__ == 'ovs-dpctl-top':
1293 # pylint: disable-msg=R0915
1296 # Test case beyond this point.
1297 # pylint: disable-msg=R0904
1298 class TestsuiteFlowParse(unittest.TestCase):
1300 parse flow into hierarchy of dictionaries.
1302 def test_flow_parse(self):
1303 """ test_flow_parse. """
1304 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1305 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1306 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1307 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1308 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1309 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1310 "38,41,44,47,50,53,56,59,62,65"
1312 (fields, stats, _) = flow_line_split(line)
1313 flow_dict = elements_to_dict(fields + stats)
1314 self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1315 self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1316 self.assertEqual(flow_dict["ipv6"]["src"],
1317 "fe80::55bf:fe42:bc96:2812")
1318 self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1319 self.assertEqual(flow_dict["packets"], "1")
1320 self.assertEqual(flow_dict["bytes"], "92")
1322 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1323 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1324 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1325 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1326 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1327 "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1328 "38,41,44,47,50,53,56,59,62,65"
1330 (fields, stats, _) = flow_line_split(line)
1331 flow_dict = elements_to_dict(fields + stats)
1332 self.assertEqual(flow_dict["used"], "-0.703s")
1333 self.assertEqual(flow_dict["packets"], "1")
1334 self.assertEqual(flow_dict["bytes"], "92")
1336 def test_flow_sum(self):
1337 """ test_flow_sum. """
1338 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1339 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1340 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1341 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1342 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1343 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1344 "38,41,44,47,50,53,56,59,62,65"
1346 (fields, stats, _) = flow_line_split(line)
1347 stats_dict = elements_to_dict(stats)
1348 fields_dict = elements_to_dict(fields)
1350 # Test simple case of one line.
1351 flow_db = FlowDB(False)
1352 matches = flow_aggregate(fields_dict, stats_dict)
1353 for match in matches:
1354 flow_db.field_add(match)
1356 flow_types = flow_db.field_types_get()
1357 expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1358 self.assert_(len(flow_types) == len(expected_flow_types))
1359 for flow_type in flow_types:
1360 self.assertTrue(flow_type in expected_flow_types)
1362 for flow_type in flow_types:
1363 sum_value = flow_db.field_values_in_order("all", 1)
1364 self.assert_(len(sum_value) == 5)
1365 self.assert_(sum_value[0].packets == 2)
1366 self.assert_(sum_value[0].count == 1)
1367 self.assert_(sum_value[0].bytes == 92)
1370 # Add line again just to see counts go up.
1371 matches = flow_aggregate(fields_dict, stats_dict)
1372 for match in matches:
1373 flow_db.field_add(match)
1375 flow_types = flow_db.field_types_get()
1376 self.assert_(len(flow_types) == len(expected_flow_types))
1377 for flow_type in flow_types:
1378 self.assertTrue(flow_type in expected_flow_types)
1380 for flow_type in flow_types:
1381 sum_value = flow_db.field_values_in_order("all", 1)
1382 self.assert_(len(sum_value) == 5)
1383 self.assert_(sum_value[0].packets == 4)
1384 self.assert_(sum_value[0].count == 2)
1385 self.assert_(sum_value[0].bytes == 2 * 92)
1387 def test_assoc_list(self):
1388 """ test_assoc_list. """
1389 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1390 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1391 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1392 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1393 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1394 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1395 "38,41,44,47,50,53,56,59,62,65"
1401 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1402 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1405 (fields, stats, _) = flow_line_split(line)
1406 stats_dict = elements_to_dict(stats)
1407 fields_dict = elements_to_dict(fields)
1410 # Test simple case of one line.
1411 flow_db = FlowDB(False)
1412 matches = flow_aggregate(fields_dict, stats_dict)
1413 for match in matches:
1414 flow_db.field_add(match)
1416 for sum_value in flow_db.field_values_in_order("all", 1):
1417 assoc_list = Columns.assoc_list(sum_value)
1418 for item in assoc_list:
1419 if (item[0] == "fields"):
1420 self.assertTrue(item[1] in valid_flows)
1421 elif (item[0] == "packets"):
1422 self.assertTrue(item[1] == 2)
1423 elif (item[0] == "count"):
1424 self.assertTrue(item[1] == 1)
1425 elif (item[0] == "average"):
1426 self.assertTrue(item[1] == 46.0)
1427 elif (item[0] == "bytes"):
1428 self.assertTrue(item[1] == 92)
1430 raise ValueError("unknown %s", item[0])
1432 def test_human_format(self):
1433 """ test_assoc_list. """
1435 self.assertEqual(approximate_size(0.0), "0.0 KiB")
1436 self.assertEqual(approximate_size(1024), "1.0 KiB")
1437 self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1438 self.assertEqual(approximate_size((1024 * 1024) + 100000),
1440 value = (1024 * 1024 * 1024) + 100000000
1441 self.assertEqual(approximate_size(value), "1.1 GiB")
1443 def test_flow_line_split(self):
1444 """ Splitting a flow line is not trivial.
1445 There is no clear delimiter. Comma is used liberally."""
1446 expected_fields = ["in_port(4)",
1447 "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1449 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1450 "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1451 "udp(src=61252,dst=5355)"]
1452 expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1453 expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1454 "38,41,44,47,50,53,56,59,62,65"
1456 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1457 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1458 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1459 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1460 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1461 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1462 "38,41,44,47,50,53,56,59,62,65"
1464 (fields, stats, actions) = flow_line_split(line)
1466 self.assertEqual(fields, expected_fields)
1467 self.assertEqual(stats, expected_stats)
1468 self.assertEqual(actions, expected_actions)
1470 def test_accumulate_decay(self):
1471 """ test_accumulate_decay: test accumulated decay. """
1472 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1473 "dst=ff:ff:ff:ff:ff:ff),"
1474 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1475 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1476 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1477 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1478 "packets:1, bytes:120, used:0.004s, actions:1"]
1480 flow_db = FlowDB(True)
1482 flow_db.flow_line_add(lines[0])
1484 # Make sure we decay
1486 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1488 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1490 flow_db.flow_line_add(lines[0])
1491 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1493 # Should not be deleted.
1494 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1496 flow_db.flow_line_add(lines[0])
1497 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1498 timer = decay_timer_start(flow_db, 2)
1500 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1503 def test_accumulate(self):
1504 """ test_accumulate test that FlowDB supports accumulate. """
1506 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1507 "dst=ff:ff:ff:ff:ff:ff),"
1508 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1509 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1510 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1511 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1512 "packets:1, bytes:120, used:0.004s, actions:1",
1514 "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1515 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1516 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1517 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1518 "packets:2, bytes:5026, used:0.348s, actions:1",
1519 "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1520 "dst=ff:ff:ff:ff:ff:ff),"
1521 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1522 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1523 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1524 "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1525 "bytes:240, used:0.004s, actions:1"]
1528 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1529 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1530 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1531 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1532 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1533 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1536 # Turn on accumulate.
1537 flow_db = FlowDB(True)
1540 flow_db.flow_line_add(lines[0])
1542 # Test one flow exist.
1543 sum_values = flow_db.field_values_in_order("all", 1)
1544 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1545 self.assertEqual(len(in_ports), 1)
1546 self.assertEqual(in_ports[0].packets, 1)
1547 self.assertEqual(in_ports[0].bytes, 120)
1548 self.assertEqual(in_ports[0].count, 1)
1550 # simulate another sample
1551 # Test two different flows exist.
1553 flow_db.flow_line_add(lines[1])
1554 sum_values = flow_db.field_values_in_order("all", 1)
1555 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1556 self.assertEqual(len(in_ports), 1)
1557 self.assertEqual(in_ports[0].packets, 1)
1558 self.assertEqual(in_ports[0].bytes, 120)
1559 self.assertEqual(in_ports[0].count, 1)
1561 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1562 self.assertEqual(len(in_ports), 1)
1563 self.assertEqual(in_ports[0].packets, 2)
1564 self.assertEqual(in_ports[0].bytes, 126)
1565 self.assertEqual(in_ports[0].count, 1)
1567 # Test first flow increments packets.
1569 flow_db.flow_line_add(lines[2])
1570 sum_values = flow_db.field_values_in_order("all", 1)
1571 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1572 self.assertEqual(len(in_ports), 1)
1573 self.assertEqual(in_ports[0].packets, 2)
1574 self.assertEqual(in_ports[0].bytes, 240)
1575 self.assertEqual(in_ports[0].count, 1)
1577 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1578 self.assertEqual(len(in_ports), 1)
1579 self.assertEqual(in_ports[0].packets, 2)
1580 self.assertEqual(in_ports[0].bytes, 126)
1581 self.assertEqual(in_ports[0].count, 1)
1583 # Test third flow but with the same in_port(1) as the first flow.
1585 flow_db.flow_line_add(lines[3])
1586 sum_values = flow_db.field_values_in_order("all", 1)
1587 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1588 self.assertEqual(len(in_ports), 1)
1589 self.assertEqual(in_ports[0].packets, 3)
1590 self.assertEqual(in_ports[0].bytes, 360)
1591 self.assertEqual(in_ports[0].count, 2)
1593 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1594 self.assertEqual(len(in_ports), 1)
1595 self.assertEqual(in_ports[0].packets, 2)
1596 self.assertEqual(in_ports[0].bytes, 126)
1597 self.assertEqual(in_ports[0].count, 1)
1599 # Third flow has changes.
1601 flow_db.flow_line_add(lines[4])
1602 sum_values = flow_db.field_values_in_order("all", 1)
1603 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1604 self.assertEqual(len(in_ports), 1)
1605 self.assertEqual(in_ports[0].packets, 4)
1606 self.assertEqual(in_ports[0].bytes, 480)
1607 self.assertEqual(in_ports[0].count, 2)
1609 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1610 self.assertEqual(len(in_ports), 1)
1611 self.assertEqual(in_ports[0].packets, 2)
1612 self.assertEqual(in_ports[0].bytes, 126)
1613 self.assertEqual(in_ports[0].count, 1)
1617 flow_db.flow_line_add(lines[5])
1618 sum_values = flow_db.field_values_in_order("all", 1)
1619 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1620 self.assertEqual(len(in_ports), 1)
1621 self.assertEqual(in_ports[0].packets, 3)
1622 self.assertEqual(in_ports[0].bytes, 360)
1623 self.assertEqual(in_ports[0].count, 2)
1625 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1626 self.assertEqual(len(in_ports), 1)
1627 self.assertEqual(in_ports[0].packets, 2)
1628 self.assertEqual(in_ports[0].bytes, 126)
1629 self.assertEqual(in_ports[0].count, 1)
1631 def test_parse_character_errors(self):
1632 """ test_parsing errors.
1633 The flow parses is purposely loose. Its not designed to validate
1634 input. Merely pull out what it can but there are situations
1635 that a parse error can be detected.
1638 lines = ["complete garbage",
1639 "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1640 "dst=33:33:00:00:00:66),"
1641 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1642 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1643 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1644 "packets:2,bytes:5026,actions:1"]
1646 flow_db = FlowDB(False)
1650 flow_db.flow_line_add(line)
1652 # We want an exception. That is how we know we have
1653 # correctly found a simple parsing error. We are not
1654 # looking to validate flow output just catch simple issues.
1656 self.assertTrue(False)
1658 def test_tunnel_parsing(self):
1659 """ test_tunnel_parsing test parse flows with tunnel. """
1661 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1662 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1663 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1664 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1665 "actions:userspace(pid=4294962691,slow_path(cfm))"
1667 flow_db = FlowDB(False)
1669 flow_db.flow_line_add(lines[0])
1670 sum_values = flow_db.field_values_in_order("all", 1)
1671 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1672 self.assertEqual(len(in_ports), 1)
1673 self.assertEqual(in_ports[0].packets, 6)
1674 self.assertEqual(in_ports[0].bytes, 534)
1675 self.assertEqual(in_ports[0].count, 1)
1677 def test_flow_multiple_paren(self):
1678 """ test_flow_multiple_paren. """
1679 line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1680 valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1682 rc = flow_line_iter(line)
1683 self.assertEqual(valid, rc)
1685 def test_to_network(self):
1686 """ test_to_network test ipv4_to_network and ipv6_to_network. """
1688 ("192.168.0.1", "192.168.0.1"),
1689 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1690 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1691 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1692 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1693 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1694 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1695 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1696 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1697 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1701 ("1::192:168:0:1", "1::192:168:0:1"),
1702 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1703 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1704 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1705 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1706 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1707 ("1::192:168:0:1/::", "::")
1710 for (ipv4_test, ipv4_check) in ipv4s:
1711 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1713 for (ipv6_test, ipv6_check) in ipv6s:
1714 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
1717 """ test_ui: test expected ui behavior. """
1718 #pylint: disable=W0212
1719 top_render = Render(80, Render.FIELD_SELECT_TOP)
1720 script_render = Render(80, Render.FIELD_SELECT_SCRIPT)
1721 self.assertEqual(top_render._field_type_select_get(), "in_port")
1722 self.assertEqual(script_render._field_type_select_get(), "all")
1723 #pylint: enable=W0212