]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
Merge pull request #7963 from volta-networks/feat_pceplib_into_frr_github
[mirror_frr.git] / tests / topotests / lib / lutil.py
1 #!/usr/bin/env python
2
3 # Copyright 2017, LabN Consulting, L.L.C.
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; see the file COPYING; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 import os
20 import re
21 import sys
22 import time
23 import datetime
24 import json
25 import math
26 import time
27 from lib.topolog import logger
28 from lib.topotest import json_cmp
29 from mininet.net import Mininet
30
31
32 # L utility functions
33 #
34 # These functions are inteneted to provide support for CI testing within MiniNet
35 # environments.
36
37
38 class lUtil:
39 # to be made configurable in the future
40 base_script_dir = "."
41 base_log_dir = "."
42 fout_name = "output.log"
43 fsum_name = "summary.txt"
44 l_level = 6
45 CallOnFail = False
46
47 l_total = 0
48 l_pass = 0
49 l_fail = 0
50 l_filename = ""
51 l_last = None
52 l_line = 0
53 l_dotall_experiment = False
54 l_last_nl = None
55
56 fout = ""
57 fsum = ""
58 net = ""
59
60 def log(self, str, level=6):
61 if self.l_level > 0:
62 if self.fout == "":
63 self.fout = open(self.fout_name, "w")
64 self.fout.write(str + "\n")
65 if level <= self.l_level:
66 print(str)
67
68 def summary(self, str):
69 if self.fsum == "":
70 self.fsum = open(self.fsum_name, "w")
71 self.fsum.write(
72 "\
73 ******************************************************************************\n"
74 )
75 self.fsum.write(
76 "\
77 Test Target Summary Pass Fail\n"
78 )
79 self.fsum.write(
80 "\
81 ******************************************************************************\n"
82 )
83 self.fsum.write(str + "\n")
84
85 def result(self, target, success, str, logstr=None):
86 if success:
87 p = 1
88 f = 0
89 self.l_pass += 1
90 sstr = "PASS"
91 else:
92 f = 1
93 p = 0
94 self.l_fail += 1
95 sstr = "FAIL"
96 self.l_total += 1
97 if logstr != None:
98 self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
99 res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
100 self.log("R:" + res)
101 self.summary(res)
102 if f == 1 and self.CallOnFail != False:
103 self.CallOnFail()
104
105 def closeFiles(self):
106 ret = (
107 "\
108 ******************************************************************************\n\
109 Total %-4d %-4d %d\n\
110 ******************************************************************************"
111 % (self.l_total, self.l_pass, self.l_fail)
112 )
113 if self.fsum != "":
114 self.fsum.write(ret + "\n")
115 self.fsum.close()
116 self.fsum = ""
117 if self.fout != "":
118 if os.path.isfile(self.fsum_name):
119 r = open(self.fsum_name, "r")
120 self.fout.write(r.read())
121 r.close()
122 self.fout.close()
123 self.fout = ""
124 return ret
125
126 def setFilename(self, name):
127 str = "FILE: " + name
128 self.log(str)
129 self.summary(str)
130 self.l_filename = name
131 self.line = 0
132
133 def getCallOnFail(self):
134 return self.CallOnFail
135
136 def setCallOnFail(self, CallOnFail):
137 self.CallOnFail = CallOnFail
138
139 def strToArray(self, string):
140 a = []
141 c = 0
142 end = ""
143 words = string.split()
144 if len(words) < 1 or words[0].startswith("#"):
145 return a
146 words = string.split()
147 for word in words:
148 if len(end) == 0:
149 a.append(word)
150 else:
151 a[c] += str(" " + word)
152 if end == "\\":
153 end = ""
154 if not word.endswith("\\"):
155 if end != '"':
156 if word.startswith('"'):
157 end = '"'
158 else:
159 c += 1
160 else:
161 if word.endswith('"'):
162 end = ""
163 c += 1
164 else:
165 c += 1
166 else:
167 end = "\\"
168 # if len(end) == 0:
169 # print('%d:%s:' % (c, a[c-1]))
170
171 return a
172
173 def execTestFile(self, tstFile):
174 if os.path.isfile(tstFile):
175 f = open(tstFile)
176 for line in f:
177 if len(line) > 1:
178 a = self.strToArray(line)
179 if len(a) >= 6:
180 luCommand(a[1], a[2], a[3], a[4], a[5])
181 else:
182 self.l_line += 1
183 self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
184 if len(a) >= 2:
185 if a[0] == "sleep":
186 time.sleep(int(a[1]))
187 elif a[0] == "include":
188 self.execTestFile(a[1])
189 f.close()
190 else:
191 self.log("unable to read: " + tstFile)
192 sys.exit(1)
193
194 def command(self, target, command, regexp, op, result, returnJson):
195 global net
196 if op != "wait":
197 self.l_line += 1
198
199 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
200 returnJson = True
201
202 self.log(
203 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
204 % (
205 time.asctime(),
206 self.l_total + 1,
207 self.l_filename,
208 self.l_line,
209 target,
210 command,
211 regexp,
212 op,
213 result,
214 )
215 )
216 if self.net == "":
217 return False
218 # self.log("Running %s %s" % (target, command))
219 js = None
220 out = self.net[target].cmd(command).rstrip()
221 if len(out) == 0:
222 report = "<no output>"
223 else:
224 report = out
225 if returnJson == True:
226 try:
227 js = json.loads(out)
228 except:
229 js = None
230 self.log(
231 "WARNING: JSON load failed -- confirm command output is in JSON format."
232 )
233 self.log("COMMAND OUTPUT:%s:" % report)
234
235 # JSON comparison
236 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
237 try:
238 expect = json.loads(regexp)
239 except:
240 expect = None
241 self.log(
242 "WARNING: JSON load failed -- confirm regex input is in JSON format."
243 )
244 json_diff = json_cmp(js, expect)
245 if json_diff != None:
246 if op == "jsoncmp_fail":
247 success = True
248 else:
249 success = False
250 self.log("JSON DIFF:%s:" % json_diff)
251 ret = success
252 else:
253 if op == "jsoncmp_fail":
254 success = False
255 else:
256 success = True
257 self.result(target, success, result)
258 if js != None:
259 return js
260 return ret
261
262 # Experiment: can we achieve the same match behavior via DOTALL
263 # without converting newlines to spaces?
264 out_nl = out
265 search_nl = re.search(regexp, out_nl, re.DOTALL)
266 self.l_last_nl = search_nl
267 # Set up for comparison
268 if search_nl != None:
269 group_nl = search_nl.group()
270 group_nl_converted = " ".join(group_nl.splitlines())
271 else:
272 group_nl_converted = None
273
274 out = " ".join(out.splitlines())
275 search = re.search(regexp, out)
276 self.l_last = search
277 if search == None:
278 if op == "fail":
279 success = True
280 else:
281 success = False
282 ret = success
283 else:
284 ret = search.group()
285 if op != "fail":
286 success = True
287 level = 7
288 else:
289 success = False
290 level = 5
291 self.log("found:%s:" % ret, level)
292 # Experiment: compare matched strings obtained each way
293 if self.l_dotall_experiment and (group_nl_converted != ret):
294 self.log(
295 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
296 % (group_nl_converted, ret),
297 9,
298 )
299 if op == "pass" or op == "fail":
300 self.result(target, success, result)
301 if js != None:
302 return js
303 return ret
304
305 def wait(
306 self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
307 ):
308 self.log(
309 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
310 % (
311 self.l_filename,
312 self.l_line,
313 target,
314 command,
315 regexp,
316 op,
317 result,
318 wait,
319 wait_time,
320 )
321 )
322 found = False
323 n = 0
324 startt = time.time()
325
326 # Calculate the amount of `sleep`s we are going to peform.
327 wait_count = int(math.ceil(wait / wait_time)) + 1
328
329 while wait_count > 0:
330 n += 1
331 found = self.command(target, command, regexp, op, result, returnJson)
332 if found is not False:
333 break
334
335 wait_count -= 1
336 if wait_count > 0:
337 time.sleep(wait_time)
338
339 delta = time.time() - startt
340 self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
341 found = self.command(
342 target,
343 command,
344 regexp,
345 "pass",
346 "%s +%4.2f secs" % (result, delta),
347 returnJson,
348 )
349 return found
350
351
352 # initialized by luStart
353 LUtil = None
354
355 # entry calls
356 def luStart(
357 baseScriptDir=".",
358 baseLogDir=".",
359 net="",
360 fout="output.log",
361 fsum="summary.txt",
362 level=None,
363 ):
364 global LUtil
365 # init class
366 LUtil = lUtil()
367 LUtil.base_script_dir = baseScriptDir
368 LUtil.base_log_dir = baseLogDir
369 LUtil.net = net
370 if fout != "":
371 LUtil.fout_name = baseLogDir + "/" + fout
372 if fsum != None:
373 LUtil.fsum_name = baseLogDir + "/" + fsum
374 if level != None:
375 LUtil.l_level = level
376 LUtil.l_dotall_experiment = False
377 LUtil.l_dotall_experiment = True
378
379
380 def luCommand(
381 target,
382 command,
383 regexp=".",
384 op="none",
385 result="",
386 time=10,
387 returnJson=False,
388 wait_time=0.5,
389 ):
390 if op != "wait":
391 return LUtil.command(target, command, regexp, op, result, returnJson)
392 else:
393 return LUtil.wait(
394 target, command, regexp, op, result, time, returnJson, wait_time
395 )
396
397
398 def luLast(usenl=False):
399 if usenl:
400 if LUtil.l_last_nl != None:
401 LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
402 return LUtil.l_last_nl
403 else:
404 if LUtil.l_last != None:
405 LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
406 return LUtil.l_last
407
408
409 def luInclude(filename, CallOnFail=None):
410 tstFile = LUtil.base_script_dir + "/" + filename
411 LUtil.setFilename(filename)
412 if CallOnFail != None:
413 oldCallOnFail = LUtil.getCallOnFail()
414 LUtil.setCallOnFail(CallOnFail)
415 if filename.endswith(".py"):
416 LUtil.log("luInclude: execfile " + tstFile)
417 with open(tstFile) as infile:
418 exec(infile.read())
419 else:
420 LUtil.log("luInclude: execTestFile " + tstFile)
421 LUtil.execTestFile(tstFile)
422 if CallOnFail != None:
423 LUtil.setCallOnFail(oldCallOnFail)
424
425
426 def luFinish():
427 global LUtil
428 ret = LUtil.closeFiles()
429 # done
430 LUtil = None
431 return ret
432
433
434 def luNumFail():
435 return LUtil.l_fail
436
437
438 def luNumPass():
439 return LUtil.l_pass
440
441
442 def luResult(target, success, str, logstr=None):
443 return LUtil.result(target, success, str, logstr)
444
445
446 def luShowResults(prFunction):
447 printed = 0
448 sf = open(LUtil.fsum_name, "r")
449 for line in sf:
450 printed += 1
451 prFunction(line.rstrip())
452 sf.close()
453
454
455 def luShowFail():
456 printed = 0
457 sf = open(LUtil.fsum_name, "r")
458 for line in sf:
459 if line[-2] != "0":
460 printed += 1
461 logger.error(line.rstrip())
462 sf.close()
463 if printed > 0:
464 logger.error("See %s for details of errors" % LUtil.fout_name)
465
466
467 # for testing
468 if __name__ == "__main__":
469 print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
470 luStart()
471 for arg in sys.argv[1:]:
472 luInclude(arg)
473 luFinish()
474 sys.exit(0)