]> git.proxmox.com Git - ovs.git/blame - utilities/ovs-dpctl-top.in
Fix renaming: libopenvswitch-2.14.so.0.0.90 -> libopenvswitch-2.15.so.0.0.0
[ovs.git] / utilities / ovs-dpctl-top.in
CommitLineData
1ca0323e 1#! @PYTHON3@
14b4c575
MH
2#
3# Copyright (c) 2013 Nicira, Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at:
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17#
18# The approximate_size code was copied from
19# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21# used under a Creative Commons Attribution-Share-Alike license:
22# http://creativecommons.org/licenses/by-sa/3.0/
23#
24#
25
26"""Top like behavior for ovs-dpctl dump-flows output.
27
28This program summarizes ovs-dpctl flow content by aggregating the number
29of packets, total bytes and occurrence of the following fields:
30
31 - Datapath in_port
32
33 - Ethernet type
34
35 - Source and destination MAC addresses
36
37 - IP protocol
38
39 - Source and destination IPv4 addresses
40
41 - Source and destination IPv6 addresses
42
43 - UDP and TCP destination port
44
45 - Tunnel source and destination addresses
46
47
48Output shows four values:
49 - FIELDS: the flow fields for example in_port(1).
50
51 - PACKETS: the total number of packets containing the flow field.
52
53 - BYTES: the total number of bytes containing the flow field. If units are
54 not present then values are in bytes.
55
56 - AVERAGE: the average packets size (BYTES/PACKET).
57
58 - COUNT: the number of lines in the dump-flow output contain the flow field.
59
60Top Behavior
61
62While in top mode, the default behavior, the following single character
63commands are supported:
64
65 a - toggles top in accumulate and live mode. Accumulate mode is described
66 below.
67
68 s - toggles which column is used to sort content in decreasing order. A
69 DESC title is placed over the column.
70
71 _ - a space indicating to collect dump-flow content again
72
73 h - halt output. Any character will restart sampling
74
3a2742a1 75 f - cycle through flow fields. The initial field is in_port
14b4c575
MH
76
77 q - q for quit.
78
79Accumulate Mode
80
81There are two supported modes: live and accumulate. The default is live.
82The parameter --accumulate or the 'a' character in top mode enables the
83latter. In live mode, recent dump-flow content is presented.
84Where as accumulate mode keeps track of the prior historical
85information until the flow is reset not when the flow is purged. Reset
86flows are determined when the packet count for a flow has decreased from
87its previous sample. There is one caveat, eventually the system will
88run out of memory if, after the accumulate-decay period any flows that
89have not been refreshed are purged. The goal here is to free memory
90of flows that are not active. Statistics are not decremented. Their purpose
91is to reflect the overall history of the flow fields.
92
93
94Debugging Errors
95
96Parsing errors are counted and displayed in the status line at the beginning
97of the output. Use the --verbose option with --script to see what output
98 was not parsed, like this:
99$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
100
101Error messages will identify content that failed to parse.
102
103
104Access Remote Hosts
105
106The --host must follow the format user@hostname. This script simply calls
107'ssh user@Hostname' without checking for login credentials therefore public
108keys should be installed on the system identified by hostname, such as:
109
110$ ssh-copy-id user@hostname
111
112Consult ssh-copy-id man pages for more details.
113
114
115Expected usage
116
117$ ovs-dpctl-top
118
119or to run as a script:
120$ ovs-dpctl dump-flows > dump-flows.log
121$ ovs-dpctl-top --script --flow-file dump-flows.log
122
123"""
124
125# pylint: disable-msg=C0103
126# pylint: disable-msg=C0302
127# pylint: disable-msg=R0902
128# pylint: disable-msg=R0903
129# pylint: disable-msg=R0904
130# pylint: disable-msg=R0912
131# pylint: disable-msg=R0913
132# pylint: disable-msg=R0914
133
134import sys
135import os
136try:
137 ##
138 # Arg parse is not installed on older Python distributions.
139 # ovs ships with a version in the directory mentioned below.
140 import argparse
141except ImportError:
142 sys.path.append(os.path.join("@pkgdatadir@", "python"))
143 import argparse
144import logging
145import re
146import unittest
147import copy
148import curses
149import operator
150import subprocess
151import fcntl
152import struct
153import termios
154import datetime
155import threading
156import time
157import socket
158
159
160##
161# The following two definitions provide the necessary netaddr functionality.
162# Python netaddr module is not part of the core installation. Packaging
163# netaddr was involved and seems inappropriate given that only two
164# methods where used.
165def ipv4_to_network(ip_str):
166 """ Calculate the network given a ipv4/mask value.
167 If a mask is not present simply return ip_str.
168 """
169 pack_length = '!HH'
170 try:
171 (ip, mask) = ip_str.split("/")
172 except ValueError:
173 # just an ip address no mask.
174 return ip_str
175
176 ip_p = socket.inet_pton(socket.AF_INET, ip)
177 ip_t = struct.unpack(pack_length, ip_p)
178 mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
180
181 return socket.inet_ntop(socket.AF_INET,
182 struct.pack('!HH', network_n[0], network_n[1]))
183
184
185def ipv6_to_network(ip_str):
186 """ Calculate the network given a ipv6/mask value.
187 If a mask is not present simply return ip_str.
188 """
189 pack_length = '!HHHHHHHH'
190 try:
191 (ip, mask) = ip_str.split("/")
192 except ValueError:
193 # just an ip address no mask.
194 return ip_str
195
196 ip_p = socket.inet_pton(socket.AF_INET6, ip)
197 ip_t = struct.unpack(pack_length, ip_p)
198 mask_t = struct.unpack(pack_length,
199 socket.inet_pton(socket.AF_INET6, mask))
200 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
201
202 return socket.inet_ntop(socket.AF_INET6,
203 struct.pack(pack_length,
204 network_n[0], network_n[1],
205 network_n[2], network_n[3],
206 network_n[4], network_n[5],
207 network_n[6], network_n[7]))
208
209
210##
211# columns displayed
212##
213class Columns:
214 """ Holds column specific content.
215 Titles needs to be less than 8 characters.
216 """
217 VALUE_WIDTH = 9
218 FIELDS = "fields"
219 PACKETS = "packets"
220 COUNT = "count"
221 BYTES = "bytes"
222 AVERAGE = "average"
223
224 def __init__(self):
225 pass
226
227 @staticmethod
228 def assoc_list(obj):
229 """ Return a associated list. """
230 return [(Columns.FIELDS, repr(obj)),
231 (Columns.PACKETS, obj.packets),
232 (Columns.BYTES, obj.bytes),
233 (Columns.COUNT, obj.count),
234 (Columns.AVERAGE, obj.average),
235 ]
236
237
238def element_eth_get(field_type, element, stats_dict):
239 """ Extract eth frame src and dst from a dump-flow element."""
240 fmt = "%s(src=%s,dst=%s)"
241
242 element = fmt % (field_type, element["src"], element["dst"])
243 return SumData(field_type, element, stats_dict["packets"],
244 stats_dict["bytes"], element)
245
246
247def element_ipv4_get(field_type, element, stats_dict):
248 """ Extract src and dst from a dump-flow element."""
249 fmt = "%s(src=%s,dst=%s)"
250 element_show = fmt % (field_type, element["src"], element["dst"])
251
252 element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253 ipv4_to_network(element["dst"]))
254
255 return SumData(field_type, element_show, stats_dict["packets"],
256 stats_dict["bytes"], element_key)
257
258
259def element_tunnel_get(field_type, element, stats_dict):
260 """ Extract src and dst from a tunnel."""
261 return element_ipv4_get(field_type, element, stats_dict)
262
263
264def element_ipv6_get(field_type, element, stats_dict):
265 """ Extract src and dst from a dump-flow element."""
266
267 fmt = "%s(src=%s,dst=%s)"
268 element_show = fmt % (field_type, element["src"], element["dst"])
269
270 element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271 ipv6_to_network(element["dst"]))
272
273 return SumData(field_type, element_show, stats_dict["packets"],
274 stats_dict["bytes"], element_key)
275
276
277def element_dst_port_get(field_type, element, stats_dict):
278 """ Extract src and dst from a dump-flow element."""
279 element_key = "%s(dst=%s)" % (field_type, element["dst"])
280 return SumData(field_type, element_key, stats_dict["packets"],
281 stats_dict["bytes"], element_key)
282
283
284def element_passthrough_get(field_type, element, stats_dict):
285 """ Extract src and dst from a dump-flow element."""
286 element_key = "%s(%s)" % (field_type, element)
287 return SumData(field_type, element_key,
288 stats_dict["packets"], stats_dict["bytes"], element_key)
289
290
291# pylint: disable-msg=R0903
292class OutputFormat:
293 """ Holds field_type and function to extract element value. """
96b2c715 294 def __init__(self, field_type, elements, generator):
14b4c575 295 self.field_type = field_type
96b2c715 296 self.elements = elements
14b4c575
MH
297 self.generator = generator
298
3a2742a1
MH
299##
300# The order below is important. The initial flow field depends on whether
301# --script or top mode is used. In top mode, the expected behavior, in_port
302# flow fields are shown first. A future feature will allow users to
303# filter output by selecting a row. Filtering by in_port is a natural
304# filtering starting point.
305#
306# In script mode, all fields are shown. The expectation is that users could
307# filter output by piping through grep.
308#
309# In top mode, the default flow field is in_port. In --script mode,
310# the default flow field is all.
311#
312# All is added to the end of the OUTPUT_FORMAT list.
313##
14b4c575 314OUTPUT_FORMAT = [
96b2c715
JCR
315 OutputFormat("in_port", (), element_passthrough_get),
316 OutputFormat("eth", ("src","dst"), element_eth_get),
317 OutputFormat("eth_type", (), element_passthrough_get),
318 OutputFormat("ipv4", ("src","dst"), element_ipv4_get),
319 OutputFormat("ipv6", ("src","dst"), element_ipv6_get),
320 OutputFormat("udp", ("src","dst"), element_dst_port_get),
321 OutputFormat("tcp", ("src","dst"), element_dst_port_get),
322 OutputFormat("tunnel", ("src","dst"), element_tunnel_get),
14b4c575 323 ]
3a2742a1 324##
14b4c575
MH
325
326
327ELEMENT_KEY = {
328 "udp": "udp.dst",
329 "tcp": "tcp.dst"
330 }
331
332
333def top_input_get(args):
334 """ Return subprocess stdout."""
335 cmd = []
336 if (args.host):
337 cmd += ["ssh", args.host]
338 cmd += ["ovs-dpctl", "dump-flows"]
339
340 return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
341 stdout=subprocess.PIPE).stdout
342
343
344def args_get():
345 """ read program parameters handle any necessary validation of input. """
346
347 parser = argparse.ArgumentParser(
348 formatter_class=argparse.RawDescriptionHelpFormatter,
349 description=__doc__)
350 ##
351 # None is a special value indicating to read flows from stdin.
352 # This handles the case
353 # ovs-dpctl dump-flows | ovs-dpctl-flows.py
354 parser.add_argument("-v", "--version", version="@VERSION@",
355 action="version", help="show version")
356 parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
357 action="append",
358 help="file containing flows from ovs-dpctl dump-flow")
359 parser.add_argument("-V", "--verbose", dest="verbose",
360 default=logging.CRITICAL,
361 action="store_const", const=logging.DEBUG,
362 help="enable debug level verbosity")
363 parser.add_argument("-s", "--script", dest="top", action="store_false",
364 help="Run from a script (no user interface)")
365 parser.add_argument("--host", dest="host",
366 help="Specify a user@host for retrieving flows see"
367 "Accessing Remote Hosts for more information")
368
369 parser.add_argument("-a", "--accumulate", dest="accumulate",
370 action="store_true", default=False,
371 help="Accumulate dump-flow content")
372 parser.add_argument("--accumulate-decay", dest="accumulateDecay",
373 default=5.0 * 60, type=float,
374 help="Decay old accumulated flows. "
375 "The default is 5 minutes. "
376 "A value of 0 disables decay.")
377 parser.add_argument("-d", "--delay", dest="delay", type=int,
378 default=1000,
379 help="Delay in milliseconds to collect dump-flow "
380 "content (sample rate).")
381
382 args = parser.parse_args()
383
384 logging.basicConfig(level=args.verbose)
385
386 return args
387
388###
389# Code to parse a single line in dump-flow
390###
391# key(values)
392FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
393# key:value
394FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
395FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
396
397
398def flow_line_iter(line):
399 """ iterate over flow dump elements.
400 return tuples of (true, element) or (false, remaining element)
401 """
402 # splits by , except for when in a (). Actions element was not
403 # split properly but we don't need it.
404 rc = []
405
406 element = ""
407 paren_count = 0
408
409 for ch in line:
410 if (ch == '('):
411 paren_count += 1
412 elif (ch == ')'):
413 paren_count -= 1
414
415 if (ch == ' '):
416 # ignore white space.
417 continue
418 elif ((ch == ',') and (paren_count == 0)):
419 rc.append(element)
420 element = ""
421 else:
422 element += ch
423
424 if (paren_count):
425 raise ValueError(line)
426 else:
427 if (len(element) > 0):
428 rc.append(element)
429 return rc
430
431
432def flow_line_compound_parse(compound):
433 """ Parse compound element
434 for example
435 src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
436 which is in
437 eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
438 """
439 result = {}
440 for element in flow_line_iter(compound):
441 match = FIELDS_CMPND_ELEMENT.search(element)
442 if (match):
443 key = match.group(1)
444 value = match.group(2)
445 result[key] = value
446
447 match = FIELDS_CMPND.search(element)
448 if (match):
449 key = match.group(1)
450 value = match.group(2)
451 result[key] = flow_line_compound_parse(value)
452 continue
453
454 if (len(result.keys()) == 0):
455 return compound
456 return result
457
458
459def flow_line_split(line):
460 """ Convert a flow dump line into ([fields], [stats], actions) tuple.
461 Where fields and stats are lists.
462 This function relies on a the following ovs-dpctl dump-flow
463 output characteristics:
464 1. The dumpe flow line consists of a list of frame fields, list of stats
465 and action.
466 2. list of frame fields, each stat and action field are delimited by ', '.
467 3. That all other non stat field are not delimited by ', '.
468
469 """
470
471 results = re.split(', ', line)
472
473 (field, stats, action) = (results[0], results[1:-1], results[-1])
474
475 fields = flow_line_iter(field)
476 return (fields, stats, action)
477
478
479def elements_to_dict(elements):
480 """ Convert line to a hierarchy of dictionaries. """
481 result = {}
482 for element in elements:
6a328b6e
TR
483 if (element == "eth()"):
484 continue
14b4c575
MH
485 match = FIELDS_CMPND.search(element)
486 if (match):
487 key = match.group(1)
488 value = match.group(2)
489 result[key] = flow_line_compound_parse(value)
490 continue
491
492 match = FIELDS_ELEMENT.search(element)
493 if (match):
494 key = match.group(1)
495 value = match.group(2)
496 result[key] = value
497 else:
498 raise ValueError("can't parse >%s<" % element)
499 return result
500
501
502# pylint: disable-msg=R0903
503class SumData(object):
504 """ Interface that all data going into SumDb must implement.
505 Holds the flow field and its corresponding count, total packets,
506 total bytes and calculates average.
507
508 __repr__ is used as key into SumData singleton.
509 __str__ is used as human readable output.
510 """
511
512 def __init__(self, field_type, field, packets, flow_bytes, key):
513 # Count is the number of lines in the dump-flow log.
514 self.field_type = field_type
515 self.field = field
516 self.count = 1
517 self.packets = int(packets)
518 self.bytes = int(flow_bytes)
519 self.key = key
520
521 def decrement(self, decr_packets, decr_bytes, decr_count):
522 """ Decrement content to calculate delta from previous flow sample."""
523 self.packets -= decr_packets
524 self.bytes -= decr_bytes
525 self.count -= decr_count
526
527 def __iadd__(self, other):
528 """ Add two objects. """
529
530 if (self.key != other.key):
531 raise ValueError("adding two unrelated types")
532
533 self.count += other.count
534 self.packets += other.packets
535 self.bytes += other.bytes
536 return self
537
538 def __isub__(self, other):
539 """ Decrement two objects. """
540
541 if (self.key != other.key):
542 raise ValueError("adding two unrelated types")
543
544 self.count -= other.count
545 self.packets -= other.packets
546 self.bytes -= other.bytes
547 return self
548
549 def __getattr__(self, name):
550 """ Handle average. """
551 if (name == "average"):
552 if (self.packets == 0):
553 return float(0.0)
554 else:
555 return float(self.bytes) / float(self.packets)
556 raise AttributeError(name)
557
558 def __str__(self):
559 """ Used for debugging. """
560 return "%s %s %s %s" % (self.field, self.count,
561 self.packets, self.bytes)
562
563 def __repr__(self):
564 """ Used as key in the FlowDB table. """
565 return self.key
566
567
568def flow_aggregate(fields_dict, stats_dict):
569 """ Search for content in a line.
570 Passed the flow port of the dump-flows plus the current stats consisting
571 of packets, bytes, etc
572 """
573 result = []
574
575 for output_format in OUTPUT_FORMAT:
576 field = fields_dict.get(output_format.field_type, None)
96b2c715 577 if (field) and all (k in field for k in output_format.elements):
14b4c575
MH
578 obj = output_format.generator(output_format.field_type,
579 field, stats_dict)
580 result.append(obj)
581
582 return result
583
584
585def flows_read(ihdl, flow_db):
586 """ read flow content from ihdl and insert into flow_db. """
587
588 done = False
589 while (not done):
590 line = ihdl.readline()
591 if (len(line) == 0):
592 # end of input
593 break
594
595 try:
596 flow_db.flow_line_add(line)
704ae357 597 except ValueError as arg:
14b4c575
MH
598 logging.error(arg)
599
600 return flow_db
601
602
603def get_terminal_size():
604 """
605 return column width and height of the terminal
606 """
607 for fd_io in [0, 1, 2]:
608 try:
609 result = struct.unpack('hh',
610 fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
611 '1234'))
612 except IOError:
613 result = None
614 continue
615
616 if (result is None or result == (0, 0)):
617 # Maybe we can't get the width. In that case assume (25, 80)
618 result = (25, 80)
619
620 return result
621
622##
623# Content derived from:
624# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
625##
626SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
627 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
628
629
630def approximate_size(size, a_kilobyte_is_1024_bytes=True):
631 """Convert a file size to human-readable form.
632
633 Keyword arguments:
634 size -- file size in bytes
635 a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
636 if False, use multiples of 1000
637
638 Returns: string
639
640 """
641 size = float(size)
642 if size < 0:
643 raise ValueError('number must be non-negative')
644
645 if (a_kilobyte_is_1024_bytes):
646 multiple = 1024
647 else:
648 multiple = 1000
649 for suffix in SUFFIXES[multiple]:
650 size /= multiple
651 if size < multiple:
652 return "%.1f %s" % (size, suffix)
653
654 raise ValueError('number too large')
655
656
657##
658# End copied content
659##
660class ColMeta:
661 """ Concepts about columns. """
662 def __init__(self, sortable, width):
663 self.sortable = sortable
664 self.width = width
665
666
667class RowMeta:
668 """ How to render rows. """
669 def __init__(self, label, fmt):
670 self.label = label
671 self.fmt = fmt
672
673
674def fmt_packet(obj, width):
675 """ Provide a string for packets that is appropriate for output."""
676 return str(obj.packets).rjust(width)
677
678
679def fmt_count(obj, width):
680 """ Provide a string for average that is appropriate for output."""
681 return str(obj.count).rjust(width)
682
683
684def fmt_avg(obj, width):
685 """ Provide a string for average that is appropriate for output."""
686 return str(int(obj.average)).rjust(width)
687
688
689def fmt_field(obj, width):
690 """ truncate really long flow and insert ellipses to help make it
691 clear.
692 """
693
694 ellipses = " ... "
695 value = obj.field
696 if (len(obj.field) > width):
697 value = value[:(width - len(ellipses))] + ellipses
698 return value.ljust(width)
699
700
701def fmt_bytes(obj, width):
702 """ Provide a string for average that is appropriate for output."""
703 if (len(str(obj.bytes)) <= width):
704 value = str(obj.bytes)
705 else:
706 value = approximate_size(obj.bytes)
707 return value.rjust(width)
708
709
710def title_center(value, width):
711 """ Center a column title."""
712 return value.upper().center(width)
713
714
715def title_rjust(value, width):
716 """ Right justify a column title. """
717 return value.upper().rjust(width)
718
719
720def column_picker(order, obj):
721 """ return the column as specified by order. """
722 if (order == 1):
723 return obj.count
724 elif (order == 2):
725 return obj.packets
726 elif (order == 3):
727 return obj.bytes
728 elif (order == 4):
729 return obj.average
730 else:
731 raise ValueError("order outside of range %s" % order)
732
733
734class Render:
3a2742a1
MH
735 """ Renders flow data.
736
737 The two FIELD_SELECT variables should be set to the actual field minus
738 1. During construction, an internal method increments and initializes
739 this object.
740 """
741 FLOW_FIELDS = [_field.field_type for _field in OUTPUT_FORMAT] + ["all"]
742
743 FIELD_SELECT_SCRIPT = 7
744 FIELD_SELECT_TOP = -1
745
746 def __init__(self, console_width, field_select):
14b4c575
MH
747 """ Calculate column widths taking into account changes in format."""
748
749 self._start_time = datetime.datetime.now()
750
751 self._cols = [ColMeta(False, 0),
752 ColMeta(True, Columns.VALUE_WIDTH),
753 ColMeta(True, Columns.VALUE_WIDTH),
754 ColMeta(True, Columns.VALUE_WIDTH),
755 ColMeta(True, Columns.VALUE_WIDTH)]
756 self._console_width = console_width
757 self.console_width_set(console_width)
758
759 # Order in this array dictate the order of the columns.
760 # The 0 width for the first entry is a place holder. This is
761 # dynamically calculated. The first column is special. We need a
762 # way to indicate which field are presented.
763 self._descs = [RowMeta("", title_rjust),
764 RowMeta("", title_rjust),
765 RowMeta("", title_rjust),
766 RowMeta("", title_rjust),
767 RowMeta("", title_rjust)]
768 self._column_sort_select = 0
769 self.column_select_event()
770
771 self._titles = [
772 RowMeta(Columns.FIELDS, title_center),
773 RowMeta(Columns.COUNT, title_rjust),
774 RowMeta(Columns.PACKETS, title_rjust),
775 RowMeta(Columns.BYTES, title_rjust),
776 RowMeta(Columns.AVERAGE, title_rjust)
777 ]
778
779 self._datas = [
780 RowMeta(None, fmt_field),
781 RowMeta(None, fmt_count),
782 RowMeta(None, fmt_packet),
783 RowMeta(None, fmt_bytes),
784 RowMeta(None, fmt_avg)
785 ]
786
787 ##
788 # _field_types hold which fields are displayed in the field
789 # column, with the keyword all implying all fields.
790 ##
3a2742a1 791 self._field_types = Render.FLOW_FIELDS
14b4c575
MH
792
793 ##
794 # The default is to show all field types.
795 ##
3a2742a1 796 self._field_type_select = field_select
14b4c575
MH
797 self.field_type_toggle()
798
799 def _field_type_select_get(self):
800 """ Return which field type to display. """
801 return self._field_types[self._field_type_select]
802
803 def field_type_toggle(self):
804 """ toggle which field types to show. """
805 self._field_type_select += 1
806 if (self._field_type_select >= len(self._field_types)):
807 self._field_type_select = 0
808 value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
809 self._titles[0].label = value
810
811 def column_select_event(self):
812 """ Handles column select toggle. """
813
814 self._descs[self._column_sort_select].label = ""
815 for _ in range(len(self._cols)):
816 self._column_sort_select += 1
817 if (self._column_sort_select >= len(self._cols)):
818 self._column_sort_select = 0
819
820 # Now look for the next sortable column
821 if (self._cols[self._column_sort_select].sortable):
822 break
823 self._descs[self._column_sort_select].label = "DESC"
824
825 def console_width_set(self, console_width):
826 """ Adjust the output given the new console_width. """
827 self._console_width = console_width
828
829 spaces = len(self._cols) - 1
830 ##
831 # Calculating column width can be tedious but important. The
832 # flow field value can be long. The goal here is to dedicate
833 # fixed column space for packets, bytes, average and counts. Give the
834 # remaining space to the flow column. When numbers get large
835 # transition output to output generated by approximate_size which
836 # limits output to ###.# XiB in other words 9 characters.
837 ##
838 # At this point, we know the maximum length values. We may
839 # truncate the flow column to get everything to fit.
840 self._cols[0].width = 0
841 values_max_length = sum([ii.width for ii in self._cols]) + spaces
842 flow_max_length = console_width - values_max_length
843 self._cols[0].width = flow_max_length
844
845 def format(self, flow_db):
846 """ shows flows based on --script parameter."""
847
848 rc = []
849 ##
850 # Top output consists of
851 # Title
852 # Column title (2 rows)
853 # data
854 # statistics and status
855
856 ##
857 # Title
858 ##
859 rc.append("Flow Summary".center(self._console_width))
860
861 stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
862 flow_db.flow_stats_get()
863 accumulate = flow_db.accumulate_get()
864 if (accumulate):
865 stats += "Accumulate: on "
866 else:
867 stats += "Accumulate: off "
868
869 duration = datetime.datetime.now() - self._start_time
870 stats += "Duration: %s " % str(duration)
871 rc.append(stats.ljust(self._console_width))
872
873 ##
874 # 2 rows for columns.
875 ##
876 # Indicate which column is in descending order.
877 rc.append(" ".join([ii.fmt(ii.label, col.width)
878 for (ii, col) in zip(self._descs, self._cols)]))
879
880 rc.append(" ".join([ii.fmt(ii.label, col.width)
881 for (ii, col) in zip(self._titles, self._cols)]))
882
883 ##
884 # Data.
885 ##
886 for dd in flow_db.field_values_in_order(self._field_type_select_get(),
887 self._column_sort_select):
888 rc.append(" ".join([ii.fmt(dd, col.width)
889 for (ii, col) in zip(self._datas,
890 self._cols)]))
891
892 return rc
893
894
895def curses_screen_begin():
896 """ begin curses screen control. """
897 stdscr = curses.initscr()
898 curses.cbreak()
899 curses.noecho()
900 stdscr.keypad(1)
901 return stdscr
902
903
904def curses_screen_end(stdscr):
905 """ end curses screen control. """
906 curses.nocbreak()
907 stdscr.keypad(0)
908 curses.echo()
909 curses.endwin()
910
911
912class FlowDB:
913 """ Implements live vs accumulate mode.
914
915 Flows are stored as key value pairs. The key consists of the content
916 prior to stat fields. The value portion consists of stats in a dictionary
917 form.
918
919 @ \todo future add filtering here.
920 """
921 def __init__(self, accumulate):
922 self._accumulate = accumulate
923 self._error_count = 0
924 # Values are (stats, last update time.)
925 # The last update time is used for aging.
926 self._flow_lock = threading.Lock()
927 # This dictionary holds individual flows.
928 self._flows = {}
929 # This dictionary holds aggregate of flow fields.
930 self._fields = {}
931
932 def accumulate_get(self):
933 """ Return the current accumulate state. """
934 return self._accumulate
935
936 def accumulate_toggle(self):
937 """ toggle accumulate flow behavior. """
938 self._accumulate = not self._accumulate
939
940 def begin(self):
941 """ Indicate the beginning of processing flow content.
942 if accumulate is false clear current set of flows. """
943
944 if (not self._accumulate):
945 self._flow_lock.acquire()
946 try:
947 self._flows.clear()
948 finally:
949 self._flow_lock.release()
950 self._fields.clear()
951
952 def flow_line_add(self, line):
953 """ Split a line from a ovs-dpctl dump-flow into key and stats.
954 The order of the content in the flow should be:
955 - flow content
956 - stats for the flow
957 - actions
958
959 This method also assumes that the dump flow output does not
960 change order of fields of the same flow.
961 """
962
704ae357
AC
963 if not isinstance(line, str):
964 line = str(line)
965
14b4c575
MH
966 line = line.rstrip("\n")
967 (fields, stats, _) = flow_line_split(line)
968
969 try:
970 fields_dict = elements_to_dict(fields)
971
972 if (len(fields_dict) == 0):
973 raise ValueError("flow fields are missing %s", line)
974
975 stats_dict = elements_to_dict(stats)
96b2c715 976 if not all (k in stats_dict for k in ("packets","bytes")):
14b4c575
MH
977 raise ValueError("statistics are missing %s.", line)
978
979 ##
980 # In accumulate mode, the Flow database can reach 10,000's of
981 # persistent flows. The interaction of the script with this many
982 # flows is too slow. Instead, delta are sent to the flow_db
983 # database allow incremental changes to be done in O(m) time
984 # where m is the current flow list, instead of iterating over
985 # all flows in O(n) time where n is the entire history of flows.
986 key = ",".join(fields)
987
988 self._flow_lock.acquire()
989 try:
990 (stats_old_dict, _) = self._flows.get(key, (None, None))
991 finally:
992 self._flow_lock.release()
993
994 self.flow_event(fields_dict, stats_old_dict, stats_dict)
995
704ae357 996 except ValueError as arg:
14b4c575
MH
997 logging.error(arg)
998 self._error_count += 1
999 raise
1000
1001 self._flow_lock.acquire()
1002 try:
1003 self._flows[key] = (stats_dict, datetime.datetime.now())
1004 finally:
1005 self._flow_lock.release()
1006
1007 def decay(self, decayTimeInSeconds):
1008 """ Decay content. """
1009 now = datetime.datetime.now()
1010 for (key, value) in self._flows.items():
1011 (stats_dict, updateTime) = value
1012 delta = now - updateTime
1013
1014 if (delta.seconds > decayTimeInSeconds):
1015 self._flow_lock.acquire()
1016 try:
1017 del self._flows[key]
1018
1019 fields_dict = elements_to_dict(flow_line_iter(key))
1020 matches = flow_aggregate(fields_dict, stats_dict)
1021 for match in matches:
1022 self.field_dec(match)
1023
1024 finally:
1025 self._flow_lock.release()
1026
1027 def flow_stats_get(self):
1028 """ Return statistics in a form of a dictionary. """
1029 rc = None
1030 self._flow_lock.acquire()
1031 try:
1032 rc = {"flow_total": len(self._flows),
1033 "flow_errors": self._error_count}
1034 finally:
1035 self._flow_lock.release()
1036 return rc
1037
1038 def field_types_get(self):
1039 """ Return the set of types stored in the singleton. """
1040 types = set((ii.field_type for ii in self._fields.values()))
1041 return types
1042
1043 def field_add(self, data):
1044 """ Collect dump-flow data to sum number of times item appears. """
1045 current = self._fields.get(repr(data), None)
1046 if (current is None):
1047 current = copy.copy(data)
1048 else:
1049 current += data
1050 self._fields[repr(current)] = current
1051
1052 def field_dec(self, data):
1053 """ Collect dump-flow data to sum number of times item appears. """
1054 current = self._fields.get(repr(data), None)
1055 if (current is None):
1056 raise ValueError("decrementing field missing %s" % repr(data))
1057
1058 current -= data
1059 self._fields[repr(current)] = current
1060 if (current.count == 0):
1061 del self._fields[repr(current)]
1062
1063 def field_values_in_order(self, field_type_select, column_order):
1064 """ Return a list of items in order maximum first. """
1065 values = self._fields.values()
1066 if (field_type_select != "all"):
1067 # If a field type other than "all" then reduce the list.
1068 values = [ii for ii in values
1069 if (ii.field_type == field_type_select)]
1070 values = [(column_picker(column_order, ii), ii) for ii in values]
1071 values.sort(key=operator.itemgetter(0))
1072 values.reverse()
1073 values = [ii[1] for ii in values]
1074 return values
1075
1076 def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1077 """ Receives new flow information. """
1078
1079 # In order to avoid processing every flow at every sample
1080 # period, changes in flow packet count is used to determine the
1081 # delta in the flow statistics. This delta is used in the call
1082 # to self.decrement prior to self.field_add
1083
1084 if (stats_old_dict is None):
1085 # This is a new flow
1086 matches = flow_aggregate(fields_dict, stats_new_dict)
1087 for match in matches:
1088 self.field_add(match)
1089 else:
1090 old_packets = int(stats_old_dict.get("packets", 0))
1091 new_packets = int(stats_new_dict.get("packets", 0))
1092 if (old_packets == new_packets):
1093 # ignore. same data.
1094 pass
1095 else:
1096 old_bytes = stats_old_dict.get("bytes", 0)
1097 # old_packets != new_packets
1098 # if old_packets > new_packets then we end up decrementing
1099 # packets and bytes.
1100 matches = flow_aggregate(fields_dict, stats_new_dict)
1101 for match in matches:
1102 match.decrement(int(old_packets), int(old_bytes), 1)
1103 self.field_add(match)
1104
1105
1106class DecayThread(threading.Thread):
1107 """ Periodically call flow database to see if any flows are old. """
1108 def __init__(self, flow_db, interval):
1109 """ Start decay thread. """
1110 threading.Thread.__init__(self)
1111
1112 self._interval = max(1, interval)
1113 self._min_interval = min(1, interval / 10)
1114 self._flow_db = flow_db
1115 self._event = threading.Event()
1116 self._running = True
1117
1118 self.daemon = True
1119
1120 def run(self):
1121 """ Worker thread which handles decaying accumulated flows. """
1122
1123 while(self._running):
1124 self._event.wait(self._min_interval)
1125 if (self._running):
1126 self._flow_db.decay(self._interval)
1127
1128 def stop(self):
1129 """ Stop thread. """
1130 self._running = False
1131 self._event.set()
1132 ##
1133 # Give the calling thread time to terminate but not too long.
1134 # this thread is a daemon so the application will terminate if
1135 # we timeout during the join. This is just a cleaner way to
1136 # release resources.
1137 self.join(2.0)
1138
1139
1140def flow_top_command(stdscr, render, flow_db):
1141 """ Handle input while in top mode. """
1142 ch = stdscr.getch()
1143 ##
1144 # Any character will restart sampling.
1145 if (ch == ord('h')):
1146 # halt output.
1147 ch = stdscr.getch()
1148 while (ch == -1):
1149 ch = stdscr.getch()
1150
1151 if (ch == ord('s')):
1152 # toggle which column sorts data in descending order.
1153 render.column_select_event()
1154 elif (ch == ord('a')):
1155 flow_db.accumulate_toggle()
1156 elif (ch == ord('f')):
1157 render.field_type_toggle()
1158 elif (ch == ord(' ')):
1159 # resample
1160 pass
1161
1162 return ch
1163
1164
1165def decay_timer_start(flow_db, accumulateDecay):
1166 """ If accumulateDecay greater than zero then start timer. """
1167 if (accumulateDecay > 0):
1168 decay_timer = DecayThread(flow_db, accumulateDecay)
1169 decay_timer.start()
1170 return decay_timer
1171 else:
1172 return None
1173
1174
1175def flows_top(args):
1176 """ handles top like behavior when --script is not specified. """
1177
1178 flow_db = FlowDB(args.accumulate)
3a2742a1 1179 render = Render(0, Render.FIELD_SELECT_TOP)
14b4c575
MH
1180
1181 decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1182 lines = []
1183
1184 try:
1185 stdscr = curses_screen_begin()
1186 try:
1187 ch = 'X'
1188 #stdscr.nodelay(1)
1189 stdscr.timeout(args.delay)
1190
1191 while (ch != ord('q')):
1192 flow_db.begin()
1193
1194 try:
1195 ihdl = top_input_get(args)
1196 try:
1197 flows_read(ihdl, flow_db)
1198 finally:
1199 ihdl.close()
704ae357 1200 except OSError as arg:
14b4c575
MH
1201 logging.critical(arg)
1202 break
1203
1204 (console_height, console_width) = stdscr.getmaxyx()
1205 render.console_width_set(console_width)
1206
1207 output_height = console_height - 1
1208 line_count = range(output_height)
1209 line_output = render.format(flow_db)
1210 lines = zip(line_count, line_output[:output_height])
1211
1212 stdscr.erase()
1213 for (count, line) in lines:
1214 stdscr.addstr(count, 0, line[:console_width])
1215 stdscr.refresh()
1216
1217 ch = flow_top_command(stdscr, render, flow_db)
1218
1219 finally:
1220 curses_screen_end(stdscr)
1221 except KeyboardInterrupt:
1222 pass
1223 if (decay_timer):
1224 decay_timer.stop()
1225
1226 # repeat output
1227 for (count, line) in lines:
704ae357 1228 print(line)
14b4c575
MH
1229
1230
1231def flows_script(args):
1232 """ handles --script option. """
1233
1234 flow_db = FlowDB(args.accumulate)
1235 flow_db.begin()
1236
1237 if (args.flowFiles is None):
1238 logging.info("reading flows from stdin")
1239 ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1240 try:
1241 flow_db = flows_read(ihdl, flow_db)
1242 finally:
1243 ihdl.close()
1244 else:
1245 for flowFile in args.flowFiles:
1246 logging.info("reading flows from %s", flowFile)
1247 ihdl = open(flowFile, "r")
1248 try:
1249 flow_db = flows_read(ihdl, flow_db)
1250 finally:
1251 ihdl.close()
1252
1253 (_, console_width) = get_terminal_size()
3a2742a1 1254 render = Render(console_width, Render.FIELD_SELECT_SCRIPT)
14b4c575
MH
1255
1256 for line in render.format(flow_db):
704ae357 1257 print(line)
14b4c575
MH
1258
1259
1260def main():
1261 """ Return 0 on success or 1 on failure.
1262
1263 Algorithm
1264 There are four stages to the process ovs-dpctl dump-flow content.
1265 1. Retrieve current input
1266 2. store in FlowDB and maintain history
1267 3. Iterate over FlowDB and aggregating stats for each flow field
1268 4. present data.
1269
1270 Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1271 is called. Future version will have more elaborate means for collecting
1272 dump-flow content. FlowDB returns all data as in the form of a hierarchical
1273 dictionary. Input will vary.
1274
1275 In the case of accumulate mode, flows are not purged from the FlowDB
1276 manager. Instead at the very least, merely the latest statistics are
1277 kept. In the case, of live output the FlowDB is purged prior to sampling
1278 data.
1279
1280 Aggregating results requires identify flow fields to aggregate out
1281 of the flow and summing stats.
1282
1283 """
1284 args = args_get()
1285
1286 try:
1287 if (args.top):
1288 flows_top(args)
1289 else:
1290 flows_script(args)
1291 except KeyboardInterrupt:
1292 return 1
1293 return 0
1294
1295if __name__ == '__main__':
1296 sys.exit(main())
1297elif __name__ == 'ovs-dpctl-top':
1298 # pylint: disable-msg=R0915
1299
1300 ##
1301 # Test case beyond this point.
1302 # pylint: disable-msg=R0904
1303 class TestsuiteFlowParse(unittest.TestCase):
1304 """
1305 parse flow into hierarchy of dictionaries.
1306 """
1307 def test_flow_parse(self):
1308 """ test_flow_parse. """
1309 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1310 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1311 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1312 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1313 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1314 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1315 "38,41,44,47,50,53,56,59,62,65"
1316
1317 (fields, stats, _) = flow_line_split(line)
1318 flow_dict = elements_to_dict(fields + stats)
1319 self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1320 self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1321 self.assertEqual(flow_dict["ipv6"]["src"],
1322 "fe80::55bf:fe42:bc96:2812")
1323 self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1324 self.assertEqual(flow_dict["packets"], "1")
1325 self.assertEqual(flow_dict["bytes"], "92")
1326
1327 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1328 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1329 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1330 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1331 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1332 "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1333 "38,41,44,47,50,53,56,59,62,65"
1334
1335 (fields, stats, _) = flow_line_split(line)
1336 flow_dict = elements_to_dict(fields + stats)
1337 self.assertEqual(flow_dict["used"], "-0.703s")
1338 self.assertEqual(flow_dict["packets"], "1")
1339 self.assertEqual(flow_dict["bytes"], "92")
1340
1341 def test_flow_sum(self):
1342 """ test_flow_sum. """
1343 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1344 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1345 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1346 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1347 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1348 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1349 "38,41,44,47,50,53,56,59,62,65"
1350
1351 (fields, stats, _) = flow_line_split(line)
1352 stats_dict = elements_to_dict(stats)
1353 fields_dict = elements_to_dict(fields)
1354 ##
1355 # Test simple case of one line.
1356 flow_db = FlowDB(False)
1357 matches = flow_aggregate(fields_dict, stats_dict)
1358 for match in matches:
1359 flow_db.field_add(match)
1360
1361 flow_types = flow_db.field_types_get()
1362 expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1363 self.assert_(len(flow_types) == len(expected_flow_types))
1364 for flow_type in flow_types:
1365 self.assertTrue(flow_type in expected_flow_types)
1366
1367 for flow_type in flow_types:
1368 sum_value = flow_db.field_values_in_order("all", 1)
1369 self.assert_(len(sum_value) == 5)
1370 self.assert_(sum_value[0].packets == 2)
1371 self.assert_(sum_value[0].count == 1)
1372 self.assert_(sum_value[0].bytes == 92)
1373
1374 ##
1375 # Add line again just to see counts go up.
1376 matches = flow_aggregate(fields_dict, stats_dict)
1377 for match in matches:
1378 flow_db.field_add(match)
1379
1380 flow_types = flow_db.field_types_get()
1381 self.assert_(len(flow_types) == len(expected_flow_types))
1382 for flow_type in flow_types:
1383 self.assertTrue(flow_type in expected_flow_types)
1384
1385 for flow_type in flow_types:
1386 sum_value = flow_db.field_values_in_order("all", 1)
1387 self.assert_(len(sum_value) == 5)
1388 self.assert_(sum_value[0].packets == 4)
1389 self.assert_(sum_value[0].count == 2)
1390 self.assert_(sum_value[0].bytes == 2 * 92)
1391
1392 def test_assoc_list(self):
1393 """ test_assoc_list. """
1394 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1395 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1396 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1397 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1398 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1399 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1400 "38,41,44,47,50,53,56,59,62,65"
1401
1402 valid_flows = [
1403 'eth_type(0x86dd)',
1404 'udp(dst=5355)',
1405 'in_port(4)',
1406 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1407 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1408 ]
1409
1410 (fields, stats, _) = flow_line_split(line)
1411 stats_dict = elements_to_dict(stats)
1412 fields_dict = elements_to_dict(fields)
1413
1414 ##
1415 # Test simple case of one line.
1416 flow_db = FlowDB(False)
1417 matches = flow_aggregate(fields_dict, stats_dict)
1418 for match in matches:
1419 flow_db.field_add(match)
1420
1421 for sum_value in flow_db.field_values_in_order("all", 1):
1422 assoc_list = Columns.assoc_list(sum_value)
1423 for item in assoc_list:
1424 if (item[0] == "fields"):
1425 self.assertTrue(item[1] in valid_flows)
1426 elif (item[0] == "packets"):
1427 self.assertTrue(item[1] == 2)
1428 elif (item[0] == "count"):
1429 self.assertTrue(item[1] == 1)
1430 elif (item[0] == "average"):
1431 self.assertTrue(item[1] == 46.0)
1432 elif (item[0] == "bytes"):
1433 self.assertTrue(item[1] == 92)
1434 else:
1435 raise ValueError("unknown %s", item[0])
1436
1437 def test_human_format(self):
1438 """ test_assoc_list. """
1439
1440 self.assertEqual(approximate_size(0.0), "0.0 KiB")
1441 self.assertEqual(approximate_size(1024), "1.0 KiB")
1442 self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1443 self.assertEqual(approximate_size((1024 * 1024) + 100000),
1444 "1.1 MiB")
1445 value = (1024 * 1024 * 1024) + 100000000
1446 self.assertEqual(approximate_size(value), "1.1 GiB")
1447
1448 def test_flow_line_split(self):
1449 """ Splitting a flow line is not trivial.
1450 There is no clear delimiter. Comma is used liberally."""
1451 expected_fields = ["in_port(4)",
1452 "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1453 "eth_type(0x86dd)",
1454 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1455 "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1456 "udp(src=61252,dst=5355)"]
1457 expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1458 expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1459 "38,41,44,47,50,53,56,59,62,65"
1460
1461 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1462 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1463 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1464 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1465 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1466 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1467 "38,41,44,47,50,53,56,59,62,65"
1468
1469 (fields, stats, actions) = flow_line_split(line)
1470
1471 self.assertEqual(fields, expected_fields)
1472 self.assertEqual(stats, expected_stats)
1473 self.assertEqual(actions, expected_actions)
1474
1475 def test_accumulate_decay(self):
1476 """ test_accumulate_decay: test accumulated decay. """
1477 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1478 "dst=ff:ff:ff:ff:ff:ff),"
1479 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1480 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1481 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1482 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1483 "packets:1, bytes:120, used:0.004s, actions:1"]
1484
1485 flow_db = FlowDB(True)
1486 flow_db.begin()
1487 flow_db.flow_line_add(lines[0])
1488
1489 # Make sure we decay
1490 time.sleep(4)
1491 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1492 flow_db.decay(1)
1493 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1494
1495 flow_db.flow_line_add(lines[0])
1496 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1497 flow_db.decay(30)
1498 # Should not be deleted.
1499 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1500
1501 flow_db.flow_line_add(lines[0])
1502 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1503 timer = decay_timer_start(flow_db, 2)
1504 time.sleep(10)
1505 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1506 timer.stop()
1507
1508 def test_accumulate(self):
1509 """ test_accumulate test that FlowDB supports accumulate. """
1510
1511 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1512 "dst=ff:ff:ff:ff:ff:ff),"
1513 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1514 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1515 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1516 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1517 "packets:1, bytes:120, used:0.004s, actions:1",
1518 "in_port(2),"
1519 "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1520 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1521 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1522 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1523 "packets:2, bytes:5026, used:0.348s, actions:1",
1524 "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1525 "dst=ff:ff:ff:ff:ff:ff),"
1526 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1527 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1528 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1529 "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1530 "bytes:240, used:0.004s, actions:1"]
1531
1532 lines = [
1533 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1534 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1535 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1536 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1537 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1538 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1539 ]
1540
1541 # Turn on accumulate.
1542 flow_db = FlowDB(True)
1543 flow_db.begin()
1544
1545 flow_db.flow_line_add(lines[0])
1546
1547 # Test one flow exist.
1548 sum_values = flow_db.field_values_in_order("all", 1)
1549 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1550 self.assertEqual(len(in_ports), 1)
1551 self.assertEqual(in_ports[0].packets, 1)
1552 self.assertEqual(in_ports[0].bytes, 120)
1553 self.assertEqual(in_ports[0].count, 1)
1554
1555 # simulate another sample
1556 # Test two different flows exist.
1557 flow_db.begin()
1558 flow_db.flow_line_add(lines[1])
1559 sum_values = flow_db.field_values_in_order("all", 1)
1560 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1561 self.assertEqual(len(in_ports), 1)
1562 self.assertEqual(in_ports[0].packets, 1)
1563 self.assertEqual(in_ports[0].bytes, 120)
1564 self.assertEqual(in_ports[0].count, 1)
1565
1566 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1567 self.assertEqual(len(in_ports), 1)
1568 self.assertEqual(in_ports[0].packets, 2)
1569 self.assertEqual(in_ports[0].bytes, 126)
1570 self.assertEqual(in_ports[0].count, 1)
1571
1572 # Test first flow increments packets.
1573 flow_db.begin()
1574 flow_db.flow_line_add(lines[2])
1575 sum_values = flow_db.field_values_in_order("all", 1)
1576 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1577 self.assertEqual(len(in_ports), 1)
1578 self.assertEqual(in_ports[0].packets, 2)
1579 self.assertEqual(in_ports[0].bytes, 240)
1580 self.assertEqual(in_ports[0].count, 1)
1581
1582 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1583 self.assertEqual(len(in_ports), 1)
1584 self.assertEqual(in_ports[0].packets, 2)
1585 self.assertEqual(in_ports[0].bytes, 126)
1586 self.assertEqual(in_ports[0].count, 1)
1587
1588 # Test third flow but with the same in_port(1) as the first flow.
1589 flow_db.begin()
1590 flow_db.flow_line_add(lines[3])
1591 sum_values = flow_db.field_values_in_order("all", 1)
1592 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1593 self.assertEqual(len(in_ports), 1)
1594 self.assertEqual(in_ports[0].packets, 3)
1595 self.assertEqual(in_ports[0].bytes, 360)
1596 self.assertEqual(in_ports[0].count, 2)
1597
1598 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1599 self.assertEqual(len(in_ports), 1)
1600 self.assertEqual(in_ports[0].packets, 2)
1601 self.assertEqual(in_ports[0].bytes, 126)
1602 self.assertEqual(in_ports[0].count, 1)
1603
1604 # Third flow has changes.
1605 flow_db.begin()
1606 flow_db.flow_line_add(lines[4])
1607 sum_values = flow_db.field_values_in_order("all", 1)
1608 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1609 self.assertEqual(len(in_ports), 1)
1610 self.assertEqual(in_ports[0].packets, 4)
1611 self.assertEqual(in_ports[0].bytes, 480)
1612 self.assertEqual(in_ports[0].count, 2)
1613
1614 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1615 self.assertEqual(len(in_ports), 1)
1616 self.assertEqual(in_ports[0].packets, 2)
1617 self.assertEqual(in_ports[0].bytes, 126)
1618 self.assertEqual(in_ports[0].count, 1)
1619
1620 # First flow reset.
1621 flow_db.begin()
1622 flow_db.flow_line_add(lines[5])
1623 sum_values = flow_db.field_values_in_order("all", 1)
1624 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1625 self.assertEqual(len(in_ports), 1)
1626 self.assertEqual(in_ports[0].packets, 3)
1627 self.assertEqual(in_ports[0].bytes, 360)
1628 self.assertEqual(in_ports[0].count, 2)
1629
1630 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1631 self.assertEqual(len(in_ports), 1)
1632 self.assertEqual(in_ports[0].packets, 2)
1633 self.assertEqual(in_ports[0].bytes, 126)
1634 self.assertEqual(in_ports[0].count, 1)
1635
1636 def test_parse_character_errors(self):
1637 """ test_parsing errors.
1638 The flow parses is purposely loose. Its not designed to validate
1639 input. Merely pull out what it can but there are situations
1640 that a parse error can be detected.
1641 """
1642
1643 lines = ["complete garbage",
1644 "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1645 "dst=33:33:00:00:00:66),"
1646 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1647 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1648 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1649 "packets:2,bytes:5026,actions:1"]
1650
1651 flow_db = FlowDB(False)
1652 flow_db.begin()
1653 for line in lines:
1654 try:
1655 flow_db.flow_line_add(line)
1656 except ValueError:
1657 # We want an exception. That is how we know we have
1658 # correctly found a simple parsing error. We are not
1659 # looking to validate flow output just catch simple issues.
1660 continue
1661 self.assertTrue(False)
1662
1663 def test_tunnel_parsing(self):
1664 """ test_tunnel_parsing test parse flows with tunnel. """
1665 lines = [
1666 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1667 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1668 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1669 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1670 "actions:userspace(pid=4294962691,slow_path(cfm))"
1671 ]
1672 flow_db = FlowDB(False)
1673 flow_db.begin()
1674 flow_db.flow_line_add(lines[0])
1675 sum_values = flow_db.field_values_in_order("all", 1)
1676 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1677 self.assertEqual(len(in_ports), 1)
1678 self.assertEqual(in_ports[0].packets, 6)
1679 self.assertEqual(in_ports[0].bytes, 534)
1680 self.assertEqual(in_ports[0].count, 1)
1681
1682 def test_flow_multiple_paren(self):
1683 """ test_flow_multiple_paren. """
1684 line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1685 valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1686 "in_port(2)"]
1687 rc = flow_line_iter(line)
1688 self.assertEqual(valid, rc)
1689
1690 def test_to_network(self):
1691 """ test_to_network test ipv4_to_network and ipv6_to_network. """
1692 ipv4s = [
1693 ("192.168.0.1", "192.168.0.1"),
1694 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1695 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1696 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1697 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1698 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1699 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1700 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1701 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1702 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1703 ]
1704
1705 ipv6s = [
1706 ("1::192:168:0:1", "1::192:168:0:1"),
1707 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1708 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1709 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1710 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1711 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1712 ("1::192:168:0:1/::", "::")
1713 ]
1714
1715 for (ipv4_test, ipv4_check) in ipv4s:
1716 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1717
1718 for (ipv6_test, ipv6_check) in ipv6s:
1719 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
3a2742a1
MH
1720
1721 def test_ui(self):
1722 """ test_ui: test expected ui behavior. """
1723 #pylint: disable=W0212
1724 top_render = Render(80, Render.FIELD_SELECT_TOP)
1725 script_render = Render(80, Render.FIELD_SELECT_SCRIPT)
1726 self.assertEqual(top_render._field_type_select_get(), "in_port")
1727 self.assertEqual(script_render._field_type_select_get(), "all")
1728 #pylint: enable=W0212