]> git.proxmox.com Git - mirror_ovs.git/blame - utilities/ovs-dpctl-top.in
ofctl: break the loop if ovs_pcap_read returns error
[mirror_ovs.git] / utilities / ovs-dpctl-top.in
CommitLineData
b49a959b 1#! @PYTHON@
14b4c575
MH
2#
3# Copyright (c) 2013 Nicira, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at:
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17#
18# The approximate_size code was copied from
19# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21# used under a Creative Commons Attribution-Share-Alike license:
22# http://creativecommons.org/licenses/by-sa/3.0/
23#
24#
25
26"""Top like behavior for ovs-dpctl dump-flows output.
27
28This program summarizes ovs-dpctl flow content by aggregating the number
29of packets, total bytes and occurrence of the following fields:
30
31 - Datapath in_port
32
33 - Ethernet type
34
35 - Source and destination MAC addresses
36
37 - IP protocol
38
39 - Source and destination IPv4 addresses
40
41 - Source and destination IPv6 addresses
42
43 - UDP and TCP destination port
44
45 - Tunnel source and destination addresses
46
47
48Output shows four values:
49 - FIELDS: the flow fields for example in_port(1).
50
51 - PACKETS: the total number of packets containing the flow field.
52
53 - BYTES: the total number of bytes containing the flow field. If units are
54 not present then values are in bytes.
55
56 - AVERAGE: the average packets size (BYTES/PACKET).
57
58 - COUNT: the number of lines in the dump-flow output contain the flow field.
59
60Top Behavior
61
62While in top mode, the default behavior, the following single character
63commands are supported:
64
65 a - toggles top in accumulate and live mode. Accumulate mode is described
66 below.
67
68 s - toggles which column is used to sort content in decreasing order. A
69 DESC title is placed over the column.
70
71 _ - a space indicating to collect dump-flow content again
72
73 h - halt output. Any character will restart sampling
74
3a2742a1 75 f - cycle through flow fields. The initial field is in_port
14b4c575
MH
76
77 q - q for quit.
78
79Accumulate Mode
80
81There are two supported modes: live and accumulate. The default is live.
82The parameter --accumulate or the 'a' character in top mode enables the
83latter. In live mode, recent dump-flow content is presented.
84Where as accumulate mode keeps track of the prior historical
85information until the flow is reset not when the flow is purged. Reset
86flows are determined when the packet count for a flow has decreased from
87its previous sample. There is one caveat, eventually the system will
88run out of memory if, after the accumulate-decay period any flows that
89have not been refreshed are purged. The goal here is to free memory
90of flows that are not active. Statistics are not decremented. Their purpose
91is to reflect the overall history of the flow fields.
92
93
94Debugging Errors
95
96Parsing errors are counted and displayed in the status line at the beginning
97of the output. Use the --verbose option with --script to see what output
98 was not parsed, like this:
99$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
100
101Error messages will identify content that failed to parse.
102
103
104Access Remote Hosts
105
106The --host must follow the format user@hostname. This script simply calls
107'ssh user@Hostname' without checking for login credentials therefore public
108keys should be installed on the system identified by hostname, such as:
109
110$ ssh-copy-id user@hostname
111
112Consult ssh-copy-id man pages for more details.
113
114
115Expected usage
116
117$ ovs-dpctl-top
118
119or to run as a script:
120$ ovs-dpctl dump-flows > dump-flows.log
121$ ovs-dpctl-top --script --flow-file dump-flows.log
122
123"""
124
125# pylint: disable-msg=C0103
126# pylint: disable-msg=C0302
127# pylint: disable-msg=R0902
128# pylint: disable-msg=R0903
129# pylint: disable-msg=R0904
130# pylint: disable-msg=R0912
131# pylint: disable-msg=R0913
132# pylint: disable-msg=R0914
133
134import sys
135import os
136try:
137 ##
138 # Arg parse is not installed on older Python distributions.
139 # ovs ships with a version in the directory mentioned below.
140 import argparse
141except ImportError:
142 sys.path.append(os.path.join("@pkgdatadir@", "python"))
143 import argparse
144import logging
145import re
146import unittest
147import copy
148import curses
149import operator
150import subprocess
151import fcntl
152import struct
153import termios
154import datetime
155import threading
156import time
157import socket
158
159
160##
161# The following two definitions provide the necessary netaddr functionality.
162# Python netaddr module is not part of the core installation. Packaging
163# netaddr was involved and seems inappropriate given that only two
164# methods where used.
165def ipv4_to_network(ip_str):
166 """ Calculate the network given a ipv4/mask value.
167 If a mask is not present simply return ip_str.
168 """
169 pack_length = '!HH'
170 try:
171 (ip, mask) = ip_str.split("/")
172 except ValueError:
173 # just an ip address no mask.
174 return ip_str
175
176 ip_p = socket.inet_pton(socket.AF_INET, ip)
177 ip_t = struct.unpack(pack_length, ip_p)
178 mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
180
181 return socket.inet_ntop(socket.AF_INET,
182 struct.pack('!HH', network_n[0], network_n[1]))
183
184
185def ipv6_to_network(ip_str):
186 """ Calculate the network given a ipv6/mask value.
187 If a mask is not present simply return ip_str.
188 """
189 pack_length = '!HHHHHHHH'
190 try:
191 (ip, mask) = ip_str.split("/")
192 except ValueError:
193 # just an ip address no mask.
194 return ip_str
195
196 ip_p = socket.inet_pton(socket.AF_INET6, ip)
197 ip_t = struct.unpack(pack_length, ip_p)
198 mask_t = struct.unpack(pack_length,
199 socket.inet_pton(socket.AF_INET6, mask))
200 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
201
202 return socket.inet_ntop(socket.AF_INET6,
203 struct.pack(pack_length,
204 network_n[0], network_n[1],
205 network_n[2], network_n[3],
206 network_n[4], network_n[5],
207 network_n[6], network_n[7]))
208
209
210##
211# columns displayed
212##
213class Columns:
214 """ Holds column specific content.
215 Titles needs to be less than 8 characters.
216 """
217 VALUE_WIDTH = 9
218 FIELDS = "fields"
219 PACKETS = "packets"
220 COUNT = "count"
221 BYTES = "bytes"
222 AVERAGE = "average"
223
224 def __init__(self):
225 pass
226
227 @staticmethod
228 def assoc_list(obj):
229 """ Return a associated list. """
230 return [(Columns.FIELDS, repr(obj)),
231 (Columns.PACKETS, obj.packets),
232 (Columns.BYTES, obj.bytes),
233 (Columns.COUNT, obj.count),
234 (Columns.AVERAGE, obj.average),
235 ]
236
237
238def element_eth_get(field_type, element, stats_dict):
239 """ Extract eth frame src and dst from a dump-flow element."""
240 fmt = "%s(src=%s,dst=%s)"
241
242 element = fmt % (field_type, element["src"], element["dst"])
243 return SumData(field_type, element, stats_dict["packets"],
244 stats_dict["bytes"], element)
245
246
247def element_ipv4_get(field_type, element, stats_dict):
248 """ Extract src and dst from a dump-flow element."""
249 fmt = "%s(src=%s,dst=%s)"
250 element_show = fmt % (field_type, element["src"], element["dst"])
251
252 element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253 ipv4_to_network(element["dst"]))
254
255 return SumData(field_type, element_show, stats_dict["packets"],
256 stats_dict["bytes"], element_key)
257
258
259def element_tunnel_get(field_type, element, stats_dict):
260 """ Extract src and dst from a tunnel."""
261 return element_ipv4_get(field_type, element, stats_dict)
262
263
264def element_ipv6_get(field_type, element, stats_dict):
265 """ Extract src and dst from a dump-flow element."""
266
267 fmt = "%s(src=%s,dst=%s)"
268 element_show = fmt % (field_type, element["src"], element["dst"])
269
270 element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271 ipv6_to_network(element["dst"]))
272
273 return SumData(field_type, element_show, stats_dict["packets"],
274 stats_dict["bytes"], element_key)
275
276
277def element_dst_port_get(field_type, element, stats_dict):
278 """ Extract src and dst from a dump-flow element."""
279 element_key = "%s(dst=%s)" % (field_type, element["dst"])
280 return SumData(field_type, element_key, stats_dict["packets"],
281 stats_dict["bytes"], element_key)
282
283
284def element_passthrough_get(field_type, element, stats_dict):
285 """ Extract src and dst from a dump-flow element."""
286 element_key = "%s(%s)" % (field_type, element)
287 return SumData(field_type, element_key,
288 stats_dict["packets"], stats_dict["bytes"], element_key)
289
290
291# pylint: disable-msg=R0903
292class OutputFormat:
293 """ Holds field_type and function to extract element value. """
294 def __init__(self, field_type, generator):
295 self.field_type = field_type
296 self.generator = generator
297
3a2742a1
MH
298##
299# The order below is important. The initial flow field depends on whether
300# --script or top mode is used. In top mode, the expected behavior, in_port
301# flow fields are shown first. A future feature will allow users to
302# filter output by selecting a row. Filtering by in_port is a natural
303# filtering starting point.
304#
305# In script mode, all fields are shown. The expectation is that users could
306# filter output by piping through grep.
307#
308# In top mode, the default flow field is in_port. In --script mode,
309# the default flow field is all.
310#
311# All is added to the end of the OUTPUT_FORMAT list.
312##
14b4c575 313OUTPUT_FORMAT = [
3a2742a1 314 OutputFormat("in_port", element_passthrough_get),
14b4c575 315 OutputFormat("eth", element_eth_get),
3a2742a1 316 OutputFormat("eth_type", element_passthrough_get),
14b4c575
MH
317 OutputFormat("ipv4", element_ipv4_get),
318 OutputFormat("ipv6", element_ipv6_get),
14b4c575
MH
319 OutputFormat("udp", element_dst_port_get),
320 OutputFormat("tcp", element_dst_port_get),
3a2742a1 321 OutputFormat("tunnel", element_tunnel_get),
14b4c575 322 ]
3a2742a1 323##
14b4c575
MH
324
325
326ELEMENT_KEY = {
327 "udp": "udp.dst",
328 "tcp": "tcp.dst"
329 }
330
331
332def top_input_get(args):
333 """ Return subprocess stdout."""
334 cmd = []
335 if (args.host):
336 cmd += ["ssh", args.host]
337 cmd += ["ovs-dpctl", "dump-flows"]
338
339 return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
340 stdout=subprocess.PIPE).stdout
341
342
343def args_get():
344 """ read program parameters handle any necessary validation of input. """
345
346 parser = argparse.ArgumentParser(
347 formatter_class=argparse.RawDescriptionHelpFormatter,
348 description=__doc__)
349 ##
350 # None is a special value indicating to read flows from stdin.
351 # This handles the case
352 # ovs-dpctl dump-flows | ovs-dpctl-flows.py
353 parser.add_argument("-v", "--version", version="@VERSION@",
354 action="version", help="show version")
355 parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
356 action="append",
357 help="file containing flows from ovs-dpctl dump-flow")
358 parser.add_argument("-V", "--verbose", dest="verbose",
359 default=logging.CRITICAL,
360 action="store_const", const=logging.DEBUG,
361 help="enable debug level verbosity")
362 parser.add_argument("-s", "--script", dest="top", action="store_false",
363 help="Run from a script (no user interface)")
364 parser.add_argument("--host", dest="host",
365 help="Specify a user@host for retrieving flows see"
366 "Accessing Remote Hosts for more information")
367
368 parser.add_argument("-a", "--accumulate", dest="accumulate",
369 action="store_true", default=False,
370 help="Accumulate dump-flow content")
371 parser.add_argument("--accumulate-decay", dest="accumulateDecay",
372 default=5.0 * 60, type=float,
373 help="Decay old accumulated flows. "
374 "The default is 5 minutes. "
375 "A value of 0 disables decay.")
376 parser.add_argument("-d", "--delay", dest="delay", type=int,
377 default=1000,
378 help="Delay in milliseconds to collect dump-flow "
379 "content (sample rate).")
380
381 args = parser.parse_args()
382
383 logging.basicConfig(level=args.verbose)
384
385 return args
386
387###
388# Code to parse a single line in dump-flow
389###
390# key(values)
391FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
392# key:value
393FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
394FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
395
396
397def flow_line_iter(line):
398 """ iterate over flow dump elements.
399 return tuples of (true, element) or (false, remaining element)
400 """
401 # splits by , except for when in a (). Actions element was not
402 # split properly but we don't need it.
403 rc = []
404
405 element = ""
406 paren_count = 0
407
408 for ch in line:
409 if (ch == '('):
410 paren_count += 1
411 elif (ch == ')'):
412 paren_count -= 1
413
414 if (ch == ' '):
415 # ignore white space.
416 continue
417 elif ((ch == ',') and (paren_count == 0)):
418 rc.append(element)
419 element = ""
420 else:
421 element += ch
422
423 if (paren_count):
424 raise ValueError(line)
425 else:
426 if (len(element) > 0):
427 rc.append(element)
428 return rc
429
430
431def flow_line_compound_parse(compound):
432 """ Parse compound element
433 for example
434 src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
435 which is in
436 eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
437 """
438 result = {}
439 for element in flow_line_iter(compound):
440 match = FIELDS_CMPND_ELEMENT.search(element)
441 if (match):
442 key = match.group(1)
443 value = match.group(2)
444 result[key] = value
445
446 match = FIELDS_CMPND.search(element)
447 if (match):
448 key = match.group(1)
449 value = match.group(2)
450 result[key] = flow_line_compound_parse(value)
451 continue
452
453 if (len(result.keys()) == 0):
454 return compound
455 return result
456
457
458def flow_line_split(line):
459 """ Convert a flow dump line into ([fields], [stats], actions) tuple.
460 Where fields and stats are lists.
461 This function relies on a the following ovs-dpctl dump-flow
462 output characteristics:
463 1. The dumpe flow line consists of a list of frame fields, list of stats
464 and action.
465 2. list of frame fields, each stat and action field are delimited by ', '.
466 3. That all other non stat field are not delimited by ', '.
467
468 """
469
470 results = re.split(', ', line)
471
472 (field, stats, action) = (results[0], results[1:-1], results[-1])
473
474 fields = flow_line_iter(field)
475 return (fields, stats, action)
476
477
478def elements_to_dict(elements):
479 """ Convert line to a hierarchy of dictionaries. """
480 result = {}
481 for element in elements:
482 match = FIELDS_CMPND.search(element)
483 if (match):
484 key = match.group(1)
485 value = match.group(2)
486 result[key] = flow_line_compound_parse(value)
487 continue
488
489 match = FIELDS_ELEMENT.search(element)
490 if (match):
491 key = match.group(1)
492 value = match.group(2)
493 result[key] = value
494 else:
495 raise ValueError("can't parse >%s<" % element)
496 return result
497
498
499# pylint: disable-msg=R0903
500class SumData(object):
501 """ Interface that all data going into SumDb must implement.
502 Holds the flow field and its corresponding count, total packets,
503 total bytes and calculates average.
504
505 __repr__ is used as key into SumData singleton.
506 __str__ is used as human readable output.
507 """
508
509 def __init__(self, field_type, field, packets, flow_bytes, key):
510 # Count is the number of lines in the dump-flow log.
511 self.field_type = field_type
512 self.field = field
513 self.count = 1
514 self.packets = int(packets)
515 self.bytes = int(flow_bytes)
516 self.key = key
517
518 def decrement(self, decr_packets, decr_bytes, decr_count):
519 """ Decrement content to calculate delta from previous flow sample."""
520 self.packets -= decr_packets
521 self.bytes -= decr_bytes
522 self.count -= decr_count
523
524 def __iadd__(self, other):
525 """ Add two objects. """
526
527 if (self.key != other.key):
528 raise ValueError("adding two unrelated types")
529
530 self.count += other.count
531 self.packets += other.packets
532 self.bytes += other.bytes
533 return self
534
535 def __isub__(self, other):
536 """ Decrement two objects. """
537
538 if (self.key != other.key):
539 raise ValueError("adding two unrelated types")
540
541 self.count -= other.count
542 self.packets -= other.packets
543 self.bytes -= other.bytes
544 return self
545
546 def __getattr__(self, name):
547 """ Handle average. """
548 if (name == "average"):
549 if (self.packets == 0):
550 return float(0.0)
551 else:
552 return float(self.bytes) / float(self.packets)
553 raise AttributeError(name)
554
555 def __str__(self):
556 """ Used for debugging. """
557 return "%s %s %s %s" % (self.field, self.count,
558 self.packets, self.bytes)
559
560 def __repr__(self):
561 """ Used as key in the FlowDB table. """
562 return self.key
563
564
565def flow_aggregate(fields_dict, stats_dict):
566 """ Search for content in a line.
567 Passed the flow port of the dump-flows plus the current stats consisting
568 of packets, bytes, etc
569 """
570 result = []
571
572 for output_format in OUTPUT_FORMAT:
573 field = fields_dict.get(output_format.field_type, None)
574 if (field):
575 obj = output_format.generator(output_format.field_type,
576 field, stats_dict)
577 result.append(obj)
578
579 return result
580
581
582def flows_read(ihdl, flow_db):
583 """ read flow content from ihdl and insert into flow_db. """
584
585 done = False
586 while (not done):
587 line = ihdl.readline()
588 if (len(line) == 0):
589 # end of input
590 break
591
592 try:
593 flow_db.flow_line_add(line)
594 except ValueError, arg:
595 logging.error(arg)
596
597 return flow_db
598
599
600def get_terminal_size():
601 """
602 return column width and height of the terminal
603 """
604 for fd_io in [0, 1, 2]:
605 try:
606 result = struct.unpack('hh',
607 fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
608 '1234'))
609 except IOError:
610 result = None
611 continue
612
613 if (result is None or result == (0, 0)):
614 # Maybe we can't get the width. In that case assume (25, 80)
615 result = (25, 80)
616
617 return result
618
619##
620# Content derived from:
621# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
622##
623SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
624 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
625
626
627def approximate_size(size, a_kilobyte_is_1024_bytes=True):
628 """Convert a file size to human-readable form.
629
630 Keyword arguments:
631 size -- file size in bytes
632 a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
633 if False, use multiples of 1000
634
635 Returns: string
636
637 """
638 size = float(size)
639 if size < 0:
640 raise ValueError('number must be non-negative')
641
642 if (a_kilobyte_is_1024_bytes):
643 multiple = 1024
644 else:
645 multiple = 1000
646 for suffix in SUFFIXES[multiple]:
647 size /= multiple
648 if size < multiple:
649 return "%.1f %s" % (size, suffix)
650
651 raise ValueError('number too large')
652
653
654##
655# End copied content
656##
657class ColMeta:
658 """ Concepts about columns. """
659 def __init__(self, sortable, width):
660 self.sortable = sortable
661 self.width = width
662
663
664class RowMeta:
665 """ How to render rows. """
666 def __init__(self, label, fmt):
667 self.label = label
668 self.fmt = fmt
669
670
671def fmt_packet(obj, width):
672 """ Provide a string for packets that is appropriate for output."""
673 return str(obj.packets).rjust(width)
674
675
676def fmt_count(obj, width):
677 """ Provide a string for average that is appropriate for output."""
678 return str(obj.count).rjust(width)
679
680
681def fmt_avg(obj, width):
682 """ Provide a string for average that is appropriate for output."""
683 return str(int(obj.average)).rjust(width)
684
685
686def fmt_field(obj, width):
687 """ truncate really long flow and insert ellipses to help make it
688 clear.
689 """
690
691 ellipses = " ... "
692 value = obj.field
693 if (len(obj.field) > width):
694 value = value[:(width - len(ellipses))] + ellipses
695 return value.ljust(width)
696
697
698def fmt_bytes(obj, width):
699 """ Provide a string for average that is appropriate for output."""
700 if (len(str(obj.bytes)) <= width):
701 value = str(obj.bytes)
702 else:
703 value = approximate_size(obj.bytes)
704 return value.rjust(width)
705
706
707def title_center(value, width):
708 """ Center a column title."""
709 return value.upper().center(width)
710
711
712def title_rjust(value, width):
713 """ Right justify a column title. """
714 return value.upper().rjust(width)
715
716
717def column_picker(order, obj):
718 """ return the column as specified by order. """
719 if (order == 1):
720 return obj.count
721 elif (order == 2):
722 return obj.packets
723 elif (order == 3):
724 return obj.bytes
725 elif (order == 4):
726 return obj.average
727 else:
728 raise ValueError("order outside of range %s" % order)
729
730
731class Render:
3a2742a1
MH
732 """ Renders flow data.
733
734 The two FIELD_SELECT variables should be set to the actual field minus
735 1. During construction, an internal method increments and initializes
736 this object.
737 """
738 FLOW_FIELDS = [_field.field_type for _field in OUTPUT_FORMAT] + ["all"]
739
740 FIELD_SELECT_SCRIPT = 7
741 FIELD_SELECT_TOP = -1
742
743 def __init__(self, console_width, field_select):
14b4c575
MH
744 """ Calculate column widths taking into account changes in format."""
745
746 self._start_time = datetime.datetime.now()
747
748 self._cols = [ColMeta(False, 0),
749 ColMeta(True, Columns.VALUE_WIDTH),
750 ColMeta(True, Columns.VALUE_WIDTH),
751 ColMeta(True, Columns.VALUE_WIDTH),
752 ColMeta(True, Columns.VALUE_WIDTH)]
753 self._console_width = console_width
754 self.console_width_set(console_width)
755
756 # Order in this array dictate the order of the columns.
757 # The 0 width for the first entry is a place holder. This is
758 # dynamically calculated. The first column is special. We need a
759 # way to indicate which field are presented.
760 self._descs = [RowMeta("", title_rjust),
761 RowMeta("", title_rjust),
762 RowMeta("", title_rjust),
763 RowMeta("", title_rjust),
764 RowMeta("", title_rjust)]
765 self._column_sort_select = 0
766 self.column_select_event()
767
768 self._titles = [
769 RowMeta(Columns.FIELDS, title_center),
770 RowMeta(Columns.COUNT, title_rjust),
771 RowMeta(Columns.PACKETS, title_rjust),
772 RowMeta(Columns.BYTES, title_rjust),
773 RowMeta(Columns.AVERAGE, title_rjust)
774 ]
775
776 self._datas = [
777 RowMeta(None, fmt_field),
778 RowMeta(None, fmt_count),
779 RowMeta(None, fmt_packet),
780 RowMeta(None, fmt_bytes),
781 RowMeta(None, fmt_avg)
782 ]
783
784 ##
785 # _field_types hold which fields are displayed in the field
786 # column, with the keyword all implying all fields.
787 ##
3a2742a1 788 self._field_types = Render.FLOW_FIELDS
14b4c575
MH
789
790 ##
791 # The default is to show all field types.
792 ##
3a2742a1 793 self._field_type_select = field_select
14b4c575
MH
794 self.field_type_toggle()
795
796 def _field_type_select_get(self):
797 """ Return which field type to display. """
798 return self._field_types[self._field_type_select]
799
800 def field_type_toggle(self):
801 """ toggle which field types to show. """
802 self._field_type_select += 1
803 if (self._field_type_select >= len(self._field_types)):
804 self._field_type_select = 0
805 value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
806 self._titles[0].label = value
807
808 def column_select_event(self):
809 """ Handles column select toggle. """
810
811 self._descs[self._column_sort_select].label = ""
812 for _ in range(len(self._cols)):
813 self._column_sort_select += 1
814 if (self._column_sort_select >= len(self._cols)):
815 self._column_sort_select = 0
816
817 # Now look for the next sortable column
818 if (self._cols[self._column_sort_select].sortable):
819 break
820 self._descs[self._column_sort_select].label = "DESC"
821
822 def console_width_set(self, console_width):
823 """ Adjust the output given the new console_width. """
824 self._console_width = console_width
825
826 spaces = len(self._cols) - 1
827 ##
828 # Calculating column width can be tedious but important. The
829 # flow field value can be long. The goal here is to dedicate
830 # fixed column space for packets, bytes, average and counts. Give the
831 # remaining space to the flow column. When numbers get large
832 # transition output to output generated by approximate_size which
833 # limits output to ###.# XiB in other words 9 characters.
834 ##
835 # At this point, we know the maximum length values. We may
836 # truncate the flow column to get everything to fit.
837 self._cols[0].width = 0
838 values_max_length = sum([ii.width for ii in self._cols]) + spaces
839 flow_max_length = console_width - values_max_length
840 self._cols[0].width = flow_max_length
841
842 def format(self, flow_db):
843 """ shows flows based on --script parameter."""
844
845 rc = []
846 ##
847 # Top output consists of
848 # Title
849 # Column title (2 rows)
850 # data
851 # statistics and status
852
853 ##
854 # Title
855 ##
856 rc.append("Flow Summary".center(self._console_width))
857
858 stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
859 flow_db.flow_stats_get()
860 accumulate = flow_db.accumulate_get()
861 if (accumulate):
862 stats += "Accumulate: on "
863 else:
864 stats += "Accumulate: off "
865
866 duration = datetime.datetime.now() - self._start_time
867 stats += "Duration: %s " % str(duration)
868 rc.append(stats.ljust(self._console_width))
869
870 ##
871 # 2 rows for columns.
872 ##
873 # Indicate which column is in descending order.
874 rc.append(" ".join([ii.fmt(ii.label, col.width)
875 for (ii, col) in zip(self._descs, self._cols)]))
876
877 rc.append(" ".join([ii.fmt(ii.label, col.width)
878 for (ii, col) in zip(self._titles, self._cols)]))
879
880 ##
881 # Data.
882 ##
883 for dd in flow_db.field_values_in_order(self._field_type_select_get(),
884 self._column_sort_select):
885 rc.append(" ".join([ii.fmt(dd, col.width)
886 for (ii, col) in zip(self._datas,
887 self._cols)]))
888
889 return rc
890
891
892def curses_screen_begin():
893 """ begin curses screen control. """
894 stdscr = curses.initscr()
895 curses.cbreak()
896 curses.noecho()
897 stdscr.keypad(1)
898 return stdscr
899
900
901def curses_screen_end(stdscr):
902 """ end curses screen control. """
903 curses.nocbreak()
904 stdscr.keypad(0)
905 curses.echo()
906 curses.endwin()
907
908
909class FlowDB:
910 """ Implements live vs accumulate mode.
911
912 Flows are stored as key value pairs. The key consists of the content
913 prior to stat fields. The value portion consists of stats in a dictionary
914 form.
915
916 @ \todo future add filtering here.
917 """
918 def __init__(self, accumulate):
919 self._accumulate = accumulate
920 self._error_count = 0
921 # Values are (stats, last update time.)
922 # The last update time is used for aging.
923 self._flow_lock = threading.Lock()
924 # This dictionary holds individual flows.
925 self._flows = {}
926 # This dictionary holds aggregate of flow fields.
927 self._fields = {}
928
929 def accumulate_get(self):
930 """ Return the current accumulate state. """
931 return self._accumulate
932
933 def accumulate_toggle(self):
934 """ toggle accumulate flow behavior. """
935 self._accumulate = not self._accumulate
936
937 def begin(self):
938 """ Indicate the beginning of processing flow content.
939 if accumulate is false clear current set of flows. """
940
941 if (not self._accumulate):
942 self._flow_lock.acquire()
943 try:
944 self._flows.clear()
945 finally:
946 self._flow_lock.release()
947 self._fields.clear()
948
949 def flow_line_add(self, line):
950 """ Split a line from a ovs-dpctl dump-flow into key and stats.
951 The order of the content in the flow should be:
952 - flow content
953 - stats for the flow
954 - actions
955
956 This method also assumes that the dump flow output does not
957 change order of fields of the same flow.
958 """
959
960 line = line.rstrip("\n")
961 (fields, stats, _) = flow_line_split(line)
962
963 try:
964 fields_dict = elements_to_dict(fields)
965
966 if (len(fields_dict) == 0):
967 raise ValueError("flow fields are missing %s", line)
968
969 stats_dict = elements_to_dict(stats)
970 if (len(stats_dict) == 0):
971 raise ValueError("statistics are missing %s.", line)
972
973 ##
974 # In accumulate mode, the Flow database can reach 10,000's of
975 # persistent flows. The interaction of the script with this many
976 # flows is too slow. Instead, delta are sent to the flow_db
977 # database allow incremental changes to be done in O(m) time
978 # where m is the current flow list, instead of iterating over
979 # all flows in O(n) time where n is the entire history of flows.
980 key = ",".join(fields)
981
982 self._flow_lock.acquire()
983 try:
984 (stats_old_dict, _) = self._flows.get(key, (None, None))
985 finally:
986 self._flow_lock.release()
987
988 self.flow_event(fields_dict, stats_old_dict, stats_dict)
989
990 except ValueError, arg:
991 logging.error(arg)
992 self._error_count += 1
993 raise
994
995 self._flow_lock.acquire()
996 try:
997 self._flows[key] = (stats_dict, datetime.datetime.now())
998 finally:
999 self._flow_lock.release()
1000
1001 def decay(self, decayTimeInSeconds):
1002 """ Decay content. """
1003 now = datetime.datetime.now()
1004 for (key, value) in self._flows.items():
1005 (stats_dict, updateTime) = value
1006 delta = now - updateTime
1007
1008 if (delta.seconds > decayTimeInSeconds):
1009 self._flow_lock.acquire()
1010 try:
1011 del self._flows[key]
1012
1013 fields_dict = elements_to_dict(flow_line_iter(key))
1014 matches = flow_aggregate(fields_dict, stats_dict)
1015 for match in matches:
1016 self.field_dec(match)
1017
1018 finally:
1019 self._flow_lock.release()
1020
1021 def flow_stats_get(self):
1022 """ Return statistics in a form of a dictionary. """
1023 rc = None
1024 self._flow_lock.acquire()
1025 try:
1026 rc = {"flow_total": len(self._flows),
1027 "flow_errors": self._error_count}
1028 finally:
1029 self._flow_lock.release()
1030 return rc
1031
1032 def field_types_get(self):
1033 """ Return the set of types stored in the singleton. """
1034 types = set((ii.field_type for ii in self._fields.values()))
1035 return types
1036
1037 def field_add(self, data):
1038 """ Collect dump-flow data to sum number of times item appears. """
1039 current = self._fields.get(repr(data), None)
1040 if (current is None):
1041 current = copy.copy(data)
1042 else:
1043 current += data
1044 self._fields[repr(current)] = current
1045
1046 def field_dec(self, data):
1047 """ Collect dump-flow data to sum number of times item appears. """
1048 current = self._fields.get(repr(data), None)
1049 if (current is None):
1050 raise ValueError("decrementing field missing %s" % repr(data))
1051
1052 current -= data
1053 self._fields[repr(current)] = current
1054 if (current.count == 0):
1055 del self._fields[repr(current)]
1056
1057 def field_values_in_order(self, field_type_select, column_order):
1058 """ Return a list of items in order maximum first. """
1059 values = self._fields.values()
1060 if (field_type_select != "all"):
1061 # If a field type other than "all" then reduce the list.
1062 values = [ii for ii in values
1063 if (ii.field_type == field_type_select)]
1064 values = [(column_picker(column_order, ii), ii) for ii in values]
1065 values.sort(key=operator.itemgetter(0))
1066 values.reverse()
1067 values = [ii[1] for ii in values]
1068 return values
1069
1070 def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1071 """ Receives new flow information. """
1072
1073 # In order to avoid processing every flow at every sample
1074 # period, changes in flow packet count is used to determine the
1075 # delta in the flow statistics. This delta is used in the call
1076 # to self.decrement prior to self.field_add
1077
1078 if (stats_old_dict is None):
1079 # This is a new flow
1080 matches = flow_aggregate(fields_dict, stats_new_dict)
1081 for match in matches:
1082 self.field_add(match)
1083 else:
1084 old_packets = int(stats_old_dict.get("packets", 0))
1085 new_packets = int(stats_new_dict.get("packets", 0))
1086 if (old_packets == new_packets):
1087 # ignore. same data.
1088 pass
1089 else:
1090 old_bytes = stats_old_dict.get("bytes", 0)
1091 # old_packets != new_packets
1092 # if old_packets > new_packets then we end up decrementing
1093 # packets and bytes.
1094 matches = flow_aggregate(fields_dict, stats_new_dict)
1095 for match in matches:
1096 match.decrement(int(old_packets), int(old_bytes), 1)
1097 self.field_add(match)
1098
1099
1100class DecayThread(threading.Thread):
1101 """ Periodically call flow database to see if any flows are old. """
1102 def __init__(self, flow_db, interval):
1103 """ Start decay thread. """
1104 threading.Thread.__init__(self)
1105
1106 self._interval = max(1, interval)
1107 self._min_interval = min(1, interval / 10)
1108 self._flow_db = flow_db
1109 self._event = threading.Event()
1110 self._running = True
1111
1112 self.daemon = True
1113
1114 def run(self):
1115 """ Worker thread which handles decaying accumulated flows. """
1116
1117 while(self._running):
1118 self._event.wait(self._min_interval)
1119 if (self._running):
1120 self._flow_db.decay(self._interval)
1121
1122 def stop(self):
1123 """ Stop thread. """
1124 self._running = False
1125 self._event.set()
1126 ##
1127 # Give the calling thread time to terminate but not too long.
1128 # this thread is a daemon so the application will terminate if
1129 # we timeout during the join. This is just a cleaner way to
1130 # release resources.
1131 self.join(2.0)
1132
1133
1134def flow_top_command(stdscr, render, flow_db):
1135 """ Handle input while in top mode. """
1136 ch = stdscr.getch()
1137 ##
1138 # Any character will restart sampling.
1139 if (ch == ord('h')):
1140 # halt output.
1141 ch = stdscr.getch()
1142 while (ch == -1):
1143 ch = stdscr.getch()
1144
1145 if (ch == ord('s')):
1146 # toggle which column sorts data in descending order.
1147 render.column_select_event()
1148 elif (ch == ord('a')):
1149 flow_db.accumulate_toggle()
1150 elif (ch == ord('f')):
1151 render.field_type_toggle()
1152 elif (ch == ord(' ')):
1153 # resample
1154 pass
1155
1156 return ch
1157
1158
1159def decay_timer_start(flow_db, accumulateDecay):
1160 """ If accumulateDecay greater than zero then start timer. """
1161 if (accumulateDecay > 0):
1162 decay_timer = DecayThread(flow_db, accumulateDecay)
1163 decay_timer.start()
1164 return decay_timer
1165 else:
1166 return None
1167
1168
1169def flows_top(args):
1170 """ handles top like behavior when --script is not specified. """
1171
1172 flow_db = FlowDB(args.accumulate)
3a2742a1 1173 render = Render(0, Render.FIELD_SELECT_TOP)
14b4c575
MH
1174
1175 decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1176 lines = []
1177
1178 try:
1179 stdscr = curses_screen_begin()
1180 try:
1181 ch = 'X'
1182 #stdscr.nodelay(1)
1183 stdscr.timeout(args.delay)
1184
1185 while (ch != ord('q')):
1186 flow_db.begin()
1187
1188 try:
1189 ihdl = top_input_get(args)
1190 try:
1191 flows_read(ihdl, flow_db)
1192 finally:
1193 ihdl.close()
1194 except OSError, arg:
1195 logging.critical(arg)
1196 break
1197
1198 (console_height, console_width) = stdscr.getmaxyx()
1199 render.console_width_set(console_width)
1200
1201 output_height = console_height - 1
1202 line_count = range(output_height)
1203 line_output = render.format(flow_db)
1204 lines = zip(line_count, line_output[:output_height])
1205
1206 stdscr.erase()
1207 for (count, line) in lines:
1208 stdscr.addstr(count, 0, line[:console_width])
1209 stdscr.refresh()
1210
1211 ch = flow_top_command(stdscr, render, flow_db)
1212
1213 finally:
1214 curses_screen_end(stdscr)
1215 except KeyboardInterrupt:
1216 pass
1217 if (decay_timer):
1218 decay_timer.stop()
1219
1220 # repeat output
1221 for (count, line) in lines:
1222 print line
1223
1224
1225def flows_script(args):
1226 """ handles --script option. """
1227
1228 flow_db = FlowDB(args.accumulate)
1229 flow_db.begin()
1230
1231 if (args.flowFiles is None):
1232 logging.info("reading flows from stdin")
1233 ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1234 try:
1235 flow_db = flows_read(ihdl, flow_db)
1236 finally:
1237 ihdl.close()
1238 else:
1239 for flowFile in args.flowFiles:
1240 logging.info("reading flows from %s", flowFile)
1241 ihdl = open(flowFile, "r")
1242 try:
1243 flow_db = flows_read(ihdl, flow_db)
1244 finally:
1245 ihdl.close()
1246
1247 (_, console_width) = get_terminal_size()
3a2742a1 1248 render = Render(console_width, Render.FIELD_SELECT_SCRIPT)
14b4c575
MH
1249
1250 for line in render.format(flow_db):
1251 print line
1252
1253
1254def main():
1255 """ Return 0 on success or 1 on failure.
1256
1257 Algorithm
1258 There are four stages to the process ovs-dpctl dump-flow content.
1259 1. Retrieve current input
1260 2. store in FlowDB and maintain history
1261 3. Iterate over FlowDB and aggregating stats for each flow field
1262 4. present data.
1263
1264 Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1265 is called. Future version will have more elaborate means for collecting
1266 dump-flow content. FlowDB returns all data as in the form of a hierarchical
1267 dictionary. Input will vary.
1268
1269 In the case of accumulate mode, flows are not purged from the FlowDB
1270 manager. Instead at the very least, merely the latest statistics are
1271 kept. In the case, of live output the FlowDB is purged prior to sampling
1272 data.
1273
1274 Aggregating results requires identify flow fields to aggregate out
1275 of the flow and summing stats.
1276
1277 """
1278 args = args_get()
1279
1280 try:
1281 if (args.top):
1282 flows_top(args)
1283 else:
1284 flows_script(args)
1285 except KeyboardInterrupt:
1286 return 1
1287 return 0
1288
1289if __name__ == '__main__':
1290 sys.exit(main())
1291elif __name__ == 'ovs-dpctl-top':
1292 # pylint: disable-msg=R0915
1293
1294 ##
1295 # Test case beyond this point.
1296 # pylint: disable-msg=R0904
1297 class TestsuiteFlowParse(unittest.TestCase):
1298 """
1299 parse flow into hierarchy of dictionaries.
1300 """
1301 def test_flow_parse(self):
1302 """ test_flow_parse. """
1303 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1304 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1305 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1306 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1307 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1308 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1309 "38,41,44,47,50,53,56,59,62,65"
1310
1311 (fields, stats, _) = flow_line_split(line)
1312 flow_dict = elements_to_dict(fields + stats)
1313 self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1314 self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1315 self.assertEqual(flow_dict["ipv6"]["src"],
1316 "fe80::55bf:fe42:bc96:2812")
1317 self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1318 self.assertEqual(flow_dict["packets"], "1")
1319 self.assertEqual(flow_dict["bytes"], "92")
1320
1321 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1322 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1323 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1324 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1325 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1326 "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1327 "38,41,44,47,50,53,56,59,62,65"
1328
1329 (fields, stats, _) = flow_line_split(line)
1330 flow_dict = elements_to_dict(fields + stats)
1331 self.assertEqual(flow_dict["used"], "-0.703s")
1332 self.assertEqual(flow_dict["packets"], "1")
1333 self.assertEqual(flow_dict["bytes"], "92")
1334
1335 def test_flow_sum(self):
1336 """ test_flow_sum. """
1337 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1338 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1339 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1340 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1341 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1342 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1343 "38,41,44,47,50,53,56,59,62,65"
1344
1345 (fields, stats, _) = flow_line_split(line)
1346 stats_dict = elements_to_dict(stats)
1347 fields_dict = elements_to_dict(fields)
1348 ##
1349 # Test simple case of one line.
1350 flow_db = FlowDB(False)
1351 matches = flow_aggregate(fields_dict, stats_dict)
1352 for match in matches:
1353 flow_db.field_add(match)
1354
1355 flow_types = flow_db.field_types_get()
1356 expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1357 self.assert_(len(flow_types) == len(expected_flow_types))
1358 for flow_type in flow_types:
1359 self.assertTrue(flow_type in expected_flow_types)
1360
1361 for flow_type in flow_types:
1362 sum_value = flow_db.field_values_in_order("all", 1)
1363 self.assert_(len(sum_value) == 5)
1364 self.assert_(sum_value[0].packets == 2)
1365 self.assert_(sum_value[0].count == 1)
1366 self.assert_(sum_value[0].bytes == 92)
1367
1368 ##
1369 # Add line again just to see counts go up.
1370 matches = flow_aggregate(fields_dict, stats_dict)
1371 for match in matches:
1372 flow_db.field_add(match)
1373
1374 flow_types = flow_db.field_types_get()
1375 self.assert_(len(flow_types) == len(expected_flow_types))
1376 for flow_type in flow_types:
1377 self.assertTrue(flow_type in expected_flow_types)
1378
1379 for flow_type in flow_types:
1380 sum_value = flow_db.field_values_in_order("all", 1)
1381 self.assert_(len(sum_value) == 5)
1382 self.assert_(sum_value[0].packets == 4)
1383 self.assert_(sum_value[0].count == 2)
1384 self.assert_(sum_value[0].bytes == 2 * 92)
1385
1386 def test_assoc_list(self):
1387 """ test_assoc_list. """
1388 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1389 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1390 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1391 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1392 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1393 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1394 "38,41,44,47,50,53,56,59,62,65"
1395
1396 valid_flows = [
1397 'eth_type(0x86dd)',
1398 'udp(dst=5355)',
1399 'in_port(4)',
1400 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1401 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1402 ]
1403
1404 (fields, stats, _) = flow_line_split(line)
1405 stats_dict = elements_to_dict(stats)
1406 fields_dict = elements_to_dict(fields)
1407
1408 ##
1409 # Test simple case of one line.
1410 flow_db = FlowDB(False)
1411 matches = flow_aggregate(fields_dict, stats_dict)
1412 for match in matches:
1413 flow_db.field_add(match)
1414
1415 for sum_value in flow_db.field_values_in_order("all", 1):
1416 assoc_list = Columns.assoc_list(sum_value)
1417 for item in assoc_list:
1418 if (item[0] == "fields"):
1419 self.assertTrue(item[1] in valid_flows)
1420 elif (item[0] == "packets"):
1421 self.assertTrue(item[1] == 2)
1422 elif (item[0] == "count"):
1423 self.assertTrue(item[1] == 1)
1424 elif (item[0] == "average"):
1425 self.assertTrue(item[1] == 46.0)
1426 elif (item[0] == "bytes"):
1427 self.assertTrue(item[1] == 92)
1428 else:
1429 raise ValueError("unknown %s", item[0])
1430
1431 def test_human_format(self):
1432 """ test_assoc_list. """
1433
1434 self.assertEqual(approximate_size(0.0), "0.0 KiB")
1435 self.assertEqual(approximate_size(1024), "1.0 KiB")
1436 self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1437 self.assertEqual(approximate_size((1024 * 1024) + 100000),
1438 "1.1 MiB")
1439 value = (1024 * 1024 * 1024) + 100000000
1440 self.assertEqual(approximate_size(value), "1.1 GiB")
1441
1442 def test_flow_line_split(self):
1443 """ Splitting a flow line is not trivial.
1444 There is no clear delimiter. Comma is used liberally."""
1445 expected_fields = ["in_port(4)",
1446 "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1447 "eth_type(0x86dd)",
1448 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1449 "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1450 "udp(src=61252,dst=5355)"]
1451 expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1452 expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1453 "38,41,44,47,50,53,56,59,62,65"
1454
1455 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1456 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1457 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1458 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1459 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1460 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1461 "38,41,44,47,50,53,56,59,62,65"
1462
1463 (fields, stats, actions) = flow_line_split(line)
1464
1465 self.assertEqual(fields, expected_fields)
1466 self.assertEqual(stats, expected_stats)
1467 self.assertEqual(actions, expected_actions)
1468
1469 def test_accumulate_decay(self):
1470 """ test_accumulate_decay: test accumulated decay. """
1471 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1472 "dst=ff:ff:ff:ff:ff:ff),"
1473 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1474 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1475 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1476 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1477 "packets:1, bytes:120, used:0.004s, actions:1"]
1478
1479 flow_db = FlowDB(True)
1480 flow_db.begin()
1481 flow_db.flow_line_add(lines[0])
1482
1483 # Make sure we decay
1484 time.sleep(4)
1485 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1486 flow_db.decay(1)
1487 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1488
1489 flow_db.flow_line_add(lines[0])
1490 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1491 flow_db.decay(30)
1492 # Should not be deleted.
1493 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1494
1495 flow_db.flow_line_add(lines[0])
1496 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1497 timer = decay_timer_start(flow_db, 2)
1498 time.sleep(10)
1499 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1500 timer.stop()
1501
1502 def test_accumulate(self):
1503 """ test_accumulate test that FlowDB supports accumulate. """
1504
1505 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1506 "dst=ff:ff:ff:ff:ff:ff),"
1507 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1508 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1509 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1510 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1511 "packets:1, bytes:120, used:0.004s, actions:1",
1512 "in_port(2),"
1513 "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1514 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1515 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1516 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1517 "packets:2, bytes:5026, used:0.348s, actions:1",
1518 "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1519 "dst=ff:ff:ff:ff:ff:ff),"
1520 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1521 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1522 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1523 "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1524 "bytes:240, used:0.004s, actions:1"]
1525
1526 lines = [
1527 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1528 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1529 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1530 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1531 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1532 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1533 ]
1534
1535 # Turn on accumulate.
1536 flow_db = FlowDB(True)
1537 flow_db.begin()
1538
1539 flow_db.flow_line_add(lines[0])
1540
1541 # Test one flow exist.
1542 sum_values = flow_db.field_values_in_order("all", 1)
1543 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1544 self.assertEqual(len(in_ports), 1)
1545 self.assertEqual(in_ports[0].packets, 1)
1546 self.assertEqual(in_ports[0].bytes, 120)
1547 self.assertEqual(in_ports[0].count, 1)
1548
1549 # simulate another sample
1550 # Test two different flows exist.
1551 flow_db.begin()
1552 flow_db.flow_line_add(lines[1])
1553 sum_values = flow_db.field_values_in_order("all", 1)
1554 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1555 self.assertEqual(len(in_ports), 1)
1556 self.assertEqual(in_ports[0].packets, 1)
1557 self.assertEqual(in_ports[0].bytes, 120)
1558 self.assertEqual(in_ports[0].count, 1)
1559
1560 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1561 self.assertEqual(len(in_ports), 1)
1562 self.assertEqual(in_ports[0].packets, 2)
1563 self.assertEqual(in_ports[0].bytes, 126)
1564 self.assertEqual(in_ports[0].count, 1)
1565
1566 # Test first flow increments packets.
1567 flow_db.begin()
1568 flow_db.flow_line_add(lines[2])
1569 sum_values = flow_db.field_values_in_order("all", 1)
1570 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1571 self.assertEqual(len(in_ports), 1)
1572 self.assertEqual(in_ports[0].packets, 2)
1573 self.assertEqual(in_ports[0].bytes, 240)
1574 self.assertEqual(in_ports[0].count, 1)
1575
1576 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1577 self.assertEqual(len(in_ports), 1)
1578 self.assertEqual(in_ports[0].packets, 2)
1579 self.assertEqual(in_ports[0].bytes, 126)
1580 self.assertEqual(in_ports[0].count, 1)
1581
1582 # Test third flow but with the same in_port(1) as the first flow.
1583 flow_db.begin()
1584 flow_db.flow_line_add(lines[3])
1585 sum_values = flow_db.field_values_in_order("all", 1)
1586 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1587 self.assertEqual(len(in_ports), 1)
1588 self.assertEqual(in_ports[0].packets, 3)
1589 self.assertEqual(in_ports[0].bytes, 360)
1590 self.assertEqual(in_ports[0].count, 2)
1591
1592 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1593 self.assertEqual(len(in_ports), 1)
1594 self.assertEqual(in_ports[0].packets, 2)
1595 self.assertEqual(in_ports[0].bytes, 126)
1596 self.assertEqual(in_ports[0].count, 1)
1597
1598 # Third flow has changes.
1599 flow_db.begin()
1600 flow_db.flow_line_add(lines[4])
1601 sum_values = flow_db.field_values_in_order("all", 1)
1602 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1603 self.assertEqual(len(in_ports), 1)
1604 self.assertEqual(in_ports[0].packets, 4)
1605 self.assertEqual(in_ports[0].bytes, 480)
1606 self.assertEqual(in_ports[0].count, 2)
1607
1608 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1609 self.assertEqual(len(in_ports), 1)
1610 self.assertEqual(in_ports[0].packets, 2)
1611 self.assertEqual(in_ports[0].bytes, 126)
1612 self.assertEqual(in_ports[0].count, 1)
1613
1614 # First flow reset.
1615 flow_db.begin()
1616 flow_db.flow_line_add(lines[5])
1617 sum_values = flow_db.field_values_in_order("all", 1)
1618 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1619 self.assertEqual(len(in_ports), 1)
1620 self.assertEqual(in_ports[0].packets, 3)
1621 self.assertEqual(in_ports[0].bytes, 360)
1622 self.assertEqual(in_ports[0].count, 2)
1623
1624 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1625 self.assertEqual(len(in_ports), 1)
1626 self.assertEqual(in_ports[0].packets, 2)
1627 self.assertEqual(in_ports[0].bytes, 126)
1628 self.assertEqual(in_ports[0].count, 1)
1629
1630 def test_parse_character_errors(self):
1631 """ test_parsing errors.
1632 The flow parses is purposely loose. Its not designed to validate
1633 input. Merely pull out what it can but there are situations
1634 that a parse error can be detected.
1635 """
1636
1637 lines = ["complete garbage",
1638 "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1639 "dst=33:33:00:00:00:66),"
1640 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1641 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1642 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1643 "packets:2,bytes:5026,actions:1"]
1644
1645 flow_db = FlowDB(False)
1646 flow_db.begin()
1647 for line in lines:
1648 try:
1649 flow_db.flow_line_add(line)
1650 except ValueError:
1651 # We want an exception. That is how we know we have
1652 # correctly found a simple parsing error. We are not
1653 # looking to validate flow output just catch simple issues.
1654 continue
1655 self.assertTrue(False)
1656
1657 def test_tunnel_parsing(self):
1658 """ test_tunnel_parsing test parse flows with tunnel. """
1659 lines = [
1660 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1661 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1662 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1663 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1664 "actions:userspace(pid=4294962691,slow_path(cfm))"
1665 ]
1666 flow_db = FlowDB(False)
1667 flow_db.begin()
1668 flow_db.flow_line_add(lines[0])
1669 sum_values = flow_db.field_values_in_order("all", 1)
1670 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1671 self.assertEqual(len(in_ports), 1)
1672 self.assertEqual(in_ports[0].packets, 6)
1673 self.assertEqual(in_ports[0].bytes, 534)
1674 self.assertEqual(in_ports[0].count, 1)
1675
1676 def test_flow_multiple_paren(self):
1677 """ test_flow_multiple_paren. """
1678 line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1679 valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1680 "in_port(2)"]
1681 rc = flow_line_iter(line)
1682 self.assertEqual(valid, rc)
1683
1684 def test_to_network(self):
1685 """ test_to_network test ipv4_to_network and ipv6_to_network. """
1686 ipv4s = [
1687 ("192.168.0.1", "192.168.0.1"),
1688 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1689 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1690 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1691 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1692 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1693 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1694 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1695 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1696 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1697 ]
1698
1699 ipv6s = [
1700 ("1::192:168:0:1", "1::192:168:0:1"),
1701 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1702 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1703 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1704 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1705 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1706 ("1::192:168:0:1/::", "::")
1707 ]
1708
1709 for (ipv4_test, ipv4_check) in ipv4s:
1710 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1711
1712 for (ipv6_test, ipv6_check) in ipv6s:
1713 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
3a2742a1
MH
1714
1715 def test_ui(self):
1716 """ test_ui: test expected ui behavior. """
1717 #pylint: disable=W0212
1718 top_render = Render(80, Render.FIELD_SELECT_TOP)
1719 script_render = Render(80, Render.FIELD_SELECT_SCRIPT)
1720 self.assertEqual(top_render._field_type_select_get(), "in_port")
1721 self.assertEqual(script_render._field_type_select_get(), "all")
1722 #pylint: enable=W0212