]> git.proxmox.com Git - mirror_ovs.git/blob - utilities/ovs-dpctl-top.in
Add support for write-actions
[mirror_ovs.git] / utilities / ovs-dpctl-top.in
1 #! @PYTHON@
2 #
3 # Copyright (c) 2013 Nicira, Inc.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 #
18 # The approximate_size code was copied from
19 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20 # which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21 # used under a Creative Commons Attribution-Share-Alike license:
22 # http://creativecommons.org/licenses/by-sa/3.0/
23 #
24 #
25
26 """Top like behavior for ovs-dpctl dump-flows output.
27
28 This program summarizes ovs-dpctl flow content by aggregating the number
29 of packets, total bytes and occurrence of the following fields:
30
31 - Datapath in_port
32
33 - Ethernet type
34
35 - Source and destination MAC addresses
36
37 - IP protocol
38
39 - Source and destination IPv4 addresses
40
41 - Source and destination IPv6 addresses
42
43 - UDP and TCP destination port
44
45 - Tunnel source and destination addresses
46
47
48 Output shows four values:
49 - FIELDS: the flow fields for example in_port(1).
50
51 - PACKETS: the total number of packets containing the flow field.
52
53 - BYTES: the total number of bytes containing the flow field. If units are
54 not present then values are in bytes.
55
56 - AVERAGE: the average packets size (BYTES/PACKET).
57
58 - COUNT: the number of lines in the dump-flow output contain the flow field.
59
60 Top Behavior
61
62 While in top mode, the default behavior, the following single character
63 commands are supported:
64
65 a - toggles top in accumulate and live mode. Accumulate mode is described
66 below.
67
68 s - toggles which column is used to sort content in decreasing order. A
69 DESC title is placed over the column.
70
71 _ - a space indicating to collect dump-flow content again
72
73 h - halt output. Any character will restart sampling
74
75 f - cycle through flow fields
76
77 q - q for quit.
78
79 Accumulate Mode
80
81 There are two supported modes: live and accumulate. The default is live.
82 The parameter --accumulate or the 'a' character in top mode enables the
83 latter. In live mode, recent dump-flow content is presented.
84 Where as accumulate mode keeps track of the prior historical
85 information until the flow is reset not when the flow is purged. Reset
86 flows are determined when the packet count for a flow has decreased from
87 its previous sample. There is one caveat, eventually the system will
88 run out of memory if, after the accumulate-decay period any flows that
89 have not been refreshed are purged. The goal here is to free memory
90 of flows that are not active. Statistics are not decremented. Their purpose
91 is to reflect the overall history of the flow fields.
92
93
94 Debugging Errors
95
96 Parsing errors are counted and displayed in the status line at the beginning
97 of the output. Use the --verbose option with --script to see what output
98 was not parsed, like this:
99 $ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
100
101 Error messages will identify content that failed to parse.
102
103
104 Access Remote Hosts
105
106 The --host must follow the format user@hostname. This script simply calls
107 'ssh user@Hostname' without checking for login credentials therefore public
108 keys should be installed on the system identified by hostname, such as:
109
110 $ ssh-copy-id user@hostname
111
112 Consult ssh-copy-id man pages for more details.
113
114
115 Expected usage
116
117 $ ovs-dpctl-top
118
119 or to run as a script:
120 $ ovs-dpctl dump-flows > dump-flows.log
121 $ ovs-dpctl-top --script --flow-file dump-flows.log
122
123 """
124
125 # pylint: disable-msg=C0103
126 # pylint: disable-msg=C0302
127 # pylint: disable-msg=R0902
128 # pylint: disable-msg=R0903
129 # pylint: disable-msg=R0904
130 # pylint: disable-msg=R0912
131 # pylint: disable-msg=R0913
132 # pylint: disable-msg=R0914
133
134 import sys
135 import os
136 try:
137 ##
138 # Arg parse is not installed on older Python distributions.
139 # ovs ships with a version in the directory mentioned below.
140 import argparse
141 except ImportError:
142 sys.path.append(os.path.join("@pkgdatadir@", "python"))
143 import argparse
144 import logging
145 import re
146 import unittest
147 import copy
148 import curses
149 import operator
150 import subprocess
151 import fcntl
152 import struct
153 import termios
154 import datetime
155 import threading
156 import time
157 import socket
158
159
160 ##
161 # The following two definitions provide the necessary netaddr functionality.
162 # Python netaddr module is not part of the core installation. Packaging
163 # netaddr was involved and seems inappropriate given that only two
164 # methods where used.
165 def ipv4_to_network(ip_str):
166 """ Calculate the network given a ipv4/mask value.
167 If a mask is not present simply return ip_str.
168 """
169 pack_length = '!HH'
170 try:
171 (ip, mask) = ip_str.split("/")
172 except ValueError:
173 # just an ip address no mask.
174 return ip_str
175
176 ip_p = socket.inet_pton(socket.AF_INET, ip)
177 ip_t = struct.unpack(pack_length, ip_p)
178 mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
180
181 return socket.inet_ntop(socket.AF_INET,
182 struct.pack('!HH', network_n[0], network_n[1]))
183
184
185 def ipv6_to_network(ip_str):
186 """ Calculate the network given a ipv6/mask value.
187 If a mask is not present simply return ip_str.
188 """
189 pack_length = '!HHHHHHHH'
190 try:
191 (ip, mask) = ip_str.split("/")
192 except ValueError:
193 # just an ip address no mask.
194 return ip_str
195
196 ip_p = socket.inet_pton(socket.AF_INET6, ip)
197 ip_t = struct.unpack(pack_length, ip_p)
198 mask_t = struct.unpack(pack_length,
199 socket.inet_pton(socket.AF_INET6, mask))
200 network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
201
202 return socket.inet_ntop(socket.AF_INET6,
203 struct.pack(pack_length,
204 network_n[0], network_n[1],
205 network_n[2], network_n[3],
206 network_n[4], network_n[5],
207 network_n[6], network_n[7]))
208
209
210 ##
211 # columns displayed
212 ##
213 class Columns:
214 """ Holds column specific content.
215 Titles needs to be less than 8 characters.
216 """
217 VALUE_WIDTH = 9
218 FIELDS = "fields"
219 PACKETS = "packets"
220 COUNT = "count"
221 BYTES = "bytes"
222 AVERAGE = "average"
223
224 def __init__(self):
225 pass
226
227 @staticmethod
228 def assoc_list(obj):
229 """ Return a associated list. """
230 return [(Columns.FIELDS, repr(obj)),
231 (Columns.PACKETS, obj.packets),
232 (Columns.BYTES, obj.bytes),
233 (Columns.COUNT, obj.count),
234 (Columns.AVERAGE, obj.average),
235 ]
236
237
238 def element_eth_get(field_type, element, stats_dict):
239 """ Extract eth frame src and dst from a dump-flow element."""
240 fmt = "%s(src=%s,dst=%s)"
241
242 element = fmt % (field_type, element["src"], element["dst"])
243 return SumData(field_type, element, stats_dict["packets"],
244 stats_dict["bytes"], element)
245
246
247 def element_ipv4_get(field_type, element, stats_dict):
248 """ Extract src and dst from a dump-flow element."""
249 fmt = "%s(src=%s,dst=%s)"
250 element_show = fmt % (field_type, element["src"], element["dst"])
251
252 element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253 ipv4_to_network(element["dst"]))
254
255 return SumData(field_type, element_show, stats_dict["packets"],
256 stats_dict["bytes"], element_key)
257
258
259 def element_tunnel_get(field_type, element, stats_dict):
260 """ Extract src and dst from a tunnel."""
261 return element_ipv4_get(field_type, element, stats_dict)
262
263
264 def element_ipv6_get(field_type, element, stats_dict):
265 """ Extract src and dst from a dump-flow element."""
266
267 fmt = "%s(src=%s,dst=%s)"
268 element_show = fmt % (field_type, element["src"], element["dst"])
269
270 element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271 ipv6_to_network(element["dst"]))
272
273 return SumData(field_type, element_show, stats_dict["packets"],
274 stats_dict["bytes"], element_key)
275
276
277 def element_dst_port_get(field_type, element, stats_dict):
278 """ Extract src and dst from a dump-flow element."""
279 element_key = "%s(dst=%s)" % (field_type, element["dst"])
280 return SumData(field_type, element_key, stats_dict["packets"],
281 stats_dict["bytes"], element_key)
282
283
284 def element_passthrough_get(field_type, element, stats_dict):
285 """ Extract src and dst from a dump-flow element."""
286 element_key = "%s(%s)" % (field_type, element)
287 return SumData(field_type, element_key,
288 stats_dict["packets"], stats_dict["bytes"], element_key)
289
290
291 # pylint: disable-msg=R0903
292 class OutputFormat:
293 """ Holds field_type and function to extract element value. """
294 def __init__(self, field_type, generator):
295 self.field_type = field_type
296 self.generator = generator
297
298 OUTPUT_FORMAT = [
299 OutputFormat("eth", element_eth_get),
300 OutputFormat("ipv4", element_ipv4_get),
301 OutputFormat("ipv6", element_ipv6_get),
302 OutputFormat("tunnel", element_tunnel_get),
303 OutputFormat("udp", element_dst_port_get),
304 OutputFormat("tcp", element_dst_port_get),
305 OutputFormat("eth_type", element_passthrough_get),
306 OutputFormat("in_port", element_passthrough_get)
307 ]
308
309
310 ELEMENT_KEY = {
311 "udp": "udp.dst",
312 "tcp": "tcp.dst"
313 }
314
315
316 def top_input_get(args):
317 """ Return subprocess stdout."""
318 cmd = []
319 if (args.host):
320 cmd += ["ssh", args.host]
321 cmd += ["ovs-dpctl", "dump-flows"]
322
323 return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
324 stdout=subprocess.PIPE).stdout
325
326
327 def args_get():
328 """ read program parameters handle any necessary validation of input. """
329
330 parser = argparse.ArgumentParser(
331 formatter_class=argparse.RawDescriptionHelpFormatter,
332 description=__doc__)
333 ##
334 # None is a special value indicating to read flows from stdin.
335 # This handles the case
336 # ovs-dpctl dump-flows | ovs-dpctl-flows.py
337 parser.add_argument("-v", "--version", version="@VERSION@",
338 action="version", help="show version")
339 parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
340 action="append",
341 help="file containing flows from ovs-dpctl dump-flow")
342 parser.add_argument("-V", "--verbose", dest="verbose",
343 default=logging.CRITICAL,
344 action="store_const", const=logging.DEBUG,
345 help="enable debug level verbosity")
346 parser.add_argument("-s", "--script", dest="top", action="store_false",
347 help="Run from a script (no user interface)")
348 parser.add_argument("--host", dest="host",
349 help="Specify a user@host for retrieving flows see"
350 "Accessing Remote Hosts for more information")
351
352 parser.add_argument("-a", "--accumulate", dest="accumulate",
353 action="store_true", default=False,
354 help="Accumulate dump-flow content")
355 parser.add_argument("--accumulate-decay", dest="accumulateDecay",
356 default=5.0 * 60, type=float,
357 help="Decay old accumulated flows. "
358 "The default is 5 minutes. "
359 "A value of 0 disables decay.")
360 parser.add_argument("-d", "--delay", dest="delay", type=int,
361 default=1000,
362 help="Delay in milliseconds to collect dump-flow "
363 "content (sample rate).")
364
365 args = parser.parse_args()
366
367 logging.basicConfig(level=args.verbose)
368
369 return args
370
371 ###
372 # Code to parse a single line in dump-flow
373 ###
374 # key(values)
375 FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
376 # key:value
377 FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
378 FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
379
380
381 def flow_line_iter(line):
382 """ iterate over flow dump elements.
383 return tuples of (true, element) or (false, remaining element)
384 """
385 # splits by , except for when in a (). Actions element was not
386 # split properly but we don't need it.
387 rc = []
388
389 element = ""
390 paren_count = 0
391
392 for ch in line:
393 if (ch == '('):
394 paren_count += 1
395 elif (ch == ')'):
396 paren_count -= 1
397
398 if (ch == ' '):
399 # ignore white space.
400 continue
401 elif ((ch == ',') and (paren_count == 0)):
402 rc.append(element)
403 element = ""
404 else:
405 element += ch
406
407 if (paren_count):
408 raise ValueError(line)
409 else:
410 if (len(element) > 0):
411 rc.append(element)
412 return rc
413
414
415 def flow_line_compound_parse(compound):
416 """ Parse compound element
417 for example
418 src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
419 which is in
420 eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
421 """
422 result = {}
423 for element in flow_line_iter(compound):
424 match = FIELDS_CMPND_ELEMENT.search(element)
425 if (match):
426 key = match.group(1)
427 value = match.group(2)
428 result[key] = value
429
430 match = FIELDS_CMPND.search(element)
431 if (match):
432 key = match.group(1)
433 value = match.group(2)
434 result[key] = flow_line_compound_parse(value)
435 continue
436
437 if (len(result.keys()) == 0):
438 return compound
439 return result
440
441
442 def flow_line_split(line):
443 """ Convert a flow dump line into ([fields], [stats], actions) tuple.
444 Where fields and stats are lists.
445 This function relies on a the following ovs-dpctl dump-flow
446 output characteristics:
447 1. The dumpe flow line consists of a list of frame fields, list of stats
448 and action.
449 2. list of frame fields, each stat and action field are delimited by ', '.
450 3. That all other non stat field are not delimited by ', '.
451
452 """
453
454 results = re.split(', ', line)
455
456 (field, stats, action) = (results[0], results[1:-1], results[-1])
457
458 fields = flow_line_iter(field)
459 return (fields, stats, action)
460
461
462 def elements_to_dict(elements):
463 """ Convert line to a hierarchy of dictionaries. """
464 result = {}
465 for element in elements:
466 match = FIELDS_CMPND.search(element)
467 if (match):
468 key = match.group(1)
469 value = match.group(2)
470 result[key] = flow_line_compound_parse(value)
471 continue
472
473 match = FIELDS_ELEMENT.search(element)
474 if (match):
475 key = match.group(1)
476 value = match.group(2)
477 result[key] = value
478 else:
479 raise ValueError("can't parse >%s<" % element)
480 return result
481
482
483 # pylint: disable-msg=R0903
484 class SumData(object):
485 """ Interface that all data going into SumDb must implement.
486 Holds the flow field and its corresponding count, total packets,
487 total bytes and calculates average.
488
489 __repr__ is used as key into SumData singleton.
490 __str__ is used as human readable output.
491 """
492
493 def __init__(self, field_type, field, packets, flow_bytes, key):
494 # Count is the number of lines in the dump-flow log.
495 self.field_type = field_type
496 self.field = field
497 self.count = 1
498 self.packets = int(packets)
499 self.bytes = int(flow_bytes)
500 self.key = key
501
502 def decrement(self, decr_packets, decr_bytes, decr_count):
503 """ Decrement content to calculate delta from previous flow sample."""
504 self.packets -= decr_packets
505 self.bytes -= decr_bytes
506 self.count -= decr_count
507
508 def __iadd__(self, other):
509 """ Add two objects. """
510
511 if (self.key != other.key):
512 raise ValueError("adding two unrelated types")
513
514 self.count += other.count
515 self.packets += other.packets
516 self.bytes += other.bytes
517 return self
518
519 def __isub__(self, other):
520 """ Decrement two objects. """
521
522 if (self.key != other.key):
523 raise ValueError("adding two unrelated types")
524
525 self.count -= other.count
526 self.packets -= other.packets
527 self.bytes -= other.bytes
528 return self
529
530 def __getattr__(self, name):
531 """ Handle average. """
532 if (name == "average"):
533 if (self.packets == 0):
534 return float(0.0)
535 else:
536 return float(self.bytes) / float(self.packets)
537 raise AttributeError(name)
538
539 def __str__(self):
540 """ Used for debugging. """
541 return "%s %s %s %s" % (self.field, self.count,
542 self.packets, self.bytes)
543
544 def __repr__(self):
545 """ Used as key in the FlowDB table. """
546 return self.key
547
548
549 def flow_aggregate(fields_dict, stats_dict):
550 """ Search for content in a line.
551 Passed the flow port of the dump-flows plus the current stats consisting
552 of packets, bytes, etc
553 """
554 result = []
555
556 for output_format in OUTPUT_FORMAT:
557 field = fields_dict.get(output_format.field_type, None)
558 if (field):
559 obj = output_format.generator(output_format.field_type,
560 field, stats_dict)
561 result.append(obj)
562
563 return result
564
565
566 def flows_read(ihdl, flow_db):
567 """ read flow content from ihdl and insert into flow_db. """
568
569 done = False
570 while (not done):
571 line = ihdl.readline()
572 if (len(line) == 0):
573 # end of input
574 break
575
576 try:
577 flow_db.flow_line_add(line)
578 except ValueError, arg:
579 logging.error(arg)
580
581 return flow_db
582
583
584 def get_terminal_size():
585 """
586 return column width and height of the terminal
587 """
588 for fd_io in [0, 1, 2]:
589 try:
590 result = struct.unpack('hh',
591 fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
592 '1234'))
593 except IOError:
594 result = None
595 continue
596
597 if (result is None or result == (0, 0)):
598 # Maybe we can't get the width. In that case assume (25, 80)
599 result = (25, 80)
600
601 return result
602
603 ##
604 # Content derived from:
605 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
606 ##
607 SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
608 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
609
610
611 def approximate_size(size, a_kilobyte_is_1024_bytes=True):
612 """Convert a file size to human-readable form.
613
614 Keyword arguments:
615 size -- file size in bytes
616 a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
617 if False, use multiples of 1000
618
619 Returns: string
620
621 """
622 size = float(size)
623 if size < 0:
624 raise ValueError('number must be non-negative')
625
626 if (a_kilobyte_is_1024_bytes):
627 multiple = 1024
628 else:
629 multiple = 1000
630 for suffix in SUFFIXES[multiple]:
631 size /= multiple
632 if size < multiple:
633 return "%.1f %s" % (size, suffix)
634
635 raise ValueError('number too large')
636
637
638 ##
639 # End copied content
640 ##
641 class ColMeta:
642 """ Concepts about columns. """
643 def __init__(self, sortable, width):
644 self.sortable = sortable
645 self.width = width
646
647
648 class RowMeta:
649 """ How to render rows. """
650 def __init__(self, label, fmt):
651 self.label = label
652 self.fmt = fmt
653
654
655 def fmt_packet(obj, width):
656 """ Provide a string for packets that is appropriate for output."""
657 return str(obj.packets).rjust(width)
658
659
660 def fmt_count(obj, width):
661 """ Provide a string for average that is appropriate for output."""
662 return str(obj.count).rjust(width)
663
664
665 def fmt_avg(obj, width):
666 """ Provide a string for average that is appropriate for output."""
667 return str(int(obj.average)).rjust(width)
668
669
670 def fmt_field(obj, width):
671 """ truncate really long flow and insert ellipses to help make it
672 clear.
673 """
674
675 ellipses = " ... "
676 value = obj.field
677 if (len(obj.field) > width):
678 value = value[:(width - len(ellipses))] + ellipses
679 return value.ljust(width)
680
681
682 def fmt_bytes(obj, width):
683 """ Provide a string for average that is appropriate for output."""
684 if (len(str(obj.bytes)) <= width):
685 value = str(obj.bytes)
686 else:
687 value = approximate_size(obj.bytes)
688 return value.rjust(width)
689
690
691 def title_center(value, width):
692 """ Center a column title."""
693 return value.upper().center(width)
694
695
696 def title_rjust(value, width):
697 """ Right justify a column title. """
698 return value.upper().rjust(width)
699
700
701 def column_picker(order, obj):
702 """ return the column as specified by order. """
703 if (order == 1):
704 return obj.count
705 elif (order == 2):
706 return obj.packets
707 elif (order == 3):
708 return obj.bytes
709 elif (order == 4):
710 return obj.average
711 else:
712 raise ValueError("order outside of range %s" % order)
713
714
715 class Render:
716 """ Renders flow data. """
717 def __init__(self, console_width):
718 """ Calculate column widths taking into account changes in format."""
719
720 self._start_time = datetime.datetime.now()
721
722 self._cols = [ColMeta(False, 0),
723 ColMeta(True, Columns.VALUE_WIDTH),
724 ColMeta(True, Columns.VALUE_WIDTH),
725 ColMeta(True, Columns.VALUE_WIDTH),
726 ColMeta(True, Columns.VALUE_WIDTH)]
727 self._console_width = console_width
728 self.console_width_set(console_width)
729
730 # Order in this array dictate the order of the columns.
731 # The 0 width for the first entry is a place holder. This is
732 # dynamically calculated. The first column is special. We need a
733 # way to indicate which field are presented.
734 self._descs = [RowMeta("", title_rjust),
735 RowMeta("", title_rjust),
736 RowMeta("", title_rjust),
737 RowMeta("", title_rjust),
738 RowMeta("", title_rjust)]
739 self._column_sort_select = 0
740 self.column_select_event()
741
742 self._titles = [
743 RowMeta(Columns.FIELDS, title_center),
744 RowMeta(Columns.COUNT, title_rjust),
745 RowMeta(Columns.PACKETS, title_rjust),
746 RowMeta(Columns.BYTES, title_rjust),
747 RowMeta(Columns.AVERAGE, title_rjust)
748 ]
749
750 self._datas = [
751 RowMeta(None, fmt_field),
752 RowMeta(None, fmt_count),
753 RowMeta(None, fmt_packet),
754 RowMeta(None, fmt_bytes),
755 RowMeta(None, fmt_avg)
756 ]
757
758 ##
759 # _field_types hold which fields are displayed in the field
760 # column, with the keyword all implying all fields.
761 ##
762 self._field_types = ["all"] + [ii.field_type for ii in OUTPUT_FORMAT]
763
764 ##
765 # The default is to show all field types.
766 ##
767 self._field_type_select = -1
768 self.field_type_toggle()
769
770 def _field_type_select_get(self):
771 """ Return which field type to display. """
772 return self._field_types[self._field_type_select]
773
774 def field_type_toggle(self):
775 """ toggle which field types to show. """
776 self._field_type_select += 1
777 if (self._field_type_select >= len(self._field_types)):
778 self._field_type_select = 0
779 value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
780 self._titles[0].label = value
781
782 def column_select_event(self):
783 """ Handles column select toggle. """
784
785 self._descs[self._column_sort_select].label = ""
786 for _ in range(len(self._cols)):
787 self._column_sort_select += 1
788 if (self._column_sort_select >= len(self._cols)):
789 self._column_sort_select = 0
790
791 # Now look for the next sortable column
792 if (self._cols[self._column_sort_select].sortable):
793 break
794 self._descs[self._column_sort_select].label = "DESC"
795
796 def console_width_set(self, console_width):
797 """ Adjust the output given the new console_width. """
798 self._console_width = console_width
799
800 spaces = len(self._cols) - 1
801 ##
802 # Calculating column width can be tedious but important. The
803 # flow field value can be long. The goal here is to dedicate
804 # fixed column space for packets, bytes, average and counts. Give the
805 # remaining space to the flow column. When numbers get large
806 # transition output to output generated by approximate_size which
807 # limits output to ###.# XiB in other words 9 characters.
808 ##
809 # At this point, we know the maximum length values. We may
810 # truncate the flow column to get everything to fit.
811 self._cols[0].width = 0
812 values_max_length = sum([ii.width for ii in self._cols]) + spaces
813 flow_max_length = console_width - values_max_length
814 self._cols[0].width = flow_max_length
815
816 def format(self, flow_db):
817 """ shows flows based on --script parameter."""
818
819 rc = []
820 ##
821 # Top output consists of
822 # Title
823 # Column title (2 rows)
824 # data
825 # statistics and status
826
827 ##
828 # Title
829 ##
830 rc.append("Flow Summary".center(self._console_width))
831
832 stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
833 flow_db.flow_stats_get()
834 accumulate = flow_db.accumulate_get()
835 if (accumulate):
836 stats += "Accumulate: on "
837 else:
838 stats += "Accumulate: off "
839
840 duration = datetime.datetime.now() - self._start_time
841 stats += "Duration: %s " % str(duration)
842 rc.append(stats.ljust(self._console_width))
843
844 ##
845 # 2 rows for columns.
846 ##
847 # Indicate which column is in descending order.
848 rc.append(" ".join([ii.fmt(ii.label, col.width)
849 for (ii, col) in zip(self._descs, self._cols)]))
850
851 rc.append(" ".join([ii.fmt(ii.label, col.width)
852 for (ii, col) in zip(self._titles, self._cols)]))
853
854 ##
855 # Data.
856 ##
857 for dd in flow_db.field_values_in_order(self._field_type_select_get(),
858 self._column_sort_select):
859 rc.append(" ".join([ii.fmt(dd, col.width)
860 for (ii, col) in zip(self._datas,
861 self._cols)]))
862
863 return rc
864
865
866 def curses_screen_begin():
867 """ begin curses screen control. """
868 stdscr = curses.initscr()
869 curses.cbreak()
870 curses.noecho()
871 stdscr.keypad(1)
872 return stdscr
873
874
875 def curses_screen_end(stdscr):
876 """ end curses screen control. """
877 curses.nocbreak()
878 stdscr.keypad(0)
879 curses.echo()
880 curses.endwin()
881
882
883 class FlowDB:
884 """ Implements live vs accumulate mode.
885
886 Flows are stored as key value pairs. The key consists of the content
887 prior to stat fields. The value portion consists of stats in a dictionary
888 form.
889
890 @ \todo future add filtering here.
891 """
892 def __init__(self, accumulate):
893 self._accumulate = accumulate
894 self._error_count = 0
895 # Values are (stats, last update time.)
896 # The last update time is used for aging.
897 self._flow_lock = threading.Lock()
898 # This dictionary holds individual flows.
899 self._flows = {}
900 # This dictionary holds aggregate of flow fields.
901 self._fields = {}
902
903 def accumulate_get(self):
904 """ Return the current accumulate state. """
905 return self._accumulate
906
907 def accumulate_toggle(self):
908 """ toggle accumulate flow behavior. """
909 self._accumulate = not self._accumulate
910
911 def begin(self):
912 """ Indicate the beginning of processing flow content.
913 if accumulate is false clear current set of flows. """
914
915 if (not self._accumulate):
916 self._flow_lock.acquire()
917 try:
918 self._flows.clear()
919 finally:
920 self._flow_lock.release()
921 self._fields.clear()
922
923 def flow_line_add(self, line):
924 """ Split a line from a ovs-dpctl dump-flow into key and stats.
925 The order of the content in the flow should be:
926 - flow content
927 - stats for the flow
928 - actions
929
930 This method also assumes that the dump flow output does not
931 change order of fields of the same flow.
932 """
933
934 line = line.rstrip("\n")
935 (fields, stats, _) = flow_line_split(line)
936
937 try:
938 fields_dict = elements_to_dict(fields)
939
940 if (len(fields_dict) == 0):
941 raise ValueError("flow fields are missing %s", line)
942
943 stats_dict = elements_to_dict(stats)
944 if (len(stats_dict) == 0):
945 raise ValueError("statistics are missing %s.", line)
946
947 ##
948 # In accumulate mode, the Flow database can reach 10,000's of
949 # persistent flows. The interaction of the script with this many
950 # flows is too slow. Instead, delta are sent to the flow_db
951 # database allow incremental changes to be done in O(m) time
952 # where m is the current flow list, instead of iterating over
953 # all flows in O(n) time where n is the entire history of flows.
954 key = ",".join(fields)
955
956 self._flow_lock.acquire()
957 try:
958 (stats_old_dict, _) = self._flows.get(key, (None, None))
959 finally:
960 self._flow_lock.release()
961
962 self.flow_event(fields_dict, stats_old_dict, stats_dict)
963
964 except ValueError, arg:
965 logging.error(arg)
966 self._error_count += 1
967 raise
968
969 self._flow_lock.acquire()
970 try:
971 self._flows[key] = (stats_dict, datetime.datetime.now())
972 finally:
973 self._flow_lock.release()
974
975 def decay(self, decayTimeInSeconds):
976 """ Decay content. """
977 now = datetime.datetime.now()
978 for (key, value) in self._flows.items():
979 (stats_dict, updateTime) = value
980 delta = now - updateTime
981
982 if (delta.seconds > decayTimeInSeconds):
983 self._flow_lock.acquire()
984 try:
985 del self._flows[key]
986
987 fields_dict = elements_to_dict(flow_line_iter(key))
988 matches = flow_aggregate(fields_dict, stats_dict)
989 for match in matches:
990 self.field_dec(match)
991
992 finally:
993 self._flow_lock.release()
994
995 def flow_stats_get(self):
996 """ Return statistics in a form of a dictionary. """
997 rc = None
998 self._flow_lock.acquire()
999 try:
1000 rc = {"flow_total": len(self._flows),
1001 "flow_errors": self._error_count}
1002 finally:
1003 self._flow_lock.release()
1004 return rc
1005
1006 def field_types_get(self):
1007 """ Return the set of types stored in the singleton. """
1008 types = set((ii.field_type for ii in self._fields.values()))
1009 return types
1010
1011 def field_add(self, data):
1012 """ Collect dump-flow data to sum number of times item appears. """
1013 current = self._fields.get(repr(data), None)
1014 if (current is None):
1015 current = copy.copy(data)
1016 else:
1017 current += data
1018 self._fields[repr(current)] = current
1019
1020 def field_dec(self, data):
1021 """ Collect dump-flow data to sum number of times item appears. """
1022 current = self._fields.get(repr(data), None)
1023 if (current is None):
1024 raise ValueError("decrementing field missing %s" % repr(data))
1025
1026 current -= data
1027 self._fields[repr(current)] = current
1028 if (current.count == 0):
1029 del self._fields[repr(current)]
1030
1031 def field_values_in_order(self, field_type_select, column_order):
1032 """ Return a list of items in order maximum first. """
1033 values = self._fields.values()
1034 if (field_type_select != "all"):
1035 # If a field type other than "all" then reduce the list.
1036 values = [ii for ii in values
1037 if (ii.field_type == field_type_select)]
1038 values = [(column_picker(column_order, ii), ii) for ii in values]
1039 values.sort(key=operator.itemgetter(0))
1040 values.reverse()
1041 values = [ii[1] for ii in values]
1042 return values
1043
1044 def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1045 """ Receives new flow information. """
1046
1047 # In order to avoid processing every flow at every sample
1048 # period, changes in flow packet count is used to determine the
1049 # delta in the flow statistics. This delta is used in the call
1050 # to self.decrement prior to self.field_add
1051
1052 if (stats_old_dict is None):
1053 # This is a new flow
1054 matches = flow_aggregate(fields_dict, stats_new_dict)
1055 for match in matches:
1056 self.field_add(match)
1057 else:
1058 old_packets = int(stats_old_dict.get("packets", 0))
1059 new_packets = int(stats_new_dict.get("packets", 0))
1060 if (old_packets == new_packets):
1061 # ignore. same data.
1062 pass
1063 else:
1064 old_bytes = stats_old_dict.get("bytes", 0)
1065 # old_packets != new_packets
1066 # if old_packets > new_packets then we end up decrementing
1067 # packets and bytes.
1068 matches = flow_aggregate(fields_dict, stats_new_dict)
1069 for match in matches:
1070 match.decrement(int(old_packets), int(old_bytes), 1)
1071 self.field_add(match)
1072
1073
1074 class DecayThread(threading.Thread):
1075 """ Periodically call flow database to see if any flows are old. """
1076 def __init__(self, flow_db, interval):
1077 """ Start decay thread. """
1078 threading.Thread.__init__(self)
1079
1080 self._interval = max(1, interval)
1081 self._min_interval = min(1, interval / 10)
1082 self._flow_db = flow_db
1083 self._event = threading.Event()
1084 self._running = True
1085
1086 self.daemon = True
1087
1088 def run(self):
1089 """ Worker thread which handles decaying accumulated flows. """
1090
1091 while(self._running):
1092 self._event.wait(self._min_interval)
1093 if (self._running):
1094 self._flow_db.decay(self._interval)
1095
1096 def stop(self):
1097 """ Stop thread. """
1098 self._running = False
1099 self._event.set()
1100 ##
1101 # Give the calling thread time to terminate but not too long.
1102 # this thread is a daemon so the application will terminate if
1103 # we timeout during the join. This is just a cleaner way to
1104 # release resources.
1105 self.join(2.0)
1106
1107
1108 def flow_top_command(stdscr, render, flow_db):
1109 """ Handle input while in top mode. """
1110 ch = stdscr.getch()
1111 ##
1112 # Any character will restart sampling.
1113 if (ch == ord('h')):
1114 # halt output.
1115 ch = stdscr.getch()
1116 while (ch == -1):
1117 ch = stdscr.getch()
1118
1119 if (ch == ord('s')):
1120 # toggle which column sorts data in descending order.
1121 render.column_select_event()
1122 elif (ch == ord('a')):
1123 flow_db.accumulate_toggle()
1124 elif (ch == ord('f')):
1125 render.field_type_toggle()
1126 elif (ch == ord(' ')):
1127 # resample
1128 pass
1129
1130 return ch
1131
1132
1133 def decay_timer_start(flow_db, accumulateDecay):
1134 """ If accumulateDecay greater than zero then start timer. """
1135 if (accumulateDecay > 0):
1136 decay_timer = DecayThread(flow_db, accumulateDecay)
1137 decay_timer.start()
1138 return decay_timer
1139 else:
1140 return None
1141
1142
1143 def flows_top(args):
1144 """ handles top like behavior when --script is not specified. """
1145
1146 flow_db = FlowDB(args.accumulate)
1147 render = Render(0)
1148
1149 decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1150 lines = []
1151
1152 try:
1153 stdscr = curses_screen_begin()
1154 try:
1155 ch = 'X'
1156 #stdscr.nodelay(1)
1157 stdscr.timeout(args.delay)
1158
1159 while (ch != ord('q')):
1160 flow_db.begin()
1161
1162 try:
1163 ihdl = top_input_get(args)
1164 try:
1165 flows_read(ihdl, flow_db)
1166 finally:
1167 ihdl.close()
1168 except OSError, arg:
1169 logging.critical(arg)
1170 break
1171
1172 (console_height, console_width) = stdscr.getmaxyx()
1173 render.console_width_set(console_width)
1174
1175 output_height = console_height - 1
1176 line_count = range(output_height)
1177 line_output = render.format(flow_db)
1178 lines = zip(line_count, line_output[:output_height])
1179
1180 stdscr.erase()
1181 for (count, line) in lines:
1182 stdscr.addstr(count, 0, line[:console_width])
1183 stdscr.refresh()
1184
1185 ch = flow_top_command(stdscr, render, flow_db)
1186
1187 finally:
1188 curses_screen_end(stdscr)
1189 except KeyboardInterrupt:
1190 pass
1191 if (decay_timer):
1192 decay_timer.stop()
1193
1194 # repeat output
1195 for (count, line) in lines:
1196 print line
1197
1198
1199 def flows_script(args):
1200 """ handles --script option. """
1201
1202 flow_db = FlowDB(args.accumulate)
1203 flow_db.begin()
1204
1205 if (args.flowFiles is None):
1206 logging.info("reading flows from stdin")
1207 ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1208 try:
1209 flow_db = flows_read(ihdl, flow_db)
1210 finally:
1211 ihdl.close()
1212 else:
1213 for flowFile in args.flowFiles:
1214 logging.info("reading flows from %s", flowFile)
1215 ihdl = open(flowFile, "r")
1216 try:
1217 flow_db = flows_read(ihdl, flow_db)
1218 finally:
1219 ihdl.close()
1220
1221 (_, console_width) = get_terminal_size()
1222 render = Render(console_width)
1223
1224 for line in render.format(flow_db):
1225 print line
1226
1227
1228 def main():
1229 """ Return 0 on success or 1 on failure.
1230
1231 Algorithm
1232 There are four stages to the process ovs-dpctl dump-flow content.
1233 1. Retrieve current input
1234 2. store in FlowDB and maintain history
1235 3. Iterate over FlowDB and aggregating stats for each flow field
1236 4. present data.
1237
1238 Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1239 is called. Future version will have more elaborate means for collecting
1240 dump-flow content. FlowDB returns all data as in the form of a hierarchical
1241 dictionary. Input will vary.
1242
1243 In the case of accumulate mode, flows are not purged from the FlowDB
1244 manager. Instead at the very least, merely the latest statistics are
1245 kept. In the case, of live output the FlowDB is purged prior to sampling
1246 data.
1247
1248 Aggregating results requires identify flow fields to aggregate out
1249 of the flow and summing stats.
1250
1251 """
1252 args = args_get()
1253
1254 try:
1255 if (args.top):
1256 flows_top(args)
1257 else:
1258 flows_script(args)
1259 except KeyboardInterrupt:
1260 return 1
1261 return 0
1262
1263 if __name__ == '__main__':
1264 sys.exit(main())
1265 elif __name__ == 'ovs-dpctl-top':
1266 # pylint: disable-msg=R0915
1267
1268 ##
1269 # Test case beyond this point.
1270 # pylint: disable-msg=R0904
1271 class TestsuiteFlowParse(unittest.TestCase):
1272 """
1273 parse flow into hierarchy of dictionaries.
1274 """
1275 def test_flow_parse(self):
1276 """ test_flow_parse. """
1277 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1278 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1279 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1280 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1281 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1282 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1283 "38,41,44,47,50,53,56,59,62,65"
1284
1285 (fields, stats, _) = flow_line_split(line)
1286 flow_dict = elements_to_dict(fields + stats)
1287 self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1288 self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1289 self.assertEqual(flow_dict["ipv6"]["src"],
1290 "fe80::55bf:fe42:bc96:2812")
1291 self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1292 self.assertEqual(flow_dict["packets"], "1")
1293 self.assertEqual(flow_dict["bytes"], "92")
1294
1295 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1296 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1297 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1298 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1299 "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1300 "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1301 "38,41,44,47,50,53,56,59,62,65"
1302
1303 (fields, stats, _) = flow_line_split(line)
1304 flow_dict = elements_to_dict(fields + stats)
1305 self.assertEqual(flow_dict["used"], "-0.703s")
1306 self.assertEqual(flow_dict["packets"], "1")
1307 self.assertEqual(flow_dict["bytes"], "92")
1308
1309 def test_flow_sum(self):
1310 """ test_flow_sum. """
1311 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1312 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1313 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1314 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1315 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1316 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1317 "38,41,44,47,50,53,56,59,62,65"
1318
1319 (fields, stats, _) = flow_line_split(line)
1320 stats_dict = elements_to_dict(stats)
1321 fields_dict = elements_to_dict(fields)
1322 ##
1323 # Test simple case of one line.
1324 flow_db = FlowDB(False)
1325 matches = flow_aggregate(fields_dict, stats_dict)
1326 for match in matches:
1327 flow_db.field_add(match)
1328
1329 flow_types = flow_db.field_types_get()
1330 expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1331 self.assert_(len(flow_types) == len(expected_flow_types))
1332 for flow_type in flow_types:
1333 self.assertTrue(flow_type in expected_flow_types)
1334
1335 for flow_type in flow_types:
1336 sum_value = flow_db.field_values_in_order("all", 1)
1337 self.assert_(len(sum_value) == 5)
1338 self.assert_(sum_value[0].packets == 2)
1339 self.assert_(sum_value[0].count == 1)
1340 self.assert_(sum_value[0].bytes == 92)
1341
1342 ##
1343 # Add line again just to see counts go up.
1344 matches = flow_aggregate(fields_dict, stats_dict)
1345 for match in matches:
1346 flow_db.field_add(match)
1347
1348 flow_types = flow_db.field_types_get()
1349 self.assert_(len(flow_types) == len(expected_flow_types))
1350 for flow_type in flow_types:
1351 self.assertTrue(flow_type in expected_flow_types)
1352
1353 for flow_type in flow_types:
1354 sum_value = flow_db.field_values_in_order("all", 1)
1355 self.assert_(len(sum_value) == 5)
1356 self.assert_(sum_value[0].packets == 4)
1357 self.assert_(sum_value[0].count == 2)
1358 self.assert_(sum_value[0].bytes == 2 * 92)
1359
1360 def test_assoc_list(self):
1361 """ test_assoc_list. """
1362 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1363 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1364 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1365 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1366 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1367 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1368 "38,41,44,47,50,53,56,59,62,65"
1369
1370 valid_flows = [
1371 'eth_type(0x86dd)',
1372 'udp(dst=5355)',
1373 'in_port(4)',
1374 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1375 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1376 ]
1377
1378 (fields, stats, _) = flow_line_split(line)
1379 stats_dict = elements_to_dict(stats)
1380 fields_dict = elements_to_dict(fields)
1381
1382 ##
1383 # Test simple case of one line.
1384 flow_db = FlowDB(False)
1385 matches = flow_aggregate(fields_dict, stats_dict)
1386 for match in matches:
1387 flow_db.field_add(match)
1388
1389 for sum_value in flow_db.field_values_in_order("all", 1):
1390 assoc_list = Columns.assoc_list(sum_value)
1391 for item in assoc_list:
1392 if (item[0] == "fields"):
1393 self.assertTrue(item[1] in valid_flows)
1394 elif (item[0] == "packets"):
1395 self.assertTrue(item[1] == 2)
1396 elif (item[0] == "count"):
1397 self.assertTrue(item[1] == 1)
1398 elif (item[0] == "average"):
1399 self.assertTrue(item[1] == 46.0)
1400 elif (item[0] == "bytes"):
1401 self.assertTrue(item[1] == 92)
1402 else:
1403 raise ValueError("unknown %s", item[0])
1404
1405 def test_human_format(self):
1406 """ test_assoc_list. """
1407
1408 self.assertEqual(approximate_size(0.0), "0.0 KiB")
1409 self.assertEqual(approximate_size(1024), "1.0 KiB")
1410 self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1411 self.assertEqual(approximate_size((1024 * 1024) + 100000),
1412 "1.1 MiB")
1413 value = (1024 * 1024 * 1024) + 100000000
1414 self.assertEqual(approximate_size(value), "1.1 GiB")
1415
1416 def test_flow_line_split(self):
1417 """ Splitting a flow line is not trivial.
1418 There is no clear delimiter. Comma is used liberally."""
1419 expected_fields = ["in_port(4)",
1420 "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1421 "eth_type(0x86dd)",
1422 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1423 "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1424 "udp(src=61252,dst=5355)"]
1425 expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1426 expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1427 "38,41,44,47,50,53,56,59,62,65"
1428
1429 line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1430 "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1431 "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1432 "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1433 "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1434 "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1435 "38,41,44,47,50,53,56,59,62,65"
1436
1437 (fields, stats, actions) = flow_line_split(line)
1438
1439 self.assertEqual(fields, expected_fields)
1440 self.assertEqual(stats, expected_stats)
1441 self.assertEqual(actions, expected_actions)
1442
1443 def test_accumulate_decay(self):
1444 """ test_accumulate_decay: test accumulated decay. """
1445 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1446 "dst=ff:ff:ff:ff:ff:ff),"
1447 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1448 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1449 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1450 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1451 "packets:1, bytes:120, used:0.004s, actions:1"]
1452
1453 flow_db = FlowDB(True)
1454 flow_db.begin()
1455 flow_db.flow_line_add(lines[0])
1456
1457 # Make sure we decay
1458 time.sleep(4)
1459 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1460 flow_db.decay(1)
1461 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1462
1463 flow_db.flow_line_add(lines[0])
1464 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1465 flow_db.decay(30)
1466 # Should not be deleted.
1467 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1468
1469 flow_db.flow_line_add(lines[0])
1470 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1471 timer = decay_timer_start(flow_db, 2)
1472 time.sleep(10)
1473 self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1474 timer.stop()
1475
1476 def test_accumulate(self):
1477 """ test_accumulate test that FlowDB supports accumulate. """
1478
1479 lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1480 "dst=ff:ff:ff:ff:ff:ff),"
1481 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1482 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1483 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1484 "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1485 "packets:1, bytes:120, used:0.004s, actions:1",
1486 "in_port(2),"
1487 "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1488 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1489 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1490 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1491 "packets:2, bytes:5026, used:0.348s, actions:1",
1492 "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1493 "dst=ff:ff:ff:ff:ff:ff),"
1494 "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1495 "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1496 "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1497 "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1498 "bytes:240, used:0.004s, actions:1"]
1499
1500 lines = [
1501 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1502 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1503 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1504 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1505 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1506 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1507 ]
1508
1509 # Turn on accumulate.
1510 flow_db = FlowDB(True)
1511 flow_db.begin()
1512
1513 flow_db.flow_line_add(lines[0])
1514
1515 # Test one flow exist.
1516 sum_values = flow_db.field_values_in_order("all", 1)
1517 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1518 self.assertEqual(len(in_ports), 1)
1519 self.assertEqual(in_ports[0].packets, 1)
1520 self.assertEqual(in_ports[0].bytes, 120)
1521 self.assertEqual(in_ports[0].count, 1)
1522
1523 # simulate another sample
1524 # Test two different flows exist.
1525 flow_db.begin()
1526 flow_db.flow_line_add(lines[1])
1527 sum_values = flow_db.field_values_in_order("all", 1)
1528 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1529 self.assertEqual(len(in_ports), 1)
1530 self.assertEqual(in_ports[0].packets, 1)
1531 self.assertEqual(in_ports[0].bytes, 120)
1532 self.assertEqual(in_ports[0].count, 1)
1533
1534 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1535 self.assertEqual(len(in_ports), 1)
1536 self.assertEqual(in_ports[0].packets, 2)
1537 self.assertEqual(in_ports[0].bytes, 126)
1538 self.assertEqual(in_ports[0].count, 1)
1539
1540 # Test first flow increments packets.
1541 flow_db.begin()
1542 flow_db.flow_line_add(lines[2])
1543 sum_values = flow_db.field_values_in_order("all", 1)
1544 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1545 self.assertEqual(len(in_ports), 1)
1546 self.assertEqual(in_ports[0].packets, 2)
1547 self.assertEqual(in_ports[0].bytes, 240)
1548 self.assertEqual(in_ports[0].count, 1)
1549
1550 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1551 self.assertEqual(len(in_ports), 1)
1552 self.assertEqual(in_ports[0].packets, 2)
1553 self.assertEqual(in_ports[0].bytes, 126)
1554 self.assertEqual(in_ports[0].count, 1)
1555
1556 # Test third flow but with the same in_port(1) as the first flow.
1557 flow_db.begin()
1558 flow_db.flow_line_add(lines[3])
1559 sum_values = flow_db.field_values_in_order("all", 1)
1560 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1561 self.assertEqual(len(in_ports), 1)
1562 self.assertEqual(in_ports[0].packets, 3)
1563 self.assertEqual(in_ports[0].bytes, 360)
1564 self.assertEqual(in_ports[0].count, 2)
1565
1566 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1567 self.assertEqual(len(in_ports), 1)
1568 self.assertEqual(in_ports[0].packets, 2)
1569 self.assertEqual(in_ports[0].bytes, 126)
1570 self.assertEqual(in_ports[0].count, 1)
1571
1572 # Third flow has changes.
1573 flow_db.begin()
1574 flow_db.flow_line_add(lines[4])
1575 sum_values = flow_db.field_values_in_order("all", 1)
1576 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1577 self.assertEqual(len(in_ports), 1)
1578 self.assertEqual(in_ports[0].packets, 4)
1579 self.assertEqual(in_ports[0].bytes, 480)
1580 self.assertEqual(in_ports[0].count, 2)
1581
1582 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1583 self.assertEqual(len(in_ports), 1)
1584 self.assertEqual(in_ports[0].packets, 2)
1585 self.assertEqual(in_ports[0].bytes, 126)
1586 self.assertEqual(in_ports[0].count, 1)
1587
1588 # First flow reset.
1589 flow_db.begin()
1590 flow_db.flow_line_add(lines[5])
1591 sum_values = flow_db.field_values_in_order("all", 1)
1592 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1593 self.assertEqual(len(in_ports), 1)
1594 self.assertEqual(in_ports[0].packets, 3)
1595 self.assertEqual(in_ports[0].bytes, 360)
1596 self.assertEqual(in_ports[0].count, 2)
1597
1598 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1599 self.assertEqual(len(in_ports), 1)
1600 self.assertEqual(in_ports[0].packets, 2)
1601 self.assertEqual(in_ports[0].bytes, 126)
1602 self.assertEqual(in_ports[0].count, 1)
1603
1604 def test_parse_character_errors(self):
1605 """ test_parsing errors.
1606 The flow parses is purposely loose. Its not designed to validate
1607 input. Merely pull out what it can but there are situations
1608 that a parse error can be detected.
1609 """
1610
1611 lines = ["complete garbage",
1612 "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1613 "dst=33:33:00:00:00:66),"
1614 "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1615 "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1616 "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1617 "packets:2,bytes:5026,actions:1"]
1618
1619 flow_db = FlowDB(False)
1620 flow_db.begin()
1621 for line in lines:
1622 try:
1623 flow_db.flow_line_add(line)
1624 except ValueError:
1625 # We want an exception. That is how we know we have
1626 # correctly found a simple parsing error. We are not
1627 # looking to validate flow output just catch simple issues.
1628 continue
1629 self.assertTrue(False)
1630
1631 def test_tunnel_parsing(self):
1632 """ test_tunnel_parsing test parse flows with tunnel. """
1633 lines = [
1634 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1635 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1636 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1637 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1638 "actions:userspace(pid=4294962691,slow_path(cfm))"
1639 ]
1640 flow_db = FlowDB(False)
1641 flow_db.begin()
1642 flow_db.flow_line_add(lines[0])
1643 sum_values = flow_db.field_values_in_order("all", 1)
1644 in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1645 self.assertEqual(len(in_ports), 1)
1646 self.assertEqual(in_ports[0].packets, 6)
1647 self.assertEqual(in_ports[0].bytes, 534)
1648 self.assertEqual(in_ports[0].count, 1)
1649
1650 def test_flow_multiple_paren(self):
1651 """ test_flow_multiple_paren. """
1652 line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1653 valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1654 "in_port(2)"]
1655 rc = flow_line_iter(line)
1656 self.assertEqual(valid, rc)
1657
1658 def test_to_network(self):
1659 """ test_to_network test ipv4_to_network and ipv6_to_network. """
1660 ipv4s = [
1661 ("192.168.0.1", "192.168.0.1"),
1662 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1663 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1664 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1665 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1666 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1667 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1668 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1669 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1670 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1671 ]
1672
1673 ipv6s = [
1674 ("1::192:168:0:1", "1::192:168:0:1"),
1675 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1676 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1677 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1678 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1679 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1680 ("1::192:168:0:1/::", "::")
1681 ]
1682
1683 for (ipv4_test, ipv4_check) in ipv4s:
1684 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1685
1686 for (ipv6_test, ipv6_check) in ipv6s:
1687 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)