]> git.proxmox.com Git - mirror_ovs.git/blame - utilities/ovs-dpctl-top.in
ovs-vsctl: Add conntrack zone commands.
[mirror_ovs.git] / utilities / ovs-dpctl-top.in
CommitLineData
b49a959b 1#! @PYTHON@
14b4c575
MH
2#
3# Copyright (c) 2013 Nicira, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at:
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17#
18# The approximate_size code was copied from
19# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21# used under a Creative Commons Attribution-Share-Alike license:
22# http://creativecommons.org/licenses/by-sa/3.0/
23#
24#
25
26"""Top like behavior for ovs-dpctl dump-flows output.
27
28This program summarizes ovs-dpctl flow content by aggregating the number
29of packets, total bytes and occurrence of the following fields:
30
31 - Datapath in_port
32
33 - Ethernet type
34
35 - Source and destination MAC addresses
36
37 - IP protocol
38
39 - Source and destination IPv4 addresses
40
41 - Source and destination IPv6 addresses
42
43 - UDP and TCP destination port
44
45 - Tunnel source and destination addresses
46
47
48Output shows four values:
49 - FIELDS: the flow fields for example in_port(1).
50
51 - PACKETS: the total number of packets containing the flow field.
52
53 - BYTES: the total number of bytes containing the flow field. If units are
54 not present then values are in bytes.
55
56 - AVERAGE: the average packets size (BYTES/PACKET).
57
58 - COUNT: the number of lines in the dump-flow output contain the flow field.
59
60Top Behavior
61
62While in top mode, the default behavior, the following single character
63commands are supported:
64
65 a - toggles top in accumulate and live mode. Accumulate mode is described
66 below.
67
68 s - toggles which column is used to sort content in decreasing order. A
69 DESC title is placed over the column.
70
71 _ - a space indicating to collect dump-flow content again
72
73 h - halt output. Any character will restart sampling
74
3a2742a1 75 f - cycle through flow fields. The initial field is in_port
14b4c575
MH
76
77 q - q for quit.
78
79Accumulate Mode
80
81There are two supported modes: live and accumulate. The default is live.
82The parameter --accumulate or the 'a' character in top mode enables the
83latter. In live mode, recent dump-flow content is presented.
84Where as accumulate mode keeps track of the prior historical
85information until the flow is reset not when the flow is purged. Reset
86flows are determined when the packet count for a flow has decreased from
87its previous sample. There is one caveat, eventually the system will
88run out of memory if, after the accumulate-decay period any flows that
89have not been refreshed are purged. The goal here is to free memory
90of flows that are not active. Statistics are not decremented. Their purpose
91is to reflect the overall history of the flow fields.
92
93
94Debugging Errors
95
96Parsing errors are counted and displayed in the status line at the beginning
97of the output. Use the --verbose option with --script to see what output
98 was not parsed, like this:
99$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
100
101Error messages will identify content that failed to parse.
102
103
104Access Remote Hosts
105
106The --host must follow the format user@hostname. This script simply calls
107'ssh user@Hostname' without checking for login credentials therefore public
108keys should be installed on the system identified by hostname, such as:
109
110$ ssh-copy-id user@hostname
111
112Consult ssh-copy-id man pages for more details.
113
114
115Expected usage
116
117$ ovs-dpctl-top
118
119or to run as a script:
120$ ovs-dpctl dump-flows > dump-flows.log
121$ ovs-dpctl-top --script --flow-file dump-flows.log
122
123"""
124
125# pylint: disable-msg=C0103
126# pylint: disable-msg=C0302
127# pylint: disable-msg=R0902
128# pylint: disable-msg=R0903
129# pylint: disable-msg=R0904
130# pylint: disable-msg=R0912
131# pylint: disable-msg=R0913
132# pylint: disable-msg=R0914
133
134import sys
135import os
136try:
137 ##
138 # Arg parse is not installed on older Python distributions.
139 # ovs ships with a version in the directory mentioned below.
140 import argparse
141except ImportError:
142 sys.path.append(os.path.join("@pkgdatadir@", "python"))
143 import argparse
144import logging
145import re
146import unittest
147import copy
148import curses
149import operator
150import subprocess
151import fcntl
152import struct
153import termios
154import datetime
155import threading
156import time
157import socket
158
159
160##
161# The following two definitions provide the necessary netaddr functionality.
162# Python netaddr module is not part of the core installation. Packaging
163# netaddr was involved and seems inappropriate given that only two
164# methods where used.
165def ipv4_to_network(ip_str):
166 """ Calculate the network given a ipv4/mask value.
167 If a mask is not present simply return ip_str.
168 """
169 pack_length = '!HH'
170 try:
171 (ip, mask) = ip_str.split("/")
172 except ValueError:
173 # just an ip address no mask.
174 return ip_str
175
176 ip_p = socket.inet_pton(socket.AF_INET, ip)
177 ip_t = struct.unpack(pack_length, ip_p)
178 mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
180
181 return socket.inet_ntop(socket.AF_INET,
182 struct.pack('!HH', network_n[0], network_n[1]))
183
184
185def ipv6_to_network(ip_str):
186 """ Calculate the network given a ipv6/mask value.
187 If a mask is not present simply return ip_str.
188 """
189 pack_length = '!HHHHHHHH'
190 try:
191 (ip, mask) = ip_str.split("/")
192 except ValueError:
193 # just an ip address no mask.
194 return ip_str
195
196 ip_p = socket.inet_pton(socket.AF_INET6, ip)
197 ip_t = struct.unpack(pack_length, ip_p)
198 mask_t = struct.unpack(pack_length,
199 socket.inet_pton(socket.AF_INET6, mask))
200 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
201
202 return socket.inet_ntop(socket.AF_INET6,
203 struct.pack(pack_length,
204 network_n[0], network_n[1],
205 network_n[2], network_n[3],
206 network_n[4], network_n[5],
207 network_n[6], network_n[7]))
208
209
210##
211# columns displayed
212##
213class Columns:
214 """ Holds column specific content.
215 Titles needs to be less than 8 characters.
216 """
217 VALUE_WIDTH = 9
218 FIELDS = "fields"
219 PACKETS = "packets"
220 COUNT = "count"
221 BYTES = "bytes"
222 AVERAGE = "average"
223
224 def __init__(self):
225 pass
226
227 @staticmethod
228 def assoc_list(obj):
229 """ Return a associated list. """
230 return [(Columns.FIELDS, repr(obj)),
231 (Columns.PACKETS, obj.packets),
232 (Columns.BYTES, obj.bytes),
233 (Columns.COUNT, obj.count),
234 (Columns.AVERAGE, obj.average),
235 ]
236
237
238def element_eth_get(field_type, element, stats_dict):
239 """ Extract eth frame src and dst from a dump-flow element."""
240 fmt = "%s(src=%s,dst=%s)"
241
242 element = fmt % (field_type, element["src"], element["dst"])
243 return SumData(field_type, element, stats_dict["packets"],
244 stats_dict["bytes"], element)
245
246
247def element_ipv4_get(field_type, element, stats_dict):
248 """ Extract src and dst from a dump-flow element."""
249 fmt = "%s(src=%s,dst=%s)"
250 element_show = fmt % (field_type, element["src"], element["dst"])
251
252 element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253 ipv4_to_network(element["dst"]))
254
255 return SumData(field_type, element_show, stats_dict["packets"],
256 stats_dict["bytes"], element_key)
257
258
259def element_tunnel_get(field_type, element, stats_dict):
260 """ Extract src and dst from a tunnel."""
261 return element_ipv4_get(field_type, element, stats_dict)
262
263
264def element_ipv6_get(field_type, element, stats_dict):
265 """ Extract src and dst from a dump-flow element."""
266
267 fmt = "%s(src=%s,dst=%s)"
268 element_show = fmt % (field_type, element["src"], element["dst"])
269
270 element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271 ipv6_to_network(element["dst"]))
272
273 return SumData(field_type, element_show, stats_dict["packets"],
274 stats_dict["bytes"], element_key)
275
276
277def element_dst_port_get(field_type, element, stats_dict):
278 """ Extract src and dst from a dump-flow element."""
279 element_key = "%s(dst=%s)" % (field_type, element["dst"])
280 return SumData(field_type, element_key, stats_dict["packets"],
281 stats_dict["bytes"], element_key)
282
283
284def element_passthrough_get(field_type, element, stats_dict):
285 """ Extract src and dst from a dump-flow element."""
286 element_key = "%s(%s)" % (field_type, element)
287 return SumData(field_type, element_key,
288 stats_dict["packets"], stats_dict["bytes"], element_key)
289
290
291# pylint: disable-msg=R0903
292class OutputFormat:
293 """ Holds field_type and function to extract element value. """
96b2c715 294 def __init__(self, field_type, elements, generator):
14b4c575 295 self.field_type = field_type
96b2c715 296 self.elements = elements
14b4c575
MH
297 self.generator = generator
298
3a2742a1
MH
299##
300# The order below is important. The initial flow field depends on whether
301# --script or top mode is used. In top mode, the expected behavior, in_port
302# flow fields are shown first. A future feature will allow users to
303# filter output by selecting a row. Filtering by in_port is a natural
304# filtering starting point.
305#
306# In script mode, all fields are shown. The expectation is that users could
307# filter output by piping through grep.
308#
309# In top mode, the default flow field is in_port. In --script mode,
310# the default flow field is all.
311#
312# All is added to the end of the OUTPUT_FORMAT list.
313##
14b4c575 314OUTPUT_FORMAT = [
96b2c715
JCR
315 OutputFormat("in_port", (), element_passthrough_get),
316 OutputFormat("eth", ("src","dst"), element_eth_get),
317 OutputFormat("eth_type", (), element_passthrough_get),
318 OutputFormat("ipv4", ("src","dst"), element_ipv4_get),
319 OutputFormat("ipv6", ("src","dst"), element_ipv6_get),
320 OutputFormat("udp", ("src","dst"), element_dst_port_get),
321 OutputFormat("tcp", ("src","dst"), element_dst_port_get),
322 OutputFormat("tunnel", ("src","dst"), element_tunnel_get),
14b4c575 323 ]
3a2742a1 324##
14b4c575
MH
325
326
327ELEMENT_KEY = {
328 "udp": "udp.dst",
329 "tcp": "tcp.dst"
330 }
331
332
333def top_input_get(args):
334 """ Return subprocess stdout."""
335 cmd = []
336 if (args.host):
337 cmd += ["ssh", args.host]
338 cmd += ["ovs-dpctl", "dump-flows"]
339
340 return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
341 stdout=subprocess.PIPE).stdout
342
343
344def args_get():
345 """ read program parameters handle any necessary validation of input. """
346
347 parser = argparse.ArgumentParser(
348 formatter_class=argparse.RawDescriptionHelpFormatter,
349 description=__doc__)
350 ##
351 # None is a special value indicating to read flows from stdin.
352 # This handles the case
353 # ovs-dpctl dump-flows | ovs-dpctl-flows.py
354 parser.add_argument("-v", "--version", version="@VERSION@",
355 action="version", help="show version")
356 parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
357 action="append",
358 help="file containing flows from ovs-dpctl dump-flow")
359 parser.add_argument("-V", "--verbose", dest="verbose",
360 default=logging.CRITICAL,
361 action="store_const", const=logging.DEBUG,
362 help="enable debug level verbosity")
363 parser.add_argument("-s", "--script", dest="top", action="store_false",
364 help="Run from a script (no user interface)")
365 parser.add_argument("--host", dest="host",
366 help="Specify a user@host for retrieving flows see"
367 "Accessing Remote Hosts for more information")
368
369 parser.add_argument("-a", "--accumulate", dest="accumulate",
370 action="store_true", default=False,
371 help="Accumulate dump-flow content")
372 parser.add_argument("--accumulate-decay", dest="accumulateDecay",
373 default=5.0 * 60, type=float,
374 help="Decay old accumulated flows. "
375 "The default is 5 minutes. "
376 "A value of 0 disables decay.")
377 parser.add_argument("-d", "--delay", dest="delay", type=int,
378 default=1000,
379 help="Delay in milliseconds to collect dump-flow "
380 "content (sample rate).")
381
382 args = parser.parse_args()
383
384 logging.basicConfig(level=args.verbose)
385
386 return args
387
388###
389# Code to parse a single line in dump-flow
390###
391# key(values)
392FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
393# key:value
394FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
395FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
396
397
398def flow_line_iter(line):
399 """ iterate over flow dump elements.
400 return tuples of (true, element) or (false, remaining element)
401 """
402 # splits by , except for when in a (). Actions element was not
403 # split properly but we don't need it.
404 rc = []
405
406 element = ""
407 paren_count = 0
408
409 for ch in line:
410 if (ch == '('):
411 paren_count += 1
412 elif (ch == ')'):
413 paren_count -= 1
414
415 if (ch == ' '):
416 # ignore white space.
417 continue
418 elif ((ch == ',') and (paren_count == 0)):
419 rc.append(element)
420 element = ""
421 else:
422 element += ch
423
424 if (paren_count):
425 raise ValueError(line)
426 else:
427 if (len(element) > 0):
428 rc.append(element)
429 return rc
430
431
432def flow_line_compound_parse(compound):
433 """ Parse compound element
434 for example
435 src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
436 which is in
437 eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
438 """
439 result = {}
440 for element in flow_line_iter(compound):
441 match = FIELDS_CMPND_ELEMENT.search(element)
442 if (match):
443 key = match.group(1)
444 value = match.group(2)
445 result[key] = value
446
447 match = FIELDS_CMPND.search(element)
448 if (match):
449 key = match.group(1)
450 value = match.group(2)
451 result[key] = flow_line_compound_parse(value)
452 continue
453
454 if (len(result.keys()) == 0):
455 return compound
456 return result
457
458
459def flow_line_split(line):
460 """ Convert a flow dump line into ([fields], [stats], actions) tuple.
461 Where fields and stats are lists.
462 This function relies on a the following ovs-dpctl dump-flow
463 output characteristics:
464 1. The dumpe flow line consists of a list of frame fields, list of stats
465 and action.
466 2. list of frame fields, each stat and action field are delimited by ', '.
467 3. That all other non stat field are not delimited by ', '.
468
469 """
470
471 results = re.split(', ', line)
472
473 (field, stats, action) = (results[0], results[1:-1], results[-1])
474
475 fields = flow_line_iter(field)
476 return (fields, stats, action)
477
478
479def elements_to_dict(elements):
480 """ Convert line to a hierarchy of dictionaries. """
481 result = {}
482 for element in elements:
483 match = FIELDS_CMPND.search(element)
484 if (match):
485 key = match.group(1)
486 value = match.group(2)
487 result[key] = flow_line_compound_parse(value)
488 continue
489
490 match = FIELDS_ELEMENT.search(element)
491 if (match):
492 key = match.group(1)
493 value = match.group(2)
494 result[key] = value
495 else:
496 raise ValueError("can't parse >%s<" % element)
497 return result
498
499
500# pylint: disable-msg=R0903
501class SumData(object):
502 """ Interface that all data going into SumDb must implement.
503 Holds the flow field and its corresponding count, total packets,
504 total bytes and calculates average.
505
506 __repr__ is used as key into SumData singleton.
507 __str__ is used as human readable output.
508 """
509
510 def __init__(self, field_type, field, packets, flow_bytes, key):
511 # Count is the number of lines in the dump-flow log.
512 self.field_type = field_type
513 self.field = field
514 self.count = 1
515 self.packets = int(packets)
516 self.bytes = int(flow_bytes)
517 self.key = key
518
519 def decrement(self, decr_packets, decr_bytes, decr_count):
520 """ Decrement content to calculate delta from previous flow sample."""
521 self.packets -= decr_packets
522 self.bytes -= decr_bytes
523 self.count -= decr_count
524
525 def __iadd__(self, other):
526 """ Add two objects. """
527
528 if (self.key != other.key):
529 raise ValueError("adding two unrelated types")
530
531 self.count += other.count
532 self.packets += other.packets
533 self.bytes += other.bytes
534 return self
535
536 def __isub__(self, other):
537 """ Decrement two objects. """
538
539 if (self.key != other.key):
540 raise ValueError("adding two unrelated types")
541
542 self.count -= other.count
543 self.packets -= other.packets
544 self.bytes -= other.bytes
545 return self
546
547 def __getattr__(self, name):
548 """ Handle average. """
549 if (name == "average"):
550 if (self.packets == 0):
551 return float(0.0)
552 else:
553 return float(self.bytes) / float(self.packets)
554 raise AttributeError(name)
555
556 def __str__(self):
557 """ Used for debugging. """
558 return "%s %s %s %s" % (self.field, self.count,
559 self.packets, self.bytes)
560
561 def __repr__(self):
562 """ Used as key in the FlowDB table. """
563 return self.key
564
565
566def flow_aggregate(fields_dict, stats_dict):
567 """ Search for content in a line.
568 Passed the flow port of the dump-flows plus the current stats consisting
569 of packets, bytes, etc
570 """
571 result = []
572
573 for output_format in OUTPUT_FORMAT:
574 field = fields_dict.get(output_format.field_type, None)
96b2c715 575 if (field) and all (k in field for k in output_format.elements):
14b4c575
MH
576 obj = output_format.generator(output_format.field_type,
577 field, stats_dict)
578 result.append(obj)
579
580 return result
581
582
583def flows_read(ihdl, flow_db):
584 """ read flow content from ihdl and insert into flow_db. """
585
586 done = False
587 while (not done):
588 line = ihdl.readline()
589 if (len(line) == 0):
590 # end of input
591 break
592
593 try:
594 flow_db.flow_line_add(line)
595 except ValueError, arg:
596 logging.error(arg)
597
598 return flow_db
599
600
601def get_terminal_size():
602 """
603 return column width and height of the terminal
604 """
605 for fd_io in [0, 1, 2]:
606 try:
607 result = struct.unpack('hh',
608 fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
609 '1234'))
610 except IOError:
611 result = None
612 continue
613
614 if (result is None or result == (0, 0)):
615 # Maybe we can't get the width. In that case assume (25, 80)
616 result = (25, 80)
617
618 return result
619
620##
621# Content derived from:
622# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
623##
624SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
625 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
626
627
628def approximate_size(size, a_kilobyte_is_1024_bytes=True):
629 """Convert a file size to human-readable form.
630
631 Keyword arguments:
632 size -- file size in bytes
633 a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
634 if False, use multiples of 1000
635
636 Returns: string
637
638 """
639 size = float(size)
640 if size < 0:
641 raise ValueError('number must be non-negative')
642
643 if (a_kilobyte_is_1024_bytes):
644 multiple = 1024
645 else:
646 multiple = 1000
647 for suffix in SUFFIXES[multiple]:
648 size /= multiple
649 if size < multiple:
650 return "%.1f %s" % (size, suffix)
651
652 raise ValueError('number too large')
653
654
655##
656# End copied content
657##
658class ColMeta:
659 """ Concepts about columns. """
660 def __init__(self, sortable, width):
661 self.sortable = sortable
662 self.width = width
663
664
665class RowMeta:
666 """ How to render rows. """
667 def __init__(self, label, fmt):
668 self.label = label
669 self.fmt = fmt
670
671
672def fmt_packet(obj, width):
673 """ Provide a string for packets that is appropriate for output."""
674 return str(obj.packets).rjust(width)
675
676
677def fmt_count(obj, width):
678 """ Provide a string for average that is appropriate for output."""
679 return str(obj.count).rjust(width)
680
681
682def fmt_avg(obj, width):
683 """ Provide a string for average that is appropriate for output."""
684 return str(int(obj.average)).rjust(width)
685
686
687def fmt_field(obj, width):
688 """ truncate really long flow and insert ellipses to help make it
689 clear.
690 """
691
692 ellipses = " ... "
693 value = obj.field
694 if (len(obj.field) > width):
695 value = value[:(width - len(ellipses))] + ellipses
696 return value.ljust(width)
697
698
699def fmt_bytes(obj, width):
700 """ Provide a string for average that is appropriate for output."""
701 if (len(str(obj.bytes)) <= width):
702 value = str(obj.bytes)
703 else:
704 value = approximate_size(obj.bytes)
705 return value.rjust(width)
706
707
708def title_center(value, width):
709 """ Center a column title."""
710 return value.upper().center(width)
711
712
713def title_rjust(value, width):
714 """ Right justify a column title. """
715 return value.upper().rjust(width)
716
717
718def column_picker(order, obj):
719 """ return the column as specified by order. """
720 if (order == 1):
721 return obj.count
722 elif (order == 2):
723 return obj.packets
724 elif (order == 3):
725 return obj.bytes
726 elif (order == 4):
727 return obj.average
728 else:
729 raise ValueError("order outside of range %s" % order)
730
731
732class Render:
3a2742a1
MH
733 """ Renders flow data.
734
735 The two FIELD_SELECT variables should be set to the actual field minus
736 1. During construction, an internal method increments and initializes
737 this object.
738 """
739 FLOW_FIELDS = [_field.field_type for _field in OUTPUT_FORMAT] + ["all"]
740
741 FIELD_SELECT_SCRIPT = 7
742 FIELD_SELECT_TOP = -1
743
744 def __init__(self, console_width, field_select):
14b4c575
MH
745 """ Calculate column widths taking into account changes in format."""
746
747 self._start_time = datetime.datetime.now()
748
749 self._cols = [ColMeta(False, 0),
750 ColMeta(True, Columns.VALUE_WIDTH),
751 ColMeta(True, Columns.VALUE_WIDTH),
752 ColMeta(True, Columns.VALUE_WIDTH),
753 ColMeta(True, Columns.VALUE_WIDTH)]
754 self._console_width = console_width
755 self.console_width_set(console_width)
756
757 # Order in this array dictate the order of the columns.
758 # The 0 width for the first entry is a place holder. This is
759 # dynamically calculated. The first column is special. We need a
760 # way to indicate which field are presented.
761 self._descs = [RowMeta("", title_rjust),
762 RowMeta("", title_rjust),
763 RowMeta("", title_rjust),
764 RowMeta("", title_rjust),
765 RowMeta("", title_rjust)]
766 self._column_sort_select = 0
767 self.column_select_event()
768
769 self._titles = [
770 RowMeta(Columns.FIELDS, title_center),
771 RowMeta(Columns.COUNT, title_rjust),
772 RowMeta(Columns.PACKETS, title_rjust),
773 RowMeta(Columns.BYTES, title_rjust),
774 RowMeta(Columns.AVERAGE, title_rjust)
775 ]
776
777 self._datas = [
778 RowMeta(None, fmt_field),
779 RowMeta(None, fmt_count),
780 RowMeta(None, fmt_packet),
781 RowMeta(None, fmt_bytes),
782 RowMeta(None, fmt_avg)
783 ]
784
785 ##
786 # _field_types hold which fields are displayed in the field
787 # column, with the keyword all implying all fields.
788 ##
3a2742a1 789 self._field_types = Render.FLOW_FIELDS
14b4c575
MH
790
791 ##
792 # The default is to show all field types.
793 ##
3a2742a1 794 self._field_type_select = field_select
14b4c575
MH
795 self.field_type_toggle()
796
797 def _field_type_select_get(self):
798 """ Return which field type to display. """
799 return self._field_types[self._field_type_select]
800
801 def field_type_toggle(self):
802 """ toggle which field types to show. """
803 self._field_type_select += 1
804 if (self._field_type_select >= len(self._field_types)):
805 self._field_type_select = 0
806 value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
807 self._titles[0].label = value
808
809 def column_select_event(self):
810 """ Handles column select toggle. """
811
812 self._descs[self._column_sort_select].label = ""
813 for _ in range(len(self._cols)):
814 self._column_sort_select += 1
815 if (self._column_sort_select >= len(self._cols)):
816 self._column_sort_select = 0
817
818 # Now look for the next sortable column
819 if (self._cols[self._column_sort_select].sortable):
820 break
821 self._descs[self._column_sort_select].label = "DESC"
822
823 def console_width_set(self, console_width):
824 """ Adjust the output given the new console_width. """
825 self._console_width = console_width
826
827 spaces = len(self._cols) - 1
828 ##
829 # Calculating column width can be tedious but important. The
830 # flow field value can be long. The goal here is to dedicate
831 # fixed column space for packets, bytes, average and counts. Give the
832 # remaining space to the flow column. When numbers get large
833 # transition output to output generated by approximate_size which
834 # limits output to ###.# XiB in other words 9 characters.
835 ##
836 # At this point, we know the maximum length values. We may
837 # truncate the flow column to get everything to fit.
838 self._cols[0].width = 0
839 values_max_length = sum([ii.width for ii in self._cols]) + spaces
840 flow_max_length = console_width - values_max_length
841 self._cols[0].width = flow_max_length
842
843 def format(self, flow_db):
844 """ shows flows based on --script parameter."""
845
846 rc = []
847 ##
848 # Top output consists of
849 # Title
850 # Column title (2 rows)
851 # data
852 # statistics and status
853
854 ##
855 # Title
856 ##
857 rc.append("Flow Summary".center(self._console_width))
858
859 stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
860 flow_db.flow_stats_get()
861 accumulate = flow_db.accumulate_get()
862 if (accumulate):
863 stats += "Accumulate: on "
864 else:
865 stats += "Accumulate: off "
866
867 duration = datetime.datetime.now() - self._start_time
868 stats += "Duration: %s " % str(duration)
869 rc.append(stats.ljust(self._console_width))
870
871 ##
872 # 2 rows for columns.
873 ##
874 # Indicate which column is in descending order.
875 rc.append(" ".join([ii.fmt(ii.label, col.width)
876 for (ii, col) in zip(self._descs, self._cols)]))
877
878 rc.append(" ".join([ii.fmt(ii.label, col.width)
879 for (ii, col) in zip(self._titles, self._cols)]))
880
881 ##
882 # Data.
883 ##
884 for dd in flow_db.field_values_in_order(self._field_type_select_get(),
885 self._column_sort_select):
886 rc.append(" ".join([ii.fmt(dd, col.width)
887 for (ii, col) in zip(self._datas,
888 self._cols)]))
889
890 return rc
891
892
893def curses_screen_begin():
894 """ begin curses screen control. """
895 stdscr = curses.initscr()
896 curses.cbreak()
897 curses.noecho()
898 stdscr.keypad(1)
899 return stdscr
900
901
902def curses_screen_end(stdscr):
903 """ end curses screen control. """
904 curses.nocbreak()
905 stdscr.keypad(0)
906 curses.echo()
907 curses.endwin()
908
909
910class FlowDB:
911 """ Implements live vs accumulate mode.
912
913 Flows are stored as key value pairs. The key consists of the content
914 prior to stat fields. The value portion consists of stats in a dictionary
915 form.
916
917 @ \todo future add filtering here.
918 """
919 def __init__(self, accumulate):
920 self._accumulate = accumulate
921 self._error_count = 0
922 # Values are (stats, last update time.)
923 # The last update time is used for aging.
924 self._flow_lock = threading.Lock()
925 # This dictionary holds individual flows.
926 self._flows = {}
927 # This dictionary holds aggregate of flow fields.
928 self._fields = {}
929
930 def accumulate_get(self):
931 """ Return the current accumulate state. """
932 return self._accumulate
933
934 def accumulate_toggle(self):
935 """ toggle accumulate flow behavior. """
936 self._accumulate = not self._accumulate
937
938 def begin(self):
939 """ Indicate the beginning of processing flow content.
940 if accumulate is false clear current set of flows. """
941
942 if (not self._accumulate):
943 self._flow_lock.acquire()
944 try:
945 self._flows.clear()
946 finally:
947 self._flow_lock.release()
948 self._fields.clear()
949
950 def flow_line_add(self, line):
951 """ Split a line from a ovs-dpctl dump-flow into key and stats.
952 The order of the content in the flow should be:
953 - flow content
954 - stats for the flow
955 - actions
956
957 This method also assumes that the dump flow output does not
958 change order of fields of the same flow.
959 """
960
961 line = line.rstrip("\n")
962 (fields, stats, _) = flow_line_split(line)
963
964 try:
965 fields_dict = elements_to_dict(fields)
966
967 if (len(fields_dict) == 0):
968 raise ValueError("flow fields are missing %s", line)
969
970 stats_dict = elements_to_dict(stats)
96b2c715 971 if not all (k in stats_dict for k in ("packets","bytes")):
14b4c575
MH
972 raise ValueError("statistics are missing %s.", line)
973
974 ##
975 # In accumulate mode, the Flow database can reach 10,000's of
976 # persistent flows. The interaction of the script with this many
977 # flows is too slow. Instead, delta are sent to the flow_db
978 # database allow incremental changes to be done in O(m) time
979 # where m is the current flow list, instead of iterating over
980 # all flows in O(n) time where n is the entire history of flows.
981 key = ",".join(fields)
982
983 self._flow_lock.acquire()
984 try:
985 (stats_old_dict, _) = self._flows.get(key, (None, None))
986 finally:
987 self._flow_lock.release()
988
989 self.flow_event(fields_dict, stats_old_dict, stats_dict)
990
991 except ValueError, arg:
992 logging.error(arg)
993 self._error_count += 1
994 raise
995
996 self._flow_lock.acquire()
997 try:
998 self._flows[key] = (stats_dict, datetime.datetime.now())
999 finally:
1000 self._flow_lock.release()
1001
1002 def decay(self, decayTimeInSeconds):
1003 """ Decay content. """
1004 now = datetime.datetime.now()
1005 for (key, value) in self._flows.items():
1006 (stats_dict, updateTime) = value
1007 delta = now - updateTime
1008
1009 if (delta.seconds > decayTimeInSeconds):
1010 self._flow_lock.acquire()
1011 try:
1012 del self._flows[key]
1013
1014 fields_dict = elements_to_dict(flow_line_iter(key))
1015 matches = flow_aggregate(fields_dict, stats_dict)
1016 for match in matches:
1017 self.field_dec(match)
1018
1019 finally:
1020 self._flow_lock.release()
1021
1022 def flow_stats_get(self):
1023 """ Return statistics in a form of a dictionary. """
1024 rc = None
1025 self._flow_lock.acquire()
1026 try:
1027 rc = {"flow_total": len(self._flows),
1028 "flow_errors": self._error_count}
1029 finally:
1030 self._flow_lock.release()
1031 return rc
1032
1033 def field_types_get(self):
1034 """ Return the set of types stored in the singleton. """
1035 types = set((ii.field_type for ii in self._fields.values()))
1036 return types
1037
1038 def field_add(self, data):
1039 """ Collect dump-flow data to sum number of times item appears. """
1040 current = self._fields.get(repr(data), None)
1041 if (current is None):
1042 current = copy.copy(data)
1043 else:
1044 current += data
1045 self._fields[repr(current)] = current
1046
1047 def field_dec(self, data):
1048 """ Collect dump-flow data to sum number of times item appears. """
1049 current = self._fields.get(repr(data), None)
1050 if (current is None):
1051 raise ValueError("decrementing field missing %s" % repr(data))
1052
1053 current -= data
1054 self._fields[repr(current)] = current
1055 if (current.count == 0):
1056 del self._fields[repr(current)]
1057
1058 def field_values_in_order(self, field_type_select, column_order):
1059 """ Return a list of items in order maximum first. """
1060 values = self._fields.values()
1061 if (field_type_select != "all"):
1062 # If a field type other than "all" then reduce the list.
1063 values = [ii for ii in values
1064 if (ii.field_type == field_type_select)]
1065 values = [(column_picker(column_order, ii), ii) for ii in values]
1066 values.sort(key=operator.itemgetter(0))
1067 values.reverse()
1068 values = [ii[1] for ii in values]
1069 return values
1070
1071 def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1072 """ Receives new flow information. """
1073
1074 # In order to avoid processing every flow at every sample
1075 # period, changes in flow packet count is used to determine the
1076 # delta in the flow statistics. This delta is used in the call
1077 # to self.decrement prior to self.field_add
1078
1079 if (stats_old_dict is None):
1080 # This is a new flow
1081 matches = flow_aggregate(fields_dict, stats_new_dict)
1082 for match in matches:
1083 self.field_add(match)
1084 else:
1085 old_packets = int(stats_old_dict.get("packets", 0))
1086 new_packets = int(stats_new_dict.get("packets", 0))
1087 if (old_packets == new_packets):
1088 # ignore. same data.
1089 pass
1090 else:
1091 old_bytes = stats_old_dict.get("bytes", 0)
1092 # old_packets != new_packets
1093 # if old_packets > new_packets then we end up decrementing
1094 # packets and bytes.
1095 matches = flow_aggregate(fields_dict, stats_new_dict)
1096 for match in matches:
1097 match.decrement(int(old_packets), int(old_bytes), 1)
1098 self.field_add(match)
1099
1100
1101class DecayThread(threading.Thread):
1102 """ Periodically call flow database to see if any flows are old. """
1103 def __init__(self, flow_db, interval):
1104 """ Start decay thread. """
1105 threading.Thread.__init__(self)
1106
1107 self._interval = max(1, interval)
1108 self._min_interval = min(1, interval / 10)
1109 self._flow_db = flow_db
1110 self._event = threading.Event()
1111 self._running = True
1112
1113 self.daemon = True
1114
1115 def run(self):
1116 """ Worker thread which handles decaying accumulated flows. """
1117
1118 while(self._running):
1119 self._event.wait(self._min_interval)
1120 if (self._running):
1121 self._flow_db.decay(self._interval)
1122
1123 def stop(self):
1124 """ Stop thread. """
1125 self._running = False
1126 self._event.set()
1127 ##
1128 # Give the calling thread time to terminate but not too long.
1129 # this thread is a daemon so the application will terminate if
1130 # we timeout during the join. This is just a cleaner way to
1131 # release resources.
1132 self.join(2.0)
1133
1134
1135def flow_top_command(stdscr, render, flow_db):
1136 """ Handle input while in top mode. """
1137 ch = stdscr.getch()
1138 ##
1139 # Any character will restart sampling.
1140 if (ch == ord('h')):
1141 # halt output.
1142 ch = stdscr.getch()
1143 while (ch == -1):
1144 ch = stdscr.getch()
1145
1146 if (ch == ord('s')):
1147 # toggle which column sorts data in descending order.
1148 render.column_select_event()
1149 elif (ch == ord('a')):
1150 flow_db.accumulate_toggle()
1151 elif (ch == ord('f')):
1152 render.field_type_toggle()
1153 elif (ch == ord(' ')):
1154 # resample
1155 pass
1156
1157 return ch
1158
1159
1160def decay_timer_start(flow_db, accumulateDecay):
1161 """ If accumulateDecay greater than zero then start timer. """
1162 if (accumulateDecay > 0):
1163 decay_timer = DecayThread(flow_db, accumulateDecay)
1164 decay_timer.start()
1165 return decay_timer
1166 else:
1167 return None
1168
1169
1170def flows_top(args):
1171 """ handles top like behavior when --script is not specified. """
1172
1173 flow_db = FlowDB(args.accumulate)
3a2742a1 1174 render = Render(0, Render.FIELD_SELECT_TOP)
14b4c575
MH
1175
1176 decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1177 lines = []
1178
1179 try:
1180 stdscr = curses_screen_begin()
1181 try:
1182 ch = 'X'
1183 #stdscr.nodelay(1)
1184 stdscr.timeout(args.delay)
1185
1186 while (ch != ord('q')):
1187 flow_db.begin()
1188
1189 try:
1190 ihdl = top_input_get(args)
1191 try:
1192 flows_read(ihdl, flow_db)
1193 finally:
1194 ihdl.close()
1195 except OSError, arg:
1196 logging.critical(arg)
1197 break
1198
1199 (console_height, console_width) = stdscr.getmaxyx()
1200 render.console_width_set(console_width)
1201
1202 output_height = console_height - 1
1203 line_count = range(output_height)
1204 line_output = render.format(flow_db)
1205 lines = zip(line_count, line_output[:output_height])
1206
1207 stdscr.erase()
1208 for (count, line) in lines:
1209 stdscr.addstr(count, 0, line[:console_width])
1210 stdscr.refresh()
1211
1212 ch = flow_top_command(stdscr, render, flow_db)
1213
1214 finally:
1215 curses_screen_end(stdscr)
1216 except KeyboardInterrupt:
1217 pass
1218 if (decay_timer):
1219 decay_timer.stop()
1220
1221 # repeat output
1222 for (count, line) in lines:
1223 print line
1224
1225
1226def flows_script(args):
1227 """ handles --script option. """
1228
1229 flow_db = FlowDB(args.accumulate)
1230 flow_db.begin()
1231
1232 if (args.flowFiles is None):
1233 logging.info("reading flows from stdin")
1234 ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1235 try:
1236 flow_db = flows_read(ihdl, flow_db)
1237 finally:
1238 ihdl.close()
1239 else:
1240 for flowFile in args.flowFiles:
1241 logging.info("reading flows from %s", flowFile)
1242 ihdl = open(flowFile, "r")
1243 try:
1244 flow_db = flows_read(ihdl, flow_db)
1245 finally:
1246 ihdl.close()
1247
1248 (_, console_width) = get_terminal_size()
3a2742a1 1249 render = Render(console_width, Render.FIELD_SELECT_SCRIPT)
14b4c575
MH
1250
1251 for line in render.format(flow_db):
1252 print line
1253
1254
1255def main():
1256 """ Return 0 on success or 1 on failure.
1257
1258 Algorithm
1259 There are four stages to the process ovs-dpctl dump-flow content.
1260 1. Retrieve current input
1261 2. store in FlowDB and maintain history
1262 3. Iterate over FlowDB and aggregating stats for each flow field
1263 4. present data.
1264
1265 Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1266 is called. Future version will have more elaborate means for collecting
1267 dump-flow content. FlowDB returns all data as in the form of a hierarchical
1268 dictionary. Input will vary.
1269
1270 In the case of accumulate mode, flows are not purged from the FlowDB
1271 manager. Instead at the very least, merely the latest statistics are
1272 kept. In the case, of live output the FlowDB is purged prior to sampling
1273 data.
1274
1275 Aggregating results requires identify flow fields to aggregate out
1276 of the flow and summing stats.
1277
1278 """
1279 args = args_get()
1280
1281 try:
1282 if (args.top):
1283 flows_top(args)
1284 else:
1285 flows_script(args)
1286 except KeyboardInterrupt:
1287 return 1
1288 return 0
1289
1290if __name__ == '__main__':
1291 sys.exit(main())
1292elif __name__ == 'ovs-dpctl-top':
1293 # pylint: disable-msg=R0915
1294
1295 ##
1296 # Test case beyond this point.
1297 # pylint: disable-msg=R0904
1298 class TestsuiteFlowParse(unittest.TestCase):
1299 """
1300 parse flow into hierarchy of dictionaries.
1301 """
1302 def test_flow_parse(self):
1303 """ test_flow_parse. """
1304 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1305 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1306 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1307 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1308 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1309 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1310 "38,41,44,47,50,53,56,59,62,65"
1311
1312 (fields, stats, _) = flow_line_split(line)
1313 flow_dict = elements_to_dict(fields + stats)
1314 self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1315 self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1316 self.assertEqual(flow_dict["ipv6"]["src"],
1317 "fe80::55bf:fe42:bc96:2812")
1318 self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1319 self.assertEqual(flow_dict["packets"], "1")
1320 self.assertEqual(flow_dict["bytes"], "92")
1321
1322 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1323 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1324 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1325 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1326 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1327 "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1328 "38,41,44,47,50,53,56,59,62,65"
1329
1330 (fields, stats, _) = flow_line_split(line)
1331 flow_dict = elements_to_dict(fields + stats)
1332 self.assertEqual(flow_dict["used"], "-0.703s")
1333 self.assertEqual(flow_dict["packets"], "1")
1334 self.assertEqual(flow_dict["bytes"], "92")
1335
1336 def test_flow_sum(self):
1337 """ test_flow_sum. """
1338 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1339 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1340 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1341 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1342 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1343 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1344 "38,41,44,47,50,53,56,59,62,65"
1345
1346 (fields, stats, _) = flow_line_split(line)
1347 stats_dict = elements_to_dict(stats)
1348 fields_dict = elements_to_dict(fields)
1349 ##
1350 # Test simple case of one line.
1351 flow_db = FlowDB(False)
1352 matches = flow_aggregate(fields_dict, stats_dict)
1353 for match in matches:
1354 flow_db.field_add(match)
1355
1356 flow_types = flow_db.field_types_get()
1357 expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1358 self.assert_(len(flow_types) == len(expected_flow_types))
1359 for flow_type in flow_types:
1360 self.assertTrue(flow_type in expected_flow_types)
1361
1362 for flow_type in flow_types:
1363 sum_value = flow_db.field_values_in_order("all", 1)
1364 self.assert_(len(sum_value) == 5)
1365 self.assert_(sum_value[0].packets == 2)
1366 self.assert_(sum_value[0].count == 1)
1367 self.assert_(sum_value[0].bytes == 92)
1368
1369 ##
1370 # Add line again just to see counts go up.
1371 matches = flow_aggregate(fields_dict, stats_dict)
1372 for match in matches:
1373 flow_db.field_add(match)
1374
1375 flow_types = flow_db.field_types_get()
1376 self.assert_(len(flow_types) == len(expected_flow_types))
1377 for flow_type in flow_types:
1378 self.assertTrue(flow_type in expected_flow_types)
1379
1380 for flow_type in flow_types:
1381 sum_value = flow_db.field_values_in_order("all", 1)
1382 self.assert_(len(sum_value) == 5)
1383 self.assert_(sum_value[0].packets == 4)
1384 self.assert_(sum_value[0].count == 2)
1385 self.assert_(sum_value[0].bytes == 2 * 92)
1386
1387 def test_assoc_list(self):
1388 """ test_assoc_list. """
1389 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1390 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1391 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1392 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1393 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1394 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1395 "38,41,44,47,50,53,56,59,62,65"
1396
1397 valid_flows = [
1398 'eth_type(0x86dd)',
1399 'udp(dst=5355)',
1400 'in_port(4)',
1401 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1402 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1403 ]
1404
1405 (fields, stats, _) = flow_line_split(line)
1406 stats_dict = elements_to_dict(stats)
1407 fields_dict = elements_to_dict(fields)
1408
1409 ##
1410 # Test simple case of one line.
1411 flow_db = FlowDB(False)
1412 matches = flow_aggregate(fields_dict, stats_dict)
1413 for match in matches:
1414 flow_db.field_add(match)
1415
1416 for sum_value in flow_db.field_values_in_order("all", 1):
1417 assoc_list = Columns.assoc_list(sum_value)
1418 for item in assoc_list:
1419 if (item[0] == "fields"):
1420 self.assertTrue(item[1] in valid_flows)
1421 elif (item[0] == "packets"):
1422 self.assertTrue(item[1] == 2)
1423 elif (item[0] == "count"):
1424 self.assertTrue(item[1] == 1)
1425 elif (item[0] == "average"):
1426 self.assertTrue(item[1] == 46.0)
1427 elif (item[0] == "bytes"):
1428 self.assertTrue(item[1] == 92)
1429 else:
1430 raise ValueError("unknown %s", item[0])
1431
1432 def test_human_format(self):
1433 """ test_assoc_list. """
1434
1435 self.assertEqual(approximate_size(0.0), "0.0 KiB")
1436 self.assertEqual(approximate_size(1024), "1.0 KiB")
1437 self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1438 self.assertEqual(approximate_size((1024 * 1024) + 100000),
1439 "1.1 MiB")
1440 value = (1024 * 1024 * 1024) + 100000000
1441 self.assertEqual(approximate_size(value), "1.1 GiB")
1442
1443 def test_flow_line_split(self):
1444 """ Splitting a flow line is not trivial.
1445 There is no clear delimiter. Comma is used liberally."""
1446 expected_fields = ["in_port(4)",
1447 "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1448 "eth_type(0x86dd)",
1449 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1450 "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1451 "udp(src=61252,dst=5355)"]
1452 expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1453 expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1454 "38,41,44,47,50,53,56,59,62,65"
1455
1456 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1457 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1458 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1459 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1460 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1461 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1462 "38,41,44,47,50,53,56,59,62,65"
1463
1464 (fields, stats, actions) = flow_line_split(line)
1465
1466 self.assertEqual(fields, expected_fields)
1467 self.assertEqual(stats, expected_stats)
1468 self.assertEqual(actions, expected_actions)
1469
1470 def test_accumulate_decay(self):
1471 """ test_accumulate_decay: test accumulated decay. """
1472 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1473 "dst=ff:ff:ff:ff:ff:ff),"
1474 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1475 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1476 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1477 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1478 "packets:1, bytes:120, used:0.004s, actions:1"]
1479
1480 flow_db = FlowDB(True)
1481 flow_db.begin()
1482 flow_db.flow_line_add(lines[0])
1483
1484 # Make sure we decay
1485 time.sleep(4)
1486 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1487 flow_db.decay(1)
1488 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1489
1490 flow_db.flow_line_add(lines[0])
1491 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1492 flow_db.decay(30)
1493 # Should not be deleted.
1494 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1495
1496 flow_db.flow_line_add(lines[0])
1497 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1498 timer = decay_timer_start(flow_db, 2)
1499 time.sleep(10)
1500 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1501 timer.stop()
1502
1503 def test_accumulate(self):
1504 """ test_accumulate test that FlowDB supports accumulate. """
1505
1506 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1507 "dst=ff:ff:ff:ff:ff:ff),"
1508 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1509 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1510 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1511 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1512 "packets:1, bytes:120, used:0.004s, actions:1",
1513 "in_port(2),"
1514 "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1515 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1516 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1517 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1518 "packets:2, bytes:5026, used:0.348s, actions:1",
1519 "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1520 "dst=ff:ff:ff:ff:ff:ff),"
1521 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1522 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1523 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1524 "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1525 "bytes:240, used:0.004s, actions:1"]
1526
1527 lines = [
1528 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1529 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1530 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1531 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1532 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1533 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1534 ]
1535
1536 # Turn on accumulate.
1537 flow_db = FlowDB(True)
1538 flow_db.begin()
1539
1540 flow_db.flow_line_add(lines[0])
1541
1542 # Test one flow exist.
1543 sum_values = flow_db.field_values_in_order("all", 1)
1544 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1545 self.assertEqual(len(in_ports), 1)
1546 self.assertEqual(in_ports[0].packets, 1)
1547 self.assertEqual(in_ports[0].bytes, 120)
1548 self.assertEqual(in_ports[0].count, 1)
1549
1550 # simulate another sample
1551 # Test two different flows exist.
1552 flow_db.begin()
1553 flow_db.flow_line_add(lines[1])
1554 sum_values = flow_db.field_values_in_order("all", 1)
1555 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1556 self.assertEqual(len(in_ports), 1)
1557 self.assertEqual(in_ports[0].packets, 1)
1558 self.assertEqual(in_ports[0].bytes, 120)
1559 self.assertEqual(in_ports[0].count, 1)
1560
1561 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1562 self.assertEqual(len(in_ports), 1)
1563 self.assertEqual(in_ports[0].packets, 2)
1564 self.assertEqual(in_ports[0].bytes, 126)
1565 self.assertEqual(in_ports[0].count, 1)
1566
1567 # Test first flow increments packets.
1568 flow_db.begin()
1569 flow_db.flow_line_add(lines[2])
1570 sum_values = flow_db.field_values_in_order("all", 1)
1571 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1572 self.assertEqual(len(in_ports), 1)
1573 self.assertEqual(in_ports[0].packets, 2)
1574 self.assertEqual(in_ports[0].bytes, 240)
1575 self.assertEqual(in_ports[0].count, 1)
1576
1577 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1578 self.assertEqual(len(in_ports), 1)
1579 self.assertEqual(in_ports[0].packets, 2)
1580 self.assertEqual(in_ports[0].bytes, 126)
1581 self.assertEqual(in_ports[0].count, 1)
1582
1583 # Test third flow but with the same in_port(1) as the first flow.
1584 flow_db.begin()
1585 flow_db.flow_line_add(lines[3])
1586 sum_values = flow_db.field_values_in_order("all", 1)
1587 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1588 self.assertEqual(len(in_ports), 1)
1589 self.assertEqual(in_ports[0].packets, 3)
1590 self.assertEqual(in_ports[0].bytes, 360)
1591 self.assertEqual(in_ports[0].count, 2)
1592
1593 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1594 self.assertEqual(len(in_ports), 1)
1595 self.assertEqual(in_ports[0].packets, 2)
1596 self.assertEqual(in_ports[0].bytes, 126)
1597 self.assertEqual(in_ports[0].count, 1)
1598
1599 # Third flow has changes.
1600 flow_db.begin()
1601 flow_db.flow_line_add(lines[4])
1602 sum_values = flow_db.field_values_in_order("all", 1)
1603 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1604 self.assertEqual(len(in_ports), 1)
1605 self.assertEqual(in_ports[0].packets, 4)
1606 self.assertEqual(in_ports[0].bytes, 480)
1607 self.assertEqual(in_ports[0].count, 2)
1608
1609 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1610 self.assertEqual(len(in_ports), 1)
1611 self.assertEqual(in_ports[0].packets, 2)
1612 self.assertEqual(in_ports[0].bytes, 126)
1613 self.assertEqual(in_ports[0].count, 1)
1614
1615 # First flow reset.
1616 flow_db.begin()
1617 flow_db.flow_line_add(lines[5])
1618 sum_values = flow_db.field_values_in_order("all", 1)
1619 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1620 self.assertEqual(len(in_ports), 1)
1621 self.assertEqual(in_ports[0].packets, 3)
1622 self.assertEqual(in_ports[0].bytes, 360)
1623 self.assertEqual(in_ports[0].count, 2)
1624
1625 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1626 self.assertEqual(len(in_ports), 1)
1627 self.assertEqual(in_ports[0].packets, 2)
1628 self.assertEqual(in_ports[0].bytes, 126)
1629 self.assertEqual(in_ports[0].count, 1)
1630
1631 def test_parse_character_errors(self):
1632 """ test_parsing errors.
1633 The flow parses is purposely loose. Its not designed to validate
1634 input. Merely pull out what it can but there are situations
1635 that a parse error can be detected.
1636 """
1637
1638 lines = ["complete garbage",
1639 "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1640 "dst=33:33:00:00:00:66),"
1641 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1642 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1643 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1644 "packets:2,bytes:5026,actions:1"]
1645
1646 flow_db = FlowDB(False)
1647 flow_db.begin()
1648 for line in lines:
1649 try:
1650 flow_db.flow_line_add(line)
1651 except ValueError:
1652 # We want an exception. That is how we know we have
1653 # correctly found a simple parsing error. We are not
1654 # looking to validate flow output just catch simple issues.
1655 continue
1656 self.assertTrue(False)
1657
1658 def test_tunnel_parsing(self):
1659 """ test_tunnel_parsing test parse flows with tunnel. """
1660 lines = [
1661 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1662 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1663 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1664 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1665 "actions:userspace(pid=4294962691,slow_path(cfm))"
1666 ]
1667 flow_db = FlowDB(False)
1668 flow_db.begin()
1669 flow_db.flow_line_add(lines[0])
1670 sum_values = flow_db.field_values_in_order("all", 1)
1671 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1672 self.assertEqual(len(in_ports), 1)
1673 self.assertEqual(in_ports[0].packets, 6)
1674 self.assertEqual(in_ports[0].bytes, 534)
1675 self.assertEqual(in_ports[0].count, 1)
1676
1677 def test_flow_multiple_paren(self):
1678 """ test_flow_multiple_paren. """
1679 line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1680 valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1681 "in_port(2)"]
1682 rc = flow_line_iter(line)
1683 self.assertEqual(valid, rc)
1684
1685 def test_to_network(self):
1686 """ test_to_network test ipv4_to_network and ipv6_to_network. """
1687 ipv4s = [
1688 ("192.168.0.1", "192.168.0.1"),
1689 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1690 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1691 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1692 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1693 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1694 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1695 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1696 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1697 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1698 ]
1699
1700 ipv6s = [
1701 ("1::192:168:0:1", "1::192:168:0:1"),
1702 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1703 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1704 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1705 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1706 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1707 ("1::192:168:0:1/::", "::")
1708 ]
1709
1710 for (ipv4_test, ipv4_check) in ipv4s:
1711 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1712
1713 for (ipv6_test, ipv6_check) in ipv6s:
1714 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
3a2742a1
MH
1715
1716 def test_ui(self):
1717 """ test_ui: test expected ui behavior. """
1718 #pylint: disable=W0212
1719 top_render = Render(80, Render.FIELD_SELECT_TOP)
1720 script_render = Render(80, Render.FIELD_SELECT_SCRIPT)
1721 self.assertEqual(top_render._field_type_select_get(), "in_port")
1722 self.assertEqual(script_render._field_type_select_get(), "all")
1723 #pylint: enable=W0212